This is an automated email from the ASF dual-hosted git repository. jooger pushed a commit to branch jdbc_over_thin_sql in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 3d2a8ebaa749ab0bf0b3729e8ce3ab810bd8cd27 Author: Pavel Pereslegin <[email protected]> AuthorDate: Wed Aug 20 09:41:30 2025 +0300 IGNITE-26087 Ability to obtain results of a multi-statement query execution using the internal thin client SQL API. (#6397) --- .../ignite/internal/client/proto/ClientOp.java | 3 + .../ignite/internal/client/sql/QueryModifier.java | 7 +- .../client/proto/sql/QueryModifierTest.java | 5 +- .../handler/ClientInboundMessageHandler.java | 4 + .../client/handler/ClientResourceRegistry.java | 6 + .../handler/requests/sql/ClientSqlCommon.java | 152 ++++++ .../sql/ClientSqlCursorNextResultRequest.java | 64 +++ .../requests/sql/ClientSqlExecuteRequest.java | 134 ++---- .../handler/requests/sql/ClientSqlProperties.java | 2 +- .../handler/requests/sql/ClientSqlCommonTest.java | 2 +- .../apache/ignite/client/ClientOperationType.java | 5 + .../org/apache/ignite/client/RetryReadPolicy.java | 1 + .../apache/ignite/internal/client/ClientUtils.java | 3 + .../internal/client/sql/ClientAsyncResultSet.java | 68 ++- .../ignite/internal/client/sql/ClientSql.java | 6 +- .../org/apache/ignite/client/ClientSqlTest.java | 4 + .../org/apache/ignite/client/fakes/FakeCursor.java | 5 +- .../client/ItThinClientMultistatementSqlTest.java | 529 +++++++++++++++++++++ .../runner/app/client/ItThinClientSqlTest.java | 21 +- .../internal/sql/api/AsyncResultSetImpl.java | 5 + .../engine/util/SqlExceptionMapperProvider.java | 4 + 21 files changed, 916 insertions(+), 114 deletions(-) diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java index d0a3f5bb0b5..377745e3f08 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java @@ -200,6 +200,9 @@ public class ClientOp { /** Response to a server->client operation. */ public static final int SERVER_OP_RESPONSE = 73; + /** Get next result set. */ + public static final int SQL_CURSOR_NEXT_RESULT_SET = 74; + /** Reserved for extensions: min. */ @SuppressWarnings("unused") public static final int RESERVED_EXTENSION_RANGE_START = 1000; diff --git a/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java b/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java index 60bfcc28e74..d910549559c 100644 --- a/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java +++ b/modules/client-common/src/main/java/org/apache/ignite/internal/client/sql/QueryModifier.java @@ -34,13 +34,16 @@ public enum QueryModifier { ALLOW_APPLIED_RESULT(2), /** Queries with transaction control statements. */ - ALLOW_TX_CONTROL(3); + ALLOW_TX_CONTROL(3), + + /** Queries with multiple statements. */ + ALLOW_MULTISTATEMENT(4); /** A set containing all modifiers. **/ public static final Set<QueryModifier> ALL = EnumSet.allOf(QueryModifier.class); /** A set of modifiers that can apply to single statements. **/ - public static final Set<QueryModifier> SINGLE_STMT_MODIFIERS = EnumSet.complementOf(EnumSet.of(ALLOW_TX_CONTROL)); + public static final Set<QueryModifier> SINGLE_STMT_MODIFIERS = EnumSet.complementOf(EnumSet.of(ALLOW_TX_CONTROL, ALLOW_MULTISTATEMENT)); private static final QueryModifier[] VALS = new QueryModifier[values().length]; diff --git a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java index 443f604e5d3..a4821d85333 100644 --- a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java +++ b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/sql/QueryModifierTest.java @@ -58,7 +58,10 @@ public class QueryModifierTest { Arguments.of(Set.of(QueryModifier.ALLOW_ROW_SET_RESULT, QueryModifier.ALLOW_AFFECTED_ROWS_RESULT)), Arguments.of(Set.of(QueryModifier.ALLOW_AFFECTED_ROWS_RESULT, QueryModifier.ALLOW_APPLIED_RESULT)), Arguments.of(Set.of(QueryModifier.ALLOW_APPLIED_RESULT, QueryModifier.ALLOW_TX_CONTROL)), - Arguments.of(Set.of(QueryModifier.ALLOW_TX_CONTROL, QueryModifier.ALLOW_ROW_SET_RESULT)), + Arguments.of(Set.of(QueryModifier.ALLOW_TX_CONTROL, QueryModifier.ALLOW_MULTISTATEMENT)), + Arguments.of(Set.of(QueryModifier.ALLOW_MULTISTATEMENT, QueryModifier.ALLOW_ROW_SET_RESULT)), + Arguments.of(Set.of(QueryModifier.ALLOW_ROW_SET_RESULT, QueryModifier.ALLOW_MULTISTATEMENT, + QueryModifier.ALLOW_APPLIED_RESULT)), Arguments.of(QueryModifier.ALL)); } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java index b4e90eb9b46..98ee804a10e 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java @@ -80,6 +80,7 @@ import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcTableMetadataReq import org.apache.ignite.client.handler.requests.jdbc.JdbcMetadataCatalog; import org.apache.ignite.client.handler.requests.sql.ClientSqlCursorCloseRequest; import org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextPageRequest; +import org.apache.ignite.client.handler.requests.sql.ClientSqlCursorNextResultRequest; import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteBatchRequest; import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteRequest; import org.apache.ignite.client.handler.requests.sql.ClientSqlExecuteScriptRequest; @@ -959,6 +960,9 @@ public class ClientInboundMessageHandler clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT) ); + case ClientOp.SQL_CURSOR_NEXT_RESULT_SET: + return ClientSqlCursorNextResultRequest.process(in, resources, partitionOperationsExecutor, metrics); + case ClientOp.OPERATION_CANCEL: return ClientOperationCancelRequest.process(in, cancelHandles); diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java index ed418d70a2a..23fc40cc33a 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientResourceRegistry.java @@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.ignite.internal.lang.IgniteInternalCheckedException; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.util.IgniteSpinBusyLock; +import org.jetbrains.annotations.TestOnly; /** * Per-connection resource registry. @@ -140,6 +141,11 @@ public class ClientResourceRegistry { } } + @TestOnly + public int size() { + return res.size(); + } + /** * Enters the lock. */ diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java index dc251f2bf58..ff3d3e7f6a7 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommon.java @@ -23,15 +23,28 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.ignite.client.handler.ClientHandlerMetricSource; +import org.apache.ignite.client.handler.ClientResource; +import org.apache.ignite.client.handler.ClientResourceRegistry; +import org.apache.ignite.client.handler.ResponseWriter; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.sql.QueryModifier; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.sql.api.AsyncResultSetImpl; +import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; +import org.apache.ignite.internal.sql.engine.InternalSqlRow; import org.apache.ignite.internal.sql.engine.SqlQueryType; +import org.apache.ignite.internal.sql.engine.prepare.partitionawareness.PartitionAwarenessMetadata; +import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.sql.ColumnMetadata; import org.apache.ignite.sql.ColumnMetadata.ColumnOrigin; import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.sql.async.AsyncResultSet; +import org.jetbrains.annotations.Nullable; /** * Common SQL request handling logic. @@ -223,6 +236,9 @@ class ClientSqlCommon { queryTypes.add(SqlQueryType.TX_CONTROL); break; + case ALLOW_MULTISTATEMENT: + break; + default: throw new IllegalArgumentException("Unexpected modifier " + queryModifier); } @@ -230,4 +246,140 @@ class ClientSqlCommon { return queryTypes; } + + static CompletableFuture<ResponseWriter> writeResultSetAsync( + ClientResourceRegistry resources, + AsyncResultSetImpl asyncResultSet, + ClientHandlerMetricSource metrics, + int pageSize, + boolean includePartitionAwarenessMeta, + boolean sqlDirectTxMappingSupported, + boolean sqlMultiStatementSupported + ) { + try { + Long nextResultResourceId = sqlMultiStatementSupported && asyncResultSet.cursor().hasNextResult() + ? saveNextResultResource(asyncResultSet.cursor().nextResult(), pageSize, resources) + : null; + + if ((asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages())) { + metrics.cursorsActiveIncrement(); + + var clientResultSet = new ClientSqlResultSet(asyncResultSet, metrics); + + ClientResource resource = new ClientResource( + clientResultSet, + clientResultSet::closeAsync); + + var resourceId = resources.put(resource); + + return CompletableFuture.completedFuture(out -> + writeResultSet(out, asyncResultSet, resourceId, includePartitionAwarenessMeta, + sqlDirectTxMappingSupported, sqlMultiStatementSupported, nextResultResourceId)); + } + + return asyncResultSet.closeAsync() + .thenApply(v -> (ResponseWriter) out -> + writeResultSet(out, asyncResultSet, null, includePartitionAwarenessMeta, + sqlDirectTxMappingSupported, sqlMultiStatementSupported, nextResultResourceId)); + + } catch (IgniteInternalCheckedException e) { + // Resource registry was closed. + return asyncResultSet + .closeAsync() + .thenRun(() -> { + throw new IgniteInternalException(e.getMessage(), e); + }); + } + } + + private static Long saveNextResultResource( + CompletableFuture<AsyncSqlCursor<InternalSqlRow>> nextResultFuture, + int pageSize, + ClientResourceRegistry resources + ) throws IgniteInternalCheckedException { + ClientResource resource = new ClientResource( + new CursorWithPageSize(nextResultFuture, pageSize), + () -> nextResultFuture.thenApply(AsyncCursor::closeAsync)); + + return resources.put(resource); + } + + private static void writeResultSet( + ClientMessagePacker out, + AsyncResultSetImpl res, + @Nullable Long resourceId, + boolean includePartitionAwarenessMeta, + boolean sqlDirectTxMappingSupported, + boolean sqlMultiStatementsSupported, + @Nullable Long nextResultResourceId + ) { + out.packLongNullable(resourceId); + + out.packBoolean(res.hasRowSet()); + out.packBoolean(res.hasMorePages()); + out.packBoolean(res.wasApplied()); + out.packLong(res.affectedRows()); + + packMeta(out, res.metadata()); + + if (includePartitionAwarenessMeta) { + packPartitionAwarenessMeta(out, res.partitionAwarenessMetadata(), sqlDirectTxMappingSupported); + } + + if (sqlMultiStatementsSupported) { + out.packLongNullable(nextResultResourceId); + } + + if (res.hasRowSet()) { + packCurrentPage(out, res); + } + } + + private static void packMeta(ClientMessagePacker out, @Nullable ResultSetMetadata meta) { + // TODO IGNITE-17179 metadata caching - avoid sending same meta over and over. + if (meta == null || meta.columns() == null) { + out.packInt(0); + return; + } + + packColumns(out, meta.columns()); + } + + private static void packPartitionAwarenessMeta( + ClientMessagePacker out, + @Nullable PartitionAwarenessMetadata meta, + boolean sqlDirectTxMappingSupported + ) { + if (meta == null) { + out.packNil(); + return; + } + + out.packInt(meta.tableId()); + out.packIntArray(meta.indexes()); + out.packIntArray(meta.hash()); + + if (sqlDirectTxMappingSupported) { + out.packByte(meta.directTxMode().id); + } + } + + /** Holder of the cursor future and page size. */ + static class CursorWithPageSize { + private final CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture; + private final int pageSize; + + CursorWithPageSize(CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture, int pageSize) { + this.cursorFuture = cursorFuture; + this.pageSize = pageSize; + } + + CompletableFuture<AsyncSqlCursor<InternalSqlRow>> cursorFuture() { + return cursorFuture; + } + + int pageSize() { + return pageSize; + } + } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java new file mode 100644 index 00000000000..5e8752fae65 --- /dev/null +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCursorNextResultRequest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client.handler.requests.sql; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import org.apache.ignite.client.handler.ClientHandlerMetricSource; +import org.apache.ignite.client.handler.ClientResource; +import org.apache.ignite.client.handler.ClientResourceRegistry; +import org.apache.ignite.client.handler.ResponseWriter; +import org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.CursorWithPageSize; +import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.sql.api.AsyncResultSetImpl; +import org.apache.ignite.sql.SqlRow; + +/** + * Client SQL cursor next result. + */ +public class ClientSqlCursorNextResultRequest { + /** + * Processes the request. + * + * @param in Unpacker. + * @return Future representing result of operation. + */ + public static CompletableFuture<ResponseWriter> process( + ClientMessageUnpacker in, + ClientResourceRegistry resources, + Executor operationExecutor, + ClientHandlerMetricSource metrics + ) throws IgniteInternalCheckedException { + long resourceId = in.unpackLong(); + ClientResource resource = resources.remove(resourceId); + CursorWithPageSize cursorWithPageSize = resource.get(CursorWithPageSize.class); + int pageSize = cursorWithPageSize.pageSize(); + + return cursorWithPageSize.cursorFuture() + .thenComposeAsync(cur -> cur.requestNextAsync(pageSize) + .thenApply(batchRes -> new AsyncResultSetImpl<SqlRow>( + cur, + batchRes, + pageSize + ) + ).thenCompose(asyncResultSet -> + ClientSqlCommon.writeResultSetAsync(resources, asyncResultSet, metrics, pageSize, false, false, true) + ).thenApply(rsWriter -> rsWriter), operationExecutor); + } +} diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java index 89bfbd7bc07..758bc01a494 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java @@ -17,39 +17,38 @@ package org.apache.ignite.client.handler.requests.sql; -import static org.apache.ignite.client.handler.requests.sql.ClientSqlCommon.packCurrentPage; import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx; import static org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta; import static org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException; +import static org.apache.ignite.internal.util.CompletableFutures.allOf; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; +import java.util.function.Function; import org.apache.ignite.client.handler.ClientHandlerMetricSource; -import org.apache.ignite.client.handler.ClientResource; import org.apache.ignite.client.handler.ClientResourceRegistry; import org.apache.ignite.client.handler.NotificationSender; import org.apache.ignite.client.handler.ResponseWriter; -import org.apache.ignite.internal.client.proto.ClientMessagePacker; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.hlc.HybridTimestampTracker; -import org.apache.ignite.internal.lang.IgniteInternalCheckedException; -import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.sql.api.AsyncResultSetImpl; +import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; +import org.apache.ignite.internal.sql.engine.InternalSqlRow; import org.apache.ignite.internal.sql.engine.QueryProcessor; import org.apache.ignite.internal.sql.engine.SqlProperties; -import org.apache.ignite.internal.sql.engine.prepare.partitionawareness.PartitionAwarenessMetadata; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.util.ArrayUtils; import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.lang.CancelHandle; import org.apache.ignite.lang.CancellationToken; -import org.apache.ignite.sql.ResultSetMetadata; import org.apache.ignite.sql.SqlRow; import org.apache.ignite.tx.Transaction; import org.jetbrains.annotations.Nullable; @@ -57,7 +56,6 @@ import org.jetbrains.annotations.Nullable; /** * Client SQL execute request. */ -@SuppressWarnings({"rawtypes", "unchecked"}) public class ClientSqlExecuteRequest { /** * Processes the request. @@ -128,7 +126,8 @@ public class ClientSqlExecuteRequest { () -> cancelHandles.remove(requestId), arguments ).thenCompose(asyncResultSet -> - writeResultSetAsync(resources, asyncResultSet, metrics, includePartitionAwarenessMeta, sqlDirectTxMappingSupported)) + ClientSqlCommon.writeResultSetAsync(resources, asyncResultSet, metrics, props.pageSize(), + includePartitionAwarenessMeta, sqlDirectTxMappingSupported, sqlMultistatementsSupported)) .thenApply(rsWriter -> out -> { if (tx != null) { writeTxMeta(out, timestampTracker, clockService, tx, resIdHolder[0]); @@ -146,95 +145,6 @@ public class ClientSqlExecuteRequest { return arguments == null ? ArrayUtils.OBJECT_EMPTY_ARRAY : arguments; } - private static CompletableFuture<ResponseWriter> writeResultSetAsync( - ClientResourceRegistry resources, - AsyncResultSetImpl asyncResultSet, - ClientHandlerMetricSource metrics, - boolean includePartitionAwarenessMeta, - boolean sqlDirectTxMappingSupported - ) { - if (asyncResultSet.hasRowSet() && asyncResultSet.hasMorePages()) { - try { - metrics.cursorsActiveIncrement(); - - var clientResultSet = new ClientSqlResultSet(asyncResultSet, metrics); - - ClientResource resource = new ClientResource( - clientResultSet, - clientResultSet::closeAsync); - - var resourceId = resources.put(resource); - - return CompletableFuture.completedFuture(out -> - writeResultSet(out, asyncResultSet, resourceId, includePartitionAwarenessMeta, sqlDirectTxMappingSupported)); - } catch (IgniteInternalCheckedException e) { - return asyncResultSet - .closeAsync() - .thenRun(() -> { - throw new IgniteInternalException(e.getMessage(), e); - }); - } - } - - return asyncResultSet.closeAsync() - .thenApply(v -> (ResponseWriter) out -> - writeResultSet(out, asyncResultSet, null, includePartitionAwarenessMeta, sqlDirectTxMappingSupported)); - } - - private static void writeResultSet( - ClientMessagePacker out, - AsyncResultSetImpl res, - @Nullable Long resourceId, - boolean includePartitionAwarenessMeta, - boolean sqlDirectTxMappingSupported - ) { - out.packLongNullable(resourceId); - - out.packBoolean(res.hasRowSet()); - out.packBoolean(res.hasMorePages()); - out.packBoolean(res.wasApplied()); - out.packLong(res.affectedRows()); - - packMeta(out, res.metadata()); - - if (includePartitionAwarenessMeta) { - packPartitionAwarenessMeta(out, res.partitionAwarenessMetadata(), sqlDirectTxMappingSupported); - } - - if (res.hasRowSet()) { - packCurrentPage(out, res); - } - } - - private static void packMeta(ClientMessagePacker out, @Nullable ResultSetMetadata meta) { - // TODO IGNITE-17179 metadata caching - avoid sending same meta over and over. - if (meta == null || meta.columns() == null) { - out.packInt(0); - return; - } - - ClientSqlCommon.packColumns(out, meta.columns()); - } - - private static void packPartitionAwarenessMeta( - ClientMessagePacker out, - @Nullable PartitionAwarenessMetadata meta, - boolean sqlDirectTxMappingSupported - ) { - if (meta == null) { - out.packNil(); - return; - } - - out.packInt(meta.tableId()); - out.packIntArray(meta.indexes()); - out.packIntArray(meta.hash()); - - if (sqlDirectTxMappingSupported) { - out.packByte(meta.directTxMode().id); - } - } - private static CompletableFuture<AsyncResultSetImpl<SqlRow>> executeAsync( @Nullable Transaction transaction, QueryProcessor qryProc, @@ -256,7 +166,7 @@ public class ClientSqlExecuteRequest { arguments ) .thenCompose(cur -> { - cur.onClose().whenComplete((none, ignore) -> onComplete.run()); + doWhenAllCursorsComplete(cur, onComplete); return cur.requestNextAsync(pageSize) .thenApply( @@ -280,4 +190,30 @@ public class ClientSqlExecuteRequest { return CompletableFuture.failedFuture(mapToPublicSqlException(e)); } } + + private static void doWhenAllCursorsComplete(AsyncSqlCursor<InternalSqlRow> cursor, Runnable action) { + List<CompletableFuture<?>> dependency = new ArrayList<>(); + var cursorChainTraverser = new Function<AsyncSqlCursor<?>, CompletableFuture<AsyncSqlCursor<?>>>() { + @Override + public CompletableFuture<AsyncSqlCursor<?>> apply(AsyncSqlCursor<?> cursor) { + dependency.add(cursor.onClose()); + + if (cursor.hasNextResult()) { + return cursor.nextResult().thenCompose(this); + } + + return allOf(dependency) + .thenRun(action) + .thenApply(ignored -> cursor); + } + }; + + cursorChainTraverser + .apply(cursor) + .exceptionally(ex -> { + action.run(); + + return null; + }); + } } diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java index 2931dd315eb..23f010e94ad 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlProperties.java @@ -75,7 +75,7 @@ class ClientSqlProperties { SqlProperties sqlProperties = new SqlProperties() .queryTimeout(queryTimeout) .allowedQueryTypes(ClientSqlCommon.convertQueryModifierToQueryType(queryModifiers)) - .allowMultiStatement(false); + .allowMultiStatement(queryModifiers.contains(QueryModifier.ALLOW_MULTISTATEMENT)); if (schema != null) { sqlProperties.defaultSchema(schema); diff --git a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java index da7041d4baa..c18556a5135 100644 --- a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java +++ b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/sql/ClientSqlCommonTest.java @@ -43,7 +43,7 @@ public class ClientSqlCommonTest { void testConvertQueryModifierToQueryType(QueryModifier type) { Set<SqlQueryType> sqlQueryTypes = ClientSqlCommon.convertQueryModifierToQueryType(Set.of(type)); - assertFalse(sqlQueryTypes.isEmpty()); + assertThat(sqlQueryTypes.isEmpty(), is(type == QueryModifier.ALLOW_MULTISTATEMENT)); sqlQueryTypes.forEach(sqlQueryType -> { switch (type) { diff --git a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java index b2a2d2f0820..f0a9f13f20a 100644 --- a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java +++ b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java @@ -175,6 +175,11 @@ public enum ClientOperationType { */ SQL_CURSOR_NEXT_PAGE, + /** + * SQL Cursor Next ResultSet. + */ + SQL_CURSOR_NEXT_RESULT_SET, + /** * Send streamer batch ({@link DataStreamerTarget#streamData}). */ diff --git a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java index d6c20a7855c..44e0373106a 100644 --- a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java +++ b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java @@ -59,6 +59,7 @@ public class RetryReadPolicy extends RetryLimitPolicy { case SQL_EXECUTE: case SQL_EXECUTE_BATCH: case SQL_CURSOR_NEXT_PAGE: + case SQL_CURSOR_NEXT_RESULT_SET: case SQL_EXECUTE_SCRIPT: case STREAMER_BATCH_SEND: case PRIMARY_REPLICAS_GET: diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java index ad969d2d3d5..f8661224bc1 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java @@ -175,6 +175,9 @@ public class ClientUtils { case ClientOp.SQL_CURSOR_NEXT_PAGE: return ClientOperationType.SQL_CURSOR_NEXT_PAGE; + case ClientOp.SQL_CURSOR_NEXT_RESULT_SET: + return ClientOperationType.SQL_CURSOR_NEXT_RESULT_SET; + case ClientOp.SQL_CURSOR_CLOSE: return null; diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java index 82f1efaba4b..97d1cfd09fe 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientAsyncResultSet.java @@ -22,7 +22,9 @@ import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.internal.binarytuple.BinaryTupleReader; import org.apache.ignite.internal.client.ClientChannel; import org.apache.ignite.internal.client.proto.ClientMessageUnpacker; @@ -46,7 +48,7 @@ import org.jetbrains.annotations.Nullable; /** * Client async result set. */ -class ClientAsyncResultSet<T> implements AsyncResultSet<T> { +public class ClientAsyncResultSet<T> implements AsyncResultSet<T> { /** Channel. */ private final ClientChannel ch; @@ -84,6 +86,17 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> { /** Closed flag. */ private volatile boolean closed; + /** ID of the resource that holds the next cursor, can be {@code null} if current result set is the last one. */ + @Nullable + private final Long nextResultResourceId; + + /** Future that holds the next result set, can be {@code null} if the current result set is the last one. */ + @Nullable + private final CompletableFuture<ClientAsyncResultSet<T>> nextResultFuture; + + /** A flag indicating whether the next result set already was requested or not. */ + private final AtomicBoolean nextResultSetRetrieved = new AtomicBoolean(); + /** * Constructor. * @@ -93,6 +106,7 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> { * @param mapper Mapper. * @param partitionAwarenessEnabled Whether partitions awareness is enabled, hence response may contain related metadata. * @param sqlDirectMappingSupported Whether direct mapping is supported, hence response may contain additional metadata. + * @param sqlMultiStatementsSupported Whether iteration over the results of script execution is supported. */ ClientAsyncResultSet( ClientChannel ch, @@ -100,7 +114,8 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> { ClientMessageUnpacker in, @Nullable Mapper<T> mapper, boolean partitionAwarenessEnabled, - boolean sqlDirectMappingSupported + boolean sqlDirectMappingSupported, + boolean sqlMultiStatementsSupported ) { this.ch = ch; @@ -117,6 +132,14 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> { partitionAwarenessMetadata = null; } + if (sqlMultiStatementsSupported && !in.tryUnpackNil()) { + nextResultResourceId = in.unpackLong(); + nextResultFuture = new CompletableFuture<>(); + } else { + nextResultResourceId = null; + nextResultFuture = null; + } + this.mapper = mapper; marshaller = metadata != null && mapper != null && mapper.targetType() != SqlRow.class ? marshaller(metadata, marshallers, mapper) @@ -139,6 +162,47 @@ class ClientAsyncResultSet<T> implements AsyncResultSet<T> { return hasRowSet; } + /** + * Returns flag indicating whether the current result set is the result of + * a multi-statement query and this statement is not the last one. + */ + public boolean hasNextResultSet() { + return nextResultResourceId != null; + } + + /** + * Retrieves the next result set of a multi-statement query. + * + * @return Next result set. + * @throws NoSuchElementException if the query has no more statements to execute. + */ + public CompletableFuture<ClientAsyncResultSet<T>> nextResultSet() { + if (nextResultResourceId == null) { + return CompletableFuture.failedFuture(new NoSuchElementException("Query has no more results")); + } + + assert nextResultFuture != null; + + if (!nextResultSetRetrieved.compareAndSet(false, true)) { + return nextResultFuture; + } + + ch.<ClientAsyncResultSet<T>>serviceAsync(ClientOp.SQL_CURSOR_NEXT_RESULT_SET, + w -> w.out().packLong(nextResultResourceId), + r -> new ClientAsyncResultSet<>( + r.clientChannel(), null, r.in(), null, false, false, true + )) + .whenComplete((r, e) -> { + if (e != null) { + nextResultFuture.completeExceptionally(e); + } else { + nextResultFuture.complete(r); + } + }); + + return nextResultFuture; + } + /** {@inheritDoc} */ @Override public long affectedRows() { diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java index 9d9e01f875b..568c0df0300 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java @@ -321,6 +321,9 @@ public class ClientSql implements IgniteSql { Statement statement, @Nullable Object... arguments ) { + assert mapper == null || mapper.targetType() == SqlRow.class + || !queryModifiers.contains(QueryModifier.ALLOW_MULTISTATEMENT) : "Mapper is not supported for multi-statements."; + Objects.requireNonNull(statement); PartitionMappingProvider mappingProvider = mappingProviderCache.getIfPresent(new PaCacheKey(statement)); @@ -373,10 +376,11 @@ public class ClientSql implements IgniteSql { && r.clientChannel().protocolContext().isFeatureSupported(SQL_PARTITION_AWARENESS); boolean sqlDirectMappingSupported = r.clientChannel().protocolContext().isFeatureSupported(SQL_DIRECT_TX_MAPPING); + boolean sqlMultistatementsSupported = r.clientChannel().protocolContext().allFeaturesSupported(SQL_MULTISTATEMENT_SUPPORT); DirectTxUtils.readTx(r, ctx, tx, ch.observableTimestamp()); ClientAsyncResultSet<T> rs = new ClientAsyncResultSet<>( - r.clientChannel(), marshallers, r.in(), mapper, tryUnpackPaMeta, sqlDirectMappingSupported + r.clientChannel(), marshallers, r.in(), mapper, tryUnpackPaMeta, sqlDirectMappingSupported, sqlMultistatementsSupported ); ClientPartitionAwarenessMetadata partitionAwarenessMetadata = rs.partitionAwarenessMetadata(); diff --git a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java index 682cab4bfdc..8411c929f86 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java +++ b/modules/client/src/test/java/org/apache/ignite/client/ClientSqlTest.java @@ -307,6 +307,10 @@ public class ClientSqlTest extends AbstractClientTableTest { expected = "TX_CONTROL"; break; + case ALLOW_MULTISTATEMENT: + expected = "MULTISTATEMENT"; + break; + default: throw new IllegalArgumentException("Unexpected type: " + modifier); } diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java index 51759bb99ce..9372dc85e0d 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java @@ -32,6 +32,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.ignite.internal.sql.engine.AsyncSqlCursor; import org.apache.ignite.internal.sql.engine.InternalSqlRow; import org.apache.ignite.internal.sql.engine.SqlProperties; @@ -121,8 +122,8 @@ public class FakeCursor implements AsyncSqlCursor<InternalSqlRow> { columns.add(new FakeColumnMetadata("col1", ColumnType.INT32)); } else if ("SELECT ALLOWED QUERY TYPES".equals(qry)) { paMeta = null; - String row = properties.allowedQueryTypes().stream().map(SqlQueryType::name).sorted() - .collect(Collectors.joining(", ")); + String row = Stream.concat(properties.allowedQueryTypes().stream().map(SqlQueryType::name).sorted(), + properties.allowMultiStatement() ? Stream.of("MULTISTATEMENT") : Stream.empty()).collect(Collectors.joining(", ")); rows.add(getRow(row)); columns.add(new FakeColumnMetadata("col1", ColumnType.STRING)); } else { diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientMultistatementSqlTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientMultistatementSqlTest.java new file mode 100644 index 00000000000..bfb3ab34645 --- /dev/null +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientMultistatementSqlTest.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.runner.app.client; + +import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl; +import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException; +import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows; +import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR; +import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR; +import static org.apache.ignite.lang.ErrorGroups.Sql.TX_CONTROL_INSIDE_EXTERNAL_TX_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import org.apache.ignite.client.handler.ClientResourceRegistry; +import org.apache.ignite.internal.client.sql.ClientAsyncResultSet; +import org.apache.ignite.internal.client.sql.ClientSql; +import org.apache.ignite.internal.client.sql.QueryModifier; +import org.apache.ignite.internal.sql.SyncResultSetAdapter; +import org.apache.ignite.internal.sql.engine.SqlQueryProcessor; +import org.apache.ignite.internal.tx.TxManager; +import org.apache.ignite.lang.CancelHandle; +import org.apache.ignite.lang.CancellationToken; +import org.apache.ignite.sql.ResultSet; +import org.apache.ignite.sql.SqlRow; +import org.apache.ignite.sql.Statement; +import org.apache.ignite.sql.async.AsyncResultSet; +import org.apache.ignite.table.Table; +import org.apache.ignite.table.mapper.Mapper; +import org.apache.ignite.tx.Transaction; +import org.awaitility.Awaitility; +import org.jetbrains.annotations.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Thin client SQL multi-statement integration test. + * + * <p>Tests to check internal API for reading script execution results. + */ +@SuppressWarnings("resource") +public class ItThinClientMultistatementSqlTest extends ItAbstractThinClientTest { + private final List<ClientAsyncResultSet<SqlRow>> resultsToClose = new ArrayList<>(); + + private int resourcesBefore; + + @BeforeEach + void setup() { + resultsToClose.forEach(resultSet -> await(resultSet.closeAsync())); + + resourcesBefore = countResources(); + } + + @AfterEach + protected void checkNoPendingTransactionsAndOpenedCursors() { + Awaitility.await().timeout(5, TimeUnit.SECONDS).untilAsserted(() -> { + for (int i = 0; i < nodes(); i++) { + assertThat("node=" + i, queryProcessor(i).openedCursors(), is(0)); + } + + for (int i = 0; i < nodes(); i++) { + assertThat("node=" + i, txManager(i).pending(), is(0)); + } + + assertThat(countResources() - resourcesBefore, is(0)); + + for (int i = 0; i < nodes(); i++) { + int cancelHandlesCount = unwrapIgniteImpl(server(i)).clientInboundMessageHandler().cancelHandlesCount(); + + assertThat("node=" + i, cancelHandlesCount, is(0)); + } + }); + + String dropTablesScript = client().tables().tables().stream() + .map(Table::name) + .map(name -> "DROP TABLE " + name) + .collect(Collectors.joining(";\n")); + + if (!dropTablesScript.isEmpty()) { + client().sql().executeScript(dropTablesScript); + } + } + + /** Ensures that we can get the next result set after the current one is closed. */ + @Test + void checkIterationOverResultSets() { + ClientAsyncResultSet<SqlRow> asyncRs = runSql("SELECT 1; SELECT 2; SELECT 3;"); + + // First result set. + { + SyncResultSetAdapter<SqlRow> rs = new SyncResultSetAdapter<>(asyncRs); + assertThat(rs.hasRowSet(), is(true)); + assertThat(rs.next().intValue(0), is(1)); + + rs.close(); + } + + assertThat(asyncRs.hasNextResultSet(), is(true)); + + CompletableFuture<ClientAsyncResultSet<SqlRow>> nextResultFut = asyncRs.nextResultSet(); + // Ensures that the next result is cached locally + // and subsequent calls do not request data from the server. + assertThat(nextResultFut, is(asyncRs.nextResultSet())); + + asyncRs = await(asyncRs.nextResultSet()); + + // Second result set. + { + SyncResultSetAdapter<SqlRow> rs = new SyncResultSetAdapter<>(asyncRs); + assertThat(rs.hasRowSet(), is(true)); + assertThat(rs.next().intValue(0), is(2)); + + rs.close(); + } + + assertThat(asyncRs.hasNextResultSet(), is(true)); + // Ensures that the next result is cached locally + // and subsequent calls do not request data from the server. + assertThat(asyncRs.nextResultSet(), is(asyncRs.nextResultSet())); + asyncRs = await(asyncRs.nextResultSet()); + + // Second result set. + { + SyncResultSetAdapter<SqlRow> rs = new SyncResultSetAdapter<>(asyncRs); + assertThat(rs.hasRowSet(), is(true)); + assertThat(rs.next().intValue(0), is(3)); + + rs.close(); + } + + assertThat(asyncRs.hasNextResultSet(), is(false)); + } + + @Test + void basicMultiStatementQuery() { + String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);" + + "INSERT INTO test VALUES (3, 3);" + + "UPDATE test SET val=7 WHERE id=3;" + + "EXPLAIN PLAN FOR SELECT * FROM test;" + + "SELECT * FROM test;" + + "DELETE FROM test;" + + "DROP TABLE IF EXISTS non_existing_table;"; + + List<ClientAsyncResultSet<SqlRow>> resultSets = fetchAllResults(runSql(sql)); + Iterator<ClientAsyncResultSet<SqlRow>> curItr = resultSets.iterator(); + + ResultValidator.ddl(curItr.next(), true); + ResultValidator.dml(curItr.next(), 1); + ResultValidator.dml(curItr.next(), 1); + assertNotNull(curItr.next()); // skip EXPLAIN. + ResultValidator.singleRow(curItr.next(), 3, 7); + ResultValidator.dml(curItr.next(), 1); + ResultValidator.ddl(curItr.next(), false); + + assertThat(curItr.hasNext(), is(false)); + + resultSets.forEach(AsyncResultSet::closeAsync); + + // Ensures that the script is executed completely, even if the cursor data has not been read. + executeSql("INSERT INTO test VALUES (1, 1);" + + "INSERT INTO test VALUES (2, 2);" + + "SELECT * FROM test;" + + "INSERT INTO test VALUES (3, 3);"); + + expectRowsCount(null, "test", 3); + } + + @Test + public void txControlStatement() { + String query = "START TRANSACTION; COMMIT"; + + // Execution of the TX_CONTROL statement is allowed. + { + EnumSet<QueryModifier> modifiers = EnumSet.of( + QueryModifier.ALLOW_TX_CONTROL, + QueryModifier.ALLOW_MULTISTATEMENT + ); + + ClientAsyncResultSet<SqlRow> startFuture = runSql((Transaction) null, null, modifiers, query); + List<ClientAsyncResultSet<SqlRow>> resultSets = fetchAllResults(startFuture); + + assertThat(resultSets, hasSize(2)); + + ClientAsyncResultSet<SqlRow> rs = resultSets.get(0); + assertThat(rs.hasNextResultSet(), is(true)); + assertThat(rs.hasRowSet(), is(false)); + assertThat(rs.wasApplied(), is(false)); + assertThat(rs.affectedRows(), is(-1L)); + + rs = resultSets.get(1); + assertThat(rs.hasNextResultSet(), is(false)); + assertThat(rs.hasRowSet(), is(false)); + assertThat(rs.wasApplied(), is(false)); + assertThat(rs.affectedRows(), is(-1L)); + } + + // Execution of the TX_CONTROL statement is not allowed. + { + EnumSet<QueryModifier> modifiers = EnumSet.of( + QueryModifier.ALLOW_ROW_SET_RESULT, + QueryModifier.ALLOW_MULTISTATEMENT + ); + + assertThrowsSqlException( + STMT_VALIDATION_ERR, + "Invalid SQL statement type.", + () -> runSql((Transaction) null, null, modifiers, query) + ); + } + } + + @Test + void throwsNoSuchElementExceptionIfNoMoreResults() { + ClientAsyncResultSet<SqlRow> resulSet = runSql("SELECT 1"); + + assertThat(resulSet.hasNextResultSet(), is(false)); + + //noinspection ThrowableNotThrown + assertThrows(NoSuchElementException.class, () -> await(resulSet.nextResultSet()), "Query has no more results"); + } + + @Test + void queryWithDynamicParameters() { + String sql = "CREATE TABLE test (id INT PRIMARY KEY, val VARCHAR DEFAULT '3');" + + "INSERT INTO test VALUES(?, ?);" + + "INSERT INTO test VALUES(?, DEFAULT);" + + "INSERT INTO test VALUES(?, ?);"; + + executeSql(sql, 0, "1", 2, 4, "5"); + expectRowsCount(null, "test", 3); + } + + @Test + void explicitTransaction() { + executeSql("CREATE TABLE test (id INT PRIMARY KEY);"); + + Transaction tx = client().transactions().begin(); + executeSql(tx, "INSERT INTO test VALUES (0); INSERT INTO test VALUES (1); INSERT INTO test VALUES (2)"); + + expectRowsCount(tx, "test", 3); + expectRowsCount(null, "test", 0); + + tx.commit(); + + expectRowsCount(null, "test", 3); + } + + @Test + void queryWithIncorrectNumberOfDynamicParametersFailsWithValidationError() { + String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);" + + "INSERT INTO test VALUES(?, ?);" + + "INSERT INTO test VALUES(?, ?);"; + + String expectedMessage = "Unexpected number of query parameters"; + + assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () -> runSql(sql, 0)); + assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () -> runSql(sql, 0, 1, 2, 3, 4, 5)); + } + + @Test + void scriptStopsExecutionOnError() { + // Runtime error. + { + assertThrowsSqlException( + RUNTIME_ERR, + "Division by zero", + () -> executeSql( + "CREATE TABLE test (id INT PRIMARY KEY);" + + "INSERT INTO test VALUES (2/0);" // Runtime error. + + "INSERT INTO test VALUES (0)" + ) + ); + + expectRowsCount(null, "test", 0); + } + + // Validation error. + { + assertThrowsSqlException( + STMT_VALIDATION_ERR, + "Values passed to VALUES operator must have compatible types", + () -> executeSql( + "INSERT INTO test VALUES (?), (?)", + "1", 2 + ) + ); + + expectRowsCount(null, "test", 0); + } + + // Internal error. + { + assertThrowsSqlException( + RUNTIME_ERR, + "Subquery returned more than 1 value", + () -> executeSql( + "INSERT INTO test VALUES(0);" + + "INSERT INTO test VALUES(1);" + + "DELETE FROM test WHERE id = (SELECT id FROM test);" // Internal error. + + "INSERT INTO test VALUES(2);" + ) + ); + + expectRowsCount(null, "test", 2); + } + + // Same as above, but inside script managed transaction. + { + assertThrowsSqlException( + RUNTIME_ERR, + "Subquery returned more than 1 value", + () -> executeSql( + "START TRANSACTION;" + + "INSERT INTO test VALUES(2);" + + "INSERT INTO test VALUES(3);" + + "DELETE FROM test WHERE id = (SELECT id FROM test);" // Internal error. + + "INSERT INTO test VALUES(4);" + + "COMMIT;" + ) + ); + + expectRowsCount(null, "test", 2); + } + + // Attempt to start script-managed transaction inside external transaction. + { + Transaction tx = client().transactions().begin(); + + assertThrowsSqlException( + TX_CONTROL_INSIDE_EXTERNAL_TX_ERR, + "Transaction control statement cannot be executed within an external transaction.", + () -> executeSql( + tx, + "START TRANSACTION; COMMIT;" + ) + ); + + tx.rollback(); + } + + // Internal error due to transaction exception. + { + Transaction tx = client().transactions().begin(); + client().sql().execute(tx, "INSERT INTO test VALUES(2);").close(); + tx.commit(); + + assertThrowsSqlException( + TX_ALREADY_FINISHED_ERR, + "Transaction is already finished", + () -> executeSql( + tx, + "INSERT INTO test VALUES(3); INSERT INTO test VALUES(4);" + ) + ); + + expectRowsCount(null, "test", 3); + } + } + + @Test + public void cancelScript() { + StringBuilder query = new StringBuilder(); + + int statementsCount = 100; + + for (int j = 0; j < statementsCount; j++) { + query.append("SELECT x FROM TABLE(SYSTEM_RANGE(0, 100))").append(";"); + } + + CancelHandle cancelHandle = CancelHandle.create(); + CancellationToken token = cancelHandle.token(); + + List<ClientAsyncResultSet<SqlRow>> allResults = fetchAllResults(runSql((Transaction) null, token, null, query.toString())); + + assertThat(allResults, hasSize(statementsCount)); + + cancelHandle.cancel(); + + allResults.forEach(rs -> { + expectQueryCancelled(() -> { + AsyncResultSet<SqlRow> res; + do { + res = await(rs.fetchNextPage()); + } while (res.hasMorePages()); + }); + + rs.closeAsync(); + }); + } + + private void expectRowsCount(@Nullable Transaction tx, String table, long expectedCount) { + try (ResultSet<SqlRow> rs = client().sql().execute(tx, "SELECT COUNT(*) FROM " + table)) { + assertThat(rs.next().longValue(0), is(expectedCount)); + } + } + + private SqlQueryProcessor queryProcessor(int idx) { + return ((SqlQueryProcessor) unwrapIgniteImpl(server(idx)).queryEngine()); + } + + private TxManager txManager(int idx) { + return unwrapIgniteImpl(server(idx)).txManager(); + } + + private int countResources() { + int count = 0; + + for (int i = 0; i < nodes(); i++) { + ClientResourceRegistry resources = unwrapIgniteImpl(server(i)).clientInboundMessageHandler().resources(); + + count += resources.size(); + } + + return count; + } + + private ClientAsyncResultSet<SqlRow> runSql(String query, Object ... args) { + return runSql(null, null, null, query, args); + } + + private ClientAsyncResultSet<SqlRow> runSql( + @Nullable Transaction tx, + @Nullable CancellationToken cancelToken, + @Nullable Set<QueryModifier> queryModifiers, + String query, + Object... args + ) { + ClientSql clientSql = (ClientSql) client().sql(); + Statement stmt = clientSql.statementBuilder().query(query).pageSize(1).build(); + + return (ClientAsyncResultSet<SqlRow>) await( + clientSql.executeAsyncInternal( + tx, + (Mapper<SqlRow>) null, + cancelToken, + queryModifiers == null ? QueryModifier.ALL : queryModifiers, + stmt, + args + ) + ); + } + + private void executeSql(String sql, Object... args) { + executeSql(null, sql, args); + } + + private void executeSql(@Nullable Transaction tx, String sql, Object ... args) { + fetchAllResults(runSql(tx, null, null, sql, args)) + .forEach(rs -> await(rs.closeAsync())); + } + + private List<ClientAsyncResultSet<SqlRow>> fetchAllResults(ClientAsyncResultSet<SqlRow> resultSet) { + List<ClientAsyncResultSet<SqlRow>> resultSets = new ArrayList<>(); + + resultSets.add(resultSet); + resultsToClose.add(resultSet); + + ClientAsyncResultSet<SqlRow> resultSet0 = resultSet; + + while (resultSet0.hasNextResultSet()) { + resultSet0 = await(resultSet0.nextResultSet()); + + assertNotNull(resultSet0); + + resultSets.add(resultSet0); + + resultsToClose.add(resultSet0); + } + + return resultSets; + } + + private static class ResultValidator { + static void dml(ClientAsyncResultSet<SqlRow> resultSet, long affectedRows) { + assertThat(resultSet.hasRowSet(), is(false)); + assertThat(resultSet.affectedRows(), is(affectedRows)); + } + + private static void ddl(ClientAsyncResultSet<SqlRow> resultSet, boolean wasApplied) { + assertThat(resultSet.hasRowSet(), is(false)); + assertThat(resultSet.wasApplied(), is(wasApplied)); + } + + private static void singleRow(ClientAsyncResultSet<SqlRow> resultSet, Object... expected) { + assertThat(resultSet.hasRowSet(), is(true)); + Iterator<SqlRow> pageIter = resultSet.currentPage().iterator(); + SqlRow row = pageIter.next(); + int rowSize = row.metadata().columns().size(); + List<Object> actual = new ArrayList<>(rowSize); + + for (int i = 0; i < rowSize; i++) { + actual.add(row.value(i)); + } + + assertThat(List.of(expected), equalTo(actual)); + assertThat(pageIter.hasNext(), is(false)); + } + } +} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java index 7f63ace6266..a9ae5856c41 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java @@ -760,11 +760,12 @@ public class ItThinClientSqlTest extends ItAbstractThinClientTest { Statement ddlStatement = client().sql().createStatement("CREATE TABLE x(id INT PRIMARY KEY)"); Statement dmlStatement = client().sql().createStatement("INSERT INTO x VALUES (1), (2), (3)"); Statement selectStatement = client().sql().createStatement("SELECT * FROM x"); + Statement multiStatement = client().sql().createStatement("SELECT 1; SELECT 2;"); BiConsumer<Statement, Set<QueryModifier>> check = (stmt, types) -> { await(sql.executeAsyncInternal( null, - () -> SqlRow.class, + null, null, types, stmt @@ -816,10 +817,20 @@ public class ItThinClientSqlTest extends ItAbstractThinClientTest { ); } - // No exception expected with correct modifiers. - check.accept(ddlStatement, QueryModifier.ALL); - check.accept(dmlStatement, QueryModifier.ALL); - check.accept(selectStatement, QueryModifier.ALL); + // Incorrect modifier for multi-statement. + { + IgniteTestUtils.assertThrows( + SqlException.class, + () -> check.accept(multiStatement, QueryModifier.SINGLE_STMT_MODIFIERS), + "Multiple statements are not allowed." + ); + } + + // No exception expected with correct query modifier. + check.accept(ddlStatement, QueryModifier.SINGLE_STMT_MODIFIERS); + check.accept(dmlStatement, QueryModifier.SINGLE_STMT_MODIFIERS); + check.accept(selectStatement, QueryModifier.SINGLE_STMT_MODIFIERS); + check.accept(multiStatement, QueryModifier.ALL); } private static class Pojo { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java index e9a12b3ffa9..2112fad417c 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java @@ -77,6 +77,11 @@ public class AsyncResultSetImpl<T> implements AsyncResultSet<T> { return cursor.partitionAwarenessMetadata(); } + /** Returns query cursor. */ + public AsyncSqlCursor<InternalSqlRow> cursor() { + return cursor; + } + /** {@inheritDoc} */ @Override public boolean hasRowSet() { diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java index 1508984a20f..c47ef5d4419 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/SqlExceptionMapperProvider.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.catalog.CatalogValidationException; import org.apache.ignite.internal.lang.IgniteExceptionMapper; import org.apache.ignite.internal.lang.IgniteExceptionMappersProvider; import org.apache.ignite.internal.sql.engine.QueryCancelledException; +import org.apache.ignite.internal.sql.engine.TxControlInsideExternalTxNotSupportedException; import org.apache.ignite.internal.sql.engine.exec.RemoteFragmentExecutionException; import org.apache.ignite.lang.ErrorGroups.Common; import org.apache.ignite.lang.IgniteException; @@ -76,6 +77,9 @@ public class SqlExceptionMapperProvider implements IgniteExceptionMappersProvide mappers.add(unchecked(InternalCompilerException.class, err -> new SqlException(Common.INTERNAL_ERR, "Expression compiler error. " + err.getMessage(), err))); + mappers.add(unchecked(TxControlInsideExternalTxNotSupportedException.class, + err -> new SqlException(err.code(), err.getMessage(), err))); + return mappers; } }
