This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 91e11ecb9e IGNITE-18392 Add SQL ResultSet object mapping (#1616)
91e11ecb9e is described below

commit 91e11ecb9e85b272d8947a79aec55bd5dbe40d65
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Feb 3 07:42:31 2023 +0200

    IGNITE-18392 Add SQL ResultSet object mapping (#1616)
    
    Add generic `<T> ResultSet<T> execute` API, which maps SQL result set data 
to user type.
    
    Before that, we could insert and retrieve data in a strongly typed way with 
object mapping via `RecordView<T>` API, but SQL results could only be accessed 
in a weakly typed way through `Tuple`. This change fixes the asymmetry.
---
 .../apache/ignite/example/sql/SqlApiExample.java   | 32 +++++---
 .../main/java/org/apache/ignite/sql/ResultSet.java | 10 ++-
 .../main/java/org/apache/ignite/sql/Session.java   | 96 ++++++++++++++++++++--
 .../apache/ignite/sql/SyncResultSetAdapter.java    | 22 ++---
 .../apache/ignite/sql/async/AsyncResultSet.java    | 14 +++-
 .../handler/requests/sql/ClientSqlCommon.java      |  2 +-
 .../handler/requests/sql/ClientSqlResultSet.java   |  7 +-
 .../internal/client/sql/ClientAsyncResultSet.java  | 93 +++++++++++++++++----
 .../ignite/internal/client/sql/ClientSession.java  | 37 ++++++++-
 .../org/apache/ignite/client/ClientSqlTest.java    | 10 +--
 .../apache/ignite/client/fakes/FakeSession.java    | 26 +++++-
 .../java/org/apache/ignite/internal/Cluster.java   |  5 +-
 .../raftsnapshot/ItTableRaftSnapshotsTest.java     |  4 +-
 .../internal/runner/app/ItDataSchemaSyncTest.java  |  3 +-
 .../runner/app/client/ItThinClientSqlTest.java     | 70 ++++++++++++++--
 .../internal/sql/api/ItSqlAsynchronousApiTest.java | 24 +++---
 .../internal/sql/api/ItSqlSynchronousApiTest.java  |  3 +-
 .../ignite/internal/sqllogic/SqlScriptRunner.java  |  3 +-
 .../internal/sql/api/AsyncResultSetImpl.java       | 11 +--
 .../ignite/internal/sql/api/SessionImpl.java       | 30 ++++++-
 .../internal/sql/engine/IgniteSqlApiTest.java      | 10 +--
 21 files changed, 415 insertions(+), 97 deletions(-)

diff --git 
a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java 
b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
index a5ad4cfb3c..0bc15b1e95 100644
--- a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
+++ b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
@@ -27,6 +27,7 @@ import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
 
 /**
  * Examples of using SQL API.
@@ -125,7 +126,7 @@ public class SqlApiExample {
 
                 System.out.println("\nAll accounts:");
 
-                try (ResultSet rs = ses.execute(null,
+                try (ResultSet<SqlRow> rs = ses.execute(null,
                         "SELECT a.FIRST_NAME, a.LAST_NAME, c.NAME FROM 
ACCOUNTS a "
                                 + "INNER JOIN CITIES c on c.ID = a.CITY_ID 
ORDER BY a.ACCOUNT_ID")) {
                     while (rs.hasNext()) {
@@ -146,16 +147,21 @@ public class SqlApiExample {
 
                 System.out.println("\nAccounts with balance lower than 
1,500:");
 
-                try (ResultSet rs = ses.execute(null,
-                        "SELECT a.FIRST_NAME, a.LAST_NAME, a.BALANCE FROM 
ACCOUNTS a WHERE a.BALANCE < 1500.0 "
-                                + "ORDER BY a.ACCOUNT_ID")) {
+                Statement statement = client.sql().statementBuilder()
+                        .query("SELECT a.FIRST_NAME as firstName, a.LAST_NAME 
as lastName, a.BALANCE FROM ACCOUNTS a "
+                                + "WHERE a.BALANCE < 1500.0 "
+                                + "ORDER BY a.ACCOUNT_ID")
+                        .build();
+
+                // POJO mapping.
+                try (ResultSet<AccountInfo> rs = ses.execute(null, 
Mapper.of(AccountInfo.class), statement)) {
                     while (rs.hasNext()) {
-                        SqlRow row = rs.next();
+                        AccountInfo row = rs.next();
 
                         System.out.println("    "
-                                + row.stringValue(1) + ", "
-                                + row.stringValue(2) + ", "
-                                + row.stringValue(3));
+                                + row.firstName + ", "
+                                + row.lastName + ", "
+                                + row.balance);
                     }
                 }
 
@@ -167,7 +173,7 @@ public class SqlApiExample {
 
                 System.out.println("\nDeleting one of the accounts...");
 
-                try (ResultSet rs = ses.execute(null, "DELETE FROM ACCOUNTS 
WHERE ACCOUNT_ID = ?", 1)) {
+                try (ResultSet<SqlRow> rs = ses.execute(null, "DELETE FROM 
ACCOUNTS WHERE ACCOUNT_ID = ?", 1)) {
                     System.out.println("\n Removed accounts: " + 
rs.affectedRows());
                 }
 
@@ -207,7 +213,7 @@ public class SqlApiExample {
      * @param resultSet Async result set.
      * @return Operation future.
      */
-    private static CompletionStage<Void> fetchAllRowsInto(AsyncResultSet 
resultSet) {
+    private static CompletionStage<Void> 
fetchAllRowsInto(AsyncResultSet<SqlRow> resultSet) {
         //
         // Process current page.
         //
@@ -230,4 +236,10 @@ public class SqlApiExample {
         //
         return 
resultSet.fetchNextPage().thenCompose(SqlApiExample::fetchAllRowsInto);
     }
+
+    private static class AccountInfo {
+        String firstName;
+        String lastName;
+        double balance;
+    }
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java 
b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
index 41640149f9..e6ab18132d 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.sql;
 
 import java.util.Iterator;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -31,8 +33,14 @@ import org.jetbrains.annotations.Nullable;
  *
  * <p>Note: one and only one of following is possible: {@link #hasRowSet()} 
returns {@code true}, or {@link #wasApplied()} returns
  * {@code true}, or {@link #affectedRows()} return zero or higher value.
+ *
+ * @param <T> A type of the objects contained by this result set (when row set 
is present). This will be either {@link SqlRow}
+ *     if no explicit mapper is provided or a particular type defined by 
supplied mapper.
+ *
+ * @see Session#execute(Transaction, String, Object...)
+ * @see Session#execute(Transaction, Mapper, String, Object...)
  */
-public interface ResultSet extends Iterator<SqlRow>, AutoCloseable {
+public interface ResultSet<T> extends Iterator<T>, AutoCloseable {
     /**
      * Returns metadata for the results if the result contains rows ({@link 
#hasRowSet()} returns {@code true}).
      *
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Session.java 
b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
index 3c3cfdc752..47beff4593 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -50,11 +51,11 @@ public interface Session extends AutoCloseable {
      * @return SQL query results set.
      * @throws SqlException If failed.
      */
-    default ResultSet execute(@Nullable Transaction transaction, String query, 
@Nullable Object... arguments) {
+    default ResultSet<SqlRow> execute(@Nullable Transaction transaction, 
String query, @Nullable Object... arguments) {
         Objects.requireNonNull(query);
 
         try {
-            return new SyncResultSetAdapter(executeAsync(transaction, query, 
arguments).join());
+            return new SyncResultSetAdapter<>(executeAsync(transaction, query, 
arguments).join());
         } catch (CompletionException e) {
             throw IgniteException.wrap(e);
         }
@@ -68,11 +69,59 @@ public interface Session extends AutoCloseable {
      * @param arguments Arguments for the statement.
      * @return SQL query results set.
      */
-    default ResultSet execute(@Nullable Transaction transaction, Statement 
statement, @Nullable Object... arguments) {
+    default ResultSet<SqlRow> execute(@Nullable Transaction transaction, 
Statement statement, @Nullable Object... arguments) {
         Objects.requireNonNull(statement);
 
         try {
-            return new SyncResultSetAdapter(executeAsync(transaction, 
statement, arguments).join());
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
statement, arguments).join());
+        } catch (CompletionException e) {
+            throw IgniteException.wrap(e);
+        }
+    }
+
+    /**
+     * Executes single SQL statement and maps results to objects with the 
provided mapper.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
+     * @param query SQL query template.
+     * @param arguments Arguments for the statement.
+     * @param <T> A type of object contained in result set.
+     * @return SQL query results set.
+     */
+    default <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            String query,
+            @Nullable Object... arguments) {
+        Objects.requireNonNull(query);
+
+        try {
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, query, arguments).join());
+        } catch (CompletionException e) {
+            throw IgniteException.wrap(e);
+        }
+    }
+
+    /**
+     * Executes single SQL statement and maps results to objects with the 
provided mapper.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
+     * @param statement SQL statement to execute.
+     * @param arguments Arguments for the statement.
+     * @param <T> A type of object contained in result set.
+     * @return SQL query results set.
+     */
+    default <T> ResultSet<T> execute(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments) {
+        Objects.requireNonNull(statement);
+
+        try {
+            return new SyncResultSetAdapter<>(executeAsync(transaction, 
mapper, statement, arguments).join());
         } catch (CompletionException e) {
             throw IgniteException.wrap(e);
         }
@@ -87,7 +136,7 @@ public interface Session extends AutoCloseable {
      * @return Operation future.
      * @throws SqlException If failed.
      */
-    CompletableFuture<AsyncResultSet> executeAsync(@Nullable Transaction 
transaction, String query, @Nullable Object... arguments);
+    CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments);
 
     /**
      * Executes SQL statement in an asynchronous way.
@@ -98,7 +147,42 @@ public interface Session extends AutoCloseable {
      * @return Operation future.
      * @throws SqlException If failed.
      */
-    CompletableFuture<AsyncResultSet> executeAsync(@Nullable Transaction 
transaction, Statement statement, @Nullable Object... arguments);
+    CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            Statement statement,
+            @Nullable Object... arguments);
+
+    /**
+     * Executes SQL statement in an asynchronous way and maps results to 
objects with the provided mapper.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
+     * @param query SQL query template.
+     * @param arguments Arguments for the statement.
+     * @param <T> A type of object contained in result set.
+     * @return Operation future.
+     */
+    <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            String query,
+            @Nullable Object... arguments);
+
+    /**
+     * Executes SQL statement in an asynchronous way and maps results to 
objects with the provided mapper.
+     *
+     * @param transaction Transaction to execute the statement within or 
{@code null}.
+     * @param mapper Mapper that defines the row type and the way to map 
columns to the type members. See {@link Mapper#of}.
+     * @param statement SQL statement to execute.
+     * @param arguments Arguments for the statement.
+     * @param <T> A type of object contained in result set.
+     * @return Operation future.
+     */
+    <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments);
 
     /**
      * Executes SQL query in a reactive way.
diff --git 
a/modules/api/src/main/java/org/apache/ignite/sql/SyncResultSetAdapter.java 
b/modules/api/src/main/java/org/apache/ignite/sql/SyncResultSetAdapter.java
index 02b86e8b3a..603e986a15 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/SyncResultSetAdapter.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/SyncResultSetAdapter.java
@@ -29,19 +29,19 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Synchronous wrapper over {@link org.apache.ignite.sql.async.AsyncResultSet}.
  */
-class SyncResultSetAdapter implements ResultSet {
+class SyncResultSetAdapter<T> implements ResultSet<T> {
     /** Wrapped async result set. */
-    private final AsyncResultSet ars;
+    private final AsyncResultSet<T> ars;
 
     /** Iterator. */
-    private final IteratorImpl it;
+    private final IteratorImpl<T> it;
 
     /**
      * Constructor.
      *
      * @param ars Asynchronous result set.
      */
-    SyncResultSetAdapter(AsyncResultSet ars) {
+    SyncResultSetAdapter(AsyncResultSet<T> ars) {
         assert ars != null;
 
         this.ars = ars;
@@ -94,7 +94,7 @@ class SyncResultSetAdapter implements ResultSet {
 
     /** {@inheritDoc} */
     @Override
-    public SqlRow next() {
+    public T next() {
         if (it == null) {
             throw new NoRowSetExpectedException();
         }
@@ -102,14 +102,14 @@ class SyncResultSetAdapter implements ResultSet {
         return it.next();
     }
 
-    private static class IteratorImpl implements Iterator<SqlRow> {
-        private AsyncResultSet curRes;
+    private static class IteratorImpl<T> implements Iterator<T> {
+        private AsyncResultSet<T> curRes;
 
-        private CompletionStage<? extends AsyncResultSet> nextPageStage;
+        private CompletionStage<? extends AsyncResultSet<T>> nextPageStage;
 
-        private Iterator<SqlRow> curPage;
+        private Iterator<T> curPage;
 
-        IteratorImpl(AsyncResultSet ars) {
+        IteratorImpl(AsyncResultSet<T> ars) {
             curRes = ars;
 
             advance();
@@ -145,7 +145,7 @@ class SyncResultSetAdapter implements ResultSet {
         }
 
         @Override
-        public SqlRow next() {
+        public T next() {
             if (!hasNext()) {
                 throw new NoSuchElementException();
             }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java 
b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
index 4fc166f6a4..92cdfd1d35 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
@@ -21,7 +21,10 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.sql.NoRowSetExpectedException;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -46,9 +49,14 @@ import org.jetbrains.annotations.Nullable;
  *      }
  * </code></pre>
  *
+ * @param <T> A type of the objects contained by this result set (when row set 
is present). This will be either {@link SqlRow}
+ *     if no explicit mapper is provided or a particular type defined by 
supplied mapper.
+ *
  * @see ResultSet
+ * @see Session#executeAsync(Transaction, String, Object...)
+ * @see Session#executeAsync(Transaction, Mapper, String, Object...)
  */
-public interface AsyncResultSet {
+public interface AsyncResultSet<T> {
     /**
      * Returns metadata for the results if the result contains rows ({@link 
#hasRowSet()} returns {@code true}), or {@code null} if
      * inapplicable.
@@ -98,7 +106,7 @@ public interface AsyncResultSet {
      * @return Iterable over rows.
      * @throws NoRowSetExpectedException if no row set is expected as a query 
result.
      */
-    Iterable<SqlRow> currentPage();
+    Iterable<T> currentPage();
 
     /**
      * Returns the current page size if the query return rows.
@@ -118,7 +126,7 @@ public interface AsyncResultSet {
      * @return Operation future.
      * @throws NoRowSetExpectedException if no row set is expected as a query 
result.
      */
-    CompletableFuture<? extends AsyncResultSet> fetchNextPage();
+    CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage();
 
     /**
      * Returns whether there are more pages of results.
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index 387ed74843..8a6487faca 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -29,7 +29,7 @@ import org.apache.ignite.sql.async.AsyncResultSet;
  * Common SQL request handling logic.
  */
 class ClientSqlCommon {
-    static void packCurrentPage(ClientMessagePacker out, AsyncResultSet 
asyncResultSet) {
+    static void packCurrentPage(ClientMessagePacker out, 
AsyncResultSet<SqlRow> asyncResultSet) {
         ResultSetMetadata meta = asyncResultSet.metadata();
         assert meta != null : "Metadata can't be null when row set is 
present.";
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
index b6d72eaa5c..4d4a28ca70 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.sql;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
 
 /**
@@ -26,7 +27,7 @@ import org.apache.ignite.sql.async.AsyncResultSet;
  */
 class ClientSqlResultSet {
     /** Result set. */
-    private final AsyncResultSet resultSet;
+    private final AsyncResultSet<SqlRow> resultSet;
 
     /** Session. */
     private final Session session;
@@ -37,7 +38,7 @@ class ClientSqlResultSet {
      * @param resultSet Result set.
      * @param session Session.
      */
-    public ClientSqlResultSet(AsyncResultSet resultSet, Session session) {
+    ClientSqlResultSet(AsyncResultSet<SqlRow> resultSet, Session session) {
         assert resultSet != null;
         assert session != null;
 
@@ -50,7 +51,7 @@ class ClientSqlResultSet {
      *
      * @return Result set.
      */
-    public AsyncResultSet resultSet() {
+    public AsyncResultSet<SqlRow> resultSet() {
         return resultSet;
     }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
index b289c7ae39..94ac524fe5 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
@@ -25,8 +25,17 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
 import org.apache.ignite.internal.client.ClientChannel;
+import org.apache.ignite.internal.client.proto.ClientColumnTypeConverter;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
 import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.client.proto.TuplePart;
+import org.apache.ignite.internal.client.table.ClientColumn;
+import org.apache.ignite.internal.client.table.ClientSchema;
+import org.apache.ignite.internal.marshaller.ClientMarshallerReader;
+import org.apache.ignite.internal.marshaller.Marshaller;
+import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.lang.ErrorGroups.Client;
+import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.ColumnMetadata;
 import org.apache.ignite.sql.CursorClosedException;
 import org.apache.ignite.sql.NoRowSetExpectedException;
@@ -34,12 +43,13 @@ import org.apache.ignite.sql.ResultSetMetadata;
 import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Client async result set.
  */
-class ClientAsyncResultSet implements AsyncResultSet {
+class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
     /** Channel. */
     private final ClientChannel ch;
 
@@ -58,8 +68,16 @@ class ClientAsyncResultSet implements AsyncResultSet {
     /** Metadata. */
     private final ResultSetMetadata metadata;
 
+    /** Marshaller. Not null when object mapping is used. */
+    @Nullable
+    private final Marshaller marshaller;
+
+    /** Mapper. */
+    @Nullable
+    private final Mapper<T> mapper;
+
     /** Rows. */
-    private volatile List<SqlRow> rows;
+    private volatile List<T> rows;
 
     /** More pages flag. */
     private volatile boolean hasMorePages;
@@ -72,8 +90,9 @@ class ClientAsyncResultSet implements AsyncResultSet {
      *
      * @param ch Channel.
      * @param in Unpacker.
+     * @param mapper Mapper.
      */
-    public ClientAsyncResultSet(ClientChannel ch, ClientMessageUnpacker in) {
+    ClientAsyncResultSet(ClientChannel ch, ClientMessageUnpacker in, @Nullable 
Mapper<T> mapper) {
         this.ch = ch;
 
         resourceId = in.tryUnpackNil() ? null : in.unpackLong();
@@ -83,6 +102,11 @@ class ClientAsyncResultSet implements AsyncResultSet {
         affectedRows = in.unpackLong();
         metadata = hasRowSet ? new ClientResultSetMetadata(in) : null;
 
+        this.mapper = mapper;
+        marshaller = metadata != null && mapper != null && mapper.targetType() 
!= SqlRow.class
+                ? marshaller(metadata, mapper)
+                : null;
+
         if (hasRowSet) {
             readRows(in);
         }
@@ -115,7 +139,7 @@ class ClientAsyncResultSet implements AsyncResultSet {
     /** {@inheritDoc} */
     @SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
     @Override
-    public Iterable<SqlRow> currentPage() {
+    public Iterable<T> currentPage() {
         requireResultSet();
 
         return rows;
@@ -131,7 +155,7 @@ class ClientAsyncResultSet implements AsyncResultSet {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<? extends AsyncResultSet> fetchNextPage() {
+    public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
         requireResultSet();
 
         if (closed) {
@@ -187,18 +211,35 @@ class ClientAsyncResultSet implements AsyncResultSet {
         int size = in.unpackArrayHeader();
         int rowSize = metadata.columns().size();
 
-        var res = new ArrayList<SqlRow>(size);
+        var res = new ArrayList<T>(size);
 
-        for (int i = 0; i < size; i++) {
-            var row = new ArrayList<>(rowSize);
-            var tupleReader = new BinaryTupleReader(rowSize, 
in.readBinaryUnsafe());
+        if (marshaller == null) {
+            for (int i = 0; i < size; i++) {
+                var row = new ArrayList<>(rowSize);
+                var tupleReader = new BinaryTupleReader(rowSize, 
in.readBinaryUnsafe());
 
-            for (int j = 0; j < rowSize; j++) {
-                var col = metadata.columns().get(j);
-                row.add(readValue(tupleReader, j, col));
-            }
+                for (int j = 0; j < rowSize; j++) {
+                    var col = metadata.columns().get(j);
+                    row.add(readValue(tupleReader, j, col));
+                }
 
-            res.add(new ClientSqlRow(row, metadata));
+                res.add((T) new ClientSqlRow(row, metadata));
+            }
+        } else {
+            try {
+                for (int i = 0; i < size; i++) {
+                    var tupleReader = new BinaryTupleReader(rowSize, 
in.readBinaryUnsafe());
+                    var reader = new ClientMarshallerReader(tupleReader);
+
+                    res.add((T) marshaller.readObject(reader, null));
+                }
+            } catch (MarshallerException e) {
+                assert mapper != null;
+                throw new IgniteException(
+                        Client.CONFIGURATION_ERR,
+                        "Failed to map SQL result set to type '" + 
mapper.targetType() + "': " + e.getMessage(),
+                        e);
+            }
         }
 
         rows = Collections.unmodifiableList(res);
@@ -271,4 +312,28 @@ class ClientAsyncResultSet implements AsyncResultSet {
                 throw new UnsupportedOperationException("Unsupported column 
type: " + col.type());
         }
     }
+
+    private static <T> Marshaller marshaller(ResultSetMetadata metadata, 
Mapper<T> mapper) {
+        var schemaColumns = new ClientColumn[metadata.columns().size()];
+        List<ColumnMetadata> columns = metadata.columns();
+
+        for (int i = 0; i < columns.size(); i++) {
+            ColumnMetadata metaColumn = columns.get(i);
+
+            var schemaColumn = new ClientColumn(
+                    metaColumn.name(),
+                    
ClientColumnTypeConverter.columnTypeToOrdinal(metaColumn.type()),
+                    metaColumn.nullable(),
+                    true,
+                    false,
+                    i,
+                    metaColumn.scale(),
+                    metaColumn.precision());
+
+            schemaColumns[i] = schemaColumn;
+        }
+
+        var schema = new ClientSchema(0, schemaColumns);
+        return schema.getMarshaller(mapper, TuplePart.KEY_AND_VAL);
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 58da2833b6..d8d062caea 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -33,9 +33,11 @@ import 
org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
 import org.apache.ignite.internal.client.proto.ClientOp;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -43,6 +45,8 @@ import org.jetbrains.annotations.Nullable;
  * Client SQL session.
  */
 public class ClientSession implements Session {
+    private static final Mapper<SqlRow> sqlRowMapper = () -> SqlRow.class;
+
     private final ReliableChannel ch;
 
     @Nullable
@@ -88,7 +92,10 @@ public class ClientSession implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments) {
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            String query,
+            @Nullable Object... arguments) {
         Objects.requireNonNull(query);
 
         ClientStatement statement = new ClientStatement(query, null, null, 
null, null);
@@ -98,8 +105,32 @@ public class ClientSession implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<AsyncResultSet> executeAsync(
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            Statement statement,
+            @Nullable Object... arguments) {
+        return executeAsync(transaction, sqlRowMapper, statement, arguments);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            String query,
+            @Nullable Object... arguments) {
+        Objects.requireNonNull(query);
+
+        ClientStatement statement = new ClientStatement(query, null, null, 
null, null);
+
+        return executeAsync(transaction, mapper, statement, arguments);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
             @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
             Statement statement,
             @Nullable Object... arguments) {
         Objects.requireNonNull(statement);
@@ -124,7 +155,7 @@ public class ClientSession implements Session {
             w.out().packString(clientStatement.query());
 
             w.out().packObjectArrayAsBinaryTuple(arguments);
-        }, r -> new ClientAsyncResultSet(r.clientChannel(), r.in()));
+        }, r -> new ClientAsyncResultSet(r.clientChannel(), r.in(), mapper));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
index a9ab9c30a4..c3ca203b9d 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
@@ -55,7 +55,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
     @Test
     public void testExecuteAsync() {
         Session session = client.sql().createSession();
-        AsyncResultSet resultSet = session.executeAsync(null, "SELECT 
1").join();
+        AsyncResultSet<SqlRow> resultSet = session.executeAsync(null, "SELECT 
1").join();
 
         assertTrue(resultSet.hasRowSet());
         assertFalse(resultSet.wasApplied());
@@ -68,7 +68,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
     @Test
     public void testExecute() {
         Session session = client.sql().createSession();
-        ResultSet resultSet = session.execute(null, "SELECT 1");
+        ResultSet<SqlRow> resultSet = session.execute(null, "SELECT 1");
 
         assertTrue(resultSet.hasRowSet());
         assertFalse(resultSet.wasApplied());
@@ -87,7 +87,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
                 .property("prop2", "2")
                 .build();
 
-        AsyncResultSet resultSet = session.executeAsync(null, "SELECT 
PROPS").join();
+        AsyncResultSet<SqlRow> resultSet = session.executeAsync(null, "SELECT 
PROPS").join();
 
         Map<String, Object> props = 
StreamSupport.stream(resultSet.currentPage().spliterator(), false)
                 .collect(Collectors.toMap(x -> x.stringValue(0), x -> 
x.value(1)));
@@ -118,7 +118,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
                 .property("prop3", "3")
                 .build();
 
-        AsyncResultSet resultSet = session.executeAsync(null, 
statement).join();
+        AsyncResultSet<SqlRow> resultSet = session.executeAsync(null, 
statement).join();
 
         Map<String, Object> props = 
StreamSupport.stream(resultSet.currentPage().spliterator(), false)
                 .collect(Collectors.toMap(x -> x.stringValue(0), x -> 
x.value(1)));
@@ -134,7 +134,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
     @Test
     public void testMetadata() {
         Session session = client.sql().createSession();
-        ResultSet resultSet = session.execute(null, "SELECT META");
+        ResultSet<SqlRow> resultSet = session.execute(null, "SELECT META");
         ResultSetMetadata meta = resultSet.metadata();
         SqlRow row = resultSet.next();
 
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
index dadc944946..1cdab40e1a 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
@@ -26,9 +26,11 @@ import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -75,13 +77,16 @@ public class FakeSession implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments) {
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            String query,
+            @Nullable Object... arguments) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<AsyncResultSet> executeAsync(
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
             Statement statement,
             @Nullable Object... arguments) {
@@ -90,6 +95,23 @@ public class FakeSession implements Session {
         return CompletableFuture.completedFuture(new FakeAsyncResultSet(this, 
transaction, statement, arguments));
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(@Nullable 
Transaction transaction, @Nullable Mapper<T> mapper,
+            String query, @Nullable Object... arguments) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments) {
+        throw new UnsupportedOperationException();
+    }
+
     /** {@inheritDoc} */
     @Override
     public ReactiveResultSet executeReactive(@Nullable Transaction 
transaction, String query, @Nullable Object... arguments) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index 0d12ea192c..67bfde67c5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -55,6 +55,7 @@ import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.TestInfo;
 
@@ -390,9 +391,9 @@ public class Cluster {
      * @param extractor Used to extract the result from a {@link ResultSet}.
      * @return Query result.
      */
-    public <T> T query(int nodeIndex, String sql, Function<ResultSet, T> 
extractor) {
+    public <T> T query(int nodeIndex, String sql, Function<ResultSet<SqlRow>, 
T> extractor) {
         return doInSession(nodeIndex, session -> {
-            try (ResultSet resultSet = session.execute(null, sql)) {
+            try (ResultSet<SqlRow> resultSet = session.execute(null, sql)) {
                 return extractor.apply(resultSet);
             }
         });
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 015941c00e..4de963abce 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -193,14 +193,14 @@ class ItTableRaftSnapshotsTest extends 
BaseIgniteAbstractTest {
                 || hasCause(e, SqlValidatorException.class, "Object 'TEST' not 
found");
     }
 
-    private <T> T queryWithRetry(int nodeIndex, String sql, 
Function<ResultSet, T> extractor) {
+    private <T> T queryWithRetry(int nodeIndex, String sql, 
Function<ResultSet<SqlRow>, T> extractor) {
         return withRetry(() -> cluster.query(nodeIndex, sql, extractor));
     }
 
     /**
      * Reads all rows from TEST table.
      */
-    private static List<IgniteBiTuple<Integer, String>> readRows(ResultSet rs) 
{
+    private static List<IgniteBiTuple<Integer, String>> 
readRows(ResultSet<SqlRow> rs) {
         List<IgniteBiTuple<Integer, String>> rows = new ArrayList<>();
 
         while (rs.hasNext()) {
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 9ba9d228b5..9874661e0b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.Tuple;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -212,7 +213,7 @@ public class ItDataSchemaSyncTest extends 
IgniteAbstractTest {
 
         Session ses = ignite1.sql().createSession();
 
-        ResultSet res = ses.execute(null, "SELECT valint2 FROM tbl1");
+        ResultSet<SqlRow> res = ses.execute(null, "SELECT valint2 FROM tbl1");
 
         for (int i = 0; i < 10; ++i) {
             assertNotNull(res.next().iterator().next());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
index a53531595a..2ef5ff273b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
@@ -38,9 +38,12 @@ import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Thin client SQL integration test.
@@ -48,7 +51,7 @@ import org.junit.jupiter.api.Test;
 public class ItThinClientSqlTest extends ItAbstractThinClientTest {
     @Test
     void testExecuteAsyncSimpleSelect() {
-        AsyncResultSet resultSet = client().sql()
+        AsyncResultSet<SqlRow> resultSet = client().sql()
                 .createSession()
                 .executeAsync(null, "select 1 as num, 'hello' as str")
                 .join();
@@ -74,7 +77,7 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
 
     @Test
     void testExecuteSimpleSelect() {
-        ResultSet resultSet = client().sql()
+        ResultSet<SqlRow> resultSet = client().sql()
                 .createSession()
                 .execute(null, "select 1 as num, 'hello' as str");
 
@@ -146,7 +149,7 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
             ses.execute(null, "INSERT INTO testExecuteDdlDml VALUES (?, ?)", 
i, "hello " + i);
         }
 
-        ResultSet selectRes = ses.execute(null, "SELECT * FROM 
testExecuteDdlDml ORDER BY ID");
+        ResultSet<SqlRow> selectRes = ses.execute(null, "SELECT * FROM 
testExecuteDdlDml ORDER BY ID");
 
         var rows = new ArrayList<SqlRow>();
         selectRes.forEachRemaining(rows::add);
@@ -186,7 +189,7 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
         }
 
         // Query data.
-        AsyncResultSet selectRes = session
+        AsyncResultSet<SqlRow> selectRes = session
                 .executeAsync(null, "SELECT VAL as MYVALUE, ID, ID + 1 FROM 
testExecuteAsyncDdlDml ORDER BY ID")
                 .join();
 
@@ -254,7 +257,7 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
         }
 
         // Query data.
-        ResultSet selectRes = session
+        ResultSet<SqlRow> selectRes = session
                 .execute(null, "SELECT VAL as MYVALUE, ID, ID + 1 FROM 
testExecuteDdlDml ORDER BY ID");
 
         assertTrue(selectRes.hasRowSet());
@@ -320,7 +323,7 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
 
         Statement statement = 
client().sql().statementBuilder().pageSize(4).query("SELECT ID FROM 
testFetchNextPage ORDER BY ID").build();
 
-        AsyncResultSet asyncResultSet = session.executeAsync(null, 
statement).join();
+        AsyncResultSet<SqlRow> asyncResultSet = session.executeAsync(null, 
statement).join();
 
         assertEquals(4, asyncResultSet.currentPageSize());
         assertTrue(asyncResultSet.hasMorePages());
@@ -364,4 +367,59 @@ public class ItThinClientSqlTest extends 
ItAbstractThinClientTest {
         var res = session.executeAsync(null, "SELECT VAL FROM testTx").join();
         assertEquals(1, res.currentPage().iterator().next().intValue(0));
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testResultSetMapping(boolean useStatement) {
+        Session session = client().sql().createSession();
+        String query = "select 123 + ? as num, 'Hello!' as str";
+
+        ResultSet<Pojo> resultSet = useStatement
+                ? session.execute(null, Mapper.of(Pojo.class), 
client().sql().statementBuilder().query(query).build(), 10)
+                : session.execute(null, Mapper.of(Pojo.class), query, 10);
+
+        assertTrue(resultSet.hasRowSet());
+
+        Pojo row = resultSet.next();
+
+        assertEquals(133, row.num);
+        assertEquals("Hello!", row.str);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testResultSetMappingAsync(boolean useStatement) {
+        Session session = client().sql().createSession();
+        String query = "select 1 as num, concat('hello ', ?) as str";
+
+        AsyncResultSet<Pojo> resultSet = useStatement
+                ? session.executeAsync(null, Mapper.of(Pojo.class), 
client().sql().statementBuilder().query(query).build(), "world").join()
+                : session.executeAsync(null, Mapper.of(Pojo.class), query, 
"world").join();
+
+        assertTrue(resultSet.hasRowSet());
+        assertEquals(1, resultSet.currentPageSize());
+
+        Pojo row = resultSet.currentPage().iterator().next();
+
+        assertEquals(1, row.num);
+        assertEquals("hello world", row.str);
+    }
+
+
+    @Test
+    void testResultSetMappingColumnNameMismatch() {
+        String query = "select 1 as foo, 2 as bar";
+
+        ResultSet<Pojo> resultSet = 
client().sql().createSession().execute(null, Mapper.of(Pojo.class), query);
+        Pojo row = resultSet.next();
+
+        assertEquals(0, row.num);
+        assertNull(row.str);
+    }
+
+    private static class Pojo {
+        public long num;
+
+        public String str;
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index e4b1242983..3ea81c2c2e 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -235,7 +235,7 @@ public class ItSqlAsynchronousApiTest extends 
AbstractBasicIntegrationTest {
         int txPrevCnt = txManagerInternal.finished();
 
         for (int i = 0; i < ROW_COUNT; ++i) {
-            CompletableFuture<AsyncResultSet> fut = ses.executeAsync(null, 
"CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i);
+            CompletableFuture<AsyncResultSet<SqlRow>> fut = 
ses.executeAsync(null, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i);
 
             AsyncResultSet asyncRes = null;
 
@@ -444,7 +444,7 @@ public class ItSqlAsynchronousApiTest extends 
AbstractBasicIntegrationTest {
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().build();
 
-        AsyncResultSet rs = await(ses.executeAsync(null, "SELECT COL1, COL0 
FROM TEST"));
+        AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1, 
COL0 FROM TEST"));
 
         // Validate columns metadata.
         ResultSetMetadata meta = rs.metadata();
@@ -488,7 +488,7 @@ public class ItSqlAsynchronousApiTest extends 
AbstractBasicIntegrationTest {
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().build();
 
-        AsyncResultSet ars = await(ses.executeAsync(null, "SELECT 1 as COL_A, 
2 as COL_B"));
+        AsyncResultSet<SqlRow> ars = await(ses.executeAsync(null, "SELECT 1 as 
COL_A, 2 as COL_B"));
 
         SqlRow r = CollectionUtils.first(ars.currentPage());
 
@@ -524,15 +524,15 @@ public class ItSqlAsynchronousApiTest extends 
AbstractBasicIntegrationTest {
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().defaultPageSize(1).build();
 
-        AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM 
TEST ORDER BY ID"));
+        AsyncResultSet<SqlRow> ars0 = await(ses.executeAsync(null, "SELECT ID 
FROM TEST ORDER BY ID"));
         var p0 = ars0.currentPage();
-        AsyncResultSet ars1 = await(ars0.fetchNextPage());
+        AsyncResultSet<SqlRow> ars1 = await(ars0.fetchNextPage());
         var p1 = ars1.currentPage();
-        AsyncResultSet ars2 = 
await(ars1.fetchNextPage().toCompletableFuture());
+        AsyncResultSet<SqlRow> ars2 = 
await(ars1.fetchNextPage().toCompletableFuture());
         var p2 = ars2.currentPage();
-        AsyncResultSet ars3 = await(ars1.fetchNextPage());
+        AsyncResultSet<SqlRow> ars3 = await(ars1.fetchNextPage());
         var p3 = ars3.currentPage();
-        AsyncResultSet ars4 = await(ars0.fetchNextPage());
+        AsyncResultSet<SqlRow> ars4 = await(ars0.fetchNextPage());
         var p4 = ars4.currentPage();
 
         assertSame(ars0, ars1);
@@ -689,12 +689,12 @@ public class ItSqlAsynchronousApiTest extends 
AbstractBasicIntegrationTest {
     }
 
     private static void checkDdl(boolean expectedApplied, Session ses, String 
sql, Transaction tx) {
-        CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
+        CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(
                 tx,
                 sql
         );
 
-        AsyncResultSet asyncRes = await(fut);
+        AsyncResultSet<SqlRow> asyncRes = await(fut);
 
         assertEquals(expectedApplied, asyncRes.wasApplied());
         assertFalse(asyncRes.hasMorePages());
@@ -736,7 +736,7 @@ public class ItSqlAsynchronousApiTest extends 
AbstractBasicIntegrationTest {
     }
 
     static class TestPageProcessor implements
-            Function<AsyncResultSet, CompletionStage<AsyncResultSet>> {
+            Function<AsyncResultSet<SqlRow>, 
CompletionStage<AsyncResultSet<SqlRow>>> {
         private int expectedPages;
 
         private final List<SqlRow> res = new ArrayList<>();
@@ -746,7 +746,7 @@ public class ItSqlAsynchronousApiTest extends 
AbstractBasicIntegrationTest {
         }
 
         @Override
-        public CompletionStage<AsyncResultSet> apply(AsyncResultSet rs) {
+        public CompletionStage<AsyncResultSet<SqlRow>> 
apply(AsyncResultSet<SqlRow> rs) {
             expectedPages--;
 
             assertTrue(rs.hasRowSet());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index bd59046349..976bc792a7 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlBatchException;
 import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.table.Table;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
@@ -211,7 +212,7 @@ public class ItSqlSynchronousApiTest extends 
AbstractBasicIntegrationTest {
         IgniteSql sql = igniteSql();
         Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT / 
4).build();
 
-        ResultSet rs = ses.execute(null, "SELECT ID FROM TEST");
+        ResultSet<SqlRow> rs = ses.execute(null, "SELECT ID FROM TEST");
 
         Set<Integer> set = Streams.stream(rs).map(r -> 
r.intValue(0)).collect(Collectors.toSet());
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/SqlScriptRunner.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/SqlScriptRunner.java
index b2af0e3c52..db130dae79 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/SqlScriptRunner.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/SqlScriptRunner.java
@@ -52,6 +52,7 @@ import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -168,7 +169,7 @@ public class SqlScriptRunner {
         log.info("Execute: " + sql);
 
         try (Session s = ignSql.createSession()) {
-            try (ResultSet rs = s.execute(null, sql)) {
+            try (ResultSet<SqlRow> rs = s.execute(null, sql)) {
                 if (rs.hasRowSet()) {
                     return Streams.stream(rs).map(
                                     r -> {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index a1d46be888..270d5c71ba 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -45,7 +45,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Asynchronous result set implementation.
  */
-public class AsyncResultSetImpl implements AsyncResultSet {
+public class AsyncResultSetImpl<T> implements AsyncResultSet<T> {
     private static final CompletableFuture<? extends AsyncResultSet> 
HAS_NO_MORE_PAGE_FUTURE =
             CompletableFuture.failedFuture(new 
SqlException(CURSOR_NO_MORE_PAGES_ERR, "There are no more pages."));
 
@@ -107,13 +107,14 @@ public class AsyncResultSetImpl implements AsyncResultSet 
{
 
     /** {@inheritDoc} */
     @Override
-    public Iterable<SqlRow> currentPage() {
+    public Iterable<T> currentPage() {
         requireResultSet();
 
         final Iterator<List<Object>> it0 = curPage.items().iterator();
         final ResultSetMetadata meta0 = cur.metadata();
 
-        return () -> new TransformingIterator<>(it0, (item) -> new 
SqlRowImpl(item, meta0));
+        // TODO: IGNITE-18695 map rows to objects when mapper is provided.
+        return () -> new TransformingIterator<>(it0, (item) -> (T) new 
SqlRowImpl(item, meta0));
     }
 
     /** {@inheritDoc} */
@@ -126,11 +127,11 @@ public class AsyncResultSetImpl implements AsyncResultSet 
{
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<? extends AsyncResultSet> fetchNextPage() {
+    public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
         requireResultSet();
 
         if (!hasMorePages()) {
-            return HAS_NO_MORE_PAGE_FUTURE;
+            return (CompletableFuture<? extends AsyncResultSet<T>>) 
HAS_NO_MORE_PAGE_FUTURE;
         } else {
             return cur.requestNextAsync(pageSize)
                     .thenApply(page -> {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index 743a56718b..a7c019a174 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -50,9 +50,11 @@ import org.apache.ignite.sql.BatchedArguments;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.SqlBatchException;
 import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.sql.SqlRow;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.sql.async.AsyncResultSet;
 import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
 import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 
@@ -154,7 +156,10 @@ public class SessionImpl implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<AsyncResultSet> executeAsync(@Nullable 
Transaction transaction, String query, @Nullable Object... arguments) {
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+            @Nullable Transaction transaction,
+            String query,
+            @Nullable Object... arguments) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(new 
SqlException(SESSION_NOT_FOUND_ERR, "Session is closed."));
         }
@@ -162,7 +167,7 @@ public class SessionImpl implements Session {
         try {
             QueryContext ctx = QueryContext.of(transaction);
 
-            CompletableFuture<AsyncResultSet> result = 
qryProc.querySingleAsync(sessionId, ctx, query, arguments)
+            CompletableFuture<AsyncResultSet<SqlRow>> result = 
qryProc.querySingleAsync(sessionId, ctx, query, arguments)
                     .thenCompose(cur -> cur.requestNextAsync(pageSize)
                             .thenApply(
                                     batchRes -> new AsyncResultSetImpl(
@@ -190,7 +195,7 @@ public class SessionImpl implements Session {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<AsyncResultSet> executeAsync(
+    public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
             @Nullable Transaction transaction,
             Statement statement,
             @Nullable Object... arguments
@@ -199,6 +204,25 @@ public class SessionImpl implements Session {
         return executeAsync(transaction, statement.query(), arguments);
     }
 
+    /** {@inheritDoc} */
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(@Nullable 
Transaction transaction, @Nullable Mapper<T> mapper,
+            String query, @Nullable Object... arguments) {
+        // TODO: IGNITE-18695.
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+            @Nullable Transaction transaction,
+            @Nullable Mapper<T> mapper,
+            Statement statement,
+            @Nullable Object... arguments) {
+        // TODO: IGNITE-18695.
+        throw new UnsupportedOperationException("Not implemented yet.");
+    }
+
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
index 946a0a5148..6b7a652666 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
@@ -129,7 +129,7 @@ public class IgniteSqlApiTest {
         final Session sess = igniteSql.createSession();
 
         // Execute DDL.
-        ResultSet rs = sess.execute(null, "CREATE TABLE IF NOT EXITS tbl (id 
INT PRIMARY KEY, val VARCHAR)");
+        ResultSet<SqlRow> rs = sess.execute(null, "CREATE TABLE IF NOT EXITS 
tbl (id INT PRIMARY KEY, val VARCHAR)");
 
         assertTrue(rs.wasApplied());
         assertFalse(rs.hasRowSet());
@@ -179,7 +179,7 @@ public class IgniteSqlApiTest {
 
         igniteTx.runInTransaction(tx -> {
             // Execute DML in tx.
-            ResultSet rs = sess.execute(tx, "INSERT INTO tbl VALUES (?, ?)", 
1, "str1");
+            ResultSet<SqlRow> rs = sess.execute(tx, "INSERT INTO tbl VALUES 
(?, ?)", 1, "str1");
 
             assertEquals(1, rs.affectedRows());
 
@@ -283,7 +283,7 @@ public class IgniteSqlApiTest {
         KeyValueView<Tuple, Tuple> table = getTable();
 
         class AsyncPageProcessor implements
-                Function<AsyncResultSet, CompletionStage<AsyncResultSet>> {
+                Function<AsyncResultSet<SqlRow>, 
CompletionStage<AsyncResultSet<SqlRow>>> {
             private final Transaction tx0;
 
             public AsyncPageProcessor(Transaction tx0) {
@@ -291,7 +291,7 @@ public class IgniteSqlApiTest {
             }
 
             @Override
-            public CompletionStage<AsyncResultSet> apply(AsyncResultSet rs) {
+            public CompletionStage<AsyncResultSet<SqlRow>> 
apply(AsyncResultSet<SqlRow> rs) {
                 for (SqlRow row : rs.currentPage()) {
                     table.getAsync(tx0, Tuple.create().set("id", 
row.intValue(0)));
                 }
@@ -348,7 +348,7 @@ public class IgniteSqlApiTest {
     @Disabled
     @Test
     public void testMetadata() {
-        ResultSet rs = igniteSql.createSession()
+        ResultSet<SqlRow> rs = igniteSql.createSession()
                 .execute(null, "SELECT id, val FROM tbl");
 
         SqlRow row = rs.next();

Reply via email to