This is an automated email from the ASF dual-hosted git repository.

anovikov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 151585bdc2 IGNITE-20977 Basic criteria queries for keyValue view 
(#2934)
151585bdc2 is described below

commit 151585bdc23f6571ae6014d365ad2ff2a480926d
Author: Andrey Novikov <[email protected]>
AuthorDate: Thu Jan 18 14:40:19 2024 +0700

    IGNITE-20977 Basic criteria queries for keyValue view (#2934)
---
 .../apache/ignite/lang/util/IgniteNameUtils.java   |  13 +
 .../java/org/apache/ignite/table/KeyValueView.java |   3 +-
 .../internal/client/table/AbstractClientView.java  |  75 ++++++
 .../client/table/ClientKeyValueBinaryView.java     |  22 +-
 .../internal/client/table/ClientKeyValueView.java  |  37 ++-
 .../client/table/ClientRecordBinaryView.java       |  30 ---
 .../internal/client/table/ClientRecordView.java    |  44 ++--
 .../configuration/util/ConfigurationUtil.java      |  71 ------
 .../configuration/util/ConfigurationUtilTest.java  |  58 -----
 .../table/criteria/QueryCriteriaAsyncCursor.java   |  36 ++-
 .../internal/table/criteria/SqlRowProjection.java  | 276 +++++++++++++++++++++
 .../internal/table/criteria/SqlSerializer.java     |   6 +-
 .../ignite/internal/util/CollectionUtils.java      |  71 ++++++
 .../ignite/internal/util/ExceptionUtils.java       |  11 +
 .../ignite/internal/util/CollectionUtilsTest.java  |  62 +++++
 .../ignite/internal/util/IgniteNameUtilsTest.java  |  11 +-
 .../ignite/internal/marshaller/TupleReader.java    | 171 +++++++++++++
 .../ignite/internal/table/ItCriteriaQueryTest.java | 275 +++++++++++++++++++-
 .../ignite/internal/table/AbstractTableView.java   |  96 +++++--
 .../internal/table/KeyValueBinaryViewImpl.java     |  17 +-
 .../ignite/internal/table/KeyValueViewImpl.java    |  42 +++-
 .../internal/table/RecordBinaryViewImpl.java       |  39 +--
 .../ignite/internal/table/RecordViewImpl.java      |  53 ++--
 23 files changed, 1209 insertions(+), 310 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/lang/util/IgniteNameUtils.java 
b/modules/api/src/main/java/org/apache/ignite/lang/util/IgniteNameUtils.java
index 22d2ccb580..e13954f072 100644
--- a/modules/api/src/main/java/org/apache/ignite/lang/util/IgniteNameUtils.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/util/IgniteNameUtils.java
@@ -71,6 +71,19 @@ public final class IgniteNameUtils {
         return "\"" + name + "\"";
     }
 
+    /**
+     * Wraps the given name with double quotes if it not upper case not-quoted 
name,
+     *     e.g. "myColumn" -&gt; "\"myColumn\"", "MYCOLUMN" -&gt; "MYCOLUMN"
+     *
+     * @param name Object name.
+     * @return Quoted object name.
+     */
+    public static String quoteIfNeeded(String name) {
+        String simpleName = parseSimpleName(name);
+
+        return name.equals(simpleName) || name.equals(quote(simpleName)) ? 
name : quote(name);
+    }
+
     /**
      * Identifier chain tokenizer.
      *
diff --git 
a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java 
b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
index 1d3137bedd..b1f5923df3 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.lang.MarshallerException;
 import org.apache.ignite.lang.NullableValue;
 import org.apache.ignite.lang.UnexpectedNullValueException;
+import org.apache.ignite.table.criteria.CriteriaQuerySource;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -35,7 +36,7 @@ import org.jetbrains.annotations.Nullable;
  * @apiNote 'Key/value class field' &gt;-&lt; 'table column' mapping laid down 
in implementation.
  * @see org.apache.ignite.table.mapper.Mapper
  */
-public interface KeyValueView<K, V> extends DataStreamerTarget<Entry<K, V>> {
+public interface KeyValueView<K, V> extends DataStreamerTarget<Entry<K, V>>, 
CriteriaQuerySource<Entry<K, V>> {
     /**
      * Gets a value associated with a given key.
      *
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
index c7eabdb65e..d665f5b096 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/AbstractClientView.java
@@ -19,13 +19,25 @@ package org.apache.ignite.internal.client.table;
 
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.internal.client.ClientUtils.sync;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.lang.util.IgniteNameUtils.parseSimpleName;
 
 import java.util.Arrays;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.function.Function;
+import org.apache.ignite.internal.client.sql.ClientSessionBuilder;
+import org.apache.ignite.internal.client.sql.ClientStatementBuilder;
 import org.apache.ignite.internal.table.criteria.CursorAdapter;
+import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
 import org.apache.ignite.internal.table.criteria.SqlSerializer;
+import org.apache.ignite.lang.AsyncCursor;
 import org.apache.ignite.lang.Cursor;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
 import org.apache.ignite.table.criteria.Criteria;
 import org.apache.ignite.table.criteria.CriteriaQueryOptions;
 import org.apache.ignite.table.criteria.CriteriaQuerySource;
@@ -50,6 +62,26 @@ abstract class AbstractClientView<T> implements 
CriteriaQuerySource<T> {
         this.tbl = tbl;
     }
 
+    /**
+     * Map columns to it's names.
+     *
+     * @param columns Target columns.
+     * @param startInclusive The first index to cover.
+     * @param endExclusive Index immediately past the last index to cover.
+     * @return Column names.
+     */
+    protected static String[] columnNames(ClientColumn[] columns, int 
startInclusive, int endExclusive) {
+        int sz = endExclusive - startInclusive;
+
+        String[] columnNames = new String[sz];
+
+        for (int i = 0; i < sz; i++) {
+            columnNames[i] = columns[startInclusive + i].name();
+        }
+
+        return columnNames;
+    }
+
     /**
      * Construct SQL query and arguments for prepare statement.
      *
@@ -70,9 +102,52 @@ abstract class AbstractClientView<T> implements 
CriteriaQuerySource<T> {
                 .build();
     }
 
+    /**
+     * Create conversion function for objects contained by result set to 
criteria query objects.
+     *
+     * @param meta Result set columns' metadata.
+     * @param schema Schema.
+     * @return Conversion function (if {@code null} conversions isn't 
required).
+     */
+    protected @Nullable Function<SqlRow, T> queryMapper(ResultSetMetadata 
meta, ClientSchema schema) {
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Override
     public Cursor<T> query(@Nullable Transaction tx, @Nullable Criteria 
criteria, @Nullable CriteriaQueryOptions opts) {
         return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts)));
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncCursor<T>> queryAsync(
+            @Nullable Transaction tx,
+            @Nullable Criteria criteria,
+            @Nullable CriteriaQueryOptions opts
+    ) {
+        CriteriaQueryOptions opts0 = opts == null ? 
CriteriaQueryOptions.DEFAULT : opts;
+
+        return tbl.getLatestSchema()
+                .thenCompose((schema) -> {
+                    SqlSerializer ser = createSqlSerializer(tbl.name(), 
schema.columns(), criteria);
+
+                    Statement statement = new 
ClientStatementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
+                    Session session = new 
ClientSessionBuilder(tbl.channel()).build();
+
+                    return session.executeAsync(tx, statement, 
ser.getArguments())
+                            .<AsyncCursor<T>>thenApply(resultSet -> {
+                                ResultSetMetadata meta = resultSet.metadata();
+
+                                assert meta != null : "Metadata can't be 
null.";
+
+                                return new 
QueryCriteriaAsyncCursor<>(resultSet, queryMapper(meta, schema), 
session::closeAsync);
+                            })
+                            .exceptionally(th -> {
+                                session.closeAsync();
+
+                                throw new CompletionException(unwrapCause(th));
+                            });
+                });
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
index 72cee01445..6ac0eeab30 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueBinaryView.java
@@ -29,11 +29,16 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
 import org.apache.ignite.client.RetryLimitPolicy;
 import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.NullableValue;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
@@ -46,10 +51,7 @@ import org.jetbrains.annotations.Nullable;
  * <p>NB: Binary view doesn't allow null tuples. Methods return either a tuple 
that represents the value, or {@code null} if no value
  * exists for the given key.
  */
-public class ClientKeyValueBinaryView implements KeyValueView<Tuple, Tuple> {
-    /** Underlying table. */
-    private final ClientTable tbl;
-
+public class ClientKeyValueBinaryView extends AbstractClientView<Entry<Tuple, 
Tuple>> implements KeyValueView<Tuple, Tuple> {
     /** Tuple serializer. */
     private final ClientTupleSerializer ser;
 
@@ -59,9 +61,8 @@ public class ClientKeyValueBinaryView implements 
KeyValueView<Tuple, Tuple> {
      * @param tbl Table.
      */
     public ClientKeyValueBinaryView(ClientTable tbl) {
-        assert tbl != null;
+        super(tbl);
 
-        this.tbl = tbl;
         ser = new ClientTupleSerializer(tbl.tableId());
     }
 
@@ -461,4 +462,13 @@ public class ClientKeyValueBinaryView implements 
KeyValueView<Tuple, Tuple> {
 
         return ClientDataStreamer.streamData(publisher, opts, batchSender, 
provider, tbl);
     }
+
+    /** {@inheritDoc} */
+    @Override
+    protected Function<SqlRow, Entry<Tuple, Tuple>> 
queryMapper(ResultSetMetadata meta, ClientSchema schema) {
+        String[] keyCols = columnNames(schema.columns(), 0, 
schema.keyColumnCount());
+        String[] valCols = columnNames(schema.columns(), 
schema.keyColumnCount(), schema.columns().length);
+
+        return (row) -> new IgniteBiTuple<>(new SqlRowProjection(row, meta, 
keyCols), new SqlRowProjection(row, meta, valCols));
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
index 2bc3c9ccf6..52765f1a3d 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientKeyValueView.java
@@ -34,6 +34,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
 import org.apache.ignite.client.RetryLimitPolicy;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
@@ -41,14 +42,19 @@ import 
org.apache.ignite.internal.client.PayloadInputChannel;
 import org.apache.ignite.internal.client.PayloadOutputChannel;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.TuplePart;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.marshaller.ClientMarshallerReader;
 import org.apache.ignite.internal.marshaller.ClientMarshallerWriter;
 import org.apache.ignite.internal.marshaller.Marshaller;
 import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.TupleReader;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NullableValue;
 import org.apache.ignite.lang.UnexpectedNullValueException;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.mapper.Mapper;
@@ -58,10 +64,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Client key-value view implementation.
  */
-public class ClientKeyValueView<K, V> implements KeyValueView<K, V> {
-    /** Underlying table. */
-    private final ClientTable tbl;
-
+public class ClientKeyValueView<K, V> extends AbstractClientView<Entry<K, V>> 
implements KeyValueView<K, V> {
     /** Key serializer.  */
     private final ClientRecordSerializer<K> keySer;
 
@@ -76,12 +79,11 @@ public class ClientKeyValueView<K, V> implements 
KeyValueView<K, V> {
      * @param valMapper value mapper.
      */
     public ClientKeyValueView(ClientTable tbl, Mapper<K> keyMapper, Mapper<V> 
valMapper) {
-        assert tbl != null;
+        super(tbl);
+
         assert keyMapper != null;
         assert valMapper != null;
 
-        this.tbl = tbl;
-
         keySer = new ClientRecordSerializer<>(tbl.tableId(), keyMapper);
         valSer = new ClientRecordSerializer<>(tbl.tableId(), valMapper);
     }
@@ -544,6 +546,27 @@ public class ClientKeyValueView<K, V> implements 
KeyValueView<K, V> {
         return ClientDataStreamer.streamData(publisher, opts, batchSender, 
provider, tbl);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    protected Function<SqlRow, Entry<K, V>> queryMapper(ResultSetMetadata 
meta, ClientSchema schema) {
+        String[] keyCols = columnNames(schema.columns(), 0, 
schema.keyColumnCount());
+        String[] valCols = columnNames(schema.columns(), 
schema.keyColumnCount(), schema.columns().length);
+
+        Marshaller keyMarsh = schema.getMarshaller(keySer.mapper(), 
TuplePart.KEY, true);
+        Marshaller valMarsh = schema.getMarshaller(valSer.mapper(), 
TuplePart.VAL, true);
+
+        return (row) -> {
+            try {
+                return new IgniteBiTuple<>(
+                        (K) keyMarsh.readObject(new TupleReader(new 
SqlRowProjection(row, meta, keyCols)), null),
+                        (V) valMarsh.readObject(new TupleReader(new 
SqlRowProjection(row, meta, valCols)), null)
+                );
+            } catch (MarshallerException e) {
+                throw new org.apache.ignite.lang.MarshallerException(e);
+            }
+        };
+    }
+
     private static <T> T throwIfNull(T obj) {
         if (obj == null) {
             throw new UnexpectedNullValueException("Got unexpected null value: 
use `getNullable` sibling method instead.");
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
index 5fb64df33d..a7dec2c150 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordBinaryView.java
@@ -29,19 +29,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.client.RetryLimitPolicy;
 import org.apache.ignite.internal.client.proto.ClientOp;
-import org.apache.ignite.internal.client.sql.ClientSessionBuilder;
-import org.apache.ignite.internal.client.sql.ClientStatementBuilder;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
-import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
-import org.apache.ignite.internal.table.criteria.SqlSerializer;
-import org.apache.ignite.lang.AsyncCursor;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
-import org.apache.ignite.table.criteria.Criteria;
-import org.apache.ignite.table.criteria.CriteriaQueryOptions;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -384,25 +375,4 @@ public class ClientRecordBinaryView extends 
AbstractClientView<Tuple> implements
 
         return ClientDataStreamer.streamData(publisher, opts, batchSender, 
provider, tbl);
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<AsyncCursor<Tuple>> queryAsync(
-            @Nullable Transaction tx,
-            @Nullable Criteria criteria,
-            @Nullable CriteriaQueryOptions opts
-    ) {
-        var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts;
-
-        return tbl.getLatestSchema()
-                .thenCompose((schema) -> {
-                    SqlSerializer ser = createSqlSerializer(tbl.name(), 
schema.columns(), criteria);
-
-                    Statement statement = new 
ClientStatementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
-                    Session session = new 
ClientSessionBuilder(tbl.channel()).build();
-
-                    return session.executeAsync(tx, statement, 
ser.getArguments())
-                            .thenApply(resultSet -> new 
QueryCriteriaAsyncCursor<>(resultSet, session::close));
-                });
-    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
index 37a2507e71..7fa6994844 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientRecordView.java
@@ -27,21 +27,19 @@ import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
 import org.apache.ignite.client.RetryLimitPolicy;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.internal.client.proto.TuplePart;
-import org.apache.ignite.internal.client.sql.ClientSessionBuilder;
-import org.apache.ignite.internal.client.sql.ClientStatementBuilder;
+import org.apache.ignite.internal.marshaller.Marshaller;
+import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.TupleReader;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
-import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
-import org.apache.ignite.internal.table.criteria.SqlSerializer;
-import org.apache.ignite.lang.AsyncCursor;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.RecordView;
-import org.apache.ignite.table.criteria.Criteria;
-import org.apache.ignite.table.criteria.CriteriaQueryOptions;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
@@ -385,22 +383,16 @@ public class ClientRecordView<R> extends 
AbstractClientView<R> implements Record
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<AsyncCursor<R>> queryAsync(
-            @Nullable Transaction tx,
-            @Nullable Criteria criteria,
-            @Nullable CriteriaQueryOptions opts
-    ) {
-        var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts;
-
-        return tbl.getLatestSchema()
-                .thenCompose((schema) -> {
-                    SqlSerializer ser = createSqlSerializer(tbl.name(), 
schema.columns(), criteria);
-
-                    Statement statement = new 
ClientStatementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
-                    Session session = new 
ClientSessionBuilder(tbl.channel()).build();
-
-                    return session.executeAsync(tx, this.ser.mapper(), 
statement, ser.getArguments())
-                            .thenApply(resultSet -> new 
QueryCriteriaAsyncCursor<>(resultSet, session::close));
-                });
+    protected Function<SqlRow, R> queryMapper(ResultSetMetadata meta, 
ClientSchema schema) {
+        String[] cols = columnNames(schema.columns(), 0, 
schema.columns().length);
+        Marshaller marsh = schema.getMarshaller(ser.mapper(), 
TuplePart.KEY_AND_VAL, true);
+
+        return (row) -> {
+            try {
+                return (R) marsh.readObject(new TupleReader(new 
SqlRowProjection(row, meta, cols)), null);
+            } catch (MarshallerException e) {
+                throw new org.apache.ignite.lang.MarshallerException(e);
+            }
+        };
     }
 }
diff --git 
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
 
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
index 24e5f50a96..7c2525f770 100644
--- 
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
+++ 
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/util/ConfigurationUtil.java
@@ -44,7 +44,6 @@ import java.util.RandomAccess;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.Function;
-import java.util.function.Predicate;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.ConfigurationProperty;
 import 
org.apache.ignite.configuration.ConfigurationWrongPolymorphicTypeIdException;
@@ -82,9 +81,6 @@ public class ConfigurationUtil {
     public static final ConfigurationSource EMPTY_CFG_SRC = new 
ConfigurationSource() {
     };
 
-    /** Special object for determining that there is no next element. */
-    private static final Object NO_NEXT_ELEMENT = new Object();
-
     /**
      * Seperator string for both public and internal representations of 
configuration keys.
      */
@@ -945,73 +941,6 @@ public class ConfigurationUtil {
         };
     }
 
-    /**
-     * Maps iterable via provided mapper function.
-     *
-     * @param iterable Basic iterable.
-     * @param mapper Conversion function.
-     * @param predicate Predicate to apply to each element of basic iterable.
-     * @param <T1> Base type of the iterable.
-     * @param <T2> Type for view.
-     * @return Mapped iterable.
-     */
-    public static <T1, T2> Iterable<T2> mapIterable(
-            @Nullable Iterable<? extends T1> iterable,
-            @Nullable Function<? super T1, ? extends T2> mapper,
-            @Nullable Predicate<? super T1> predicate
-    ) {
-        if (iterable == null) {
-            return Collections.emptyList();
-        }
-
-        if (mapper == null && predicate == null) {
-            return (Iterable<T2>) iterable;
-        }
-
-        return new Iterable<>() {
-            @Override
-            public Iterator<T2> iterator() {
-                Iterator<? extends T1> innerIterator = iterable.iterator();
-                return new Iterator<>() {
-                    @Nullable
-                    T1 current = advance();
-
-                    /** {@inheritDoc} */
-                    @Override
-                    public boolean hasNext() {
-                        return current != NO_NEXT_ELEMENT;
-                    }
-
-                    /** {@inheritDoc} */
-                    @Override
-                    public T2 next() {
-                        T1 current = this.current;
-
-                        if (current == NO_NEXT_ELEMENT) {
-                            throw new NoSuchElementException();
-                        }
-
-                        this.current = advance();
-
-                        return mapper == null ? (T2) current : 
mapper.apply(current);
-                    }
-
-                    private @Nullable T1 advance() {
-                        while (innerIterator.hasNext()) {
-                            T1 next = innerIterator.next();
-
-                            if (predicate == null || predicate.test(next)) {
-                                return next;
-                            }
-                        }
-
-                        return (T1) NO_NEXT_ELEMENT;
-                    }
-                };
-            }
-        };
-    }
-
     /**
      * Leaf configuration source.
      */
diff --git 
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationUtilTest.java
 
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationUtilTest.java
index 2ac4b6decf..dfa29bd5d8 100644
--- 
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationUtilTest.java
+++ 
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/util/ConfigurationUtilTest.java
@@ -46,7 +46,6 @@ import static org.hamcrest.Matchers.hasToString;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.matchesPattern;
 import static org.hamcrest.Matchers.nullValue;
-import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -63,7 +62,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Objects;
 import java.util.Set;
 import java.util.Spliterators;
 import java.util.UUID;
@@ -1167,62 +1165,6 @@ public class ConfigurationUtilTest {
         assertFalse(iterator.hasNext());
     }
 
-    @Test
-    void testMapIterableWithPredicate() {
-        assertFalse(mapIterable(null, null, null).iterator().hasNext());
-        assertFalse(mapIterable(emptyList(), null, null).iterator().hasNext());
-
-        assertEquals(List.of(1), collect(mapIterable(List.of(1), null, null)));
-        assertEquals(List.of(1), collect(mapIterable(List.of(1), identity(), 
null)));
-        assertEquals(List.of(1), collect(mapIterable(List.of(1), null, integer 
-> true)));
-        assertEquals(List.of(1), collect(mapIterable(List.of(1), identity(), 
integer -> true)));
-        assertEquals(List.of(), collect(mapIterable(List.of(1), null, integer 
-> false)));
-        assertEquals(List.of(), collect(mapIterable(List.of(1), identity(), 
integer -> false)));
-
-        assertEquals(List.of("1", "2", "3"), collect(mapIterable(List.of(1, 2, 
3), String::valueOf, null)));
-        assertEquals(List.of("3"), collect(mapIterable(List.of(1, 2, 3), 
String::valueOf, integer -> integer > 2)));
-
-        Iterator<String> iterator1 = mapIterable(List.of(1, 2, 3, 4), 
String::valueOf, integer -> true).iterator();
-        assertEquals("1", iterator1.next());
-        assertEquals("2", iterator1.next());
-        assertEquals("3", iterator1.next());
-        assertEquals("4", iterator1.next());
-
-        Iterator<String> iterator2 = mapIterable(List.of(1, 2, 3, 4), 
String::valueOf, null).iterator();
-        assertEquals("1", iterator2.next());
-        assertEquals("2", iterator2.next());
-        assertEquals("3", iterator2.next());
-        assertEquals("4", iterator2.next());
-
-        Iterator<Integer> iterator3 = mapIterable(List.of(1, 2, 3, 4), 
identity(), integer -> integer < 3).iterator();
-        assertEquals(1, iterator3.next());
-        assertEquals(2, iterator3.next());
-
-        Iterator<Integer> iterator4 = mapIterable(List.of(1, 2, 3, 4), 
identity(), integer -> false).iterator();
-        assertFalse(iterator4.hasNext());
-
-        assertDoesNotThrow(() -> mapIterable(Arrays.asList(new 
Integer[]{null}), null, null).iterator().next());
-        assertDoesNotThrow(() -> mapIterable(Arrays.asList(new 
Integer[]{null}), null, integer -> true).iterator().next());
-        assertDoesNotThrow(() -> mapIterable(Arrays.asList(null, 1), null, 
null).iterator().next());
-        assertDoesNotThrow(() -> mapIterable(Arrays.asList(null, 1), null, 
integer -> true).iterator().next());
-
-        assertThrows(
-                NoSuchElementException.class,
-                () -> mapIterable(Arrays.asList(new Integer[]{null}), null, 
integer -> false).iterator().next()
-        );
-
-        assertThrows(
-                NoSuchElementException.class,
-                () -> {
-                    Iterator<Object> iterator = 
mapIterable(Arrays.asList(null, 1), null, Objects::nonNull).iterator();
-                    iterator.next();
-                    iterator.next();
-                }
-        );
-
-        assertThrows(UnsupportedOperationException.class, () -> 
mapIterable(List.of(1), null, null).iterator().remove());
-    }
-
     /**
      * Patches super root and returns flat representation of the changes. 
Passed {@code superRoot} object will contain patched tree when
      * method execution is completed.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java
index 1ee79939e8..1880faaa1a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/QueryCriteriaAsyncCursor.java
@@ -17,15 +17,27 @@
 
 package org.apache.ignite.internal.table.criteria;
 
+import static org.apache.ignite.internal.util.CollectionUtils.mapIterable;
+
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import org.apache.ignite.lang.AsyncCursor;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Wrapper over {@link AsyncResultSet} for criteria queries.
+ *
+ * @param <R> A type of the objects contained by wrapped result set. This will 
be either {@link SqlRow} if no explicit mapper is provided
+ *      or a particular type defined by supplied mapper.
+ * @param <T> The type of elements returned by this cursor.
  */
-public class QueryCriteriaAsyncCursor<T> implements AsyncCursor<T> {
-    private final AsyncResultSet<T> ars;
+public class QueryCriteriaAsyncCursor<T, R> implements AsyncCursor<T> {
+    private final AsyncResultSet<R> ars;
+
+    @Nullable
+    private final Function<R, T> mapper;
 
     private final Runnable closeRun;
 
@@ -33,17 +45,19 @@ public class QueryCriteriaAsyncCursor<T> implements 
AsyncCursor<T> {
      * Constructor.
      *
      * @param ars Asynchronous result set.
-     * @param closeRun Callback to be invoked after result is closed.
+     * @param mapper Conversion function for objects contained by result set 
(if {@code null}, conversion isn't required).
+     * @param closeRun Callback to be invoked after result set is closed.
      */
-    public QueryCriteriaAsyncCursor(AsyncResultSet<? extends T> ars, Runnable 
closeRun) {
-        this.ars = (AsyncResultSet<T>) ars;
+    public QueryCriteriaAsyncCursor(AsyncResultSet<R> ars, @Nullable 
Function<R, T> mapper, Runnable closeRun) {
+        this.ars = ars;
+        this.mapper = mapper;
         this.closeRun = closeRun;
     }
 
     /** {@inheritDoc} */
     @Override
     public Iterable<T> currentPage() {
-        return ars.currentPage();
+        return mapIterable(ars.currentPage(), mapper, null);
     }
 
     /** {@inheritDoc} */
@@ -54,12 +68,14 @@ public class QueryCriteriaAsyncCursor<T> implements 
AsyncCursor<T> {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
+    public CompletableFuture<? extends AsyncCursor<T>> fetchNextPage() {
         return ars.fetchNextPage()
-                .whenComplete((v, t) -> {
-                    if (t == null && !hasMorePages()) {
-                        closeRun.run();
+                .thenApply((rs) -> {
+                    if (!hasMorePages()) {
+                        closeAsync();
                     }
+
+                    return new QueryCriteriaAsyncCursor<>(rs, mapper, 
closeRun);
                 });
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlRowProjection.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlRowProjection.java
new file mode 100644
index 0000000000..63b09bfd44
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlRowProjection.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.criteria;
+
+import static org.apache.ignite.lang.util.IgniteNameUtils.quoteIfNeeded;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Projection under {@link SqlRow}.
+ */
+public class SqlRowProjection implements Tuple {
+    private final SqlRow row;
+
+    /** Column`s name map. */
+    private final Object2IntMap<String> columnsIndices;
+
+    private final int[] rowIndexMapping;
+
+    /**
+     * Constructor.
+     *
+     * @param row Row data.
+     * @param meta Metadata for query results.
+     * @param cols Target column names.
+     */
+    public SqlRowProjection(SqlRow row, ResultSetMetadata meta, String[] cols) 
{
+        this.row = row;
+
+        rowIndexMapping = new int[cols.length];
+        columnsIndices = new Object2IntOpenHashMap<>(cols.length);
+
+        for (int i = 0; i < cols.length; i++) {
+            String quotedColumnName = quoteIfNeeded(cols[i]);
+
+            columnsIndices.put(quotedColumnName, i);
+            rowIndexMapping[i] = meta.indexOf(quotedColumnName);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int columnCount() {
+        return rowIndexMapping.length;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String columnName(int columnIndex) {
+        return row.columnName(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int columnIndex(String columnName) {
+        return columnsIndices.getOrDefault(columnName, -1);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> @Nullable T valueOrDefault(String columnName, @Nullable T 
defaultValue) {
+        return row.valueOrDefault(columnName, defaultValue);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Tuple set(String columnName, @Nullable Object value) {
+        return row.set(columnName, value);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> @Nullable T value(String columnName) throws 
IllegalArgumentException {
+        return row.value(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> @Nullable T value(int columnIndex) {
+        return row.value(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean booleanValue(String columnName) {
+        return row.booleanValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean booleanValue(int columnIndex) {
+        return row.booleanValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public byte byteValue(String columnName) {
+        return row.byteValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public byte byteValue(int columnIndex) {
+        return row.byteValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public short shortValue(String columnName) {
+        return row.shortValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public short shortValue(int columnIndex) {
+        return row.shortValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int intValue(String columnName) {
+        return row.intValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int intValue(int columnIndex) {
+        return row.intValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long longValue(String columnName) {
+        return row.longValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long longValue(int columnIndex) {
+        return row.longValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public float floatValue(String columnName) {
+        return row.floatValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public float floatValue(int columnIndex) {
+        return row.floatValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public double doubleValue(String columnName) {
+        return row.doubleValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public double doubleValue(int columnIndex) {
+        return row.doubleValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String stringValue(String columnName) {
+        return row.stringValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String stringValue(int columnIndex) {
+        return row.stringValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID uuidValue(String columnName) {
+        return row.uuidValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public UUID uuidValue(int columnIndex) {
+        return row.uuidValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public BitSet bitmaskValue(String columnName) {
+        return row.bitmaskValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public BitSet bitmaskValue(int columnIndex) {
+        return row.bitmaskValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public LocalDate dateValue(String columnName) {
+        return row.dateValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public LocalDate dateValue(int columnIndex) {
+        return row.dateValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public LocalTime timeValue(String columnName) {
+        return row.timeValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public LocalTime timeValue(int columnIndex) {
+        return row.timeValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public LocalDateTime datetimeValue(String columnName) {
+        return row.datetimeValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public LocalDateTime datetimeValue(int columnIndex) {
+        return row.datetimeValue(rowIndexMapping[columnIndex]);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Instant timestampValue(String columnName) {
+        return row.timestampValue(columnName);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Instant timestampValue(int columnIndex) {
+        return row.timestampValue(rowIndexMapping[columnIndex]);
+    }
+}
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
index 1b45243724..bfc887cc8c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/table/criteria/SqlSerializer.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal.table.criteria;
 
 import static org.apache.ignite.internal.util.StringUtils.nullOrBlank;
-import static org.apache.ignite.lang.util.IgniteNameUtils.quote;
+import static org.apache.ignite.lang.util.IgniteNameUtils.quoteIfNeeded;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -157,10 +157,6 @@ public class SqlSerializer implements 
CriteriaVisitor<Void> {
         }
     }
 
-    private static String quoteIfNeeded(String name) {
-        return name.chars().allMatch(Character::isUpperCase) ? name : 
quote(name);
-    }
-
     /**
      * Builder.
      */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 10602c7458..6bb497517f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.function.Predicate;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import org.jetbrains.annotations.Nullable;
@@ -46,6 +47,9 @@ import org.jetbrains.annotations.Nullable;
  * Utility class provides various method to work with collections.
  */
 public final class CollectionUtils {
+    /** Special object for determining that there is no next element. */
+    private static final Object NO_NEXT_ELEMENT = new Object();
+
     /** Stub. */
     private CollectionUtils() {
         // No op.
@@ -521,4 +525,71 @@ public final class CollectionUtils {
 
         return Set.copyOf(set);
     }
+
+    /**
+     * Maps iterable via provided mapper function.
+     *
+     * @param iterable Basic iterable.
+     * @param mapper Conversion function.
+     * @param predicate Predicate to apply to each element of basic iterable.
+     * @param <T1> Base type of the iterable.
+     * @param <T2> Type for view.
+     * @return Mapped iterable.
+     */
+    public static <T1, T2> Iterable<T2> mapIterable(
+            @Nullable Iterable<? extends T1> iterable,
+            @Nullable Function<? super T1, ? extends T2> mapper,
+            @Nullable Predicate<? super T1> predicate
+    ) {
+        if (iterable == null) {
+            return Collections.emptyList();
+        }
+
+        if (mapper == null && predicate == null) {
+            return (Iterable<T2>) iterable;
+        }
+
+        return new Iterable<>() {
+            @Override
+            public Iterator<T2> iterator() {
+                Iterator<? extends T1> innerIterator = iterable.iterator();
+                return new Iterator<>() {
+                    @Nullable
+                    T1 current = advance();
+
+                    /** {@inheritDoc} */
+                    @Override
+                    public boolean hasNext() {
+                        return current != NO_NEXT_ELEMENT;
+                    }
+
+                    /** {@inheritDoc} */
+                    @Override
+                    public T2 next() {
+                        T1 current = this.current;
+
+                        if (current == NO_NEXT_ELEMENT) {
+                            throw new NoSuchElementException();
+                        }
+
+                        this.current = advance();
+
+                        return mapper == null ? (T2) current : 
mapper.apply(current);
+                    }
+
+                    private @Nullable T1 advance() {
+                        while (innerIterator.hasNext()) {
+                            T1 next = innerIterator.next();
+
+                            if (predicate == null || predicate.test(next)) {
+                                return next;
+                            }
+                        }
+
+                        return (T1) NO_NEXT_ELEMENT;
+                    }
+                };
+            }
+        };
+    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index acef8dc60c..fd958a7685 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -602,6 +602,17 @@ public final class ExceptionUtils {
         return new IgniteException(INTERNAL_ERR, e.getMessage(), e);
     }
 
+    /**
+     * Checks if passed in {@code 'Throwable'} has given class in {@code 
'cause'} hierarchy <b>including</b> that throwable itself.
+     *
+     * @param exceptionClass Cause class to check.
+     * @param ex Throwable to check (if {@code null}, {@code false} is 
returned).
+     * @return {@code True} if one of the causing exception is an instance of 
passed in classes, {@code false} otherwise.
+     */
+    public static boolean isOrCausedBy(Class<? extends Exception> 
exceptionClass, @Nullable Throwable ex) {
+        return ex != null && (exceptionClass.isInstance(ex) || 
isOrCausedBy(exceptionClass, ex.getCause()));
+    }
+
     /**
      * Creates and return a copy of an exception that is a cause of the given 
{@code exception}.
      * If the original exception does not contain a cause, then the original 
exception will be returned.
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
index 4bf160b553..ea30781cad 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
@@ -18,16 +18,20 @@
 package org.apache.ignite.internal.util;
 
 import static java.util.Collections.emptyIterator;
+import static java.util.Collections.emptyList;
+import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.util.CollectionUtils.concat;
 import static org.apache.ignite.internal.util.CollectionUtils.difference;
 import static org.apache.ignite.internal.util.CollectionUtils.intersect;
 import static org.apache.ignite.internal.util.CollectionUtils.last;
+import static org.apache.ignite.internal.util.CollectionUtils.mapIterable;
 import static org.apache.ignite.internal.util.CollectionUtils.setOf;
 import static org.apache.ignite.internal.util.CollectionUtils.union;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -42,6 +46,8 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
 import java.util.Spliterators;
 import java.util.stream.StreamSupport;
@@ -291,4 +297,60 @@ public class CollectionUtilsTest {
         assertEquals(1, last(List.of(1)));
         assertEquals(2, last(List.of(1, 2)));
     }
+
+    @Test
+    void testMapIterableWithPredicate() {
+        assertFalse(mapIterable(null, null, null).iterator().hasNext());
+        assertFalse(mapIterable(emptyList(), null, null).iterator().hasNext());
+
+        assertEquals(List.of(1), collect(mapIterable(List.of(1), null, null)));
+        assertEquals(List.of(1), collect(mapIterable(List.of(1), identity(), 
null)));
+        assertEquals(List.of(1), collect(mapIterable(List.of(1), null, integer 
-> true)));
+        assertEquals(List.of(1), collect(mapIterable(List.of(1), identity(), 
integer -> true)));
+        assertEquals(List.of(), collect(mapIterable(List.of(1), null, integer 
-> false)));
+        assertEquals(List.of(), collect(mapIterable(List.of(1), identity(), 
integer -> false)));
+
+        assertEquals(List.of("1", "2", "3"), collect(mapIterable(List.of(1, 2, 
3), String::valueOf, null)));
+        assertEquals(List.of("3"), collect(mapIterable(List.of(1, 2, 3), 
String::valueOf, integer -> integer > 2)));
+
+        Iterator<String> iterator1 = mapIterable(List.of(1, 2, 3, 4), 
String::valueOf, integer -> true).iterator();
+        assertEquals("1", iterator1.next());
+        assertEquals("2", iterator1.next());
+        assertEquals("3", iterator1.next());
+        assertEquals("4", iterator1.next());
+
+        Iterator<String> iterator2 = mapIterable(List.of(1, 2, 3, 4), 
String::valueOf, null).iterator();
+        assertEquals("1", iterator2.next());
+        assertEquals("2", iterator2.next());
+        assertEquals("3", iterator2.next());
+        assertEquals("4", iterator2.next());
+
+        Iterator<Integer> iterator3 = mapIterable(List.of(1, 2, 3, 4), 
identity(), integer -> integer < 3).iterator();
+        assertEquals(1, iterator3.next());
+        assertEquals(2, iterator3.next());
+
+        Iterator<Integer> iterator4 = mapIterable(List.of(1, 2, 3, 4), 
identity(), integer -> false).iterator();
+        assertFalse(iterator4.hasNext());
+
+        assertDoesNotThrow(() -> mapIterable(Arrays.asList(new 
Integer[]{null}), null, null).iterator().next());
+        assertDoesNotThrow(() -> mapIterable(Arrays.asList(new 
Integer[]{null}), null, integer -> true).iterator().next());
+        assertDoesNotThrow(() -> mapIterable(Arrays.asList(null, 1), null, 
null).iterator().next());
+        assertDoesNotThrow(() -> mapIterable(Arrays.asList(null, 1), null, 
integer -> true).iterator().next());
+
+        assertThrows(
+                NoSuchElementException.class,
+                () -> mapIterable(Arrays.asList(new Integer[]{null}), null, 
integer -> false).iterator().next()
+        );
+
+        assertThrows(
+                NoSuchElementException.class,
+                () -> {
+                    Iterator<Object> iterator = 
mapIterable(Arrays.asList(null, 1), null, Objects::nonNull).iterator();
+                    iterator.next();
+                    iterator.next();
+                }
+        );
+
+        assertThrows(UnsupportedOperationException.class, () -> 
mapIterable(List.of(1), null, null).iterator().remove());
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteNameUtilsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteNameUtilsTest.java
index 3219a85891..34afbc4962 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteNameUtilsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteNameUtilsTest.java
@@ -36,7 +36,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 public class IgniteNameUtilsTest {
     @ParameterizedTest
     @CsvSource({
-            "foo, FOO", "fOo, FOO", "FOO, FOO", "1o0, 1O0", "@#$, @#$",
+            "foo, FOO", "fOo, FOO", "FOO, FOO", "\"FOO\", FOO", "1o0, 1O0", 
"@#$, @#$",
             "\"foo\", foo", "\"fOo\", fOo", "\"f.f\", f.f", "\"f\"\"f\", f\"f",
     })
     public void validSimpleNames(String source, String expected) {
@@ -59,4 +59,13 @@ public class IgniteNameUtilsTest {
                 equalTo("Fully qualified name is not expected [name=" + source 
+ "]"),
                 containsString("Malformed name [name=" + source))));
     }
+
+    @ParameterizedTest
+    @CsvSource({
+            "foo, \"foo\"", "fOo, \"fOo\"", "FOO, FOO", "\"FOO\", \"FOO\"", 
"1o0, \"1o0\"", "@#$, @#$",
+            "\"foo\", \"foo\"", "\"fOo\", \"fOo\"", "\"f.f\", \"f.f\""
+    })
+    public void quoteIfNeeded(String source, String expected) {
+        assertThat(IgniteNameUtils.quoteIfNeeded(source), equalTo(expected));
+    }
 }
diff --git 
a/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/TupleReader.java
 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/TupleReader.java
new file mode 100644
index 0000000000..93e93c5e79
--- /dev/null
+++ 
b/modules/marshaller-common/src/main/java/org/apache/ignite/internal/marshaller/TupleReader.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.table.Tuple;
+
+/**
+ * Adapter from a {@link Tuple} to a {@link MarshallerReader}.
+ */
+public class TupleReader implements MarshallerReader {
+    private final Tuple tuple;
+
+    private int index;
+
+    public TupleReader(Tuple tuple) {
+        this(tuple, 0);
+    }
+
+    TupleReader(Tuple tuple, int index) {
+        this.tuple = tuple;
+        this.index = index;
+    }
+
+    @Override
+    public void skipValue() {
+        index++;
+    }
+
+    @Override
+    public boolean readBoolean() {
+        return tuple.booleanValue(index++);
+    }
+
+    @Override
+    public Boolean readBooleanBoxed() {
+        return tuple.value(index++);
+    }
+
+    @Override
+    public byte readByte() {
+        return tuple.byteValue(index++);
+    }
+
+    @Override
+    public Byte readByteBoxed() {
+        return tuple.value(index++);
+    }
+
+    @Override
+    public short readShort() {
+        return tuple.shortValue(index++);
+    }
+
+    @Override
+    public Short readShortBoxed() {
+        return tuple.value(index++);
+    }
+
+    @Override
+    public int readInt() {
+        return tuple.intValue(index++);
+    }
+
+    @Override
+    public Integer readIntBoxed() {
+        return tuple.value(index++);
+    }
+
+    @Override
+    public long readLong() {
+        return tuple.longValue(index++);
+    }
+
+    @Override
+    public Long readLongBoxed() {
+        return tuple.value(index++);
+    }
+
+    @Override
+    public float readFloat() {
+        return tuple.floatValue(index++);
+    }
+
+    @Override
+    public Float readFloatBoxed() {
+        return tuple.value(index++);
+    }
+
+    @Override
+    public double readDouble() {
+        return tuple.doubleValue(index++);
+    }
+
+    @Override
+    public Double readDoubleBoxed() {
+        return tuple.value(index++);
+    }
+
+    @Override
+    public String readString() {
+        return tuple.stringValue(index++);
+    }
+
+    @Override
+    public UUID readUuid() {
+        return tuple.uuidValue(index++);
+    }
+
+    @Override
+    public byte[] readBytes() {
+        return tuple.value(index++);
+    }
+
+    @Override
+    public BitSet readBitSet() {
+        return tuple.bitmaskValue(index++);
+    }
+
+    @Override
+    public BigInteger readBigInt() {
+        return tuple.value(index++);
+    }
+
+    @Override
+    public BigDecimal readBigDecimal(int scale) {
+        return new BigDecimal(tuple.value(index++), scale);
+    }
+
+    @Override
+    public LocalDate readDate() {
+        return tuple.dateValue(index++);
+    }
+
+    @Override
+    public LocalTime readTime() {
+        return tuple.timeValue(index++);
+    }
+
+    @Override
+    public Instant readTimestamp() {
+        return tuple.timestampValue(index++);
+    }
+
+    @Override
+    public LocalDateTime readDateTime() {
+        return tuple.datetimeValue(index++);
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java
index be646bc4a3..754682b0aa 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItCriteriaQueryTest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.table;
 import static java.util.Spliterators.spliteratorUnknownSize;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toMap;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.matchers.TupleMatcher.tupleValue;
 import static org.apache.ignite.lang.util.IgniteNameUtils.quote;
@@ -37,22 +39,27 @@ import static 
org.apache.ignite.table.criteria.Criteria.notNullValue;
 import static org.apache.ignite.table.criteria.Criteria.nullValue;
 import static org.apache.ignite.table.criteria.CriteriaQueryOptions.builder;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Spliterator;
 import java.util.function.Function;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.sql.api.IgniteSqlImpl;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.AsyncCursor;
 import org.apache.ignite.lang.Cursor;
@@ -62,6 +69,7 @@ import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.criteria.CriteriaQuerySource;
 import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.AfterAll;
@@ -129,19 +137,21 @@ public class ItCriteriaQueryTest extends 
ClusterPerClassIntegrationTest {
         Table table = CLUSTER.aliveNode().tables().table(TABLE_NAME);
         Table clientTable = CLIENT.tables().table(TABLE_NAME);
 
+        Function<TestObject, Tuple> objMapper = (obj) -> 
Tuple.create().set("id", obj.id).set("name", obj.name)
+                .set("salary", obj.salary).set("hash", obj.hash);
+
         return Stream.of(
                 Arguments.of(table.recordView(), identity()),
+                Arguments.of(table.recordView(TestObject.class), objMapper),
                 Arguments.of(clientTable.recordView(), identity()),
-                Arguments.of(clientTable.recordView(TestObject.class),
-                        (Function<TestObject, Tuple>) (obj) -> 
Tuple.create().set("id", obj.id).set("name", obj.name)
-                                .set("salary", obj.salary).set("hash", 
obj.hash))
+                Arguments.of(clientTable.recordView(TestObject.class), 
objMapper)
         );
     }
 
     @ParameterizedTest
     @MethodSource
     public <T> void testRecordViewQuery(CriteriaQuerySource<T> view, 
Function<T, Tuple> mapper) {
-        IgniteTestUtils.assertThrows(
+        assertThrows(
                 IgniteException.class,
                 () -> view.query(null, columnValue("id", equalTo("2"))),
                 "Dynamic parameter requires adding explicit type cast"
@@ -223,6 +233,175 @@ public class ItCriteriaQueryTest extends 
ClusterPerClassIntegrationTest {
         }
     }
 
+    private static Stream<Arguments> testKeyValueView() {
+        Table table = CLUSTER.aliveNode().tables().table(TABLE_NAME);
+        Table clientTable = CLIENT.tables().table(TABLE_NAME);
+
+        Function<Entry<TestObjectKey, TestObject>, Entry<Tuple, Tuple>> 
kvMapper = (entry) -> {
+            TestObjectKey key = entry.getKey();
+            TestObject val = entry.getValue();
+
+            return new IgniteBiTuple<>(Tuple.create().set("id", key.id), 
Tuple.create().set("name", val.name).set("salary", val.salary)
+                    .set("hash", val.hash));
+        };
+
+        return Stream.of(
+                Arguments.of(table.keyValueView(), identity()),
+                Arguments.of(table.keyValueView(TestObjectKey.class, 
TestObject.class), kvMapper),
+                Arguments.of(clientTable.keyValueView(), identity()),
+                Arguments.of(clientTable.keyValueView(TestObjectKey.class, 
TestObject.class), kvMapper)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    public <T> void testKeyValueView(CriteriaQuerySource<T> view, Function<T, 
Entry<Tuple, Tuple>> mapper) {
+        assertThrows(
+                IgniteException.class,
+                () -> view.query(null, columnValue("id", equalTo("2"))),
+                "Dynamic parameter requires adding explicit type cast"
+        );
+
+        Matcher<Tuple> personKey0 = tupleValue("id", is(0));
+        Matcher<Tuple> person0 = allOf(tupleValue("name", 
Matchers.nullValue()), tupleValue("salary", is(0.0d)),
+                tupleValue("hash", is("hash0".getBytes())));
+
+        Matcher<Tuple> personKey1 = tupleValue("id", is(1));
+        Matcher<Tuple> person1 = allOf(tupleValue("name", is("name1")), 
tupleValue("salary", is(10.0d)),
+                tupleValue("hash", is("hash1".getBytes())));
+
+        Matcher<Tuple> personKey2 = tupleValue("id", is(2));
+        Matcher<Tuple> person2 = allOf(tupleValue("name", is("name2")), 
tupleValue("salary", is(20.0d)),
+                tupleValue("hash", is("hash2".getBytes())));
+
+        try (Cursor<T> cur = view.query(null, null)) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(3),
+                    hasEntry(personKey0, person0),
+                    hasEntry(personKey1, person1),
+                    hasEntry(personKey2, person2)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("id", equalTo(2)))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(1),
+                    hasEntry(personKey2, person2)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("hash", 
equalTo("hash2".getBytes())))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(1),
+                    hasEntry(personKey2, person2)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("id", 
notEqualTo(2)))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(2),
+                    hasEntry(personKey0, person0),
+                    hasEntry(personKey1, person1)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("hash", 
notEqualTo("hash2".getBytes())))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(2),
+                    hasEntry(personKey0, person0),
+                    hasEntry(personKey1, person1)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("id", 
greaterThan(1)))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(1),
+                    hasEntry(personKey2, person2)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("id", 
greaterThanOrEqualTo(1)))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(2),
+                    hasEntry(personKey1, person1),
+                    hasEntry(personKey2, person2)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("id", lessThan(1)))) 
{
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(1),
+                    hasEntry(personKey0, person0)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("id", 
lessThanOrEqualTo(1)))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(2),
+                    hasEntry(personKey0, person0),
+                    hasEntry(personKey1, person1)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("name", 
nullValue()))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(1),
+                    hasEntry(personKey0, person0)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("name", 
notNullValue()))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(2),
+                    hasEntry(personKey1, person1),
+                    hasEntry(personKey2, person2)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("id", in(1, 2)))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(2),
+                    hasEntry(personKey1, person1),
+                    hasEntry(personKey2, person2)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("id", notIn(1, 2)))) 
{
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(1),
+                    hasEntry(personKey0, person0)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("hash", 
in("hash1".getBytes(), "hash2".getBytes())))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(2),
+                    hasEntry(personKey1, person1),
+                    hasEntry(personKey2, person2)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("hash", in((byte[]) 
null)))) {
+            assertThat(mapToTupleMap(cur, mapper), anEmptyMap());
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("hash", 
notIn("hash1".getBytes(), "hash2".getBytes())))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(1),
+                    hasEntry(personKey0, person0)
+            ));
+        }
+
+        try (Cursor<T> cur = view.query(null, columnValue("hash", 
notIn((byte[]) null)))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(3),
+                    hasEntry(personKey0, person0),
+                    hasEntry(personKey1, person1),
+                    hasEntry(personKey2, person2)
+            ));
+        }
+    }
+
     @Test
     public void testOptions() {
         RecordView<TestObject> view = 
CLIENT.tables().table(TABLE_NAME).recordView(TestObject.class);
@@ -238,16 +417,18 @@ public class ItCriteriaQueryTest extends 
ClusterPerClassIntegrationTest {
         Table table = CLUSTER.aliveNode().tables().table(QUOTED_TABLE_NAME);
         Table clientTable = CLIENT.tables().table(QUOTED_TABLE_NAME);
 
-        Mapper<QuotedObject> pojoMapper = Mapper.builder(QuotedObject.class)
+        Mapper<QuotedObject> recMapper = Mapper.builder(QuotedObject.class)
                 .map("colUmn", quote("colUmn"))
                 .automap()
                 .build();
 
+        Function<QuotedObject, Tuple> objMapper = (obj) -> 
Tuple.create(Map.of("id", obj.id, quote("colUmn"), obj.colUmn));
+
         return Stream.of(
                 Arguments.of(table.recordView(), identity()),
+                Arguments.of(table.recordView(recMapper), objMapper),
                 Arguments.of(clientTable.recordView(), identity()),
-                Arguments.of(clientTable.recordView(pojoMapper),
-                        (Function<QuotedObject, Tuple>) (obj) -> 
Tuple.create(Map.of("id", obj.id, quote("colUmn"), obj.colUmn)))
+                Arguments.of(clientTable.recordView(recMapper), objMapper)
         );
     }
 
@@ -261,17 +442,95 @@ public class ItCriteriaQueryTest extends 
ClusterPerClassIntegrationTest {
         }
     }
 
+    private static Stream<Arguments> testKeyViewWithQuotes() {
+        Table table = CLUSTER.aliveNode().tables().table(QUOTED_TABLE_NAME);
+        Table clientTable = CLIENT.tables().table(QUOTED_TABLE_NAME);
+
+        Mapper<QuotedObjectKey> keyMapper = Mapper.of(QuotedObjectKey.class);
+        Mapper<QuotedObject> valMapper = Mapper.builder(QuotedObject.class)
+                .map("colUmn", quote("colUmn"))
+                .build();
+
+        Function<Entry<QuotedObjectKey, QuotedObject>, Entry<Tuple, Tuple>> 
kvMapper = (entry) ->
+                new IgniteBiTuple<>(Tuple.create(Map.of("id", 
entry.getKey().id)), Tuple.create(Map.of(quote("colUmn"),
+                        entry.getValue().colUmn)));
+
+        return Stream.of(
+                Arguments.of(table.keyValueView(), identity()),
+                Arguments.of(table.keyValueView(keyMapper, valMapper), 
kvMapper),
+                Arguments.of(clientTable.keyValueView(), identity()),
+                Arguments.of(clientTable.keyValueView(keyMapper, valMapper), 
kvMapper)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    public <T> void testKeyViewWithQuotes(CriteriaQuerySource<T> view, 
Function<T, Entry<Tuple, Tuple>> mapper) {
+        try (Cursor<T> cur = view.query(null, columnValue(quote("colUmn"), 
equalTo("name1")))) {
+            assertThat(mapToTupleMap(cur, mapper), allOf(
+                    aMapWithSize(1),
+                    hasEntry(tupleValue("id", is(1)), 
tupleValue(quote("colUmn"), is("name1")))
+            ));
+        }
+    }
+
+    private static Stream<Arguments> testSessionClosing() {
+        Table table = CLUSTER.aliveNode().tables().table(TABLE_NAME);
+
+        Transaction tx = CLUSTER.aliveNode().transactions().begin();
+        tx.rollback();
+
+        return Stream.of(
+                Arguments.of(table.recordView(), tx),
+                Arguments.of(table.recordView(TestObject.class), tx),
+                Arguments.of(table.keyValueView(), tx),
+                Arguments.of(table.keyValueView(TestObjectKey.class, 
TestObject.class), tx)
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource
+    public void testSessionClosing(CriteriaQuerySource<?> view, Transaction 
tx) {
+        int baseSessionsCount = activeSessionsCount();
+
+        assertThrows(
+                IgniteException.class,
+                () -> view.query(tx, columnValue("id", equalTo(2))),
+                "Transaction is already finished"
+        );
+
+        assertEquals(baseSessionsCount, activeSessionsCount());
+    }
+
+    private static int activeSessionsCount() {
+        return ((IgniteSqlImpl) CLUSTER.aliveNode().sql()).sessions().size();
+    }
+
     private static <T> List<Tuple> mapToTupleList(Cursor<T> cur, Function<T, 
Tuple> mapper) {
         return StreamSupport.stream(spliteratorUnknownSize(cur, 
Spliterator.ORDERED), false)
                 .map(mapper)
                 .collect(toList());
     }
 
+    private static <T, K, V> Map<K, V> mapToTupleMap(Cursor<T> cur, 
Function<T, Entry<K, V>> mapper) {
+        return StreamSupport.stream(spliteratorUnknownSize(cur, 
Spliterator.ORDERED), false)
+                .map(mapper)
+                .collect(toMap(Entry::getKey, Entry::getValue));
+    }
+
+    static class QuotedObjectKey {
+        int id;
+    }
+
     static class QuotedObject {
         int id;
         String colUmn;
     }
 
+    static class TestObjectKey {
+        int id;
+    }
+
     static class TestObject {
         int id;
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 01406d72c9..66cb9dbe89 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -20,28 +20,43 @@ package org.apache.ignite.internal.table;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.function.Function.identity;
 import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.isOrCausedBy;
 import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
-import java.util.Collection;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
 import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.criteria.CursorAdapter;
+import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
 import org.apache.ignite.internal.table.criteria.SqlSerializer;
 import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.AsyncCursor;
+import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.sql.Statement;
 import org.apache.ignite.table.criteria.Criteria;
+import org.apache.ignite.table.criteria.CriteriaQueryOptions;
+import org.apache.ignite.table.criteria.CriteriaQuerySource;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Base class for Table views.
  */
-abstract class AbstractTableView {
+abstract class AbstractTableView<R> implements CriteriaQuerySource<R> {
     /** Internal table. */
     protected final InternalTable tbl;
 
@@ -136,26 +151,77 @@ abstract class AbstractTableView {
         return convertToPublicFuture(future);
     }
 
-    private static boolean isOrCausedBy(Class<? extends Exception> 
exceptionClass, @Nullable Throwable ex) {
-        return ex != null && (exceptionClass.isInstance(ex) || 
isOrCausedBy(exceptionClass, ex.getCause()));
+    /**
+     * Map columns to it's names.
+     *
+     * @param columns Target columns.
+     * @return Column names.
+     */
+    protected static String[] columnNames(Column[] columns) {
+        String[] columnNames = new String[columns.length];
+
+        for (int i = 0; i < columns.length; i++) {
+            columnNames[i] = columns[i].name();
+        }
+
+        return columnNames;
     }
 
     /**
-     * Construct SQL query and arguments for prepare statement.
+     * Create conversion function for objects contained by result set to 
criteria query objects.
      *
-     * @param tableName Table name.
-     * @param columnNames Column names.
-     * @param criteria The predicate to filter entries or {@code null} to 
return all entries from the underlying table.
-     * @return SQL query and it's arguments.
+     * @param meta Result set columns' metadata.
+     * @param schema Schema.
+     * @return Conversion function (if {@code null} conversions isn't 
required).
      */
-    static SqlSerializer createSqlSerializer(String tableName, 
Collection<String> columnNames, @Nullable Criteria criteria) {
-        return new SqlSerializer.Builder()
-                .tableName(tableName)
-                .columns(columnNames)
-                .where(criteria)
-                .build();
+    protected @Nullable Function<SqlRow, R> queryMapper(ResultSetMetadata 
meta, SchemaDescriptor schema) {
+        return null;
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public Cursor<R> query(@Nullable Transaction tx, @Nullable Criteria 
criteria, CriteriaQueryOptions opts) {
+        return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts)));
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<AsyncCursor<R>> queryAsync(
+            @Nullable Transaction tx,
+            @Nullable Criteria criteria,
+            @Nullable CriteriaQueryOptions opts
+    ) {
+        CriteriaQueryOptions opts0 = opts == null ? 
CriteriaQueryOptions.DEFAULT : opts;
+
+        return withSchemaSync(tx, (schemaVersion) -> {
+            SchemaDescriptor schema = 
rowConverter.registry().schema(schemaVersion);
+
+            SqlSerializer ser = new SqlSerializer.Builder()
+                    .tableName(tbl.name())
+                    .columns(schema.columnNames())
+                    .where(criteria)
+                    .build();
+
+            Statement statement = 
sql.statementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
+            Session session = sql.createSession();
+
+            return session.executeAsync(tx, statement, ser.getArguments())
+                    .<AsyncCursor<R>>thenApply(resultSet -> {
+                        ResultSetMetadata meta = resultSet.metadata();
+
+                        assert meta != null : "Metadata can't be null.";
+
+                        return new QueryCriteriaAsyncCursor<>(resultSet, 
queryMapper(meta, schema), session::closeAsync);
+                    })
+                    .exceptionally(th -> {
+                        session.closeAsync();
+
+                        throw new CompletionException(unwrapCause(th));
+                    });
+        });
+    }
+
+
     /**
      * Action representing some KV operation. When executed, the action is 
supplied with schema version corresponding
      * to the operation timestamp (see {@link #withSchemaSync(Transaction, 
KvAction)} for details).
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index 0ad9e64c67..60ff561d8e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -26,12 +26,16 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
+import java.util.function.Function;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -39,6 +43,8 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.MarshallerException;
 import org.apache.ignite.lang.NullableValue;
 import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.Tuple;
@@ -51,7 +57,7 @@ import org.jetbrains.annotations.Nullable;
  * <p>NB: Binary view doesn't allow null tuples. Methods return either a tuple 
that represents the value, or {@code null} if no value
  * exists for the given key.
  */
-public class KeyValueBinaryViewImpl extends AbstractTableView implements 
KeyValueView<Tuple, Tuple> {
+public class KeyValueBinaryViewImpl extends AbstractTableView<Entry<Tuple, 
Tuple>> implements KeyValueView<Tuple, Tuple> {
     private final TupleMarshallerCache marshallerCache;
 
     /**
@@ -557,4 +563,13 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView implements KeyValu
 
         return rows;
     }
+
+    /** {@inheritDoc} */
+    @Override
+    protected Function<SqlRow, Entry<Tuple, Tuple>> 
queryMapper(ResultSetMetadata meta, SchemaDescriptor schema) {
+        return (row) -> new IgniteBiTuple<>(
+                new SqlRowProjection(row, meta, 
columnNames(schema.keyColumns().columns())),
+                new SqlRowProjection(row, meta, 
columnNames(schema.valueColumns().columns()))
+        );
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 51bdfe46b5..d193078f71 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.table;
 
+import static 
org.apache.ignite.internal.marshaller.Marshaller.createMarshaller;
+import static 
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.toMarshallerColumns;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,15 +30,20 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
+import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.marshaller.Marshaller;
 import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.internal.marshaller.TupleReader;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
 import 
org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -43,6 +51,8 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.NullableValue;
 import org.apache.ignite.lang.UnexpectedNullValueException;
 import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.mapper.Mapper;
@@ -52,7 +62,13 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Key-value view implementation.
  */
-public class KeyValueViewImpl<K, V> extends AbstractTableView implements 
KeyValueView<K, V> {
+public class KeyValueViewImpl<K, V> extends AbstractTableView<Entry<K, V>> 
implements KeyValueView<K, V> {
+    /** Key class mapper. */
+    private final Mapper<K> keyMapper;
+
+    /** Value class mapper. */
+    private final Mapper<V> valueMapper;
+
     /** Marshaller factory. */
     private final Function<SchemaDescriptor, KvMarshaller<K, V>> 
marshallerFactory;
 
@@ -79,6 +95,9 @@ public class KeyValueViewImpl<K, V> extends AbstractTableView 
implements KeyValu
     ) {
         super(tbl, schemaVersions, schemaRegistry, sql);
 
+        this.keyMapper = keyMapper;
+        this.valueMapper = valueMapper;
+
         marshallerFactory = (schema) -> new KvMarshallerImpl<>(schema, 
keyMapper, valueMapper);
     }
 
@@ -677,4 +696,25 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView implements KeyValu
 
         return DataStreamer.streamData(publisher, options, batchSender, 
partitioner);
     }
+
+    /** {@inheritDoc} */
+    @Override
+    protected Function<SqlRow, Entry<K, V>> queryMapper(ResultSetMetadata 
meta, SchemaDescriptor schema) {
+        Column[] keyCols = schema.keyColumns().columns();
+        Column[] valCols = schema.valueColumns().columns();
+
+        Marshaller keyMarsh = createMarshaller(toMarshallerColumns(keyCols), 
keyMapper, false, true);
+        Marshaller valMarsh = createMarshaller(toMarshallerColumns(valCols), 
valueMapper, false, true);
+
+        return (row) -> {
+            try {
+                return new IgniteBiTuple<>(
+                        (K) keyMarsh.readObject(new TupleReader(new 
SqlRowProjection(row, meta, columnNames(keyCols))), null),
+                        (V) valMarsh.readObject(new TupleReader(new 
SqlRowProjection(row, meta, columnNames(valCols))), null)
+                );
+            } catch (MarshallerException e) {
+                throw new org.apache.ignite.lang.MarshallerException(e);
+            }
+        };
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index 12368105c9..422c99a46f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -26,36 +26,26 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
 import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
-import org.apache.ignite.internal.table.criteria.CursorAdapter;
-import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
-import org.apache.ignite.internal.table.criteria.SqlSerializer;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.lang.AsyncCursor;
-import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.MarshallerException;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Tuple;
-import org.apache.ignite.table.criteria.Criteria;
-import org.apache.ignite.table.criteria.CriteriaQueryOptions;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Table view implementation for binary objects.
  */
-public class RecordBinaryViewImpl extends AbstractTableView implements 
RecordView<Tuple> {
+public class RecordBinaryViewImpl extends AbstractTableView<Tuple> implements 
RecordView<Tuple> {
     private final TupleMarshallerCache marshallerCache;
 
     /**
@@ -446,31 +436,4 @@ public class RecordBinaryViewImpl extends 
AbstractTableView implements RecordVie
 
         return DataStreamer.streamData(publisher, options, batchSender, 
partitioner);
     }
-
-    /** {@inheritDoc} */
-    @Override
-    public Cursor<Tuple> query(@Nullable Transaction tx, @Nullable Criteria 
criteria, @Nullable CriteriaQueryOptions opts) {
-        return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts)));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<AsyncCursor<Tuple>> queryAsync(
-            @Nullable Transaction tx,
-            @Nullable Criteria criteria,
-            @Nullable CriteriaQueryOptions opts
-    ) {
-        var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts;
-
-        return withSchemaSync(tx, (schemaVersion) -> {
-            SchemaDescriptor schema = 
rowConverter.registry().schema(schemaVersion);
-            SqlSerializer ser = createSqlSerializer(tbl.name(), 
schema.columnNames(), criteria);
-
-            Statement statement = 
sql.statementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
-            Session session = sql.createSession();
-
-            return session.executeAsync(tx, statement, ser.getArguments())
-                    .thenApply(resultSet -> new 
QueryCriteriaAsyncCursor<>(resultSet, session::close));
-        });
-    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 4b6e8b0ed6..92ba1f71df 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.table;
 
+import static 
org.apache.ignite.internal.marshaller.Marshaller.createMarshaller;
+import static 
org.apache.ignite.internal.schema.marshaller.MarshallerUtil.toMarshallerColumns;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -25,29 +28,27 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
+import org.apache.ignite.internal.marshaller.Marshaller;
+import org.apache.ignite.internal.marshaller.TupleReader;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.schema.marshaller.RecordMarshaller;
 import 
org.apache.ignite.internal.schema.marshaller.reflection.RecordMarshallerImpl;
 import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.streamer.StreamerBatchSender;
-import org.apache.ignite.internal.table.criteria.CursorAdapter;
-import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
-import org.apache.ignite.internal.table.criteria.SqlSerializer;
+import org.apache.ignite.internal.table.criteria.SqlRowProjection;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.lang.AsyncCursor;
-import org.apache.ignite.lang.Cursor;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.lang.MarshallerException;
 import org.apache.ignite.sql.IgniteSql;
-import org.apache.ignite.sql.Session;
-import org.apache.ignite.sql.Statement;
+import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.DataStreamerOptions;
 import org.apache.ignite.table.RecordView;
-import org.apache.ignite.table.criteria.Criteria;
-import org.apache.ignite.table.criteria.CriteriaQueryOptions;
 import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
@@ -55,7 +56,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Record view implementation.
  */
-public class RecordViewImpl<R> extends AbstractTableView implements 
RecordView<R> {
+public class RecordViewImpl<R> extends AbstractTableView<R> implements 
RecordView<R> {
     /** Record class mapper. */
     private final Mapper<R> mapper;
 
@@ -541,28 +542,16 @@ public class RecordViewImpl<R> extends AbstractTableView 
implements RecordView<R
 
     /** {@inheritDoc} */
     @Override
-    public Cursor<R> query(@Nullable Transaction tx, @Nullable Criteria 
criteria, @Nullable CriteriaQueryOptions opts) {
-        return new CursorAdapter<>(sync(queryAsync(tx, criteria, opts)));
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<AsyncCursor<R>> queryAsync(
-            @Nullable Transaction tx,
-            @Nullable Criteria criteria,
-            @Nullable CriteriaQueryOptions opts
-    ) {
-        var opts0 = opts == null ? CriteriaQueryOptions.DEFAULT : opts;
+    protected Function<SqlRow, R> queryMapper(ResultSetMetadata meta, 
SchemaDescriptor schema) {
+        Column[] cols = ArrayUtils.concat(schema.keyColumns().columns(), 
schema.valueColumns().columns());
+        Marshaller marsh = createMarshaller(toMarshallerColumns(cols), mapper, 
false, true);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
-            SchemaDescriptor schema = 
rowConverter.registry().schema(schemaVersion);
-            SqlSerializer ser = createSqlSerializer(tbl.name(), 
schema.columnNames(), criteria);
-
-            Statement statement = 
sql.statementBuilder().query(ser.toString()).pageSize(opts0.pageSize()).build();
-            Session session = sql.createSession();
-
-            return session.executeAsync(tx, mapper, statement, 
ser.getArguments())
-                    .thenApply(resultSet -> new 
QueryCriteriaAsyncCursor<>(resultSet, session::close));
-        });
+        return (row) -> {
+            try {
+                return (R) marsh.readObject(new TupleReader(new 
SqlRowProjection(row, meta, columnNames(cols))), null);
+            } catch (org.apache.ignite.internal.marshaller.MarshallerException 
e) {
+                throw new MarshallerException(e);
+            }
+        };
     }
 }


Reply via email to