This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8d6264af0e IGNITE-20443 Sql. Implement script processing logic (#2789)
8d6264af0e is described below
commit 8d6264af0edb9752e2239f9b686abe5580056862
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu Nov 16 13:27:08 2023 +0300
IGNITE-20443 Sql. Implement script processing logic (#2789)
---
.../handler/requests/jdbc/JdbcQueryCursor.java | 19 ++
.../org/apache/ignite/client/fakes/FakeCursor.java | 10 +
.../client/fakes/FakeIgniteQueryProcessor.java | 11 +
.../sql/engine/ItSqlMultiStatementTest.java | 268 +++++++++++++++++++
.../ignite/internal/sql/engine/AsyncSqlCursor.java | 16 ++
.../internal/sql/engine/AsyncSqlCursorImpl.java | 43 +++
.../ignite/internal/sql/engine/QueryProcessor.java | 22 ++
.../internal/sql/engine/SqlQueryProcessor.java | 294 ++++++++++++++++++---
.../sql/engine/exec/ExecutionServiceImpl.java | 6 +
.../internal/sql/engine/sql/ScriptParseResult.java | 44 +--
.../sql/engine/sql/IgniteSqlParserTest.java | 15 ++
.../internal/sql/engine/util/QueryCheckerTest.java | 11 +
12 files changed, 711 insertions(+), 48 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
index c33f5fa3e3..232f2eb593 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursor.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client.handler.requests.jdbc;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
@@ -87,4 +88,22 @@ public class JdbcQueryCursor<T> implements AsyncSqlCursor<T>
{
public ResultSetMetadata metadata() {
return cur.metadata();
}
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNextResult() {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20661
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<AsyncSqlCursor<T>> nextResult() {
+ if (!hasNextResult()) {
+ throw new NoSuchElementException("Query has no more results");
+ }
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20661
+ return null;
+ }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
index d941fb3a9f..340c1459d3 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCursor.java
@@ -69,4 +69,14 @@ public class FakeCursor implements
AsyncSqlCursor<List<Object>> {
public ResultSetMetadata metadata() {
return null;
}
+
+ @Override
+ public boolean hasNextResult() {
+ return false;
+ }
+
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>> nextResult() {
+ throw new UnsupportedOperationException();
+ }
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index 0a996f57d2..9302f7f4ee 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -41,6 +41,17 @@ public class FakeIgniteQueryProcessor implements
QueryProcessor {
return CompletableFuture.completedFuture(new FakeCursor());
}
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync(
+ SqlProperties properties,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction transaction,
+ String qry,
+ Object... params
+ ) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void start() {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
new file mode 100644
index 0000000000..08db857f25
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItSqlMultiStatementTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.RUNTIME_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
+import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
+import org.apache.ignite.tx.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests to verify the execution of queries with multiple statements.
+ */
+@SuppressWarnings("ThrowableNotThrown")
+public class ItSqlMultiStatementTest extends BaseSqlIntegrationTest {
+ @AfterEach
+ void dropTables() {
+ dropAllTables();
+ }
+
+ @AfterEach
+ void checkNoPendingTransactions() {
+ assertEquals(0, txManager().pending());
+ }
+
+ @Test
+ void basicMultiStatementQuery() {
+ String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);"
+ + "INSERT INTO test VALUES (0, 0);"
+ + "EXPLAIN PLAN FOR SELECT * FROM test;"
+ + "SELECT * FROM test";
+
+ List<AsyncSqlCursor<List<Object>>> cursors =
fetchAllCursors(runScript(sql));
+ assertNotNull(cursors);
+
+ Iterator<AsyncSqlCursor<List<Object>>> curItr = cursors.iterator();
+
+ validateSingleResult(curItr.next(), true);
+ validateSingleResult(curItr.next(), 1L);
+ assertNotNull(curItr.next()); // skip EXPLAIN.
+ validateSingleResult(curItr.next(), 0, 0);
+
+ assertFalse(curItr.hasNext());
+
+ // Ensures that the script is executed completely, even if the cursor
data has not been read.
+ fetchAllCursors(runScript("INSERT INTO test VALUES (1, 1);"
+ + "INSERT INTO test VALUES (2, 2);"
+ + "SELECT * FROM test;"
+ + "INSERT INTO test VALUES (3, 3);"));
+
+ assertQuery("select * from test")
+ .returns(0, 0)
+ .returns(1, 1)
+ .returns(2, 2)
+ .returns(3, 3)
+ .check();
+ }
+
+ /** Checks single statement execution using multi-statement API. */
+ @Test
+ void singleStatementQuery() {
+ AsyncSqlCursor<List<Object>> cursor = runScript("CREATE TABLE test (id
INT PRIMARY KEY, val INT)");
+ assertNotNull(cursor);
+ validateSingleResult(cursor, true);
+ assertFalse(cursor.hasNextResult());
+ assertThrows(NoSuchElementException.class, cursor::nextResult, "Query
has no more results");
+
+ AsyncSqlCursor<List<Object>> cursor2 = runScript("INSERT INTO test
VALUES (0, 0)");
+ assertNotNull(cursor2);
+ validateSingleResult(cursor2, 1L);
+ assertFalse(cursor2.hasNextResult());
+
+ assertQuery("SELECT * FROM test").returns(0, 0).check();
+ }
+
+ @Test
+ void queryWithDynamicParameters() {
+ String sql = "CREATE TABLE test (id INT PRIMARY KEY, val VARCHAR
DEFAULT '3');"
+ + "INSERT INTO test VALUES(?, ?);"
+ + "INSERT INTO test VALUES(?, DEFAULT);"
+ + "INSERT INTO test VALUES(?, ?);";
+
+ fetchAllCursors(runScript(sql, null, 0, "1", 2, 4, "5"));
+
+ assertQuery("SELECT * FROM test")
+ .returns(0, "1")
+ .returns(2, "3")
+ .returns(4, "5")
+ .check();
+ }
+
+ @Test
+ void queryWithIncorrectNumberOfDynamicParametersFailsWithValidationError()
{
+ String sql = "CREATE TABLE test (id INT PRIMARY KEY, val INT);"
+ + "INSERT INTO test VALUES(?, ?);"
+ + "INSERT INTO test VALUES(?, ?);";
+
+ String expectedMessage = "Unexpected number of query parameters";
+
+ assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () ->
runScript(sql, null, 0));
+ assertThrowsSqlException(STMT_VALIDATION_ERR, expectedMessage, () ->
runScript(sql, null, 0, 1, 2, 3, 4, 5));
+ }
+
+ @Test
+ void transactionControlStatementDoesNotCreateCursor() {
+ assertThat(runScript("START TRANSACTION; COMMIT"), nullValue());
+
+ AsyncSqlCursor<List<Object>> cursor = runScript(
+ "START TRANSACTION;"
+ + "SELECT 1;"
+ + "COMMIT"
+ );
+
+ assertNotNull(cursor);
+ validateSingleResult(cursor, 1);
+
+ assertFalse(cursor.hasNextResult());
+ }
+
+ @Test
+ void scriptStopsExecutionOnError() {
+ // Runtime error.
+ AsyncSqlCursor<List<Object>> cursor = runScript(
+ "CREATE TABLE test (id INT PRIMARY KEY);"
+ + "SELECT 2/0;" // Runtime error.
+ + "INSERT INTO test VALUES (0)"
+ );
+ assertNotNull(cursor);
+ assertTrue(cursor.hasNextResult());
+
+ CompletableFuture<AsyncSqlCursor<List<Object>>> curFut0 =
cursor.nextResult();
+ assertThrowsSqlException(RUNTIME_ERR, "/ by zero", () ->
await(curFut0));
+
+ // Validation error.
+ assertThrowsSqlException(STMT_VALIDATION_ERR, "operator must have
compatible types",
+ () -> runScript("INSERT INTO test VALUES (?); INSERT INTO test
VALUES (1)", null, "Incompatible param"));
+
+ assertQuery("SELECT count(*) FROM test")
+ .returns(0L)
+ .check();
+
+ // Internal error.
+ cursor = runScript(
+ "INSERT INTO test VALUES(0);"
+ + "INSERT INTO test VALUES(1);"
+ + "SELECT (SELECT id FROM test);" // Internal error.
+ + "INSERT INTO test VALUES(2);"
+ );
+ assertNotNull(cursor);
+ assertTrue(cursor.hasNextResult());
+
+ cursor = await(cursor.nextResult());
+ assertNotNull(cursor);
+ assertTrue(cursor.hasNextResult());
+
+ CompletableFuture<AsyncSqlCursor<List<Object>>> cursFut =
cursor.nextResult();
+ assertThrowsSqlException(INTERNAL_ERR, "Subquery returned more than 1
value", () -> await(cursFut));
+
+ assertQuery("SELECT * FROM test")
+ .returns(0)
+ .returns(1)
+ .check();
+
+ // Internal error due to transaction exception.
+ Transaction tx = igniteTx().begin();
+ sql(tx, "INSERT INTO test VALUES(2);");
+ tx.commit();
+
+ assertThrowsSqlException(
+ TX_FAILED_READ_WRITE_OPERATION_ERR,
+ "Transaction is already finished",
+ () -> runScript(
+ "INSERT INTO test VALUES(3); INSERT INTO test
VALUES(4);",
+ (InternalTransaction) tx
+ )
+ );
+
+ assertQuery("SELECT * FROM test")
+ .returns(0)
+ .returns(1)
+ .returns(2)
+ .check();
+
+ // DDL inside external transaction.
+ assertThrowsSqlException(STMT_VALIDATION_ERR, "DDL doesn't support
transactions.",
+ () -> runScript("CREATE TABLE test2 (id INT PRIMARY KEY)",
(InternalTransaction) tx));
+ }
+
+ private @Nullable AsyncSqlCursor<List<Object>> runScript(String query) {
+ return runScript(query, null);
+ }
+
+ private @Nullable AsyncSqlCursor<List<Object>> runScript(
+ String query,
+ @Nullable InternalTransaction tx,
+ Object ... params
+ ) {
+ QueryProcessor qryProc = queryProcessor();
+
+ return
await(qryProc.queryScriptAsync(SqlPropertiesHelper.emptyProperties(),
igniteTx(), tx, query, params));
+ }
+
+ private static @Nullable List<AsyncSqlCursor<List<Object>>>
fetchAllCursors(@Nullable AsyncSqlCursor<List<Object>> cursor) {
+ if (cursor == null) {
+ return null;
+ }
+
+ List<AsyncSqlCursor<List<Object>>> cursors = new ArrayList<>();
+
+ cursors.add(cursor);
+
+ while (cursor.hasNextResult()) {
+ cursor = await(cursor.nextResult());
+
+ assertNotNull(cursor);
+
+ cursors.add(cursor);
+ }
+
+ return cursors;
+ }
+
+ private static void validateSingleResult(AsyncSqlCursor<List<Object>>
cursor, Object... expected) {
+ BatchedResult<List<Object>> res = await(cursor.requestNextAsync(1));
+ assertNotNull(res);
+ assertEquals(List.of(List.of(expected)), res.items());
+ assertFalse(res.hasMore());
+
+ cursor.closeAsync();
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java
index af8608a1b0..b314040d6f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursor.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.sql.engine;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -35,4 +37,18 @@ public interface AsyncSqlCursor<T> extends AsyncCursor<T> {
* Returns column metadata.
*/
ResultSetMetadata metadata();
+
+ /**
+ * Returns {@code true} if the current cursor is the result of a
multi-statement query
+ * and this statement is not the last one, {@code false} otherwise.
+ */
+ boolean hasNextResult();
+
+ /**
+ * Returns the future for the next statement of the query.
+ *
+ * @return Future that completes when the next statement completes.
+ * @throws NoSuchElementException if the query has no more statements to
execute.
+ */
+ CompletableFuture<AsyncSqlCursor<T>> nextResult();
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 71a63be42a..421f780cc0 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -17,12 +17,14 @@
package org.apache.ignite.internal.sql.engine;
+import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.sql.ResultSetMetadata;
+import org.jetbrains.annotations.Nullable;
/**
* Sql query cursor.
@@ -35,12 +37,14 @@ public class AsyncSqlCursorImpl<T> implements
AsyncSqlCursor<T> {
private final QueryTransactionWrapper txWrapper;
private final AsyncCursor<T> dataCursor;
private final Runnable onClose;
+ private final CompletableFuture<AsyncSqlCursor<T>> nextStatement;
/**
* Constructor.
*
* @param queryType Type of the query.
* @param meta The meta of the result set.
+ * @param txWrapper Transaction wrapper.
* @param dataCursor The result set.
* @param onClose Callback to invoke when cursor is closed.
*/
@@ -50,12 +54,35 @@ public class AsyncSqlCursorImpl<T> implements
AsyncSqlCursor<T> {
QueryTransactionWrapper txWrapper,
AsyncCursor<T> dataCursor,
Runnable onClose
+ ) {
+ this(queryType, meta, txWrapper, dataCursor, onClose, null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param queryType Type of the query.
+ * @param meta The meta of the result set.
+ * @param txWrapper Transaction wrapper.
+ * @param dataCursor The result set.
+ * @param onClose Callback to invoke when cursor is closed.
+ * @param nextStatement Next statement future, non-null in the case of a
+ * multi-statement query and if current statement is not the last.
+ */
+ AsyncSqlCursorImpl(
+ SqlQueryType queryType,
+ ResultSetMetadata meta,
+ QueryTransactionWrapper txWrapper,
+ AsyncCursor<T> dataCursor,
+ Runnable onClose,
+ @Nullable CompletableFuture<AsyncSqlCursor<T>> nextStatement
) {
this.queryType = queryType;
this.meta = meta;
this.txWrapper = txWrapper;
this.dataCursor = dataCursor;
this.onClose = onClose;
+ this.nextStatement = nextStatement;
}
/** {@inheritDoc} */
@@ -89,6 +116,22 @@ public class AsyncSqlCursorImpl<T> implements
AsyncSqlCursor<T> {
});
}
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNextResult() {
+ return nextStatement != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<AsyncSqlCursor<T>> nextResult() {
+ if (nextStatement == null) {
+ throw new NoSuchElementException("Query has no more results");
+ }
+
+ return nextStatement;
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> closeAsync() {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index f89717a892..e3e924b8b1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -53,4 +53,26 @@ public interface QueryProcessor extends IgniteComponent {
String qry,
Object... params
);
+
+ /**
+ * Execute the multi-statement query with given schema name and parameters.
+ *
+ * @param properties User query properties. See {@link QueryProperty} for
available properties.
+ * @param transactions Transactions facade.
+ * @param transaction A transaction to use for query execution. If null,
an implicit transaction
+ * will be started by provided transactions facade.
+ * @param qry Multi statement SQL query.
+ * @param params Query parameters.
+ * @return Sql cursor.
+ *
+ * @throws IgniteException in case of an error.
+ * @see QueryProperty
+ */
+ CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync(
+ SqlProperties properties,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction transaction,
+ String qry,
+ Object... params
+ );
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index cd914bc079..7830570d2c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -18,17 +18,22 @@
package org.apache.ignite.internal.sql.engine;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.lang.SqlExceptionMapperUtil.mapToPublicSqlException;
import static
org.apache.ignite.internal.sql.engine.util.Commons.FRAMEWORK_CONFIG;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
import static org.apache.ignite.lang.ErrorGroups.Sql.STMT_VALIDATION_ERR;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -351,6 +356,26 @@ public class SqlQueryProcessor implements QueryProcessor {
}
}
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>> queryScriptAsync(
+ SqlProperties properties,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction transaction,
+ String qry,
+ Object... params
+ ) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
+ }
+
+ try {
+ return queryScript0(properties, transactions, transaction, qry,
params);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
private <T extends LifecycleAware> T registerService(T service) {
services.add(service);
@@ -374,25 +399,11 @@ public class SqlQueryProcessor implements QueryProcessor {
CompletableFuture<AsyncSqlCursor<List<Object>>> stage =
start.thenCompose(ignored -> {
ParsedResult result = parserService.parse(sql);
- validateParsedStatement(properties0, result, params,
explicitTransaction);
+ validateParsedStatement(properties0, result, params);
QueryTransactionWrapper txWrapper =
wrapTxOrStartImplicit(result.queryType(), transactions, explicitTransaction);
- return waitForActualSchema(schemaName,
txWrapper.unwrap().startTimestamp())
- .thenCompose(schema -> {
- BaseQueryContext ctx = BaseQueryContext.builder()
-
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
- .queryId(UUID.randomUUID())
- .cancel(queryCancel)
- .parameters(params)
- .build();
-
- return prepareSvc.prepareAsync(result,
ctx).thenApply(plan -> executePlan(txWrapper, ctx, plan));
- }).whenComplete((res, ex) -> {
- if (ex != null) {
- txWrapper.rollback();
- }
- });
+ return executeParsedStatement(schemaName, result, txWrapper,
queryCancel, params, false, null);
});
// TODO IGNITE-20078 Improve (or remove) CancellationException
handling.
@@ -407,6 +418,69 @@ public class SqlQueryProcessor implements QueryProcessor {
return stage;
}
+ private CompletableFuture<AsyncSqlCursor<List<Object>>> queryScript0(
+ SqlProperties properties,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction explicitTransaction,
+ String sql,
+ Object... params
+ ) {
+ SqlProperties properties0 = SqlPropertiesHelper.chain(properties,
DEFAULT_PROPERTIES);
+ String schemaName = properties0.get(QueryProperty.DEFAULT_SCHEMA);
+
+ CompletableFuture<?> start = new CompletableFuture<>();
+
+ CompletableFuture<AsyncSqlCursor<List<Object>>> parseFut = start
+ .thenApply(ignored -> parserService.parseScript(sql))
+ .thenCompose(parsedResults -> {
+ MultiStatementHandler handler = new MultiStatementHandler(
+ schemaName, transactions, explicitTransaction,
parsedResults, params);
+
+ return handler.processNext();
+ });
+
+ start.completeAsync(() -> null, taskExecutor);
+
+ return parseFut;
+ }
+
+ private CompletableFuture<AsyncSqlCursor<List<Object>>>
executeParsedStatement(
+ String schemaName,
+ ParsedResult parsedResult,
+ QueryTransactionWrapper txWrapper,
+ QueryCancel queryCancel,
+ Object[] params,
+ boolean waitForPrefetch,
+ @Nullable CompletableFuture<AsyncSqlCursor<List<Object>>>
nextStatement
+ ) {
+ return waitForActualSchema(schemaName,
txWrapper.unwrap().startTimestamp())
+ .thenCompose(schema -> {
+ PrefetchCallback callback = waitForPrefetch ? new
PrefetchCallback() : null;
+
+ BaseQueryContext ctx = BaseQueryContext.builder()
+
.frameworkConfig(Frameworks.newConfigBuilder(FRAMEWORK_CONFIG).defaultSchema(schema).build())
+ .queryId(UUID.randomUUID())
+ .cancel(queryCancel)
+ .prefetchCallback(callback)
+ .parameters(params)
+ .build();
+
+ CompletableFuture<AsyncSqlCursor<List<Object>>> fut =
prepareSvc.prepareAsync(parsedResult, ctx)
+ .thenApply(plan -> executePlan(txWrapper, ctx,
plan, nextStatement));
+
+ if (waitForPrefetch) {
+ fut = fut.thenCompose(cursor ->
callback.prefetchFuture().thenApply(ignore -> cursor));
+ }
+
+ return fut;
+ })
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ txWrapper.rollback();
+ }
+ });
+ }
+
private CompletableFuture<SchemaPlus> waitForActualSchema(String
schemaName, HybridTimestamp timestamp) {
try {
return
schemaSyncService.waitForMetadataCompleteness(timestamp).thenApply(unused -> {
@@ -426,7 +500,8 @@ public class SqlQueryProcessor implements QueryProcessor {
private AsyncSqlCursor<List<Object>> executePlan(
QueryTransactionWrapper txWrapper,
BaseQueryContext ctx,
- QueryPlan plan
+ QueryPlan plan,
+ @Nullable CompletableFuture<AsyncSqlCursor<List<Object>>>
nextStatement
) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
@@ -443,7 +518,8 @@ public class SqlQueryProcessor implements QueryProcessor {
plan.metadata(),
txWrapper,
dataCursor,
- () -> openedCursors.remove(queryId)
+ () -> openedCursors.remove(queryId),
+ nextStatement
);
Object old = openedCursors.put(queryId, cursor);
@@ -477,6 +553,14 @@ public class SqlQueryProcessor implements QueryProcessor {
return new QueryTransactionWrapper(tx, true);
}
+ if (SqlQueryType.DDL == queryType) {
+ throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't support
transactions.");
+ }
+
+ if (SqlQueryType.DML == queryType && outerTx.isReadOnly()) {
+ throw new SqlException(STMT_VALIDATION_ERR, "DML query cannot be
started by using read only transactions.");
+ }
+
return new QueryTransactionWrapper(outerTx, false);
}
@@ -489,22 +573,11 @@ public class SqlQueryProcessor implements QueryProcessor {
private static void validateParsedStatement(
SqlProperties properties,
ParsedResult parsedResult,
- Object[] params,
- @Nullable InternalTransaction outerTx
+ Object[] params
) {
Set<SqlQueryType> allowedTypes =
properties.get(QueryProperty.ALLOWED_QUERY_TYPES);
SqlQueryType queryType = parsedResult.queryType();
- if (outerTx != null) {
- if (SqlQueryType.DDL == queryType) {
- throw new SqlException(STMT_VALIDATION_ERR, "DDL doesn't
support transactions.");
- }
-
- if (SqlQueryType.DML == queryType && outerTx.isReadOnly()) {
- throw new SqlException(STMT_VALIDATION_ERR, "DML query cannot
be started by using read only transactions.");
- }
- }
-
if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
String message = "Transaction control statement can not be
executed as an independent statement";
@@ -517,10 +590,14 @@ public class SqlQueryProcessor implements QueryProcessor {
throw new SqlException(STMT_VALIDATION_ERR, message);
}
- if (parsedResult.dynamicParamsCount() != params.length) {
+ validateDynamicParameters(parsedResult.dynamicParamsCount(), params);
+ }
+
+ private static void validateDynamicParameters(int expectedParamsCount,
Object[] params) throws SqlException {
+ if (expectedParamsCount != params.length) {
String message = format(
"Unexpected number of query parameters. Provided {} but
there is only {} dynamic parameter(s).",
- params.length, parsedResult.dynamicParamsCount()
+ params.length, expectedParamsCount
);
throw new SqlException(STMT_VALIDATION_ERR, message);
@@ -535,4 +612,157 @@ public class SqlQueryProcessor implements QueryProcessor {
}
}
}
+
+ private class MultiStatementHandler {
+ private final String schemaName;
+ private final IgniteTransactions transactions;
+ private final @Nullable InternalTransaction explicitTransaction;
+ private final Queue<ScriptStatementParameters> statements;
+
+ MultiStatementHandler(
+ String schemaName,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction explicitTransaction,
+ List<ParsedResult> parsedResults,
+ Object[] params
+ ) {
+ this.schemaName = schemaName;
+ this.transactions = transactions;
+ this.explicitTransaction = explicitTransaction;
+ this.statements = prepareStatementsQueue(parsedResults, params);
+ }
+
+ CompletableFuture<AsyncSqlCursor<List<Object>>> processNext() {
+ if (statements == null) {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20463
Each tx control statement must return an empty cursor.
+ return CompletableFuture.completedFuture(null);
+ }
+
+ ScriptStatementParameters parameters = statements.poll();
+
+ assert parameters != null;
+
+ ParsedResult parsedResult = parameters.parsedResult;
+ Object[] dynamicParams = parameters.dynamicParams;
+ CompletableFuture<AsyncSqlCursor<List<Object>>> cursorFuture =
parameters.cursorFuture;
+ CompletableFuture<AsyncSqlCursor<List<Object>>> nextCursorFuture =
parameters.nextStatementFuture;
+
+ try {
+ if (cursorFuture.isDone()) {
+ return cursorFuture;
+ }
+
+ QueryTransactionWrapper txWrapper =
wrapTxOrStartImplicit(parsedResult.queryType(), transactions,
explicitTransaction);
+
+ QueryCancel cancel = new QueryCancel();
+
+ executeParsedStatement(schemaName, parsedResult, txWrapper,
cancel, dynamicParams, true, nextCursorFuture)
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ cursorFuture.completeExceptionally(ex);
+ cancelAll(ex);
+
+ return;
+ }
+
+ txWrapper.commitImplicit();
+
+ if (nextCursorFuture != null) {
+ taskExecutor.execute(this::processNext);
+ }
+
+ cursorFuture.complete(res);
+ });
+ } catch (Exception e) {
+ cursorFuture.completeExceptionally(e);
+
+ cancelAll(e);
+ }
+
+ return cursorFuture;
+ }
+
+ /**
+ * Returns a queue. each element of which represents parameters
required to execute a single statement of the script.
+ */
+ private @Nullable Queue<ScriptStatementParameters>
prepareStatementsQueue(List<ParsedResult> parsedResults, Object[] params) {
+ List<ParsedResult> parsedResults0 = parsedResults.stream()
+ // TODO https://issues.apache.org/jira/browse/IGNITE-20463
Integrate TX-related statements
+ .filter(res -> res.queryType() !=
SqlQueryType.TX_CONTROL).collect(Collectors.toList());
+
+ if (parsedResults0.isEmpty()) {
+ return null;
+ }
+
+ int paramsCount =
parsedResults0.stream().mapToInt(ParsedResult::dynamicParamsCount).sum();
+ validateDynamicParameters(paramsCount, params);
+
+ ScriptStatementParameters[] results = new
ScriptStatementParameters[parsedResults0.size()];
+
+ // We fill parameters in reverse order, because each script
statement
+ // requires a reference to the future of the next statement.
+ for (int i = parsedResults0.size(); i > 0; i--) {
+ ParsedResult result = parsedResults0.get(i - 1);
+
+ Object[] params0 = Arrays.copyOfRange(params, paramsCount -
result.dynamicParamsCount(), paramsCount);
+ paramsCount -= result.dynamicParamsCount();
+
+ results[i - 1] = new ScriptStatementParameters(result, params0,
+ i < parsedResults0.size() ? results[i].cursorFuture :
null);
+ }
+
+ return new ArrayBlockingQueue<>(results.length, false,
List.of(results));
+ }
+
+ private void cancelAll(Throwable cause) {
+ for (ScriptStatementParameters parameters : statements) {
+ CompletableFuture<AsyncSqlCursor<List<Object>>> fut =
parameters.cursorFuture;
+
+ if (fut.isDone()) {
+ continue;
+ }
+
+ fut.completeExceptionally(new SqlException(
+ EXECUTION_CANCELLED_ERR,
+ "The script execution was canceled due to an error in
the previous statement.",
+ cause
+ ));
+ }
+ }
+
+ private class ScriptStatementParameters {
+ private final CompletableFuture<AsyncSqlCursor<List<Object>>>
cursorFuture = new CompletableFuture<>();
+ private final CompletableFuture<AsyncSqlCursor<List<Object>>>
nextStatementFuture;
+ private final ParsedResult parsedResult;
+ private final Object[] dynamicParams;
+
+ private ScriptStatementParameters(
+ ParsedResult parsedResult,
+ Object[] dynamicParams,
+ @Nullable CompletableFuture<AsyncSqlCursor<List<Object>>>
nextStatementFuture
+ ) {
+ this.parsedResult = parsedResult;
+ this.dynamicParams = dynamicParams;
+ this.nextStatementFuture = nextStatementFuture;
+ }
+ }
+ }
+
+ /** Completes the provided future when the callback is called. */
+ private static class PrefetchCallback implements QueryPrefetchCallback {
+ private final CompletableFuture<Void> prefetchFuture = new
CompletableFuture<>();
+
+ @Override
+ public void onPrefetchComplete(@Nullable Throwable ex) {
+ if (ex == null) {
+ prefetchFuture.complete(null);
+ } else {
+
prefetchFuture.completeExceptionally(mapToPublicSqlException(ex));
+ }
+ }
+
+ CompletableFuture<Void> prefetchFuture() {
+ return prefetchFuture;
+ }
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 8f8103c6d4..a7ea216d5d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -490,6 +490,12 @@ public class ExecutionServiceImpl<RowT> implements
ExecutionService, TopologyEve
root.exceptionally(t -> {
this.close(true);
+ QueryPrefetchCallback callback = ctx.prefetchCallback();
+
+ if (callback != null) {
+ taskExecutor.execute(() ->
callback.onPrefetchComplete(t));
+ }
+
return null;
});
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java
index 2d59946b14..3b2b546fcc 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/sql/ScriptParseResult.java
@@ -22,9 +22,8 @@ import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.calcite.sql.SqlDynamicParam;
import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.util.SqlShuttle;
import org.apache.ignite.internal.tostring.S;
-import org.jetbrains.annotations.Nullable;
/**
* Result of parsing SQL string that multiple statements.
@@ -41,9 +40,22 @@ public final class ScriptParseResult extends ParseResult {
return new ScriptParseResult(List.of(new
StatementParseResult(list.get(0), dynamicParamsCount)), dynamicParamsCount);
}
- SqlDynamicParamsCounter paramsCounter = dynamicParamsCount == 0 ?
null : new SqlDynamicParamsCounter();
+ SqlDynamicParamsAdjuster dynamicParamsAdjuster = new
SqlDynamicParamsAdjuster();
+
List<StatementParseResult> results = list.stream()
- .map(node -> new StatementParseResult(node, paramsCounter
== null ? 0 : paramsCounter.forNode(node)))
+ .map(node -> {
+ if (dynamicParamsCount == 0) {
+ return new StatementParseResult(node, 0);
+ }
+
+ dynamicParamsAdjuster.reset();
+
+ SqlNode newTree =
dynamicParamsAdjuster.visitNode(node);
+
+ assert newTree != null;
+
+ return new StatementParseResult(newTree,
dynamicParamsAdjuster.paramsCount());
+ })
.collect(Collectors.toList());
return new ScriptParseResult(results, dynamicParamsCount);
@@ -69,25 +81,25 @@ public final class ScriptParseResult extends ParseResult {
}
/**
- * Counts the number of {@link SqlDynamicParam} nodes in the tree.
+ * Adjusts the dynamic parameter indexes to match the single statement
parameter indexes.
*/
@NotThreadSafe
- static class SqlDynamicParamsCounter extends SqlBasicVisitor<Object> {
- int count;
+ private static final class SqlDynamicParamsAdjuster extends SqlShuttle {
+ private int counter;
@Override
- public @Nullable Object visit(SqlDynamicParam param) {
- count++;
-
- return null;
+ public SqlNode visit(SqlDynamicParam param) {
+ return new SqlDynamicParam(counter++, param.getParserPosition());
}
- int forNode(SqlNode node) {
- count = 0;
-
- this.visitNode(node);
+ /** Resets the dynamic parameters counter. */
+ void reset() {
+ counter = 0;
+ }
- return count;
+ /** Returns the number of processed dynamic parameters. */
+ int paramsCount() {
+ return counter;
}
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java
index 8bc52a3c6a..cb5c48ad59 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/sql/IgniteSqlParserTest.java
@@ -93,6 +93,21 @@ public class IgniteSqlParserTest {
() -> IgniteSqlParser.parse("", StatementParseResult.MODE));
}
+ @Test
+ public void testEmptyStatements() {
+ assertThrowsSqlException(Sql.STMT_PARSE_ERR,
+ "Failed to parse query: Encountered \";\" at line 1, column 1",
+ () -> IgniteSqlParser.parse(";", ScriptParseResult.MODE));
+
+ assertThrowsSqlException(Sql.STMT_PARSE_ERR,
+ "Failed to parse query: Encountered \";\" at line 2, column 1",
+ () -> IgniteSqlParser.parse("--- comment\n;",
ScriptParseResult.MODE));
+
+ assertThrowsSqlException(Sql.STMT_PARSE_ERR,
+ "Failed to parse query: Encountered \"<EOF>\" at line 1,
column 11",
+ () -> IgniteSqlParser.parse("--- comment",
ScriptParseResult.MODE));
+ }
+
@Test
public void testCommentedQuery() {
assertThrowsSqlException(
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
index 938e577abc..9fe1a6920c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/util/QueryCheckerTest.java
@@ -293,6 +293,17 @@ public class QueryCheckerTest extends
BaseIgniteAbstractTest {
return CompletableFuture.completedFuture(sqlCursor);
}
+ @Override
+ public CompletableFuture<AsyncSqlCursor<List<Object>>>
queryScriptAsync(
+ SqlProperties properties,
+ IgniteTransactions transactions,
+ @Nullable InternalTransaction transaction,
+ String qry,
+ Object... params
+ ) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void start() {
// NO-OP