This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 91e11ecb9e IGNITE-18392 Add SQL ResultSet object mapping (#1616)
91e11ecb9e is described below
commit 91e11ecb9e85b272d8947a79aec55bd5dbe40d65
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Fri Feb 3 07:42:31 2023 +0200
IGNITE-18392 Add SQL ResultSet object mapping (#1616)
Add generic `<T> ResultSet<T> execute` API, which maps SQL result set data
to user type.
Before that, we could insert and retrieve data in a strongly typed way with
object mapping via `RecordView<T>` API, but SQL results could only be accessed
in a weakly typed way through `Tuple`. This change fixes the asymmetry.
---
.../apache/ignite/example/sql/SqlApiExample.java | 32 +++++---
.../main/java/org/apache/ignite/sql/ResultSet.java | 10 ++-
.../main/java/org/apache/ignite/sql/Session.java | 96 ++++++++++++++++++++--
.../apache/ignite/sql/SyncResultSetAdapter.java | 22 ++---
.../apache/ignite/sql/async/AsyncResultSet.java | 14 +++-
.../handler/requests/sql/ClientSqlCommon.java | 2 +-
.../handler/requests/sql/ClientSqlResultSet.java | 7 +-
.../internal/client/sql/ClientAsyncResultSet.java | 93 +++++++++++++++++----
.../ignite/internal/client/sql/ClientSession.java | 37 ++++++++-
.../org/apache/ignite/client/ClientSqlTest.java | 10 +--
.../apache/ignite/client/fakes/FakeSession.java | 26 +++++-
.../java/org/apache/ignite/internal/Cluster.java | 5 +-
.../raftsnapshot/ItTableRaftSnapshotsTest.java | 4 +-
.../internal/runner/app/ItDataSchemaSyncTest.java | 3 +-
.../runner/app/client/ItThinClientSqlTest.java | 70 ++++++++++++++--
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 24 +++---
.../internal/sql/api/ItSqlSynchronousApiTest.java | 3 +-
.../ignite/internal/sqllogic/SqlScriptRunner.java | 3 +-
.../internal/sql/api/AsyncResultSetImpl.java | 11 +--
.../ignite/internal/sql/api/SessionImpl.java | 30 ++++++-
.../internal/sql/engine/IgniteSqlApiTest.java | 10 +--
21 files changed, 415 insertions(+), 97 deletions(-)
diff --git
a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
index a5ad4cfb3c..0bc15b1e95 100644
--- a/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
+++ b/examples/src/main/java/org/apache/ignite/example/sql/SqlApiExample.java
@@ -27,6 +27,7 @@ import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
/**
* Examples of using SQL API.
@@ -125,7 +126,7 @@ public class SqlApiExample {
System.out.println("\nAll accounts:");
- try (ResultSet rs = ses.execute(null,
+ try (ResultSet<SqlRow> rs = ses.execute(null,
"SELECT a.FIRST_NAME, a.LAST_NAME, c.NAME FROM
ACCOUNTS a "
+ "INNER JOIN CITIES c on c.ID = a.CITY_ID
ORDER BY a.ACCOUNT_ID")) {
while (rs.hasNext()) {
@@ -146,16 +147,21 @@ public class SqlApiExample {
System.out.println("\nAccounts with balance lower than
1,500:");
- try (ResultSet rs = ses.execute(null,
- "SELECT a.FIRST_NAME, a.LAST_NAME, a.BALANCE FROM
ACCOUNTS a WHERE a.BALANCE < 1500.0 "
- + "ORDER BY a.ACCOUNT_ID")) {
+ Statement statement = client.sql().statementBuilder()
+ .query("SELECT a.FIRST_NAME as firstName, a.LAST_NAME
as lastName, a.BALANCE FROM ACCOUNTS a "
+ + "WHERE a.BALANCE < 1500.0 "
+ + "ORDER BY a.ACCOUNT_ID")
+ .build();
+
+ // POJO mapping.
+ try (ResultSet<AccountInfo> rs = ses.execute(null,
Mapper.of(AccountInfo.class), statement)) {
while (rs.hasNext()) {
- SqlRow row = rs.next();
+ AccountInfo row = rs.next();
System.out.println(" "
- + row.stringValue(1) + ", "
- + row.stringValue(2) + ", "
- + row.stringValue(3));
+ + row.firstName + ", "
+ + row.lastName + ", "
+ + row.balance);
}
}
@@ -167,7 +173,7 @@ public class SqlApiExample {
System.out.println("\nDeleting one of the accounts...");
- try (ResultSet rs = ses.execute(null, "DELETE FROM ACCOUNTS
WHERE ACCOUNT_ID = ?", 1)) {
+ try (ResultSet<SqlRow> rs = ses.execute(null, "DELETE FROM
ACCOUNTS WHERE ACCOUNT_ID = ?", 1)) {
System.out.println("\n Removed accounts: " +
rs.affectedRows());
}
@@ -207,7 +213,7 @@ public class SqlApiExample {
* @param resultSet Async result set.
* @return Operation future.
*/
- private static CompletionStage<Void> fetchAllRowsInto(AsyncResultSet
resultSet) {
+ private static CompletionStage<Void>
fetchAllRowsInto(AsyncResultSet<SqlRow> resultSet) {
//
// Process current page.
//
@@ -230,4 +236,10 @@ public class SqlApiExample {
//
return
resultSet.fetchNextPage().thenCompose(SqlApiExample::fetchAllRowsInto);
}
+
+ private static class AccountInfo {
+ String firstName;
+ String lastName;
+ double balance;
+ }
}
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
index 41640149f9..e6ab18132d 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/ResultSet.java
@@ -18,6 +18,8 @@
package org.apache.ignite.sql;
import java.util.Iterator;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
/**
@@ -31,8 +33,14 @@ import org.jetbrains.annotations.Nullable;
*
* <p>Note: one and only one of following is possible: {@link #hasRowSet()}
returns {@code true}, or {@link #wasApplied()} returns
* {@code true}, or {@link #affectedRows()} return zero or higher value.
+ *
+ * @param <T> A type of the objects contained by this result set (when row set
is present). This will be either {@link SqlRow}
+ * if no explicit mapper is provided or a particular type defined by
supplied mapper.
+ *
+ * @see Session#execute(Transaction, String, Object...)
+ * @see Session#execute(Transaction, Mapper, String, Object...)
*/
-public interface ResultSet extends Iterator<SqlRow>, AutoCloseable {
+public interface ResultSet<T> extends Iterator<T>, AutoCloseable {
/**
* Returns metadata for the results if the result contains rows ({@link
#hasRowSet()} returns {@code true}).
*
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
index 3c3cfdc752..47beff4593 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -50,11 +51,11 @@ public interface Session extends AutoCloseable {
* @return SQL query results set.
* @throws SqlException If failed.
*/
- default ResultSet execute(@Nullable Transaction transaction, String query,
@Nullable Object... arguments) {
+ default ResultSet<SqlRow> execute(@Nullable Transaction transaction,
String query, @Nullable Object... arguments) {
Objects.requireNonNull(query);
try {
- return new SyncResultSetAdapter(executeAsync(transaction, query,
arguments).join());
+ return new SyncResultSetAdapter<>(executeAsync(transaction, query,
arguments).join());
} catch (CompletionException e) {
throw IgniteException.wrap(e);
}
@@ -68,11 +69,59 @@ public interface Session extends AutoCloseable {
* @param arguments Arguments for the statement.
* @return SQL query results set.
*/
- default ResultSet execute(@Nullable Transaction transaction, Statement
statement, @Nullable Object... arguments) {
+ default ResultSet<SqlRow> execute(@Nullable Transaction transaction,
Statement statement, @Nullable Object... arguments) {
Objects.requireNonNull(statement);
try {
- return new SyncResultSetAdapter(executeAsync(transaction,
statement, arguments).join());
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
statement, arguments).join());
+ } catch (CompletionException e) {
+ throw IgniteException.wrap(e);
+ }
+ }
+
+ /**
+ * Executes single SQL statement and maps results to objects with the
provided mapper.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
+ * @param query SQL query template.
+ * @param arguments Arguments for the statement.
+ * @param <T> A type of object contained in result set.
+ * @return SQL query results set.
+ */
+ default <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ String query,
+ @Nullable Object... arguments) {
+ Objects.requireNonNull(query);
+
+ try {
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, query, arguments).join());
+ } catch (CompletionException e) {
+ throw IgniteException.wrap(e);
+ }
+ }
+
+ /**
+ * Executes single SQL statement and maps results to objects with the
provided mapper.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
+ * @param statement SQL statement to execute.
+ * @param arguments Arguments for the statement.
+ * @param <T> A type of object contained in result set.
+ * @return SQL query results set.
+ */
+ default <T> ResultSet<T> execute(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments) {
+ Objects.requireNonNull(statement);
+
+ try {
+ return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, statement, arguments).join());
} catch (CompletionException e) {
throw IgniteException.wrap(e);
}
@@ -87,7 +136,7 @@ public interface Session extends AutoCloseable {
* @return Operation future.
* @throws SqlException If failed.
*/
- CompletableFuture<AsyncResultSet> executeAsync(@Nullable Transaction
transaction, String query, @Nullable Object... arguments);
+ CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(@Nullable
Transaction transaction, String query, @Nullable Object... arguments);
/**
* Executes SQL statement in an asynchronous way.
@@ -98,7 +147,42 @@ public interface Session extends AutoCloseable {
* @return Operation future.
* @throws SqlException If failed.
*/
- CompletableFuture<AsyncResultSet> executeAsync(@Nullable Transaction
transaction, Statement statement, @Nullable Object... arguments);
+ CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ Statement statement,
+ @Nullable Object... arguments);
+
+ /**
+ * Executes SQL statement in an asynchronous way and maps results to
objects with the provided mapper.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
+ * @param query SQL query template.
+ * @param arguments Arguments for the statement.
+ * @param <T> A type of object contained in result set.
+ * @return Operation future.
+ */
+ <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ String query,
+ @Nullable Object... arguments);
+
+ /**
+ * Executes SQL statement in an asynchronous way and maps results to
objects with the provided mapper.
+ *
+ * @param transaction Transaction to execute the statement within or
{@code null}.
+ * @param mapper Mapper that defines the row type and the way to map
columns to the type members. See {@link Mapper#of}.
+ * @param statement SQL statement to execute.
+ * @param arguments Arguments for the statement.
+ * @param <T> A type of object contained in result set.
+ * @return Operation future.
+ */
+ <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments);
/**
* Executes SQL query in a reactive way.
diff --git
a/modules/api/src/main/java/org/apache/ignite/sql/SyncResultSetAdapter.java
b/modules/api/src/main/java/org/apache/ignite/sql/SyncResultSetAdapter.java
index 02b86e8b3a..603e986a15 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/SyncResultSetAdapter.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/SyncResultSetAdapter.java
@@ -29,19 +29,19 @@ import org.jetbrains.annotations.Nullable;
/**
* Synchronous wrapper over {@link org.apache.ignite.sql.async.AsyncResultSet}.
*/
-class SyncResultSetAdapter implements ResultSet {
+class SyncResultSetAdapter<T> implements ResultSet<T> {
/** Wrapped async result set. */
- private final AsyncResultSet ars;
+ private final AsyncResultSet<T> ars;
/** Iterator. */
- private final IteratorImpl it;
+ private final IteratorImpl<T> it;
/**
* Constructor.
*
* @param ars Asynchronous result set.
*/
- SyncResultSetAdapter(AsyncResultSet ars) {
+ SyncResultSetAdapter(AsyncResultSet<T> ars) {
assert ars != null;
this.ars = ars;
@@ -94,7 +94,7 @@ class SyncResultSetAdapter implements ResultSet {
/** {@inheritDoc} */
@Override
- public SqlRow next() {
+ public T next() {
if (it == null) {
throw new NoRowSetExpectedException();
}
@@ -102,14 +102,14 @@ class SyncResultSetAdapter implements ResultSet {
return it.next();
}
- private static class IteratorImpl implements Iterator<SqlRow> {
- private AsyncResultSet curRes;
+ private static class IteratorImpl<T> implements Iterator<T> {
+ private AsyncResultSet<T> curRes;
- private CompletionStage<? extends AsyncResultSet> nextPageStage;
+ private CompletionStage<? extends AsyncResultSet<T>> nextPageStage;
- private Iterator<SqlRow> curPage;
+ private Iterator<T> curPage;
- IteratorImpl(AsyncResultSet ars) {
+ IteratorImpl(AsyncResultSet<T> ars) {
curRes = ars;
advance();
@@ -145,7 +145,7 @@ class SyncResultSetAdapter implements ResultSet {
}
@Override
- public SqlRow next() {
+ public T next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
index 4fc166f6a4..92cdfd1d35 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/async/AsyncResultSet.java
@@ -21,7 +21,10 @@ import java.util.concurrent.CompletableFuture;
import org.apache.ignite.sql.NoRowSetExpectedException;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.ResultSetMetadata;
+import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlRow;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
/**
@@ -46,9 +49,14 @@ import org.jetbrains.annotations.Nullable;
* }
* </code></pre>
*
+ * @param <T> A type of the objects contained by this result set (when row set
is present). This will be either {@link SqlRow}
+ * if no explicit mapper is provided or a particular type defined by
supplied mapper.
+ *
* @see ResultSet
+ * @see Session#executeAsync(Transaction, String, Object...)
+ * @see Session#executeAsync(Transaction, Mapper, String, Object...)
*/
-public interface AsyncResultSet {
+public interface AsyncResultSet<T> {
/**
* Returns metadata for the results if the result contains rows ({@link
#hasRowSet()} returns {@code true}), or {@code null} if
* inapplicable.
@@ -98,7 +106,7 @@ public interface AsyncResultSet {
* @return Iterable over rows.
* @throws NoRowSetExpectedException if no row set is expected as a query
result.
*/
- Iterable<SqlRow> currentPage();
+ Iterable<T> currentPage();
/**
* Returns the current page size if the query return rows.
@@ -118,7 +126,7 @@ public interface AsyncResultSet {
* @return Operation future.
* @throws NoRowSetExpectedException if no row set is expected as a query
result.
*/
- CompletableFuture<? extends AsyncResultSet> fetchNextPage();
+ CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage();
/**
* Returns whether there are more pages of results.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
index 387ed74843..8a6487faca 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java
@@ -29,7 +29,7 @@ import org.apache.ignite.sql.async.AsyncResultSet;
* Common SQL request handling logic.
*/
class ClientSqlCommon {
- static void packCurrentPage(ClientMessagePacker out, AsyncResultSet
asyncResultSet) {
+ static void packCurrentPage(ClientMessagePacker out,
AsyncResultSet<SqlRow> asyncResultSet) {
ResultSetMetadata meta = asyncResultSet.metadata();
assert meta != null : "Metadata can't be null when row set is
present.";
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
index b6d72eaa5c..4d4a28ca70 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlResultSet.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.sql;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
/**
@@ -26,7 +27,7 @@ import org.apache.ignite.sql.async.AsyncResultSet;
*/
class ClientSqlResultSet {
/** Result set. */
- private final AsyncResultSet resultSet;
+ private final AsyncResultSet<SqlRow> resultSet;
/** Session. */
private final Session session;
@@ -37,7 +38,7 @@ class ClientSqlResultSet {
* @param resultSet Result set.
* @param session Session.
*/
- public ClientSqlResultSet(AsyncResultSet resultSet, Session session) {
+ ClientSqlResultSet(AsyncResultSet<SqlRow> resultSet, Session session) {
assert resultSet != null;
assert session != null;
@@ -50,7 +51,7 @@ class ClientSqlResultSet {
*
* @return Result set.
*/
- public AsyncResultSet resultSet() {
+ public AsyncResultSet<SqlRow> resultSet() {
return resultSet;
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
index b289c7ae39..94ac524fe5 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java
@@ -25,8 +25,17 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
import org.apache.ignite.internal.client.ClientChannel;
+import org.apache.ignite.internal.client.proto.ClientColumnTypeConverter;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
+import org.apache.ignite.internal.client.proto.TuplePart;
+import org.apache.ignite.internal.client.table.ClientColumn;
+import org.apache.ignite.internal.client.table.ClientSchema;
+import org.apache.ignite.internal.marshaller.ClientMarshallerReader;
+import org.apache.ignite.internal.marshaller.Marshaller;
+import org.apache.ignite.internal.marshaller.MarshallerException;
+import org.apache.ignite.lang.ErrorGroups.Client;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.CursorClosedException;
import org.apache.ignite.sql.NoRowSetExpectedException;
@@ -34,12 +43,13 @@ import org.apache.ignite.sql.ResultSetMetadata;
import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
import org.jetbrains.annotations.Nullable;
/**
* Client async result set.
*/
-class ClientAsyncResultSet implements AsyncResultSet {
+class ClientAsyncResultSet<T> implements AsyncResultSet<T> {
/** Channel. */
private final ClientChannel ch;
@@ -58,8 +68,16 @@ class ClientAsyncResultSet implements AsyncResultSet {
/** Metadata. */
private final ResultSetMetadata metadata;
+ /** Marshaller. Not null when object mapping is used. */
+ @Nullable
+ private final Marshaller marshaller;
+
+ /** Mapper. */
+ @Nullable
+ private final Mapper<T> mapper;
+
/** Rows. */
- private volatile List<SqlRow> rows;
+ private volatile List<T> rows;
/** More pages flag. */
private volatile boolean hasMorePages;
@@ -72,8 +90,9 @@ class ClientAsyncResultSet implements AsyncResultSet {
*
* @param ch Channel.
* @param in Unpacker.
+ * @param mapper Mapper.
*/
- public ClientAsyncResultSet(ClientChannel ch, ClientMessageUnpacker in) {
+ ClientAsyncResultSet(ClientChannel ch, ClientMessageUnpacker in, @Nullable
Mapper<T> mapper) {
this.ch = ch;
resourceId = in.tryUnpackNil() ? null : in.unpackLong();
@@ -83,6 +102,11 @@ class ClientAsyncResultSet implements AsyncResultSet {
affectedRows = in.unpackLong();
metadata = hasRowSet ? new ClientResultSetMetadata(in) : null;
+ this.mapper = mapper;
+ marshaller = metadata != null && mapper != null && mapper.targetType()
!= SqlRow.class
+ ? marshaller(metadata, mapper)
+ : null;
+
if (hasRowSet) {
readRows(in);
}
@@ -115,7 +139,7 @@ class ClientAsyncResultSet implements AsyncResultSet {
/** {@inheritDoc} */
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@Override
- public Iterable<SqlRow> currentPage() {
+ public Iterable<T> currentPage() {
requireResultSet();
return rows;
@@ -131,7 +155,7 @@ class ClientAsyncResultSet implements AsyncResultSet {
/** {@inheritDoc} */
@Override
- public CompletableFuture<? extends AsyncResultSet> fetchNextPage() {
+ public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
requireResultSet();
if (closed) {
@@ -187,18 +211,35 @@ class ClientAsyncResultSet implements AsyncResultSet {
int size = in.unpackArrayHeader();
int rowSize = metadata.columns().size();
- var res = new ArrayList<SqlRow>(size);
+ var res = new ArrayList<T>(size);
- for (int i = 0; i < size; i++) {
- var row = new ArrayList<>(rowSize);
- var tupleReader = new BinaryTupleReader(rowSize,
in.readBinaryUnsafe());
+ if (marshaller == null) {
+ for (int i = 0; i < size; i++) {
+ var row = new ArrayList<>(rowSize);
+ var tupleReader = new BinaryTupleReader(rowSize,
in.readBinaryUnsafe());
- for (int j = 0; j < rowSize; j++) {
- var col = metadata.columns().get(j);
- row.add(readValue(tupleReader, j, col));
- }
+ for (int j = 0; j < rowSize; j++) {
+ var col = metadata.columns().get(j);
+ row.add(readValue(tupleReader, j, col));
+ }
- res.add(new ClientSqlRow(row, metadata));
+ res.add((T) new ClientSqlRow(row, metadata));
+ }
+ } else {
+ try {
+ for (int i = 0; i < size; i++) {
+ var tupleReader = new BinaryTupleReader(rowSize,
in.readBinaryUnsafe());
+ var reader = new ClientMarshallerReader(tupleReader);
+
+ res.add((T) marshaller.readObject(reader, null));
+ }
+ } catch (MarshallerException e) {
+ assert mapper != null;
+ throw new IgniteException(
+ Client.CONFIGURATION_ERR,
+ "Failed to map SQL result set to type '" +
mapper.targetType() + "': " + e.getMessage(),
+ e);
+ }
}
rows = Collections.unmodifiableList(res);
@@ -271,4 +312,28 @@ class ClientAsyncResultSet implements AsyncResultSet {
throw new UnsupportedOperationException("Unsupported column
type: " + col.type());
}
}
+
+ private static <T> Marshaller marshaller(ResultSetMetadata metadata,
Mapper<T> mapper) {
+ var schemaColumns = new ClientColumn[metadata.columns().size()];
+ List<ColumnMetadata> columns = metadata.columns();
+
+ for (int i = 0; i < columns.size(); i++) {
+ ColumnMetadata metaColumn = columns.get(i);
+
+ var schemaColumn = new ClientColumn(
+ metaColumn.name(),
+
ClientColumnTypeConverter.columnTypeToOrdinal(metaColumn.type()),
+ metaColumn.nullable(),
+ true,
+ false,
+ i,
+ metaColumn.scale(),
+ metaColumn.precision());
+
+ schemaColumns[i] = schemaColumn;
+ }
+
+ var schema = new ClientSchema(0, schemaColumns);
+ return schema.getMarshaller(mapper, TuplePart.KEY_AND_VAL);
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index 58da2833b6..d8d062caea 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -33,9 +33,11 @@ import
org.apache.ignite.internal.client.proto.ClientBinaryTupleUtils;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -43,6 +45,8 @@ import org.jetbrains.annotations.Nullable;
* Client SQL session.
*/
public class ClientSession implements Session {
+ private static final Mapper<SqlRow> sqlRowMapper = () -> SqlRow.class;
+
private final ReliableChannel ch;
@Nullable
@@ -88,7 +92,10 @@ public class ClientSession implements Session {
/** {@inheritDoc} */
@Override
- public CompletableFuture<AsyncResultSet> executeAsync(@Nullable
Transaction transaction, String query, @Nullable Object... arguments) {
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ String query,
+ @Nullable Object... arguments) {
Objects.requireNonNull(query);
ClientStatement statement = new ClientStatement(query, null, null,
null, null);
@@ -98,8 +105,32 @@ public class ClientSession implements Session {
/** {@inheritDoc} */
@Override
- public CompletableFuture<AsyncResultSet> executeAsync(
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ Statement statement,
+ @Nullable Object... arguments) {
+ return executeAsync(transaction, sqlRowMapper, statement, arguments);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ String query,
+ @Nullable Object... arguments) {
+ Objects.requireNonNull(query);
+
+ ClientStatement statement = new ClientStatement(query, null, null,
null, null);
+
+ return executeAsync(transaction, mapper, statement, arguments);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
@Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
Statement statement,
@Nullable Object... arguments) {
Objects.requireNonNull(statement);
@@ -124,7 +155,7 @@ public class ClientSession implements Session {
w.out().packString(clientStatement.query());
w.out().packObjectArrayAsBinaryTuple(arguments);
- }, r -> new ClientAsyncResultSet(r.clientChannel(), r.in()));
+ }, r -> new ClientAsyncResultSet(r.clientChannel(), r.in(), mapper));
}
/** {@inheritDoc} */
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
index a9ab9c30a4..c3ca203b9d 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java
@@ -55,7 +55,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
@Test
public void testExecuteAsync() {
Session session = client.sql().createSession();
- AsyncResultSet resultSet = session.executeAsync(null, "SELECT
1").join();
+ AsyncResultSet<SqlRow> resultSet = session.executeAsync(null, "SELECT
1").join();
assertTrue(resultSet.hasRowSet());
assertFalse(resultSet.wasApplied());
@@ -68,7 +68,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
@Test
public void testExecute() {
Session session = client.sql().createSession();
- ResultSet resultSet = session.execute(null, "SELECT 1");
+ ResultSet<SqlRow> resultSet = session.execute(null, "SELECT 1");
assertTrue(resultSet.hasRowSet());
assertFalse(resultSet.wasApplied());
@@ -87,7 +87,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
.property("prop2", "2")
.build();
- AsyncResultSet resultSet = session.executeAsync(null, "SELECT
PROPS").join();
+ AsyncResultSet<SqlRow> resultSet = session.executeAsync(null, "SELECT
PROPS").join();
Map<String, Object> props =
StreamSupport.stream(resultSet.currentPage().spliterator(), false)
.collect(Collectors.toMap(x -> x.stringValue(0), x ->
x.value(1)));
@@ -118,7 +118,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
.property("prop3", "3")
.build();
- AsyncResultSet resultSet = session.executeAsync(null,
statement).join();
+ AsyncResultSet<SqlRow> resultSet = session.executeAsync(null,
statement).join();
Map<String, Object> props =
StreamSupport.stream(resultSet.currentPage().spliterator(), false)
.collect(Collectors.toMap(x -> x.stringValue(0), x ->
x.value(1)));
@@ -134,7 +134,7 @@ public class ClientSqlTest extends AbstractClientTableTest {
@Test
public void testMetadata() {
Session session = client.sql().createSession();
- ResultSet resultSet = session.execute(null, "SELECT META");
+ ResultSet<SqlRow> resultSet = session.execute(null, "SELECT META");
ResultSetMetadata meta = resultSet.metadata();
SqlRow row = resultSet.next();
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
index dadc944946..1cdab40e1a 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
@@ -26,9 +26,11 @@ import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -75,13 +77,16 @@ public class FakeSession implements Session {
/** {@inheritDoc} */
@Override
- public CompletableFuture<AsyncResultSet> executeAsync(@Nullable
Transaction transaction, String query, @Nullable Object... arguments) {
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ String query,
+ @Nullable Object... arguments) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override
- public CompletableFuture<AsyncResultSet> executeAsync(
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
Statement statement,
@Nullable Object... arguments) {
@@ -90,6 +95,23 @@ public class FakeSession implements Session {
return CompletableFuture.completedFuture(new FakeAsyncResultSet(this,
transaction, statement, arguments));
}
+ /** {@inheritDoc} */
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(@Nullable
Transaction transaction, @Nullable Mapper<T> mapper,
+ String query, @Nullable Object... arguments) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments) {
+ throw new UnsupportedOperationException();
+ }
+
/** {@inheritDoc} */
@Override
public ReactiveResultSet executeReactive(@Nullable Transaction
transaction, String query, @Nullable Object... arguments) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
index 0d12ea192c..67bfde67c5 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/Cluster.java
@@ -55,6 +55,7 @@ import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.util.concurrent.ConcurrentHashSet;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.TestInfo;
@@ -390,9 +391,9 @@ public class Cluster {
* @param extractor Used to extract the result from a {@link ResultSet}.
* @return Query result.
*/
- public <T> T query(int nodeIndex, String sql, Function<ResultSet, T>
extractor) {
+ public <T> T query(int nodeIndex, String sql, Function<ResultSet<SqlRow>,
T> extractor) {
return doInSession(nodeIndex, session -> {
- try (ResultSet resultSet = session.execute(null, sql)) {
+ try (ResultSet<SqlRow> resultSet = session.execute(null, sql)) {
return extractor.apply(resultSet);
}
});
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
index 015941c00e..4de963abce 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItTableRaftSnapshotsTest.java
@@ -193,14 +193,14 @@ class ItTableRaftSnapshotsTest extends
BaseIgniteAbstractTest {
|| hasCause(e, SqlValidatorException.class, "Object 'TEST' not
found");
}
- private <T> T queryWithRetry(int nodeIndex, String sql,
Function<ResultSet, T> extractor) {
+ private <T> T queryWithRetry(int nodeIndex, String sql,
Function<ResultSet<SqlRow>, T> extractor) {
return withRetry(() -> cluster.query(nodeIndex, sql, extractor));
}
/**
* Reads all rows from TEST table.
*/
- private static List<IgniteBiTuple<Integer, String>> readRows(ResultSet rs)
{
+ private static List<IgniteBiTuple<Integer, String>>
readRows(ResultSet<SqlRow> rs) {
List<IgniteBiTuple<Integer, String>> rows = new ArrayList<>();
while (rs.hasNext()) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 9ba9d228b5..9874661e0b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -212,7 +213,7 @@ public class ItDataSchemaSyncTest extends
IgniteAbstractTest {
Session ses = ignite1.sql().createSession();
- ResultSet res = ses.execute(null, "SELECT valint2 FROM tbl1");
+ ResultSet<SqlRow> res = ses.execute(null, "SELECT valint2 FROM tbl1");
for (int i = 0; i < 10; ++i) {
assertNotNull(res.next().iterator().next());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
index a53531595a..2ef5ff273b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
@@ -38,9 +38,12 @@ import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Thin client SQL integration test.
@@ -48,7 +51,7 @@ import org.junit.jupiter.api.Test;
public class ItThinClientSqlTest extends ItAbstractThinClientTest {
@Test
void testExecuteAsyncSimpleSelect() {
- AsyncResultSet resultSet = client().sql()
+ AsyncResultSet<SqlRow> resultSet = client().sql()
.createSession()
.executeAsync(null, "select 1 as num, 'hello' as str")
.join();
@@ -74,7 +77,7 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
@Test
void testExecuteSimpleSelect() {
- ResultSet resultSet = client().sql()
+ ResultSet<SqlRow> resultSet = client().sql()
.createSession()
.execute(null, "select 1 as num, 'hello' as str");
@@ -146,7 +149,7 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
ses.execute(null, "INSERT INTO testExecuteDdlDml VALUES (?, ?)",
i, "hello " + i);
}
- ResultSet selectRes = ses.execute(null, "SELECT * FROM
testExecuteDdlDml ORDER BY ID");
+ ResultSet<SqlRow> selectRes = ses.execute(null, "SELECT * FROM
testExecuteDdlDml ORDER BY ID");
var rows = new ArrayList<SqlRow>();
selectRes.forEachRemaining(rows::add);
@@ -186,7 +189,7 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
}
// Query data.
- AsyncResultSet selectRes = session
+ AsyncResultSet<SqlRow> selectRes = session
.executeAsync(null, "SELECT VAL as MYVALUE, ID, ID + 1 FROM
testExecuteAsyncDdlDml ORDER BY ID")
.join();
@@ -254,7 +257,7 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
}
// Query data.
- ResultSet selectRes = session
+ ResultSet<SqlRow> selectRes = session
.execute(null, "SELECT VAL as MYVALUE, ID, ID + 1 FROM
testExecuteDdlDml ORDER BY ID");
assertTrue(selectRes.hasRowSet());
@@ -320,7 +323,7 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
Statement statement =
client().sql().statementBuilder().pageSize(4).query("SELECT ID FROM
testFetchNextPage ORDER BY ID").build();
- AsyncResultSet asyncResultSet = session.executeAsync(null,
statement).join();
+ AsyncResultSet<SqlRow> asyncResultSet = session.executeAsync(null,
statement).join();
assertEquals(4, asyncResultSet.currentPageSize());
assertTrue(asyncResultSet.hasMorePages());
@@ -364,4 +367,59 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
var res = session.executeAsync(null, "SELECT VAL FROM testTx").join();
assertEquals(1, res.currentPage().iterator().next().intValue(0));
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testResultSetMapping(boolean useStatement) {
+ Session session = client().sql().createSession();
+ String query = "select 123 + ? as num, 'Hello!' as str";
+
+ ResultSet<Pojo> resultSet = useStatement
+ ? session.execute(null, Mapper.of(Pojo.class),
client().sql().statementBuilder().query(query).build(), 10)
+ : session.execute(null, Mapper.of(Pojo.class), query, 10);
+
+ assertTrue(resultSet.hasRowSet());
+
+ Pojo row = resultSet.next();
+
+ assertEquals(133, row.num);
+ assertEquals("Hello!", row.str);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testResultSetMappingAsync(boolean useStatement) {
+ Session session = client().sql().createSession();
+ String query = "select 1 as num, concat('hello ', ?) as str";
+
+ AsyncResultSet<Pojo> resultSet = useStatement
+ ? session.executeAsync(null, Mapper.of(Pojo.class),
client().sql().statementBuilder().query(query).build(), "world").join()
+ : session.executeAsync(null, Mapper.of(Pojo.class), query,
"world").join();
+
+ assertTrue(resultSet.hasRowSet());
+ assertEquals(1, resultSet.currentPageSize());
+
+ Pojo row = resultSet.currentPage().iterator().next();
+
+ assertEquals(1, row.num);
+ assertEquals("hello world", row.str);
+ }
+
+
+ @Test
+ void testResultSetMappingColumnNameMismatch() {
+ String query = "select 1 as foo, 2 as bar";
+
+ ResultSet<Pojo> resultSet =
client().sql().createSession().execute(null, Mapper.of(Pojo.class), query);
+ Pojo row = resultSet.next();
+
+ assertEquals(0, row.num);
+ assertNull(row.str);
+ }
+
+ private static class Pojo {
+ public long num;
+
+ public String str;
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index e4b1242983..3ea81c2c2e 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -235,7 +235,7 @@ public class ItSqlAsynchronousApiTest extends
AbstractBasicIntegrationTest {
int txPrevCnt = txManagerInternal.finished();
for (int i = 0; i < ROW_COUNT; ++i) {
- CompletableFuture<AsyncResultSet> fut = ses.executeAsync(null,
"CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i);
+ CompletableFuture<AsyncResultSet<SqlRow>> fut =
ses.executeAsync(null, "CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)", i, i);
AsyncResultSet asyncRes = null;
@@ -444,7 +444,7 @@ public class ItSqlAsynchronousApiTest extends
AbstractBasicIntegrationTest {
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().build();
- AsyncResultSet rs = await(ses.executeAsync(null, "SELECT COL1, COL0
FROM TEST"));
+ AsyncResultSet<SqlRow> rs = await(ses.executeAsync(null, "SELECT COL1,
COL0 FROM TEST"));
// Validate columns metadata.
ResultSetMetadata meta = rs.metadata();
@@ -488,7 +488,7 @@ public class ItSqlAsynchronousApiTest extends
AbstractBasicIntegrationTest {
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().build();
- AsyncResultSet ars = await(ses.executeAsync(null, "SELECT 1 as COL_A,
2 as COL_B"));
+ AsyncResultSet<SqlRow> ars = await(ses.executeAsync(null, "SELECT 1 as
COL_A, 2 as COL_B"));
SqlRow r = CollectionUtils.first(ars.currentPage());
@@ -524,15 +524,15 @@ public class ItSqlAsynchronousApiTest extends
AbstractBasicIntegrationTest {
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().defaultPageSize(1).build();
- AsyncResultSet ars0 = await(ses.executeAsync(null, "SELECT ID FROM
TEST ORDER BY ID"));
+ AsyncResultSet<SqlRow> ars0 = await(ses.executeAsync(null, "SELECT ID
FROM TEST ORDER BY ID"));
var p0 = ars0.currentPage();
- AsyncResultSet ars1 = await(ars0.fetchNextPage());
+ AsyncResultSet<SqlRow> ars1 = await(ars0.fetchNextPage());
var p1 = ars1.currentPage();
- AsyncResultSet ars2 =
await(ars1.fetchNextPage().toCompletableFuture());
+ AsyncResultSet<SqlRow> ars2 =
await(ars1.fetchNextPage().toCompletableFuture());
var p2 = ars2.currentPage();
- AsyncResultSet ars3 = await(ars1.fetchNextPage());
+ AsyncResultSet<SqlRow> ars3 = await(ars1.fetchNextPage());
var p3 = ars3.currentPage();
- AsyncResultSet ars4 = await(ars0.fetchNextPage());
+ AsyncResultSet<SqlRow> ars4 = await(ars0.fetchNextPage());
var p4 = ars4.currentPage();
assertSame(ars0, ars1);
@@ -689,12 +689,12 @@ public class ItSqlAsynchronousApiTest extends
AbstractBasicIntegrationTest {
}
private static void checkDdl(boolean expectedApplied, Session ses, String
sql, Transaction tx) {
- CompletableFuture<AsyncResultSet> fut = ses.executeAsync(
+ CompletableFuture<AsyncResultSet<SqlRow>> fut = ses.executeAsync(
tx,
sql
);
- AsyncResultSet asyncRes = await(fut);
+ AsyncResultSet<SqlRow> asyncRes = await(fut);
assertEquals(expectedApplied, asyncRes.wasApplied());
assertFalse(asyncRes.hasMorePages());
@@ -736,7 +736,7 @@ public class ItSqlAsynchronousApiTest extends
AbstractBasicIntegrationTest {
}
static class TestPageProcessor implements
- Function<AsyncResultSet, CompletionStage<AsyncResultSet>> {
+ Function<AsyncResultSet<SqlRow>,
CompletionStage<AsyncResultSet<SqlRow>>> {
private int expectedPages;
private final List<SqlRow> res = new ArrayList<>();
@@ -746,7 +746,7 @@ public class ItSqlAsynchronousApiTest extends
AbstractBasicIntegrationTest {
}
@Override
- public CompletionStage<AsyncResultSet> apply(AsyncResultSet rs) {
+ public CompletionStage<AsyncResultSet<SqlRow>>
apply(AsyncResultSet<SqlRow> rs) {
expectedPages--;
assertTrue(rs.hasRowSet());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index bd59046349..976bc792a7 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -54,6 +54,7 @@ import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlBatchException;
import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -211,7 +212,7 @@ public class ItSqlSynchronousApiTest extends
AbstractBasicIntegrationTest {
IgniteSql sql = igniteSql();
Session ses = sql.sessionBuilder().defaultPageSize(ROW_COUNT /
4).build();
- ResultSet rs = ses.execute(null, "SELECT ID FROM TEST");
+ ResultSet<SqlRow> rs = ses.execute(null, "SELECT ID FROM TEST");
Set<Integer> set = Streams.stream(rs).map(r ->
r.intValue(0)).collect(Collectors.toSet());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/SqlScriptRunner.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/SqlScriptRunner.java
index b2af0e3c52..db130dae79 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/SqlScriptRunner.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/SqlScriptRunner.java
@@ -52,6 +52,7 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.SqlRow;
import org.jetbrains.annotations.NotNull;
/**
@@ -168,7 +169,7 @@ public class SqlScriptRunner {
log.info("Execute: " + sql);
try (Session s = ignSql.createSession()) {
- try (ResultSet rs = s.execute(null, sql)) {
+ try (ResultSet<SqlRow> rs = s.execute(null, sql)) {
if (rs.hasRowSet()) {
return Streams.stream(rs).map(
r -> {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index a1d46be888..270d5c71ba 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -45,7 +45,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Asynchronous result set implementation.
*/
-public class AsyncResultSetImpl implements AsyncResultSet {
+public class AsyncResultSetImpl<T> implements AsyncResultSet<T> {
private static final CompletableFuture<? extends AsyncResultSet>
HAS_NO_MORE_PAGE_FUTURE =
CompletableFuture.failedFuture(new
SqlException(CURSOR_NO_MORE_PAGES_ERR, "There are no more pages."));
@@ -107,13 +107,14 @@ public class AsyncResultSetImpl implements AsyncResultSet
{
/** {@inheritDoc} */
@Override
- public Iterable<SqlRow> currentPage() {
+ public Iterable<T> currentPage() {
requireResultSet();
final Iterator<List<Object>> it0 = curPage.items().iterator();
final ResultSetMetadata meta0 = cur.metadata();
- return () -> new TransformingIterator<>(it0, (item) -> new
SqlRowImpl(item, meta0));
+ // TODO: IGNITE-18695 map rows to objects when mapper is provided.
+ return () -> new TransformingIterator<>(it0, (item) -> (T) new
SqlRowImpl(item, meta0));
}
/** {@inheritDoc} */
@@ -126,11 +127,11 @@ public class AsyncResultSetImpl implements AsyncResultSet
{
/** {@inheritDoc} */
@Override
- public CompletableFuture<? extends AsyncResultSet> fetchNextPage() {
+ public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
requireResultSet();
if (!hasMorePages()) {
- return HAS_NO_MORE_PAGE_FUTURE;
+ return (CompletableFuture<? extends AsyncResultSet<T>>)
HAS_NO_MORE_PAGE_FUTURE;
} else {
return cur.requestNextAsync(pageSize)
.thenApply(page -> {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index 743a56718b..a7c019a174 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -50,9 +50,11 @@ import org.apache.ignite.sql.BatchedArguments;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlBatchException;
import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.async.AsyncResultSet;
import org.apache.ignite.sql.reactive.ReactiveResultSet;
+import org.apache.ignite.table.mapper.Mapper;
import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
@@ -154,7 +156,10 @@ public class SessionImpl implements Session {
/** {@inheritDoc} */
@Override
- public CompletableFuture<AsyncResultSet> executeAsync(@Nullable
Transaction transaction, String query, @Nullable Object... arguments) {
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
+ @Nullable Transaction transaction,
+ String query,
+ @Nullable Object... arguments) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new
SqlException(SESSION_NOT_FOUND_ERR, "Session is closed."));
}
@@ -162,7 +167,7 @@ public class SessionImpl implements Session {
try {
QueryContext ctx = QueryContext.of(transaction);
- CompletableFuture<AsyncResultSet> result =
qryProc.querySingleAsync(sessionId, ctx, query, arguments)
+ CompletableFuture<AsyncResultSet<SqlRow>> result =
qryProc.querySingleAsync(sessionId, ctx, query, arguments)
.thenCompose(cur -> cur.requestNextAsync(pageSize)
.thenApply(
batchRes -> new AsyncResultSetImpl(
@@ -190,7 +195,7 @@ public class SessionImpl implements Session {
/** {@inheritDoc} */
@Override
- public CompletableFuture<AsyncResultSet> executeAsync(
+ public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
Statement statement,
@Nullable Object... arguments
@@ -199,6 +204,25 @@ public class SessionImpl implements Session {
return executeAsync(transaction, statement.query(), arguments);
}
+ /** {@inheritDoc} */
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(@Nullable
Transaction transaction, @Nullable Mapper<T> mapper,
+ String query, @Nullable Object... arguments) {
+ // TODO: IGNITE-18695.
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
+ @Nullable Transaction transaction,
+ @Nullable Mapper<T> mapper,
+ Statement statement,
+ @Nullable Object... arguments) {
+ // TODO: IGNITE-18695.
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
index 946a0a5148..6b7a652666 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/IgniteSqlApiTest.java
@@ -129,7 +129,7 @@ public class IgniteSqlApiTest {
final Session sess = igniteSql.createSession();
// Execute DDL.
- ResultSet rs = sess.execute(null, "CREATE TABLE IF NOT EXITS tbl (id
INT PRIMARY KEY, val VARCHAR)");
+ ResultSet<SqlRow> rs = sess.execute(null, "CREATE TABLE IF NOT EXITS
tbl (id INT PRIMARY KEY, val VARCHAR)");
assertTrue(rs.wasApplied());
assertFalse(rs.hasRowSet());
@@ -179,7 +179,7 @@ public class IgniteSqlApiTest {
igniteTx.runInTransaction(tx -> {
// Execute DML in tx.
- ResultSet rs = sess.execute(tx, "INSERT INTO tbl VALUES (?, ?)",
1, "str1");
+ ResultSet<SqlRow> rs = sess.execute(tx, "INSERT INTO tbl VALUES
(?, ?)", 1, "str1");
assertEquals(1, rs.affectedRows());
@@ -283,7 +283,7 @@ public class IgniteSqlApiTest {
KeyValueView<Tuple, Tuple> table = getTable();
class AsyncPageProcessor implements
- Function<AsyncResultSet, CompletionStage<AsyncResultSet>> {
+ Function<AsyncResultSet<SqlRow>,
CompletionStage<AsyncResultSet<SqlRow>>> {
private final Transaction tx0;
public AsyncPageProcessor(Transaction tx0) {
@@ -291,7 +291,7 @@ public class IgniteSqlApiTest {
}
@Override
- public CompletionStage<AsyncResultSet> apply(AsyncResultSet rs) {
+ public CompletionStage<AsyncResultSet<SqlRow>>
apply(AsyncResultSet<SqlRow> rs) {
for (SqlRow row : rs.currentPage()) {
table.getAsync(tx0, Tuple.create().set("id",
row.intValue(0)));
}
@@ -348,7 +348,7 @@ public class IgniteSqlApiTest {
@Disabled
@Test
public void testMetadata() {
- ResultSet rs = igniteSql.createSession()
+ ResultSet<SqlRow> rs = igniteSql.createSession()
.execute(null, "SELECT id, val FROM tbl");
SqlRow row = rs.next();