This is an automated email from the ASF dual-hosted git repository.
anovikov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 151585bdc2 IGNITE-20977 Basic criteria queries for keyValue view
(#2934)
151585bdc2 is described below
commit 151585bdc23f6571ae6014d365ad2ff2a480926d
Author: Andrey Novikov <[email protected]>
AuthorDate: Thu Jan 18 14:40:19 2024 +0700
IGNITE-20977 Basic criteria queries for keyValue view (#2934)
---
.../apache/ignite/lang/util/IgniteNameUtils.java | 13 +
.../java/org/apache/ignite/table/KeyValueView.java | 3 +-
.../internal/client/table/AbstractClientView.java | 75 ++++++
.../client/table/ClientKeyValueBinaryView.java | 22 +-
.../internal/client/table/ClientKeyValueView.java | 37 ++-
.../client/table/ClientRecordBinaryView.java | 30 ---
.../internal/client/table/ClientRecordView.java | 44 ++--
.../configuration/util/ConfigurationUtil.java | 71 ------
.../configuration/util/ConfigurationUtilTest.java | 58 -----
.../table/criteria/QueryCriteriaAsyncCursor.java | 36 ++-
.../internal/table/criteria/SqlRowProjection.java | 276 +++++++++++++++++++++
.../internal/table/criteria/SqlSerializer.java | 6 +-
.../ignite/internal/util/CollectionUtils.java | 71 ++++++
.../ignite/internal/util/ExceptionUtils.java | 11 +
.../ignite/internal/util/CollectionUtilsTest.java | 62 +++++
.../ignite/internal/util/IgniteNameUtilsTest.java | 11 +-
.../ignite/internal/marshaller/TupleReader.java | 171 +++++++++++++
.../ignite/internal/table/ItCriteriaQueryTest.java | 275 +++++++++++++++++++-
.../ignite/internal/table/AbstractTableView.java | 96 +++++--
.../internal/table/KeyValueBinaryViewImpl.java | 17 +-
.../ignite/internal/table/KeyValueViewImpl.java | 42 +++-
.../internal/table/RecordBinaryViewImpl.java | 39 +--
.../ignite/internal/table/RecordViewImpl.java | 53 ++--
23 files changed, 1209 insertions(+), 310 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/lang/util/IgniteNameUtils.java
b/modules/api/src/main/java/org/apache/ignite/lang/util/IgniteNameUtils.java
index 22d2ccb580..e13954f072 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/util/IgniteNameUtils.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/util/IgniteNameUtils.java
@@ -71,6 +71,19 @@ public final class IgniteNameUtils {
return "\"" + name + "\"";
}
+ /**
+ * Wraps the given name with double quotes if it not upper case not-quoted
name,
+ * e.g. "myColumn" -> "\"myColumn\"", "MYCOLUMN" -> "MYCOLUMN"
+ *
+ * @param name Object name.
+ * @return Quoted object name.
+ */
+ public static String quoteIfNeeded(String name) {
+ String simpleName = parseSimpleName(name);
+
+ return name.equals(simpleName) || name.equals(quote(simpleName)) ?
name : quote(name);
+ }
+
/**
* Identifier chain tokenizer.
*
diff --git
a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
index 1d3137bedd..b1f5923df3 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.lang.NullableValue;
import org.apache.ignite.lang.UnexpectedNullValueException;
+import org.apache.ignite.table.criteria.CriteriaQuerySource;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -35,7 +36,7 @@ import org.jetbrains.annotations.Nullable;
* @apiNote 'Key/value class field' >-< 'table column' mapping laid down
in implementation.
* @see org.apache.ignite.table.mapper.Mapper
*/
-public interface KeyValueView<K, V> extends DataStreamerTarget<Entry<K, V>> {
+public interface KeyValueView<K, V> extends DataStreamerTarget<Entry<K, V>>,
CriteriaQuerySource<Entry<K, V>> {
/**
* Gets a value associated with a given key.
*
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
index c7eabdb65e..d665f5b096 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
@@ -19,13 +19,25 @@ package org.apache.ignite.internal.client.table;
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.internal.client.ClientUtils.sync;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.lang.util.IgniteNameUtils.parseSimpleName;
import java.util.Arrays;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
+import org.apache.ignite.internal.client.sql.ClientSessionBuilder;
+import org.apache.ignite.internal.client.sql.ClientStatementBuilder;
import org.apache.ignite.internal.table.criteria.CursorAdapter;
+import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
import org.apache.ignite.internal.table.criteria.SqlSerializer;
+import org.apache.ignite.lang.AsyncCursor;
import org.apache.ignite.lang.Cursor;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
import org.apache.ignite.table.criteria.Criteria;
import org.apache.ignite.table.criteria.CriteriaQueryOptions;
import org.apache.ignite.table.criteria.CriteriaQuerySource;
@@ -50,6 +62,26 @@ abstract class AbstractClientView<T> implements
CriteriaQuerySource<T> {
this.tbl = tbl;
}
+ /**
+ * Map columns to it's names.
+ *
+ * @param columns Target columns.
+ * @param startInclusive The first index to cover.
+ * @param endExclusive Index immediately past the last index to cover.
+ * @return Column names.
+ */
+ protected static String[] columnNames(ClientColumn[] columns, int
startInclusive, int endExclusive) {
+ int sz = endExclusive - startInclusive;
+
+ String[] columnNames = new String[sz];
+
+ for (int i = 0; i < sz; i++) {
+ columnNames[i] = columns[startInclusive + i].name();
+ }
+
+ return columnNames;
+ }
+
/**
* Construct SQL query and arguments for prepare statement.
*
@@ -70,9 +102,52 @@ abstract class AbstractClientView<T> implements
CriteriaQuerySource<T> {
.build();
}
+ /**
+ * Create conversion function for objects contained by result set to
criteria query objects.
+ *
+ * @param meta Result set columns' metadata.
+ * @param schema Schema.
+ * @return Conversion function (if {@code null} conversions isn't
required).
+ */
+ protected @Nullable Function<SqlRow, T> queryMapper(ResultSetMetadata
meta, ClientSchema schema) {
+ return null;
+ }
+
/** {@inheritDoc} */
@Override
public Cursor<T> query(@Nullable Transaction tx, @Nullable Criteria
criteria, @Nullable CriteriaQueryOptions opts) {
return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts)));
}
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<AsyncCursor<T>> queryAsync(
+ @Nullable Transaction tx,
+ @Nullable Criteria criteria,
+ @Nullable CriteriaQueryOptions opts
+ ) {
+ CriteriaQueryOptions opts0 = opts == null ?
CriteriaQueryOptions.DEFAULT : opts;
+
+ return tbl.getLatestSchema()
+ .thenCompose((schema) -> {
+ SqlSerializer ser = createSqlSerializer(tbl.name(),
schema.columns(), criteria);
+
+ Statement statement = new
ClientStatementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
+ Session session = new
ClientSessionBuilder(tbl.channel()).build();
+
+ return session.executeAsync(tx, statement,
ser.getArguments())
+ .<AsyncCursor<T>>thenApply(resultSet -> {
+ ResultSetMetadata meta = resultSet.metadata();
+
+ assert meta != null : "Metadata can't be
null.";
+
+ return new
QueryCriteriaAsyncCursor<>(resultSet, queryMapper(meta, schema),
session::closeAsync);
+ })
+ .exceptionally(th -> {
+ session.closeAsync();
+
+ throw new CompletionException(unwrapCause(th));
+ });
+ });
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
index 72cee01445..6ac0eeab30 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
@@ -29,11 +29,16 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.NullableValue;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
@@ -46,10 +51,7 @@ import org.jetbrains.annotations.Nullable;
* <p>NB: Binary view doesn't allow null tuples. Methods return either a tuple
that represents the value, or {@code null} if no value
* exists for the given key.
*/
-public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
- /** Underlying table. */
- private final ClientTable tbl;
-
+public class ClientKeyValueBinaryView extends AbstractClientView<Entry<Tuple,
Tuple>> implements KeyValueView<Tuple, Tuple> {
/** Tuple serializer. */
private final ClientTupleSerializer ser;
@@ -59,9 +61,8 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
* @param tbl Table.
*/
public ClientKeyValueBinaryView(ClientTable tbl) {
- assert tbl != null;
+ super(tbl);
- this.tbl = tbl;
ser = new ClientTupleSerializer(tbl.tableId());
}
@@ -461,4 +462,13 @@ public class ClientKeyValueBinaryView implements
KeyValueView<Tuple, Tuple> {
return ClientDataStreamer.streamData(publisher, opts, batchSender,
provider, tbl);
}
+
+ /** {@inheritDoc} */
+ @Override
+ protected Function<SqlRow, Entry<Tuple, Tuple>>
queryMapper(ResultSetMetadata meta, ClientSchema schema) {
+ String[] keyCols = columnNames(schema.columns(), 0,
schema.keyColumnCount());
+ String[] valCols = columnNames(schema.columns(),
schema.keyColumnCount(), schema.columns().length);
+
+ return (row) -> new IgniteBiTuple<>(new SqlRowProjection(row, meta,
keyCols), new SqlRowProjection(row, meta, valCols));
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
index 2bc3c9ccf6..52765f1a3d 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
@@ -34,6 +34,7 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
@@ -41,14 +42,19 @@ import
org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.TuplePart;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.marshaller.ClientMarshallerReader;
import org.apache.ignite.internal.marshaller.ClientMarshallerWriter;
import org.apache.ignite.internal.marshaller.Marshaller;
import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.TupleReader;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NullableValue;
import org.apache.ignite.lang.UnexpectedNullValueException;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.mapper.Mapper;
@@ -58,10 +64,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Client key-value view implementation.
*/
-public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
- /** Underlying table. */
- private final ClientTable tbl;
-
+public class ClientKeyValueView<K, V> extends AbstractClientView<Entry<K, V>>
implements KeyValueView<K, V> {
/** Key serializer. */
private final ClientRecordSerializer<K> keySer;
@@ -76,12 +79,11 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
* @param valMapper value mapper.
*/
public ClientKeyValueView(ClientTable tbl, Mapper<K> keyMapper, Mapper<V>
valMapper) {
- assert tbl != null;
+ super(tbl);
+
assert keyMapper != null;
assert valMapper != null;
- this.tbl = tbl;
-
keySer = new ClientRecordSerializer<>(tbl.tableId(), keyMapper);
valSer = new ClientRecordSerializer<>(tbl.tableId(), valMapper);
}
@@ -544,6 +546,27 @@ public class ClientKeyValueView<K, V> implements
KeyValueView<K, V> {
return ClientDataStreamer.streamData(publisher, opts, batchSender,
provider, tbl);
}
+ /** {@inheritDoc} */
+ @Override
+ protected Function<SqlRow, Entry<K, V>> queryMapper(ResultSetMetadata
meta, ClientSchema schema) {
+ String[] keyCols = columnNames(schema.columns(), 0,
schema.keyColumnCount());
+ String[] valCols = columnNames(schema.columns(),
schema.keyColumnCount(), schema.columns().length);
+
+ Marshaller keyMarsh = schema.getMarshaller(keySer.mapper(),
TuplePart.KEY, true);
+ Marshaller valMarsh = schema.getMarshaller(valSer.mapper(),
TuplePart.VAL, true);
+
+ return (row) -> {
+ try {
+ return new IgniteBiTuple<>(
+ (K) keyMarsh.readObject(new TupleReader(new
SqlRowProjection(row, meta, keyCols)), null),
+ (V) valMarsh.readObject(new TupleReader(new
SqlRowProjection(row, meta, valCols)), null)
+ );
+ } catch (MarshallerException e) {
+ throw new org.apache.ignite.lang.MarshallerException(e);
+ }
+ };
+ }
+
private static <T> T throwIfNull(T obj) {
if (obj == null) {
throw new UnexpectedNullValueException("Got unexpected null value:
use `getNullable` sibling method instead.");
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
index 5fb64df33d..a7dec2c150 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
@@ -29,19 +29,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.internal.client.proto.ClientOp;
-import org.apache.ignite.internal.client.sql.ClientSessionBuilder;
-import org.apache.ignite.internal.client.sql.ClientStatementBuilder;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
-import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
-import org.apache.ignite.internal.table.criteria.SqlSerializer;
-import org.apache.ignite.lang.AsyncCursor;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
-import org.apache.ignite.table.criteria.Criteria;
-import org.apache.ignite.table.criteria.CriteriaQueryOptions;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -384,25 +375,4 @@ public class ClientRecordBinaryView extends
AbstractClientView<Tuple> implements
return ClientDataStreamer.streamData(publisher, opts, batchSender,
provider, tbl);
}
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<AsyncCursor<Tuple>> queryAsync(
- @Nullable Transaction tx,
- @Nullable Criteria criteria,
- @Nullable CriteriaQueryOptions opts
- ) {
- var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts;
-
- return tbl.getLatestSchema()
- .thenCompose((schema) -> {
- SqlSerializer ser = createSqlSerializer(tbl.name(),
schema.columns(), criteria);
-
- Statement statement = new
ClientStatementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
- Session session = new
ClientSessionBuilder(tbl.channel()).build();
-
- return session.executeAsync(tx, statement,
ser.getArguments())
- .thenApply(resultSet -> new
QueryCriteriaAsyncCursor<>(resultSet, session::close));
- });
- }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
index 37a2507e71..7fa6994844 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
@@ -27,21 +27,19 @@ import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
import org.apache.ignite.client.RetryLimitPolicy;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.TuplePart;
-import org.apache.ignite.internal.client.sql.ClientSessionBuilder;
-import org.apache.ignite.internal.client.sql.ClientStatementBuilder;
+import org.apache.ignite.internal.marshaller.Marshaller;
+import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.TupleReader;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
-import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
-import org.apache.ignite.internal.table.criteria.SqlSerializer;
-import org.apache.ignite.lang.AsyncCursor;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.RecordView;
-import org.apache.ignite.table.criteria.Criteria;
-import org.apache.ignite.table.criteria.CriteriaQueryOptions;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -385,22 +383,16 @@ public class ClientRecordView<R> extends
AbstractClientView<R> implements Record
/** {@inheritDoc} */
@Override
- public CompletableFuture<AsyncCursor<R>> queryAsync(
- @Nullable Transaction tx,
- @Nullable Criteria criteria,
- @Nullable CriteriaQueryOptions opts
- ) {
- var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts;
-
- return tbl.getLatestSchema()
- .thenCompose((schema) -> {
- SqlSerializer ser = createSqlSerializer(tbl.name(),
schema.columns(), criteria);
-
- Statement statement = new
ClientStatementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
- Session session = new
ClientSessionBuilder(tbl.channel()).build();
-
- return session.executeAsync(tx, this.ser.mapper(),
statement, ser.getArguments())
- .thenApply(resultSet -> new
QueryCriteriaAsyncCursor<>(resultSet, session::close));
- });
+ protected Function<SqlRow, R> queryMapper(ResultSetMetadata meta,
ClientSchema schema) {
+ String[] cols = columnNames(schema.columns(), 0,
schema.columns().length);
+ Marshaller marsh = schema.getMarshaller(ser.mapper(),
TuplePart.KEY_AND_VAL, true);
+
+ return (row) -> {
+ try {
+ return (R) marsh.readObject(new TupleReader(new
SqlRowProjection(row, meta, cols)), null);
+ } catch (MarshallerException e) {
+ throw new org.apache.ignite.lang.MarshallerException(e);
+ }
+ };
}
}
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
index 24e5f50a96..7c2525f770 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
@@ -44,7 +44,6 @@ import java.util.RandomAccess;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
-import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.ignite.configuration.ConfigurationProperty;
import
org.apache.ignite.configuration.ConfigurationWrongPolymorphicTypeIdException;
@@ -82,9 +81,6 @@ public class ConfigurationUtil {
public static final ConfigurationSource EMPTY_CFG_SRC = new
ConfigurationSource() {
};
- /** Special object for determining that there is no next element. */
- private static final Object NO_NEXT_ELEMENT = new Object();
-
/**
* Seperator string for both public and internal representations of
configuration keys.
*/
@@ -945,73 +941,6 @@ public class ConfigurationUtil {
};
}
- /**
- * Maps iterable via provided mapper function.
- *
- * @param iterable Basic iterable.
- * @param mapper Conversion function.
- * @param predicate Predicate to apply to each element of basic iterable.
- * @param <T1> Base type of the iterable.
- * @param <T2> Type for view.
- * @return Mapped iterable.
- */
- public static <T1, T2> Iterable<T2> mapIterable(
- @Nullable Iterable<? extends T1> iterable,
- @Nullable Function<? super T1, ? extends T2> mapper,
- @Nullable Predicate<? super T1> predicate
- ) {
- if (iterable == null) {
- return Collections.emptyList();
- }
-
- if (mapper == null && predicate == null) {
- return (Iterable<T2>) iterable;
- }
-
- return new Iterable<>() {
- @Override
- public Iterator<T2> iterator() {
- Iterator<? extends T1> innerIterator = iterable.iterator();
- return new Iterator<>() {
- @Nullable
- T1 current = advance();
-
- /** {@inheritDoc} */
- @Override
- public boolean hasNext() {
- return current != NO_NEXT_ELEMENT;
- }
-
- /** {@inheritDoc} */
- @Override
- public T2 next() {
- T1 current = this.current;
-
- if (current == NO_NEXT_ELEMENT) {
- throw new NoSuchElementException();
- }
-
- this.current = advance();
-
- return mapper == null ? (T2) current :
mapper.apply(current);
- }
-
- private @Nullable T1 advance() {
- while (innerIterator.hasNext()) {
- T1 next = innerIterator.next();
-
- if (predicate == null || predicate.test(next)) {
- return next;
- }
- }
-
- return (T1) NO_NEXT_ELEMENT;
- }
- };
- }
- };
- }
-
/**
* Leaf configuration source.
*/
diff --git
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationUtilTest.java
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationUtilTest.java
index 2ac4b6decf..dfa29bd5d8 100644
---
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationUtilTest.java
+++
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationUtilTest.java
@@ -46,7 +46,6 @@ import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -63,7 +62,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
-import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.UUID;
@@ -1167,62 +1165,6 @@ public class ConfigurationUtilTest {
assertFalse(iterator.hasNext());
}
- @Test
- void testMapIterableWithPredicate() {
- assertFalse(mapIterable(null, null, null).iterator().hasNext());
- assertFalse(mapIterable(emptyList(), null, null).iterator().hasNext());
-
- assertEquals(List.of(1), collect(mapIterable(List.of(1), null, null)));
- assertEquals(List.of(1), collect(mapIterable(List.of(1), identity(),
null)));
- assertEquals(List.of(1), collect(mapIterable(List.of(1), null, integer
-> true)));
- assertEquals(List.of(1), collect(mapIterable(List.of(1), identity(),
integer -> true)));
- assertEquals(List.of(), collect(mapIterable(List.of(1), null, integer
-> false)));
- assertEquals(List.of(), collect(mapIterable(List.of(1), identity(),
integer -> false)));
-
- assertEquals(List.of("1", "2", "3"), collect(mapIterable(List.of(1, 2,
3), String::valueOf, null)));
- assertEquals(List.of("3"), collect(mapIterable(List.of(1, 2, 3),
String::valueOf, integer -> integer > 2)));
-
- Iterator<String> iterator1 = mapIterable(List.of(1, 2, 3, 4),
String::valueOf, integer -> true).iterator();
- assertEquals("1", iterator1.next());
- assertEquals("2", iterator1.next());
- assertEquals("3", iterator1.next());
- assertEquals("4", iterator1.next());
-
- Iterator<String> iterator2 = mapIterable(List.of(1, 2, 3, 4),
String::valueOf, null).iterator();
- assertEquals("1", iterator2.next());
- assertEquals("2", iterator2.next());
- assertEquals("3", iterator2.next());
- assertEquals("4", iterator2.next());
-
- Iterator<Integer> iterator3 = mapIterable(List.of(1, 2, 3, 4),
identity(), integer -> integer < 3).iterator();
- assertEquals(1, iterator3.next());
- assertEquals(2, iterator3.next());
-
- Iterator<Integer> iterator4 = mapIterable(List.of(1, 2, 3, 4),
identity(), integer -> false).iterator();
- assertFalse(iterator4.hasNext());
-
- assertDoesNotThrow(() -> mapIterable(Arrays.asList(new
Integer[]{null}), null, null).iterator().next());
- assertDoesNotThrow(() -> mapIterable(Arrays.asList(new
Integer[]{null}), null, integer -> true).iterator().next());
- assertDoesNotThrow(() -> mapIterable(Arrays.asList(null, 1), null,
null).iterator().next());
- assertDoesNotThrow(() -> mapIterable(Arrays.asList(null, 1), null,
integer -> true).iterator().next());
-
- assertThrows(
- NoSuchElementException.class,
- () -> mapIterable(Arrays.asList(new Integer[]{null}), null,
integer -> false).iterator().next()
- );
-
- assertThrows(
- NoSuchElementException.class,
- () -> {
- Iterator<Object> iterator =
mapIterable(Arrays.asList(null, 1), null, Objects::nonNull).iterator();
- iterator.next();
- iterator.next();
- }
- );
-
- assertThrows(UnsupportedOperationException.class, () ->
mapIterable(List.of(1), null, null).iterator().remove());
- }
-
/**
* Patches super root and returns flat representation of the changes.
Passed {@code superRoot} object will contain patched tree when
* method execution is completed.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java
index 1ee79939e8..1880faaa1a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java
@@ -17,15 +17,27 @@
package org.apache.ignite.internal.table.criteria;
+import static org.apache.ignite.internal.util.CollectionUtils.mapIterable;
+
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import org.apache.ignite.lang.AsyncCursor;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
+import org.jetbrains.annotations.Nullable;
/**
* Wrapper over {@link AsyncResultSet} for criteria queries.
+ *
+ * @param <R> A type of the objects contained by wrapped result set. This will
be either {@link SqlRow} if no explicit mapper is provided
+ * or a particular type defined by supplied mapper.
+ * @param <T> The type of elements returned by this cursor.
*/
-public class QueryCriteriaAsyncCursor<T> implements AsyncCursor<T> {
- private final AsyncResultSet<T> ars;
+public class QueryCriteriaAsyncCursor<T, R> implements AsyncCursor<T> {
+ private final AsyncResultSet<R> ars;
+
+ @Nullable
+ private final Function<R, T> mapper;
private final Runnable closeRun;
@@ -33,17 +45,19 @@ public class QueryCriteriaAsyncCursor<T> implements
AsyncCursor<T> {
* Constructor.
*
* @param ars Asynchronous result set.
- * @param closeRun Callback to be invoked after result is closed.
+ * @param mapper Conversion function for objects contained by result set
(if {@code null}, conversion isn't required).
+ * @param closeRun Callback to be invoked after result set is closed.
*/
- public QueryCriteriaAsyncCursor(AsyncResultSet<? extends T> ars, Runnable
closeRun) {
- this.ars = (AsyncResultSet<T>) ars;
+ public QueryCriteriaAsyncCursor(AsyncResultSet<R> ars, @Nullable
Function<R, T> mapper, Runnable closeRun) {
+ this.ars = ars;
+ this.mapper = mapper;
this.closeRun = closeRun;
}
/** {@inheritDoc} */
@Override
public Iterable<T> currentPage() {
- return ars.currentPage();
+ return mapIterable(ars.currentPage(), mapper, null);
}
/** {@inheritDoc} */
@@ -54,12 +68,14 @@ public class QueryCriteriaAsyncCursor<T> implements
AsyncCursor<T> {
/** {@inheritDoc} */
@Override
- public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
+ public CompletableFuture<? extends AsyncCursor<T>> fetchNextPage() {
return ars.fetchNextPage()
- .whenComplete((v, t) -> {
- if (t == null && !hasMorePages()) {
- closeRun.run();
+ .thenApply((rs) -> {
+ if (!hasMorePages()) {
+ closeAsync();
}
+
+ return new QueryCriteriaAsyncCursor<>(rs, mapper,
closeRun);
});
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlRowProjection.java
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlRowProjection.java
new file mode 100644
index 0000000000..63b09bfd44
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlRowProjection.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.criteria;
+
+import static org.apache.ignite.lang.util.IgniteNameUtils.quoteIfNeeded;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Projection under {@link SqlRow}.
+ */
+public class SqlRowProjection implements Tuple {
+ private final SqlRow row;
+
+ /** Column`s name map. */
+ private final Object2IntMap<String> columnsIndices;
+
+ private final int[] rowIndexMapping;
+
+ /**
+ * Constructor.
+ *
+ * @param row Row data.
+ * @param meta Metadata for query results.
+ * @param cols Target column names.
+ */
+ public SqlRowProjection(SqlRow row, ResultSetMetadata meta, String[] cols)
{
+ this.row = row;
+
+ rowIndexMapping = new int[cols.length];
+ columnsIndices = new Object2IntOpenHashMap<>(cols.length);
+
+ for (int i = 0; i < cols.length; i++) {
+ String quotedColumnName = quoteIfNeeded(cols[i]);
+
+ columnsIndices.put(quotedColumnName, i);
+ rowIndexMapping[i] = meta.indexOf(quotedColumnName);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int columnCount() {
+ return rowIndexMapping.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String columnName(int columnIndex) {
+ return row.columnName(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int columnIndex(String columnName) {
+ return columnsIndices.getOrDefault(columnName, -1);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> @Nullable T valueOrDefault(String columnName, @Nullable T
defaultValue) {
+ return row.valueOrDefault(columnName, defaultValue);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Tuple set(String columnName, @Nullable Object value) {
+ return row.set(columnName, value);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> @Nullable T value(String columnName) throws
IllegalArgumentException {
+ return row.value(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> @Nullable T value(int columnIndex) {
+ return row.value(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean booleanValue(String columnName) {
+ return row.booleanValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean booleanValue(int columnIndex) {
+ return row.booleanValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte byteValue(String columnName) {
+ return row.byteValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte byteValue(int columnIndex) {
+ return row.byteValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public short shortValue(String columnName) {
+ return row.shortValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public short shortValue(int columnIndex) {
+ return row.shortValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int intValue(String columnName) {
+ return row.intValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int intValue(int columnIndex) {
+ return row.intValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long longValue(String columnName) {
+ return row.longValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long longValue(int columnIndex) {
+ return row.longValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float floatValue(String columnName) {
+ return row.floatValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public float floatValue(int columnIndex) {
+ return row.floatValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public double doubleValue(String columnName) {
+ return row.doubleValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public double doubleValue(int columnIndex) {
+ return row.doubleValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String stringValue(String columnName) {
+ return row.stringValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String stringValue(int columnIndex) {
+ return row.stringValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID uuidValue(String columnName) {
+ return row.uuidValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID uuidValue(int columnIndex) {
+ return row.uuidValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BitSet bitmaskValue(String columnName) {
+ return row.bitmaskValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BitSet bitmaskValue(int columnIndex) {
+ return row.bitmaskValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalDate dateValue(String columnName) {
+ return row.dateValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalDate dateValue(int columnIndex) {
+ return row.dateValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalTime timeValue(String columnName) {
+ return row.timeValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalTime timeValue(int columnIndex) {
+ return row.timeValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalDateTime datetimeValue(String columnName) {
+ return row.datetimeValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public LocalDateTime datetimeValue(int columnIndex) {
+ return row.datetimeValue(rowIndexMapping[columnIndex]);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Instant timestampValue(String columnName) {
+ return row.timestampValue(columnName);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Instant timestampValue(int columnIndex) {
+ return row.timestampValue(rowIndexMapping[columnIndex]);
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
index 1b45243724..bfc887cc8c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.table.criteria;
import static org.apache.ignite.internal.util.StringUtils.nullOrBlank;
-import static org.apache.ignite.lang.util.IgniteNameUtils.quote;
+import static org.apache.ignite.lang.util.IgniteNameUtils.quoteIfNeeded;
import java.util.Arrays;
import java.util.Collection;
@@ -157,10 +157,6 @@ public class SqlSerializer implements
CriteriaVisitor<Void> {
}
}
- private static String quoteIfNeeded(String name) {
- return name.chars().allMatch(Character::isUpperCase) ? name :
quote(name);
- }
-
/**
* Builder.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 10602c7458..6bb497517f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import org.jetbrains.annotations.Nullable;
@@ -46,6 +47,9 @@ import org.jetbrains.annotations.Nullable;
* Utility class provides various method to work with collections.
*/
public final class CollectionUtils {
+ /** Special object for determining that there is no next element. */
+ private static final Object NO_NEXT_ELEMENT = new Object();
+
/** Stub. */
private CollectionUtils() {
// No op.
@@ -521,4 +525,71 @@ public final class CollectionUtils {
return Set.copyOf(set);
}
+
+ /**
+ * Maps iterable via provided mapper function.
+ *
+ * @param iterable Basic iterable.
+ * @param mapper Conversion function.
+ * @param predicate Predicate to apply to each element of basic iterable.
+ * @param <T1> Base type of the iterable.
+ * @param <T2> Type for view.
+ * @return Mapped iterable.
+ */
+ public static <T1, T2> Iterable<T2> mapIterable(
+ @Nullable Iterable<? extends T1> iterable,
+ @Nullable Function<? super T1, ? extends T2> mapper,
+ @Nullable Predicate<? super T1> predicate
+ ) {
+ if (iterable == null) {
+ return Collections.emptyList();
+ }
+
+ if (mapper == null && predicate == null) {
+ return (Iterable<T2>) iterable;
+ }
+
+ return new Iterable<>() {
+ @Override
+ public Iterator<T2> iterator() {
+ Iterator<? extends T1> innerIterator = iterable.iterator();
+ return new Iterator<>() {
+ @Nullable
+ T1 current = advance();
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ return current != NO_NEXT_ELEMENT;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T2 next() {
+ T1 current = this.current;
+
+ if (current == NO_NEXT_ELEMENT) {
+ throw new NoSuchElementException();
+ }
+
+ this.current = advance();
+
+ return mapper == null ? (T2) current :
mapper.apply(current);
+ }
+
+ private @Nullable T1 advance() {
+ while (innerIterator.hasNext()) {
+ T1 next = innerIterator.next();
+
+ if (predicate == null || predicate.test(next)) {
+ return next;
+ }
+ }
+
+ return (T1) NO_NEXT_ELEMENT;
+ }
+ };
+ }
+ };
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index acef8dc60c..fd958a7685 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -602,6 +602,17 @@ public final class ExceptionUtils {
return new IgniteException(INTERNAL_ERR, e.getMessage(), e);
}
+ /**
+ * Checks if passed in {@code 'Throwable'} has given class in {@code
'cause'} hierarchy <b>including</b> that throwable itself.
+ *
+ * @param exceptionClass Cause class to check.
+ * @param ex Throwable to check (if {@code null}, {@code false} is
returned).
+ * @return {@code True} if one of the causing exception is an instance of
passed in classes, {@code false} otherwise.
+ */
+ public static boolean isOrCausedBy(Class<? extends Exception>
exceptionClass, @Nullable Throwable ex) {
+ return ex != null && (exceptionClass.isInstance(ex) ||
isOrCausedBy(exceptionClass, ex.getCause()));
+ }
+
/**
* Creates and return a copy of an exception that is a cause of the given
{@code exception}.
* If the original exception does not contain a cause, then the original
exception will be returned.
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
index 4bf160b553..ea30781cad 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
@@ -18,16 +18,20 @@
package org.apache.ignite.internal.util;
import static java.util.Collections.emptyIterator;
+import static java.util.Collections.emptyList;
+import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.util.CollectionUtils.concat;
import static org.apache.ignite.internal.util.CollectionUtils.difference;
import static org.apache.ignite.internal.util.CollectionUtils.intersect;
import static org.apache.ignite.internal.util.CollectionUtils.last;
+import static org.apache.ignite.internal.util.CollectionUtils.mapIterable;
import static org.apache.ignite.internal.util.CollectionUtils.setOf;
import static org.apache.ignite.internal.util.CollectionUtils.union;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -42,6 +46,8 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.Set;
import java.util.Spliterators;
import java.util.stream.StreamSupport;
@@ -291,4 +297,60 @@ public class CollectionUtilsTest {
assertEquals(1, last(List.of(1)));
assertEquals(2, last(List.of(1, 2)));
}
+
+ @Test
+ void testMapIterableWithPredicate() {
+ assertFalse(mapIterable(null, null, null).iterator().hasNext());
+ assertFalse(mapIterable(emptyList(), null, null).iterator().hasNext());
+
+ assertEquals(List.of(1), collect(mapIterable(List.of(1), null, null)));
+ assertEquals(List.of(1), collect(mapIterable(List.of(1), identity(),
null)));
+ assertEquals(List.of(1), collect(mapIterable(List.of(1), null, integer
-> true)));
+ assertEquals(List.of(1), collect(mapIterable(List.of(1), identity(),
integer -> true)));
+ assertEquals(List.of(), collect(mapIterable(List.of(1), null, integer
-> false)));
+ assertEquals(List.of(), collect(mapIterable(List.of(1), identity(),
integer -> false)));
+
+ assertEquals(List.of("1", "2", "3"), collect(mapIterable(List.of(1, 2,
3), String::valueOf, null)));
+ assertEquals(List.of("3"), collect(mapIterable(List.of(1, 2, 3),
String::valueOf, integer -> integer > 2)));
+
+ Iterator<String> iterator1 = mapIterable(List.of(1, 2, 3, 4),
String::valueOf, integer -> true).iterator();
+ assertEquals("1", iterator1.next());
+ assertEquals("2", iterator1.next());
+ assertEquals("3", iterator1.next());
+ assertEquals("4", iterator1.next());
+
+ Iterator<String> iterator2 = mapIterable(List.of(1, 2, 3, 4),
String::valueOf, null).iterator();
+ assertEquals("1", iterator2.next());
+ assertEquals("2", iterator2.next());
+ assertEquals("3", iterator2.next());
+ assertEquals("4", iterator2.next());
+
+ Iterator<Integer> iterator3 = mapIterable(List.of(1, 2, 3, 4),
identity(), integer -> integer < 3).iterator();
+ assertEquals(1, iterator3.next());
+ assertEquals(2, iterator3.next());
+
+ Iterator<Integer> iterator4 = mapIterable(List.of(1, 2, 3, 4),
identity(), integer -> false).iterator();
+ assertFalse(iterator4.hasNext());
+
+ assertDoesNotThrow(() -> mapIterable(Arrays.asList(new
Integer[]{null}), null, null).iterator().next());
+ assertDoesNotThrow(() -> mapIterable(Arrays.asList(new
Integer[]{null}), null, integer -> true).iterator().next());
+ assertDoesNotThrow(() -> mapIterable(Arrays.asList(null, 1), null,
null).iterator().next());
+ assertDoesNotThrow(() -> mapIterable(Arrays.asList(null, 1), null,
integer -> true).iterator().next());
+
+ assertThrows(
+ NoSuchElementException.class,
+ () -> mapIterable(Arrays.asList(new Integer[]{null}), null,
integer -> false).iterator().next()
+ );
+
+ assertThrows(
+ NoSuchElementException.class,
+ () -> {
+ Iterator<Object> iterator =
mapIterable(Arrays.asList(null, 1), null, Objects::nonNull).iterator();
+ iterator.next();
+ iterator.next();
+ }
+ );
+
+ assertThrows(UnsupportedOperationException.class, () ->
mapIterable(List.of(1), null, null).iterator().remove());
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteNameUtilsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteNameUtilsTest.java
index 3219a85891..34afbc4962 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteNameUtilsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteNameUtilsTest.java
@@ -36,7 +36,7 @@ import org.junit.jupiter.params.provider.ValueSource;
public class IgniteNameUtilsTest {
@ParameterizedTest
@CsvSource({
- "foo, FOO", "fOo, FOO", "FOO, FOO", "1o0, 1O0", "@#$, @#$",
+ "foo, FOO", "fOo, FOO", "FOO, FOO", "\"FOO\", FOO", "1o0, 1O0",
"@#$, @#$",
"\"foo\", foo", "\"fOo\", fOo", "\"f.f\", f.f", "\"f\"\"f\", f\"f",
})
public void validSimpleNames(String source, String expected) {
@@ -59,4 +59,13 @@ public class IgniteNameUtilsTest {
equalTo("Fully qualified name is not expected [name=" + source
+ "]"),
containsString("Malformed name [name=" + source))));
}
+
+ @ParameterizedTest
+ @CsvSource({
+ "foo, \"foo\"", "fOo, \"fOo\"", "FOO, FOO", "\"FOO\", \"FOO\"",
"1o0, \"1o0\"", "@#$, @#$",
+ "\"foo\", \"foo\"", "\"fOo\", \"fOo\"", "\"f.f\", \"f.f\""
+ })
+ public void quoteIfNeeded(String source, String expected) {
+ assertThat(IgniteNameUtils.quoteIfNeeded(source), equalTo(expected));
+ }
}
diff --git
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/TupleReader.java
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/TupleReader.java
new file mode 100644
index 0000000000..93e93c5e79
--- /dev/null
+++
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/TupleReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.table.Tuple;
+
+/**
+ * Adapter from a {@link Tuple} to a {@link MarshallerReader}.
+ */
+public class TupleReader implements MarshallerReader {
+ private final Tuple tuple;
+
+ private int index;
+
+ public TupleReader(Tuple tuple) {
+ this(tuple, 0);
+ }
+
+ TupleReader(Tuple tuple, int index) {
+ this.tuple = tuple;
+ this.index = index;
+ }
+
+ @Override
+ public void skipValue() {
+ index++;
+ }
+
+ @Override
+ public boolean readBoolean() {
+ return tuple.booleanValue(index++);
+ }
+
+ @Override
+ public Boolean readBooleanBoxed() {
+ return tuple.value(index++);
+ }
+
+ @Override
+ public byte readByte() {
+ return tuple.byteValue(index++);
+ }
+
+ @Override
+ public Byte readByteBoxed() {
+ return tuple.value(index++);
+ }
+
+ @Override
+ public short readShort() {
+ return tuple.shortValue(index++);
+ }
+
+ @Override
+ public Short readShortBoxed() {
+ return tuple.value(index++);
+ }
+
+ @Override
+ public int readInt() {
+ return tuple.intValue(index++);
+ }
+
+ @Override
+ public Integer readIntBoxed() {
+ return tuple.value(index++);
+ }
+
+ @Override
+ public long readLong() {
+ return tuple.longValue(index++);
+ }
+
+ @Override
+ public Long readLongBoxed() {
+ return tuple.value(index++);
+ }
+
+ @Override
+ public float readFloat() {
+ return tuple.floatValue(index++);
+ }
+
+ @Override
+ public Float readFloatBoxed() {
+ return tuple.value(index++);
+ }
+
+ @Override
+ public double readDouble() {
+ return tuple.doubleValue(index++);
+ }
+
+ @Override
+ public Double readDoubleBoxed() {
+ return tuple.value(index++);
+ }
+
+ @Override
+ public String readString() {
+ return tuple.stringValue(index++);
+ }
+
+ @Override
+ public UUID readUuid() {
+ return tuple.uuidValue(index++);
+ }
+
+ @Override
+ public byte[] readBytes() {
+ return tuple.value(index++);
+ }
+
+ @Override
+ public BitSet readBitSet() {
+ return tuple.bitmaskValue(index++);
+ }
+
+ @Override
+ public BigInteger readBigInt() {
+ return tuple.value(index++);
+ }
+
+ @Override
+ public BigDecimal readBigDecimal(int scale) {
+ return new BigDecimal(tuple.value(index++), scale);
+ }
+
+ @Override
+ public LocalDate readDate() {
+ return tuple.dateValue(index++);
+ }
+
+ @Override
+ public LocalTime readTime() {
+ return tuple.timeValue(index++);
+ }
+
+ @Override
+ public Instant readTimestamp() {
+ return tuple.timestampValue(index++);
+ }
+
+ @Override
+ public LocalDateTime readDateTime() {
+ return tuple.datetimeValue(index++);
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java
index be646bc4a3..754682b0aa 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.table;
import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.matchers.TupleMatcher.tupleValue;
import static org.apache.ignite.lang.util.IgniteNameUtils.quote;
@@ -37,22 +39,27 @@ import static
org.apache.ignite.table.criteria.Criteria.notNullValue;
import static org.apache.ignite.table.criteria.Criteria.nullValue;
import static org.apache.ignite.table.criteria.CriteriaQueryOptions.builder;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Spliterator;
import java.util.function.Function;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.AsyncCursor;
import org.apache.ignite.lang.Cursor;
@@ -62,6 +69,7 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.criteria.CriteriaQuerySource;
import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
@@ -129,19 +137,21 @@ public class ItCriteriaQueryTest extends
ClusterPerClassIntegrationTest {
Table table = CLUSTER.aliveNode().tables().table(TABLE_NAME);
Table clientTable = CLIENT.tables().table(TABLE_NAME);
+ Function<TestObject, Tuple> objMapper = (obj) ->
Tuple.create().set("id", obj.id).set("name", obj.name)
+ .set("salary", obj.salary).set("hash", obj.hash);
+
return Stream.of(
Arguments.of(table.recordView(), identity()),
+ Arguments.of(table.recordView(TestObject.class), objMapper),
Arguments.of(clientTable.recordView(), identity()),
- Arguments.of(clientTable.recordView(TestObject.class),
- (Function<TestObject, Tuple>) (obj) ->
Tuple.create().set("id", obj.id).set("name", obj.name)
- .set("salary", obj.salary).set("hash",
obj.hash))
+ Arguments.of(clientTable.recordView(TestObject.class),
objMapper)
);
}
@ParameterizedTest
@MethodSource
public <T> void testRecordViewQuery(CriteriaQuerySource<T> view,
Function<T, Tuple> mapper) {
- IgniteTestUtils.assertThrows(
+ assertThrows(
IgniteException.class,
() -> view.query(null, columnValue("id", equalTo("2"))),
"Dynamic parameter requires adding explicit type cast"
@@ -223,6 +233,175 @@ public class ItCriteriaQueryTest extends
ClusterPerClassIntegrationTest {
}
}
+ private static Stream<Arguments> testKeyValueView() {
+ Table table = CLUSTER.aliveNode().tables().table(TABLE_NAME);
+ Table clientTable = CLIENT.tables().table(TABLE_NAME);
+
+ Function<Entry<TestObjectKey, TestObject>, Entry<Tuple, Tuple>>
kvMapper = (entry) -> {
+ TestObjectKey key = entry.getKey();
+ TestObject val = entry.getValue();
+
+ return new IgniteBiTuple<>(Tuple.create().set("id", key.id),
Tuple.create().set("name", val.name).set("salary", val.salary)
+ .set("hash", val.hash));
+ };
+
+ return Stream.of(
+ Arguments.of(table.keyValueView(), identity()),
+ Arguments.of(table.keyValueView(TestObjectKey.class,
TestObject.class), kvMapper),
+ Arguments.of(clientTable.keyValueView(), identity()),
+ Arguments.of(clientTable.keyValueView(TestObjectKey.class,
TestObject.class), kvMapper)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ public <T> void testKeyValueView(CriteriaQuerySource<T> view, Function<T,
Entry<Tuple, Tuple>> mapper) {
+ assertThrows(
+ IgniteException.class,
+ () -> view.query(null, columnValue("id", equalTo("2"))),
+ "Dynamic parameter requires adding explicit type cast"
+ );
+
+ Matcher<Tuple> personKey0 = tupleValue("id", is(0));
+ Matcher<Tuple> person0 = allOf(tupleValue("name",
Matchers.nullValue()), tupleValue("salary", is(0.0d)),
+ tupleValue("hash", is("hash0".getBytes())));
+
+ Matcher<Tuple> personKey1 = tupleValue("id", is(1));
+ Matcher<Tuple> person1 = allOf(tupleValue("name", is("name1")),
tupleValue("salary", is(10.0d)),
+ tupleValue("hash", is("hash1".getBytes())));
+
+ Matcher<Tuple> personKey2 = tupleValue("id", is(2));
+ Matcher<Tuple> person2 = allOf(tupleValue("name", is("name2")),
tupleValue("salary", is(20.0d)),
+ tupleValue("hash", is("hash2".getBytes())));
+
+ try (Cursor<T> cur = view.query(null, null)) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(3),
+ hasEntry(personKey0, person0),
+ hasEntry(personKey1, person1),
+ hasEntry(personKey2, person2)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("id", equalTo(2)))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(1),
+ hasEntry(personKey2, person2)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("hash",
equalTo("hash2".getBytes())))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(1),
+ hasEntry(personKey2, person2)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("id",
notEqualTo(2)))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(2),
+ hasEntry(personKey0, person0),
+ hasEntry(personKey1, person1)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("hash",
notEqualTo("hash2".getBytes())))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(2),
+ hasEntry(personKey0, person0),
+ hasEntry(personKey1, person1)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("id",
greaterThan(1)))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(1),
+ hasEntry(personKey2, person2)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("id",
greaterThanOrEqualTo(1)))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(2),
+ hasEntry(personKey1, person1),
+ hasEntry(personKey2, person2)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("id", lessThan(1))))
{
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(1),
+ hasEntry(personKey0, person0)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("id",
lessThanOrEqualTo(1)))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(2),
+ hasEntry(personKey0, person0),
+ hasEntry(personKey1, person1)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("name",
nullValue()))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(1),
+ hasEntry(personKey0, person0)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("name",
notNullValue()))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(2),
+ hasEntry(personKey1, person1),
+ hasEntry(personKey2, person2)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("id", in(1, 2)))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(2),
+ hasEntry(personKey1, person1),
+ hasEntry(personKey2, person2)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("id", notIn(1, 2))))
{
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(1),
+ hasEntry(personKey0, person0)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("hash",
in("hash1".getBytes(), "hash2".getBytes())))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(2),
+ hasEntry(personKey1, person1),
+ hasEntry(personKey2, person2)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("hash", in((byte[])
null)))) {
+ assertThat(mapToTupleMap(cur, mapper), anEmptyMap());
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("hash",
notIn("hash1".getBytes(), "hash2".getBytes())))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(1),
+ hasEntry(personKey0, person0)
+ ));
+ }
+
+ try (Cursor<T> cur = view.query(null, columnValue("hash",
notIn((byte[]) null)))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(3),
+ hasEntry(personKey0, person0),
+ hasEntry(personKey1, person1),
+ hasEntry(personKey2, person2)
+ ));
+ }
+ }
+
@Test
public void testOptions() {
RecordView<TestObject> view =
CLIENT.tables().table(TABLE_NAME).recordView(TestObject.class);
@@ -238,16 +417,18 @@ public class ItCriteriaQueryTest extends
ClusterPerClassIntegrationTest {
Table table = CLUSTER.aliveNode().tables().table(QUOTED_TABLE_NAME);
Table clientTable = CLIENT.tables().table(QUOTED_TABLE_NAME);
- Mapper<QuotedObject> pojoMapper = Mapper.builder(QuotedObject.class)
+ Mapper<QuotedObject> recMapper = Mapper.builder(QuotedObject.class)
.map("colUmn", quote("colUmn"))
.automap()
.build();
+ Function<QuotedObject, Tuple> objMapper = (obj) ->
Tuple.create(Map.of("id", obj.id, quote("colUmn"), obj.colUmn));
+
return Stream.of(
Arguments.of(table.recordView(), identity()),
+ Arguments.of(table.recordView(recMapper), objMapper),
Arguments.of(clientTable.recordView(), identity()),
- Arguments.of(clientTable.recordView(pojoMapper),
- (Function<QuotedObject, Tuple>) (obj) ->
Tuple.create(Map.of("id", obj.id, quote("colUmn"), obj.colUmn)))
+ Arguments.of(clientTable.recordView(recMapper), objMapper)
);
}
@@ -261,17 +442,95 @@ public class ItCriteriaQueryTest extends
ClusterPerClassIntegrationTest {
}
}
+ private static Stream<Arguments> testKeyViewWithQuotes() {
+ Table table = CLUSTER.aliveNode().tables().table(QUOTED_TABLE_NAME);
+ Table clientTable = CLIENT.tables().table(QUOTED_TABLE_NAME);
+
+ Mapper<QuotedObjectKey> keyMapper = Mapper.of(QuotedObjectKey.class);
+ Mapper<QuotedObject> valMapper = Mapper.builder(QuotedObject.class)
+ .map("colUmn", quote("colUmn"))
+ .build();
+
+ Function<Entry<QuotedObjectKey, QuotedObject>, Entry<Tuple, Tuple>>
kvMapper = (entry) ->
+ new IgniteBiTuple<>(Tuple.create(Map.of("id",
entry.getKey().id)), Tuple.create(Map.of(quote("colUmn"),
+ entry.getValue().colUmn)));
+
+ return Stream.of(
+ Arguments.of(table.keyValueView(), identity()),
+ Arguments.of(table.keyValueView(keyMapper, valMapper),
kvMapper),
+ Arguments.of(clientTable.keyValueView(), identity()),
+ Arguments.of(clientTable.keyValueView(keyMapper, valMapper),
kvMapper)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ public <T> void testKeyViewWithQuotes(CriteriaQuerySource<T> view,
Function<T, Entry<Tuple, Tuple>> mapper) {
+ try (Cursor<T> cur = view.query(null, columnValue(quote("colUmn"),
equalTo("name1")))) {
+ assertThat(mapToTupleMap(cur, mapper), allOf(
+ aMapWithSize(1),
+ hasEntry(tupleValue("id", is(1)),
tupleValue(quote("colUmn"), is("name1")))
+ ));
+ }
+ }
+
+ private static Stream<Arguments> testSessionClosing() {
+ Table table = CLUSTER.aliveNode().tables().table(TABLE_NAME);
+
+ Transaction tx = CLUSTER.aliveNode().transactions().begin();
+ tx.rollback();
+
+ return Stream.of(
+ Arguments.of(table.recordView(), tx),
+ Arguments.of(table.recordView(TestObject.class), tx),
+ Arguments.of(table.keyValueView(), tx),
+ Arguments.of(table.keyValueView(TestObjectKey.class,
TestObject.class), tx)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ public void testSessionClosing(CriteriaQuerySource<?> view, Transaction
tx) {
+ int baseSessionsCount = activeSessionsCount();
+
+ assertThrows(
+ IgniteException.class,
+ () -> view.query(tx, columnValue("id", equalTo(2))),
+ "Transaction is already finished"
+ );
+
+ assertEquals(baseSessionsCount, activeSessionsCount());
+ }
+
+ private static int activeSessionsCount() {
+ return ((IgniteSqlImpl) CLUSTER.aliveNode().sql()).sessions().size();
+ }
+
private static <T> List<Tuple> mapToTupleList(Cursor<T> cur, Function<T,
Tuple> mapper) {
return StreamSupport.stream(spliteratorUnknownSize(cur,
Spliterator.ORDERED), false)
.map(mapper)
.collect(toList());
}
+ private static <T, K, V> Map<K, V> mapToTupleMap(Cursor<T> cur,
Function<T, Entry<K, V>> mapper) {
+ return StreamSupport.stream(spliteratorUnknownSize(cur,
Spliterator.ORDERED), false)
+ .map(mapper)
+ .collect(toMap(Entry::getKey, Entry::getValue));
+ }
+
+ static class QuotedObjectKey {
+ int id;
+ }
+
static class QuotedObject {
int id;
String colUmn;
}
+ static class TestObjectKey {
+ int id;
+ }
+
static class TestObject {
int id;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 01406d72c9..66cb9dbe89 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -20,28 +20,43 @@ package org.apache.ignite.internal.table;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.isOrCausedBy;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
-import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.criteria.CursorAdapter;
+import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
import org.apache.ignite.internal.table.criteria.SqlSerializer;
import
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.AsyncCursor;
+import org.apache.ignite.lang.Cursor;
import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
import org.apache.ignite.table.criteria.Criteria;
+import org.apache.ignite.table.criteria.CriteriaQueryOptions;
+import org.apache.ignite.table.criteria.CriteriaQuerySource;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
/**
* Base class for Table views.
*/
-abstract class AbstractTableView {
+abstract class AbstractTableView<R> implements CriteriaQuerySource<R> {
/** Internal table. */
protected final InternalTable tbl;
@@ -136,26 +151,77 @@ abstract class AbstractTableView {
return convertToPublicFuture(future);
}
- private static boolean isOrCausedBy(Class<? extends Exception>
exceptionClass, @Nullable Throwable ex) {
- return ex != null && (exceptionClass.isInstance(ex) ||
isOrCausedBy(exceptionClass, ex.getCause()));
+ /**
+ * Map columns to it's names.
+ *
+ * @param columns Target columns.
+ * @return Column names.
+ */
+ protected static String[] columnNames(Column[] columns) {
+ String[] columnNames = new String[columns.length];
+
+ for (int i = 0; i < columns.length; i++) {
+ columnNames[i] = columns[i].name();
+ }
+
+ return columnNames;
}
/**
- * Construct SQL query and arguments for prepare statement.
+ * Create conversion function for objects contained by result set to
criteria query objects.
*
- * @param tableName Table name.
- * @param columnNames Column names.
- * @param criteria The predicate to filter entries or {@code null} to
return all entries from the underlying table.
- * @return SQL query and it's arguments.
+ * @param meta Result set columns' metadata.
+ * @param schema Schema.
+ * @return Conversion function (if {@code null} conversions isn't
required).
*/
- static SqlSerializer createSqlSerializer(String tableName,
Collection<String> columnNames, @Nullable Criteria criteria) {
- return new SqlSerializer.Builder()
- .tableName(tableName)
- .columns(columnNames)
- .where(criteria)
- .build();
+ protected @Nullable Function<SqlRow, R> queryMapper(ResultSetMetadata
meta, SchemaDescriptor schema) {
+ return null;
}
+ /** {@inheritDoc} */
+ @Override
+ public Cursor<R> query(@Nullable Transaction tx, @Nullable Criteria
criteria, CriteriaQueryOptions opts) {
+ return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts)));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<AsyncCursor<R>> queryAsync(
+ @Nullable Transaction tx,
+ @Nullable Criteria criteria,
+ @Nullable CriteriaQueryOptions opts
+ ) {
+ CriteriaQueryOptions opts0 = opts == null ?
CriteriaQueryOptions.DEFAULT : opts;
+
+ return withSchemaSync(tx, (schemaVersion) -> {
+ SchemaDescriptor schema =
rowConverter.registry().schema(schemaVersion);
+
+ SqlSerializer ser = new SqlSerializer.Builder()
+ .tableName(tbl.name())
+ .columns(schema.columnNames())
+ .where(criteria)
+ .build();
+
+ Statement statement =
sql.statementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
+ Session session = sql.createSession();
+
+ return session.executeAsync(tx, statement, ser.getArguments())
+ .<AsyncCursor<R>>thenApply(resultSet -> {
+ ResultSetMetadata meta = resultSet.metadata();
+
+ assert meta != null : "Metadata can't be null.";
+
+ return new QueryCriteriaAsyncCursor<>(resultSet,
queryMapper(meta, schema), session::closeAsync);
+ })
+ .exceptionally(th -> {
+ session.closeAsync();
+
+ throw new CompletionException(unwrapCause(th));
+ });
+ });
+ }
+
+
/**
* Action representing some KV operation. When executed, the action is
supplied with schema version corresponding
* to the operation timestamp (see {@link #withSchemaSync(Transaction,
KvAction)} for details).
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index 0ad9e64c67..60ff561d8e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -26,12 +26,16 @@ import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -39,6 +43,8 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.lang.NullableValue;
import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
@@ -51,7 +57,7 @@ import org.jetbrains.annotations.Nullable;
* <p>NB: Binary view doesn't allow null tuples. Methods return either a tuple
that represents the value, or {@code null} if no value
* exists for the given key.
*/
-public class KeyValueBinaryViewImpl extends AbstractTableView implements
KeyValueView<Tuple, Tuple> {
+public class KeyValueBinaryViewImpl extends AbstractTableView<Entry<Tuple,
Tuple>> implements KeyValueView<Tuple, Tuple> {
private final TupleMarshallerCache marshallerCache;
/**
@@ -557,4 +563,13 @@ public class KeyValueBinaryViewImpl extends
AbstractTableView implements KeyValu
return rows;
}
+
+ /** {@inheritDoc} */
+ @Override
+ protected Function<SqlRow, Entry<Tuple, Tuple>>
queryMapper(ResultSetMetadata meta, SchemaDescriptor schema) {
+ return (row) -> new IgniteBiTuple<>(
+ new SqlRowProjection(row, meta,
columnNames(schema.keyColumns().columns())),
+ new SqlRowProjection(row, meta,
columnNames(schema.valueColumns().columns()))
+ );
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 51bdfe46b5..d193078f71 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.internal.marshaller.Marshaller.createMarshaller;
+import static
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.toMarshallerColumns;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -27,15 +30,20 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.marshaller.Marshaller;
import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.TupleReader;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
import
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -43,6 +51,8 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NullableValue;
import org.apache.ignite.lang.UnexpectedNullValueException;
import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.mapper.Mapper;
@@ -52,7 +62,13 @@ import org.jetbrains.annotations.Nullable;
/**
* Key-value view implementation.
*/
-public class KeyValueViewImpl<K, V> extends AbstractTableView implements
KeyValueView<K, V> {
+public class KeyValueViewImpl<K, V> extends AbstractTableView<Entry<K, V>>
implements KeyValueView<K, V> {
+ /** Key class mapper. */
+ private final Mapper<K> keyMapper;
+
+ /** Value class mapper. */
+ private final Mapper<V> valueMapper;
+
/** Marshaller factory. */
private final Function<SchemaDescriptor, KvMarshaller<K, V>>
marshallerFactory;
@@ -79,6 +95,9 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView
implements KeyValu
) {
super(tbl, schemaVersions, schemaRegistry, sql);
+ this.keyMapper = keyMapper;
+ this.valueMapper = valueMapper;
+
marshallerFactory = (schema) -> new KvMarshallerImpl<>(schema,
keyMapper, valueMapper);
}
@@ -677,4 +696,25 @@ public class KeyValueViewImpl<K, V> extends
AbstractTableView implements KeyValu
return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
}
+
+ /** {@inheritDoc} */
+ @Override
+ protected Function<SqlRow, Entry<K, V>> queryMapper(ResultSetMetadata
meta, SchemaDescriptor schema) {
+ Column[] keyCols = schema.keyColumns().columns();
+ Column[] valCols = schema.valueColumns().columns();
+
+ Marshaller keyMarsh = createMarshaller(toMarshallerColumns(keyCols),
keyMapper, false, true);
+ Marshaller valMarsh = createMarshaller(toMarshallerColumns(valCols),
valueMapper, false, true);
+
+ return (row) -> {
+ try {
+ return new IgniteBiTuple<>(
+ (K) keyMarsh.readObject(new TupleReader(new
SqlRowProjection(row, meta, columnNames(keyCols))), null),
+ (V) valMarsh.readObject(new TupleReader(new
SqlRowProjection(row, meta, columnNames(valCols))), null)
+ );
+ } catch (MarshallerException e) {
+ throw new org.apache.ignite.lang.MarshallerException(e);
+ }
+ };
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 12368105c9..422c99a46f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -26,36 +26,26 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
-import org.apache.ignite.internal.table.criteria.CursorAdapter;
-import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
-import org.apache.ignite.internal.table.criteria.SqlSerializer;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.lang.AsyncCursor;
-import org.apache.ignite.lang.Cursor;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
-import org.apache.ignite.table.criteria.Criteria;
-import org.apache.ignite.table.criteria.CriteriaQueryOptions;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
/**
* Table view implementation for binary objects.
*/
-public class RecordBinaryViewImpl extends AbstractTableView implements
RecordView<Tuple> {
+public class RecordBinaryViewImpl extends AbstractTableView<Tuple> implements
RecordView<Tuple> {
private final TupleMarshallerCache marshallerCache;
/**
@@ -446,31 +436,4 @@ public class RecordBinaryViewImpl extends
AbstractTableView implements RecordVie
return DataStreamer.streamData(publisher, options, batchSender,
partitioner);
}
-
- /** {@inheritDoc} */
- @Override
- public Cursor<Tuple> query(@Nullable Transaction tx, @Nullable Criteria
criteria, @Nullable CriteriaQueryOptions opts) {
- return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts)));
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<AsyncCursor<Tuple>> queryAsync(
- @Nullable Transaction tx,
- @Nullable Criteria criteria,
- @Nullable CriteriaQueryOptions opts
- ) {
- var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts;
-
- return withSchemaSync(tx, (schemaVersion) -> {
- SchemaDescriptor schema =
rowConverter.registry().schema(schemaVersion);
- SqlSerializer ser = createSqlSerializer(tbl.name(),
schema.columnNames(), criteria);
-
- Statement statement =
sql.statementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
- Session session = sql.createSession();
-
- return session.executeAsync(tx, statement, ser.getArguments())
- .thenApply(resultSet -> new
QueryCriteriaAsyncCursor<>(resultSet, session::close));
- });
- }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 4b6e8b0ed6..92ba1f71df 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.table;
+import static
org.apache.ignite.internal.marshaller.Marshaller.createMarshaller;
+import static
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.toMarshallerColumns;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -25,29 +28,27 @@ import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.function.Function;
+import org.apache.ignite.internal.marshaller.Marshaller;
+import org.apache.ignite.internal.marshaller.TupleReader;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.RecordMarshaller;
import
org.apache.ignite.internal.schema.marshaller.reflection.RecordMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.streamer.StreamerBatchSender;
-import org.apache.ignite.internal.table.criteria.CursorAdapter;
-import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
-import org.apache.ignite.internal.table.criteria.SqlSerializer;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.lang.AsyncCursor;
-import org.apache.ignite.lang.Cursor;
+import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.DataStreamerOptions;
import org.apache.ignite.table.RecordView;
-import org.apache.ignite.table.criteria.Criteria;
-import org.apache.ignite.table.criteria.CriteriaQueryOptions;
import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -55,7 +56,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Record view implementation.
*/
-public class RecordViewImpl<R> extends AbstractTableView implements
RecordView<R> {
+public class RecordViewImpl<R> extends AbstractTableView<R> implements
RecordView<R> {
/** Record class mapper. */
private final Mapper<R> mapper;
@@ -541,28 +542,16 @@ public class RecordViewImpl<R> extends AbstractTableView
implements RecordView<R
/** {@inheritDoc} */
@Override
- public Cursor<R> query(@Nullable Transaction tx, @Nullable Criteria
criteria, @Nullable CriteriaQueryOptions opts) {
- return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts)));
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<AsyncCursor<R>> queryAsync(
- @Nullable Transaction tx,
- @Nullable Criteria criteria,
- @Nullable CriteriaQueryOptions opts
- ) {
- var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts;
+ protected Function<SqlRow, R> queryMapper(ResultSetMetadata meta,
SchemaDescriptor schema) {
+ Column[] cols = ArrayUtils.concat(schema.keyColumns().columns(),
schema.valueColumns().columns());
+ Marshaller marsh = createMarshaller(toMarshallerColumns(cols), mapper,
false, true);
- return withSchemaSync(tx, (schemaVersion) -> {
- SchemaDescriptor schema =
rowConverter.registry().schema(schemaVersion);
- SqlSerializer ser = createSqlSerializer(tbl.name(),
schema.columnNames(), criteria);
-
- Statement statement =
sql.statementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
- Session session = sql.createSession();
-
- return session.executeAsync(tx, mapper, statement,
ser.getArguments())
- .thenApply(resultSet -> new
QueryCriteriaAsyncCursor<>(resultSet, session::close));
- });
+ return (row) -> {
+ try {
+ return (R) marsh.readObject(new TupleReader(new
SqlRowProjection(row, meta, columnNames(cols))), null);
+ } catch (org.apache.ignite.internal.marshaller.MarshallerException
e) {
+ throw new MarshallerException(e);
+ }
+ };
}
}