This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 39bc0f37d5 IGNITE-18984 Sql. Migrate JDBC batched methods on new
internal API (#1866)
39bc0f37d5 is described below
commit 39bc0f37d597ec2d58db27e3eeb095097625d70f
Author: ygerzhedovich <[email protected]>
AuthorDate: Fri Mar 31 18:44:20 2023 +0300
IGNITE-18984 Sql. Migrate JDBC batched methods on new internal API (#1866)
---
.../internal/jdbc/proto/JdbcQueryEventHandler.java | 8 +-
.../client/handler/JdbcQueryEventHandlerImpl.java | 40 ++++---
.../jdbc/ClientJdbcExecuteBatchRequest.java | 4 +-
.../jdbc/ClientJdbcPreparedStmntBatchRequest.java | 4 +-
.../client/fakes/FakeIgniteQueryProcessor.java | 11 --
.../apache/ignite/jdbc/ItJdbcBatchSelfTest.java | 15 ++-
.../internal/jdbc/JdbcClientQueryEventHandler.java | 17 ++-
.../internal/jdbc/JdbcPreparedStatement.java | 2 +-
.../apache/ignite/internal/jdbc/JdbcStatement.java | 2 +-
.../internal/ClusterPerTestIntegrationTest.java | 11 +-
.../ignite/internal/sql/engine/QueryProcessor.java | 25 -----
.../internal/sql/engine/SqlQueryProcessor.java | 124 +--------------------
.../internal/sql/engine/StopCalciteModuleTest.java | 20 ++--
.../sql/engine/exec/MockedStructuresTest.java | 89 +++++++++------
14 files changed, 145 insertions(+), 227 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java
index 737c0a4983..a71c49a203 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/jdbc/proto/JdbcQueryEventHandler.java
@@ -47,6 +47,7 @@ public interface JdbcQueryEventHandler {
/**
* {@link JdbcQueryExecuteRequest} command handler.
*
+ * @param connectionId Identifier of the connection.
* @param req Execute query request.
* @return Result future.
*/
@@ -55,19 +56,20 @@ public interface JdbcQueryEventHandler {
/**
* {@link JdbcBatchExecuteRequest} command handler.
*
+ * @param connectionId Identifier of the connection.
* @param req Batch query request.
* @return Result future.
*/
- CompletableFuture<JdbcBatchExecuteResult>
batchAsync(JdbcBatchExecuteRequest req);
+ CompletableFuture<JdbcBatchExecuteResult> batchAsync(long connectionId,
JdbcBatchExecuteRequest req);
/**
* {@link JdbcBatchPreparedStmntRequest} command handler.
*
+ * @param connectionId The identifier of the connection.
* @param req Batch query request.
* @return Result future.
*/
- CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(
- JdbcBatchPreparedStmntRequest req);
+ CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(long
connectionId, JdbcBatchPreparedStmntRequest req);
/**
* {@link JdbcMetaTablesRequest} command handler.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index 82b04200d4..9836ffc154 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -60,6 +60,7 @@ import
org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.lang.ErrorGroups.Client;
import org.apache.ignite.lang.ErrorGroups.Sql;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalCheckedException;
@@ -177,14 +178,14 @@ public class JdbcQueryEventHandlerImpl implements
JdbcQueryEventHandler {
/** {@inheritDoc} */
@Override
- public CompletableFuture<JdbcBatchExecuteResult>
batchAsync(JdbcBatchExecuteRequest req) {
+ public CompletableFuture<JdbcBatchExecuteResult> batchAsync(long
connectionId, JdbcBatchExecuteRequest req) {
List<String> queries = req.queries();
var counters = new IntArrayList(req.queries().size());
var tail = CompletableFuture.completedFuture(counters);
for (String query : queries) {
- tail = tail.thenCompose(list ->
executeAndCollectUpdateCount(req.schemaName(), query, OBJECT_EMPTY_ARRAY)
+ tail = tail.thenCompose(list ->
executeAndCollectUpdateCount(connectionId, query, OBJECT_EMPTY_ARRAY)
.thenApply(cnt -> {
list.add(cnt > Integer.MAX_VALUE ?
Statement.SUCCESS_NO_INFO : cnt.intValue());
@@ -203,15 +204,14 @@ public class JdbcQueryEventHandlerImpl implements
JdbcQueryEventHandler {
/** {@inheritDoc} */
@Override
- public CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(
- JdbcBatchPreparedStmntRequest req) {
+ public CompletableFuture<JdbcBatchExecuteResult>
batchPrepStatementAsync(long connectionId, JdbcBatchPreparedStmntRequest req) {
var argList = req.getArgs();
var counters = new IntArrayList(req.getArgs().size());
var tail = CompletableFuture.completedFuture(counters);
for (Object[] args : argList) {
- tail = tail.thenCompose(list ->
executeAndCollectUpdateCount(req.schemaName(), req.getQuery(), args)
+ tail = tail.thenCompose(list ->
executeAndCollectUpdateCount(connectionId, req.getQuery(), args)
.thenApply(cnt -> {
list.add(cnt > Integer.MAX_VALUE ?
Statement.SUCCESS_NO_INFO : cnt.intValue());
@@ -228,16 +228,25 @@ public class JdbcQueryEventHandlerImpl implements
JdbcQueryEventHandler {
});
}
- private CompletableFuture<Long> executeAndCollectUpdateCount(String
schema, String sql, Object[] arg) {
+ private CompletableFuture<Long> executeAndCollectUpdateCount(long
connectionId, String sql, Object[] arg) {
var context =
createQueryContext(JdbcStatementType.UPDATE_STATEMENT_TYPE);
- var cursors = processor.queryAsync(context, schema, sql, arg);
-
- if (cursors.size() != 1) {
- return CompletableFuture.failedFuture(new
IgniteInternalException("Multi statement queries are not supported in
batching"));
+ JdbcConnectionContext connectionContext;
+ try {
+ connectionContext =
resources.get(connectionId).get(JdbcConnectionContext.class);
+ } catch (IgniteInternalCheckedException exception) {
+ return CompletableFuture.failedFuture(new
IgniteInternalException(Client.CONNECTION_ERR));
}
- return cursors.get(0).thenCompose(cursor ->
cursor.requestNextAsync(1).thenApply(batch -> (Long)
batch.items().get(0).get(0)));
+ CompletableFuture<AsyncSqlCursor<List<Object>>> result =
connectionContext.doInSession(sessionId -> processor.querySingleAsync(
+ sessionId,
+ context,
+ sql,
+ arg == null ? OBJECT_EMPTY_ARRAY : arg
+ ));
+
+ return result.thenCompose(cursor -> cursor.requestNextAsync(1))
+ .thenApply(batch -> (Long) batch.items().get(0).get(0));
}
private JdbcBatchExecuteResult handleBatchException(Throwable e, String
query, int[] counters) {
@@ -290,7 +299,8 @@ public class JdbcQueryEventHandlerImpl implements
JdbcQueryEventHandler {
try (PrintWriter pw = new PrintWriter(sw)) {
// We need to remap QueryValidationException into a jdbc error.
- if (cause instanceof IgniteException && cause.getCause()
instanceof QueryValidationException) {
+ if (cause instanceof QueryValidationException
+ || (cause instanceof IgniteException && cause.getCause()
instanceof QueryValidationException)) {
pw.print("Given statement type does not match that declared by
JDBC driver.");
} else {
pw.print(cause.getMessage());
@@ -394,7 +404,7 @@ public class JdbcQueryEventHandlerImpl implements
JdbcQueryEventHandler {
potentiallyNotCreatedSessionId = recreateSession(null);
}
- final SessionId finalSessionId = potentiallyNotCreatedSessionId;
+ SessionId finalSessionId = potentiallyNotCreatedSessionId;
return action.perform(finalSessionId)
.handle((BiFunction<T, Throwable, Pair<T, Throwable>>)
Pair::new)
@@ -457,13 +467,13 @@ public class JdbcQueryEventHandlerImpl implements
JdbcQueryEventHandler {
* when the session is no longer needed.
*/
@FunctionalInterface
- private static interface SessionCleaner {
+ private interface SessionCleaner {
void clean(SessionId sessionId);
}
/** Interface describing an action that should be performed within the
session. */
@FunctionalInterface
- static interface SessionAwareAction<T> {
+ interface SessionAwareAction<T> {
CompletableFuture<T> perform(SessionId sessionId);
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
index e6d183ca00..311a04a872 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcExecuteBatchRequest.java
@@ -42,8 +42,10 @@ public class ClientJdbcExecuteBatchRequest {
) {
var req = new JdbcBatchExecuteRequest();
+ long connectionId = in.unpackLong();
+
req.readBinary(in);
- return handler.batchAsync(req).thenAccept(res -> res.writeBinary(out));
+ return handler.batchAsync(connectionId, req).thenAccept(res ->
res.writeBinary(out));
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
index d0017c6586..ba29ea7a0e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/jdbc/ClientJdbcPreparedStmntBatchRequest.java
@@ -42,8 +42,10 @@ public class ClientJdbcPreparedStmntBatchRequest {
) {
var req = new JdbcBatchPreparedStmntRequest();
+ long connectionId = in.unpackLong();
+
req.readBinary(in);
- return handler.batchPrepStatementAsync(req).thenAccept(res ->
res.writeBinary(out));
+ return handler.batchPrepStatementAsync(connectionId,
req).thenAccept(res -> res.writeBinary(out));
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
index dc0ac6e481..08106c9cc6 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteQueryProcessor.java
@@ -47,17 +47,6 @@ public class FakeIgniteQueryProcessor implements
QueryProcessor {
return Collections.emptyList();
}
- @Override
- public List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
queryAsync(String schemaName, String qry, Object... params) {
- return List.of(CompletableFuture.completedFuture(new FakeCursor()));
- }
-
- @Override
- public List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
queryAsync(QueryContext context, String schemaName,
- String qry, Object... params) {
- return List.of(CompletableFuture.completedFuture(new FakeCursor()));
- }
-
@Override
public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
SessionId sessionid, QueryContext context, String qry,
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
index cc581ac178..5634af9127 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
@@ -123,10 +123,21 @@ public class ItJdbcBatchSelfTest extends
AbstractJdbcSelfTest {
}
}
+ @Test
+ public void testMultipleStatementForBatchIsNotAllowed() throws
SQLException {
+ String insertStmt = "insert into Person (id, firstName, lastName, age)
values";
+ String ins1 = insertStmt + valuesRow(1);
+ String ins2 = insertStmt + valuesRow(2);
+
+ stmt.addBatch(ins1 + ";" + ins2);
+
+ assertThrows(BatchUpdateException.class, () -> stmt.executeBatch(),
"Multiple statements are not allowed.");
+ }
+
@Test
public void testBatchOnClosedStatement() throws SQLException {
- final Statement stmt2 = conn.createStatement();
- final PreparedStatement pstmt2 = conn.prepareStatement("");
+ Statement stmt2 = conn.createStatement();
+ PreparedStatement pstmt2 = conn.prepareStatement("");
stmt2.close();
pstmt2.close();
diff --git
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java
index c69ed96559..083f53c84c 100644
---
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java
+++
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcClientQueryEventHandler.java
@@ -82,8 +82,12 @@ public class JdbcClientQueryEventHandler implements
JdbcQueryEventHandler {
/** {@inheritDoc} */
@Override
- public CompletableFuture<JdbcBatchExecuteResult>
batchAsync(JdbcBatchExecuteRequest req) {
- return client.sendRequestAsync(ClientOp.JDBC_EXEC_BATCH, w ->
req.writeBinary(w.out()), r -> {
+ public CompletableFuture<JdbcBatchExecuteResult> batchAsync(long
connectionId, JdbcBatchExecuteRequest req) {
+ return client.sendRequestAsync(ClientOp.JDBC_EXEC_BATCH, w -> {
+ w.out().packLong(connectionId);
+
+ req.writeBinary(w.out());
+ }, r -> {
JdbcBatchExecuteResult res = new JdbcBatchExecuteResult();
res.readBinary(r.in());
@@ -94,9 +98,12 @@ public class JdbcClientQueryEventHandler implements
JdbcQueryEventHandler {
/** {@inheritDoc} */
@Override
- public CompletableFuture<JdbcBatchExecuteResult> batchPrepStatementAsync(
- JdbcBatchPreparedStmntRequest req) {
- return client.sendRequestAsync(ClientOp.JDBC_SQL_EXEC_PS_BATCH, w ->
req.writeBinary(w.out()), r -> {
+ public CompletableFuture<JdbcBatchExecuteResult>
batchPrepStatementAsync(long connectionId, JdbcBatchPreparedStmntRequest req) {
+ return client.sendRequestAsync(ClientOp.JDBC_SQL_EXEC_PS_BATCH, w -> {
+ w.out().packLong(connectionId);
+
+ req.writeBinary(w.out());
+ }, r -> {
JdbcBatchExecuteResult res = new JdbcBatchExecuteResult();
res.readBinary(r.in());
diff --git
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
index ccf16601f3..246ec5f2a6 100644
---
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
+++
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcPreparedStatement.java
@@ -115,7 +115,7 @@ public class JdbcPreparedStatement extends JdbcStatement
implements PreparedStat
= new JdbcBatchPreparedStmntRequest(conn.getSchema(), sql,
batchedArgs);
try {
- JdbcBatchExecuteResult res =
conn.handler().batchPrepStatementAsync(req).join();
+ JdbcBatchExecuteResult res =
conn.handler().batchPrepStatementAsync(conn.connectionId(), req).join();
if (!res.hasResults()) {
throw new BatchUpdateException(res.err(),
diff --git
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
index 2cb6946390..0808ae91ec 100644
---
a/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
+++
b/modules/jdbc/src/main/java/org/apache/ignite/internal/jdbc/JdbcStatement.java
@@ -550,7 +550,7 @@ public class JdbcStatement implements Statement {
JdbcBatchExecuteRequest req = new
JdbcBatchExecuteRequest(conn.getSchema(), batch);
try {
- JdbcBatchExecuteResult res = conn.handler().batchAsync(req).join();
+ JdbcBatchExecuteResult res =
conn.handler().batchAsync(conn.connectionId(), req).join();
if (!res.hasResults()) {
throw new BatchUpdateException(res.err(),
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index 4c8121d146..f2806ffaa4 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -27,6 +27,11 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.sql.engine.QueryContext;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -194,8 +199,12 @@ public abstract class ClusterPerTestIntegrationTest
extends IgniteIntegrationTes
}
protected final List<List<Object>> executeSql(String sql, Object... args) {
+ QueryProcessor qryProc = node(0).queryEngine();
+ SessionId sessionId =
qryProc.createSession(PropertiesHelper.emptyHolder());
+ QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
return getAllFromCursor(
- node(0).queryEngine().queryAsync("PUBLIC", sql,
args).get(0).join()
+ qryProc.querySingleAsync(sessionId, context, sql, args).join()
);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
index 5cfadb9783..92bacfd50d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryProcessor.java
@@ -56,31 +56,6 @@ public interface QueryProcessor extends IgniteComponent {
*/
List<SessionInfo> liveSessions();
- /**
- * Execute the query with given schema name and parameters.
- *
- * @param schemaName Schema name.
- * @param qry Sql query.
- * @param params Query parameters.
- * @return List of sql cursors.
- *
- * @throws IgniteException in case of an error.
- */
- List<CompletableFuture<AsyncSqlCursor<List<Object>>>> queryAsync(String
schemaName, String qry, Object... params);
-
- /**
- * Execute the query with given schema name and parameters.
- *
- * @param context User query context.
- * @param schemaName Schema name.
- * @param qry Sql query.
- * @param params Query parameters.
- * @return List of sql cursors.
- *
- * @throws IgniteException in case of an error.
- */
- List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
queryAsync(QueryContext context, String schemaName, String qry, Object...
params);
-
/**
* Execute the single statement query with given schema name and
parameters.
*
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 4f4aa60c03..4366c4f63a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -82,7 +82,6 @@ import
org.apache.ignite.internal.sql.engine.session.SessionManager;
import org.apache.ignite.internal.sql.engine.session.SessionProperty;
import org.apache.ignite.internal.sql.engine.sql.IgniteSqlParser;
import org.apache.ignite.internal.sql.engine.sql.ParseResult;
-import org.apache.ignite.internal.sql.engine.sql.ScriptParseResult;
import org.apache.ignite.internal.sql.engine.sql.StatementParseResult;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
@@ -175,7 +174,7 @@ public class SqlQueryProcessor implements QueryProcessor {
private final HybridClock clock;
/** Distributed catalog manager. */
- private CatalogManager catalogManager;
+ private final CatalogManager catalogManager;
/** Constructor. */
public SqlQueryProcessor(
@@ -334,29 +333,6 @@ public class SqlQueryProcessor implements QueryProcessor {
IgniteUtils.closeAll(Stream.concat(closableComponents,
closableListeners).collect(Collectors.toList()));
}
- /** {@inheritDoc} */
- @Override
- public List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
queryAsync(String schemaName, String qry, Object... params) {
- QueryContext context = QueryContext.create(SqlQueryType.ALL);
-
- return queryAsync(context, schemaName, qry, params);
- }
-
- /** {@inheritDoc} */
- @Override
- public List<CompletableFuture<AsyncSqlCursor<List<Object>>>>
queryAsync(QueryContext context, String schemaName,
- String qry, Object... params) {
- if (!busyLock.enterBusy()) {
- throw new IgniteInternalException(OPERATION_INTERRUPTED_ERR, new
NodeStoppingException());
- }
-
- try {
- return query0(context, schemaName, qry, params);
- } finally {
- busyLock.leaveBusy();
- }
- }
-
/** {@inheritDoc} */
@Override
public CompletableFuture<AsyncSqlCursor<List<Object>>> querySingleAsync(
@@ -502,104 +478,6 @@ public class SqlQueryProcessor implements QueryProcessor {
return stage;
}
- private List<CompletableFuture<AsyncSqlCursor<List<Object>>>> query0(
- QueryContext context,
- String schemaName,
- String sql,
- Object... params
- ) {
- SchemaPlus schema = sqlSchemaManager.schema(schemaName);
-
- if (schema == null) {
- throw new IgniteInternalException(SCHEMA_NOT_FOUND_ERR,
format("Schema not found [schemaName={}]", schemaName));
- }
-
- CompletableFuture<Void> start = new CompletableFuture<>();
-
- ScriptParseResult parseResult;
- List<CompletableFuture<AsyncSqlCursor<List<Object>>>> res;
-
- try {
- parseResult = IgniteSqlParser.parse(sql, ScriptParseResult.MODE);
- res = new ArrayList<>(parseResult.statements().size());
- } catch (Throwable th) {
- start.completeExceptionally(th);
-
- parseResult = new ScriptParseResult(Collections.emptyList(), 0);
- res =
Collections.singletonList(CompletableFuture.completedFuture(failedCursor(th)));
- }
-
- for (SqlNode sqlNode : parseResult.statements()) {
- try {
- validateParsedStatement(context, parseResult, sqlNode, params);
- } catch (Exception e) {
- start.completeExceptionally(e);
-
- res =
Collections.singletonList(CompletableFuture.completedFuture(failedCursor(e)));
- return res;
- }
-
- // Only rw transactions for now.
- InternalTransaction implicitTx = txManager.begin(false);
-
- final BaseQueryContext ctx = BaseQueryContext.builder()
- .cancel(new QueryCancel())
- .frameworkConfig(
- Frameworks.newConfigBuilder(FRAMEWORK_CONFIG)
- .defaultSchema(schema)
- .build()
- )
- .logger(LOG)
- .parameters(params)
- .plannerTimeout(PLANNER_TIMEOUT)
- .build();
-
- // TODO https://issues.apache.org/jira/browse/IGNITE-17746 Fix
query execution flow.
- CompletableFuture<AsyncSqlCursor<List<Object>>> stage = start
- .thenCompose(none -> prepareSvc.prepareAsync(sqlNode, ctx))
- .thenApply(plan -> {
- SqlQueryType queryType = plan.type();
- assert queryType != null : "Expected a full plan but
got a fragment: " + plan;
-
- return new AsyncSqlCursorImpl<>(
- queryType,
- plan.metadata(),
- implicitTx,
- executionSrvc.executePlan(implicitTx, plan,
ctx)
- );
- });
-
- stage.whenComplete((cur, ex) -> {
- if (ex instanceof CancellationException) {
- ctx.cancel().cancel();
- }
- });
-
- res.add(stage);
- }
-
- start.completeAsync(() -> null, taskExecutor);
-
- return res;
- }
-
- private static <T> AsyncSqlCursor<T> failedCursor(Throwable th) {
- return new AsyncSqlCursorImpl<>(
- null, null, null,
- new AsyncCursor<>() {
- @Override
- public CompletableFuture<BatchedResult<T>>
requestNextAsync(int rows) {
- return CompletableFuture.failedFuture(th);
- }
-
- @Override
- public CompletableFuture<Void> closeAsync() {
- return CompletableFuture.completedFuture(null);
- }
- }
- );
- }
-
private abstract static class AbstractTableEventListener implements
EventListener<TableEventParameters> {
protected final SqlSchemaManagerImpl schemaHolder;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index cf5811b285..c78501322f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -68,6 +68,8 @@ import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.sql.engine.exec.ExecutionCancelledException;
import org.apache.ignite.internal.sql.engine.framework.NoOpTransaction;
import
org.apache.ignite.internal.sql.engine.planner.AbstractPlannerTest.TestHashIndex;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
@@ -261,19 +263,22 @@ public class StopCalciteModuleTest {
await(testRevisionRegister.moveRevision.apply(0L));
- var cursors = qryProc.queryAsync(
- "PUBLIC",
+ SessionId sessionId =
qryProc.createSession(PropertiesHelper.emptyHolder());
+ QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
+ var cursors = qryProc.querySingleAsync(
+ sessionId,
+ context,
"SELECT * FROM TEST"
);
- await(cursors.get(0).thenCompose(cursor ->
cursor.requestNextAsync(1)));
+ await(cursors.thenCompose(cursor -> cursor.requestNextAsync(1)));
assertTrue(isThereNodeThreads(NODE_NAME));
qryProc.stop();
- var request = cursors.get(0)
- .thenCompose(cursor -> cursor.requestNextAsync(1));
+ var request = cursors.thenCompose(cursor ->
cursor.requestNextAsync(1));
// Check cursor closed.
await(request.exceptionally(t -> {
@@ -286,8 +291,9 @@ public class StopCalciteModuleTest {
assertTrue(request.isCompletedExceptionally());
// Check execute query on stopped node.
- assertTrue(assertThrows(IgniteInternalException.class, () ->
qryProc.queryAsync(
- "PUBLIC",
+ assertTrue(assertThrows(IgniteInternalException.class, () ->
qryProc.querySingleAsync(
+ sessionId,
+ context,
"SELECT 1"
)).getCause() instanceof NodeStoppingException);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 06253e8b62..feac7aacb3 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -71,7 +71,11 @@ import
org.apache.ignite.internal.schema.configuration.TableView;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
import org.apache.ignite.internal.sql.engine.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.QueryContext;
import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.storage.impl.TestDataStorageModule;
@@ -300,32 +304,37 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
public void testCreateTable() {
SqlQueryProcessor finalQueryProc = queryProc;
+ SessionId sessionId =
queryProc.createSession(PropertiesHelper.emptyHolder());
+ QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
String curMethodName = getCurrentMethodName();
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varbinary(255)) "
+ "with primary_zone='zone123'", curMethodName);
- readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+ readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
.equalsIgnoreCase(curMethodName)));
String finalNewTblSql1 = newTblSql;
- assertThrows(TableAlreadyExistsException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql1)));
+ assertThrows(TableAlreadyExistsException.class,
+ () -> readFirst(finalQueryProc.querySingleAsync(sessionId,
context, finalNewTblSql1)));
String finalNewTblSql2 = String.format("CREATE TABLE \"PUBLIC\".%s (c1
int PRIMARY KEY, c2 varbinary(255)) "
+ "with primary_zone='zone123'", curMethodName);
- assertThrows(TableAlreadyExistsException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC", finalNewTblSql2)));
+ assertThrows(TableAlreadyExistsException.class,
+ () -> readFirst(finalQueryProc.querySingleAsync(sessionId,
context, finalNewTblSql2)));
- assertThrows(SqlException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(SqlException.class, () ->
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with
partitions__wrong=1,primary_zone='zone123'")));
- assertThrows(SqlException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(SqlException.class, () ->
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with
replicas__wrong=1,primary_zone='zone123'")));
- assertThrows(SqlException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(SqlException.class, () ->
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
"CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with
primary_zone__wrong='zone123'")));
newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varchar(255))",
@@ -333,7 +342,7 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
String finalNewTblSql3 = newTblSql;
- assertDoesNotThrow(() -> await(finalQueryProc.queryAsync("PUBLIC",
finalNewTblSql3).get(0)));
+ assertDoesNotThrow(() ->
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
finalNewTblSql3)));
}
/**
@@ -343,16 +352,19 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
public void testCreateTableWithDistributionZone() {
String tableName = getCurrentMethodName().toUpperCase();
+ SessionId sessionId =
queryProc.createSession(PropertiesHelper.emptyHolder());
+ QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
String zoneName = "zone123";
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varbinary(255)) ",
tableName);
- readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+ readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
assertEquals(DistributionZoneManager.DEFAULT_ZONE_ID,
tblsCfg.tables().get(tableName).zoneId().value());
- readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + tableName));
+ readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE "
+ tableName));
int zoneId =
dstZnsCfg.distributionZones().get(zoneName).zoneId().value();
@@ -361,18 +373,18 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) "
+ "with primary_zone='%s'", tableName, zoneName);
- readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+ readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
assertEquals(zoneId, tblsCfg.tables().get(tableName).zoneId().value());
- readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " + tableName));
+ readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE "
+ tableName));
when(distributionZoneManager.getZoneId(zoneName)).thenThrow(DistributionZoneNotFoundException.class);
Exception exception = assertThrows(
IgniteException.class,
- () -> readFirst(queryProc.queryAsync("PUBLIC",
+ () -> readFirst(queryProc.querySingleAsync(sessionId, context,
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) "
+ "with primary_zone='%s'", tableName,
zoneName)))
);
@@ -387,26 +399,29 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
public void testDropTable() {
String curMethodName = getCurrentMethodName();
+ SessionId sessionId =
queryProc.createSession(PropertiesHelper.emptyHolder());
+ QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY,
c2 varchar(255))", curMethodName);
- readFirst(queryProc.queryAsync("PUBLIC", newTblSql));
+ readFirst(queryProc.querySingleAsync(sessionId, context, newTblSql));
- readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE " +
curMethodName));
+ readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE "
+ curMethodName));
SqlQueryProcessor finalQueryProc = queryProc;
- assertThrows(TableNotFoundException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(TableNotFoundException.class, () ->
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
"DROP TABLE " + curMethodName + "_not_exist")));
- assertThrows(TableNotFoundException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(TableNotFoundException.class, () ->
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
"DROP TABLE " + curMethodName)));
- assertThrows(TableNotFoundException.class, () ->
readFirst(finalQueryProc.queryAsync("PUBLIC",
+ assertThrows(TableNotFoundException.class, () ->
readFirst(finalQueryProc.querySingleAsync(sessionId, context,
"DROP TABLE PUBLIC." + curMethodName)));
- readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS
PUBLIC." + curMethodName + "_not_exist"));
+ readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE
IF EXISTS PUBLIC." + curMethodName + "_not_exist"));
- readFirst(queryProc.queryAsync("PUBLIC", "DROP TABLE IF EXISTS
PUBLIC." + curMethodName));
+ readFirst(queryProc.querySingleAsync(sessionId, context, "DROP TABLE
IF EXISTS PUBLIC." + curMethodName));
assertTrue(tblManager.tables().stream().noneMatch(t -> t.name()
.equalsIgnoreCase("PUBLIC." + curMethodName)));
@@ -416,8 +431,12 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
void createTableWithTableOptions() {
String method = getCurrentMethodName();
- assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
- "PUBLIC",
+ SessionId sessionId =
queryProc.createSession(PropertiesHelper.emptyHolder());
+ QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
+ assertDoesNotThrow(() -> readFirst(queryProc.querySingleAsync(
+ sessionId,
+ context,
String.format(
"CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with primary_zone='zone123'",
method + 4
@@ -426,8 +445,9 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
IgniteException exception = assertThrows(
IgniteException.class,
- () -> readFirst(queryProc.queryAsync(
- "PUBLIC",
+ () -> readFirst(queryProc.querySingleAsync(
+ sessionId,
+ context,
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with %s='%s'", method + 6, method, method)
))
);
@@ -439,8 +459,12 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
void createTableWithDataStorageOptions() {
String method = getCurrentMethodName();
- assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
- "PUBLIC",
+ SessionId sessionId =
queryProc.createSession(PropertiesHelper.emptyHolder());
+ QueryContext context = QueryContext.create(SqlQueryType.ALL);
+
+ assertDoesNotThrow(() -> readFirst(queryProc.querySingleAsync(
+ sessionId,
+ context,
String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with dataRegion='default'", method + 0)
)));
@@ -449,10 +473,13 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
equalTo(DEFAULT_DATA_REGION_NAME)
);
- assertDoesNotThrow(() -> readFirst(queryProc.queryAsync(
- "PUBLIC",
- String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with DATAREGION='test_region'", method + 1)
- )));
+ assertDoesNotThrow(() -> readFirst(
+ queryProc.querySingleAsync(
+ sessionId,
+ context,
+ String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2
varbinary(255)) with DATAREGION='test_region'", method + 1)
+ )
+ ));
assertThat(
((RocksDbDataStorageView) tableView(method +
1).dataStorage()).dataRegion(),
@@ -549,8 +576,8 @@ public class MockedStructuresTest extends
IgniteAbstractTest {
return tableManager;
}
- private <T> BatchedResult<T>
readFirst(List<CompletableFuture<AsyncSqlCursor<T>>> cursors) {
- return await(await(cursors.get(0)).requestNextAsync(512));
+ private <T> BatchedResult<T>
readFirst(CompletableFuture<AsyncSqlCursor<List<Object>>> cursors) {
+ return (BatchedResult<T>) await(await(cursors).requestNextAsync(512));
}
private @Nullable TableView tableView(String tableName) {