This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch ignite-2.13
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-2.13 by this push:
new 522c4b90402 IGNITE-16740 SQL Calcite: Implement batches in JDBC and
ODBC (#9917)
522c4b90402 is described below
commit 522c4b904028f5261f195f5edfd0a5fbc4a9a47d
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Tue Apr 5 11:23:28 2022 +0300
IGNITE-16740 SQL Calcite: Implement batches in JDBC and ODBC (#9917)
(cherry picked from commit 8c9f2809f69763a3635a5e067fd5fc805dada644)
---
.../query/calcite/CalciteQueryProcessor.java | 154 +++++++++++++--------
.../query/calcite/jdbc/JdbcQueryTest.java | 71 ++++++++++
.../processors/query/GridQueryProcessor.java | 23 ++-
.../internal/processors/query/NoOpQueryEngine.java | 18 ++-
.../internal/processors/query/QueryEngine.java | 24 +++-
modules/platforms/cpp/CMakeLists.txt | 6 +
modules/platforms/cpp/core-test/src/test_utils.cpp | 1 -
.../cpp/odbc-test/src/cross_engine_test.cpp | 5 +-
modules/platforms/cpp/odbc-test/src/test_utils.cpp | 1 -
.../cpp/thin-client-test/src/test_utils.cpp | 1 -
10 files changed, 228 insertions(+), 76 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
index beb383638c6..5450a62b842 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessor.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.function.Function;
import org.apache.calcite.DataContexts;
import org.apache.calcite.config.Lex;
import org.apache.calcite.config.NullCollation;
@@ -32,6 +33,7 @@ import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.hint.HintStrategyTable;
import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.parser.SqlParser;
@@ -326,87 +328,121 @@ public class CalciteQueryProcessor extends
GridProcessorAdapter implements Query
QueryPlan plan = queryPlanCache().queryPlan(new
CacheKey(schema.getName(), sql));
if (plan != null) {
- RootQuery<Object[]> qry = new RootQuery<>(
- sql,
- schema,
- params,
- qryCtx,
- exchangeSvc,
- (q) -> qryReg.unregister(q.id()),
- log,
- queryPlannerTimeout
+ return Collections.singletonList(
+ executeQuery(qryCtx, qry -> plan, schemaName, sql, null,
params)
);
+ }
- qryReg.register(qry);
+ SqlNodeList qryList = Commons.parse(sql,
FRAMEWORK_CONFIG.getParserConfig());
- try {
- return Collections.singletonList(executionSvc.executePlan(
- qry,
- plan
- ));
- }
- catch (Exception e) {
- boolean isCanceled = qry.isCancelled();
+ List<FieldsQueryCursor<List<?>>> cursors = new
ArrayList<>(qryList.size());
+ List<RootQuery<Object[]>> qrys = new ArrayList<>(qryList.size());
- qry.cancel();
+ for (final SqlNode sqlNode: qryList) {
+ FieldsQueryCursor<List<?>> cursor = executeQuery(qryCtx, qry -> {
+ if (qryList.size() == 1) {
+ return queryPlanCache().queryPlan(
+ new CacheKey(schemaName, sql), // Use source SQL to
avoid redundant parsing next time.
+ () -> prepareSvc.prepareSingle(sqlNode,
qry.planningContext())
+ );
+ }
+ else
+ return prepareSvc.prepareSingle(sqlNode,
qry.planningContext());
+ }, schemaName, sqlNode.toString(), qrys, params);
- qryReg.unregister(qry.id());
+ cursors.add(cursor);
+ }
- if (isCanceled)
- throw new IgniteSQLException("The query was cancelled
while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
- else
- throw e;
+ return cursors;
+ }
- }
+ /** {@inheritDoc} */
+ @Override public List<FieldsQueryCursor<List<?>>> queryBatched(
+ @Nullable QueryContext qryCtx,
+ String schemaName,
+ String sql,
+ List<Object[]> batchedParams
+ ) throws IgniteSQLException {
+ SqlNodeList qryNodeList = Commons.parse(sql,
FRAMEWORK_CONFIG.getParserConfig());
+
+ if (qryNodeList.size() != 1) {
+ throw new IgniteSQLException("Multiline statements are not
supported in batched query",
+ IgniteQueryErrorCode.PARSING);
}
- SqlNodeList qryList = Commons.parse(sql,
FRAMEWORK_CONFIG.getParserConfig());
- List<FieldsQueryCursor<List<?>>> cursors = new
ArrayList<>(qryList.size());
+ SqlNode qryNode = qryNodeList.get(0);
- List<RootQuery<Object[]>> qrys = new ArrayList<>(qryList.size());
+ if (qryNode.getKind() != SqlKind.INSERT && qryNode.getKind() !=
SqlKind.UPDATE
+ && qryNode.getKind() != SqlKind.MERGE && qryNode.getKind() !=
SqlKind.DELETE) {
+ throw new IgniteSQLException("Unexpected operation kind for
batched query [kind=" + qryNode.getKind() + "]",
+ IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+ }
- for (final SqlNode sqlNode: qryList) {
- RootQuery<Object[]> qry = new RootQuery<>(
- sqlNode.toString(),
- schemaHolder.schema(schemaName), // Update schema for each
query in multiple statements.
- params,
- qryCtx,
- exchangeSvc,
- (q) -> qryReg.unregister(q.id()),
- log,
- queryPlannerTimeout
- );
+ List<FieldsQueryCursor<List<?>>> cursors = new
ArrayList<>(batchedParams.size());
- qrys.add(qry);
+ List<RootQuery<Object[]>> qrys = new ArrayList<>(batchedParams.size());
- qryReg.register(qry);
+ Function<RootQuery<Object[]>, QueryPlan> planSupplier = new
Function<RootQuery<Object[]>, QueryPlan>() {
+ private QueryPlan plan;
- try {
- if (qryList.size() == 1) {
- plan = queryPlanCache().queryPlan(
- new CacheKey(schemaName, sql), // Use source SQL to
avoid redundant parsing next time.
- () -> prepareSvc.prepareSingle(sqlNode,
qry.planningContext()));
+ @Override public QueryPlan apply(RootQuery<Object[]> qry) {
+ if (plan == null) {
+ plan = queryPlanCache().queryPlan(new CacheKey(schemaName,
sql), () ->
+ prepareSvc.prepareSingle(qryNode,
qry.planningContext())
+ );
}
- else
- plan = prepareSvc.prepareSingle(sqlNode,
qry.planningContext());
- cursors.add(executionSvc.executePlan(qry, plan));
+ return plan;
}
- catch (Exception e) {
- boolean isCanceled = qry.isCancelled();
+ };
- qrys.forEach(RootQuery::cancel);
+ for (final Object[] batch: batchedParams)
+ cursors.add(executeQuery(qryCtx, planSupplier, schemaName, sql,
qrys, batch));
- qryReg.unregister(qry.id());
+ return cursors;
+ }
- if (isCanceled)
- throw new IgniteSQLException("The query was cancelled
while planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
- else
- throw e;
- }
+ /** */
+ private FieldsQueryCursor<List<?>> executeQuery(
+ @Nullable QueryContext qryCtx,
+ Function<RootQuery<Object[]>, QueryPlan> plan,
+ String schema,
+ String sql,
+ @Nullable List<RootQuery<Object[]>> qrys,
+ Object... params
+ ) {
+ RootQuery<Object[]> qry = new RootQuery<>(
+ sql,
+ schemaHolder.schema(schema),
+ params,
+ qryCtx,
+ exchangeSvc,
+ (q) -> qryReg.unregister(q.id()),
+ log,
+ queryPlannerTimeout
+ );
+
+ if (qrys != null)
+ qrys.add(qry);
+
+ qryReg.register(qry);
+
+ try {
+ return executionSvc.executePlan(qry, plan.apply(qry));
}
+ catch (Exception e) {
+ boolean isCanceled = qry.isCancelled();
- return cursors;
+ if (qrys != null)
+ qrys.forEach(RootQuery::cancel);
+
+ qryReg.unregister(qry.id());
+
+ if (isCanceled)
+ throw new IgniteSQLException("The query was cancelled while
planning", IgniteQueryErrorCode.QUERY_CANCELED, e);
+ else
+ throw e;
+ }
}
/** */
diff --git
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java
index bfa5199604a..02301e21469 100644
---
a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java
+++
b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/jdbc/JdbcQueryTest.java
@@ -17,18 +17,22 @@
package org.apache.ignite.internal.processors.query.calcite.jdbc;
+import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
+import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.calcite.CalciteQueryEngineConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.SqlConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -108,6 +112,73 @@ public class JdbcQueryTest extends GridCommonAbstractTest {
stmt.close();
}
+ /** Test batched execution of statement. */
+ @Test
+ public void testBatch() throws Exception {
+ stmt.execute("CREATE TABLE Person(\"id\" INT, PRIMARY KEY(\"id\"),
\"name\" VARCHAR)");
+
+ for (int i = 0; i < 10; ++i)
+ stmt.addBatch(String.format("INSERT INTO Person VALUES (%d,
'Name')", i));
+
+ stmt.executeBatch();
+
+ try (ResultSet rs = stmt.executeQuery("select * from Person p order by
\"id\" asc")) {
+ for (int i = 0; rs.next(); ++i) {
+ assertEquals(i, rs.getInt(1));
+ assertEquals("Name", rs.getString(2));
+ }
+ }
+ }
+
+ /** Test batched execution of prepared statement. */
+ @Test
+ public void testBatchPrepared() throws Exception {
+ stmt.execute("CREATE TABLE Person(\"id\" INT, PRIMARY KEY(\"id\"),
\"name\" VARCHAR)");
+
+ try (PreparedStatement stmt0 = conn.prepareStatement("INSERT INTO
Person VALUES (?, ?);" +
+ "INSERT INTO Person VALUES (?, ?)")) {
+ stmt0.setInt(1, 0);
+ stmt0.setString(2, "Name");
+ stmt0.setInt(2, 1);
+ stmt0.setString(4, "Name");
+ stmt0.addBatch();
+
+ GridTestUtils.assertThrows(log, stmt0::executeBatch,
BatchUpdateException.class,
+ "Multiline statements are not supported in batched query");
+ }
+
+ try (PreparedStatement stmt0 = conn.prepareStatement("SELECT * FROM
Person WHERE id = ?")) {
+ stmt0.setInt(1, 0);
+ stmt0.addBatch();
+
+ GridTestUtils.assertThrows(log, stmt0::executeBatch,
BatchUpdateException.class,
+ "Unexpected operation kind for batched query [kind=SELECT]");
+ }
+
+ int[] ret;
+ try (PreparedStatement stmt0 = conn.prepareStatement("INSERT INTO
Person VALUES (?, ?), (?, ?)")) {
+ for (int i = 0; i < 1000; i += 2) {
+ stmt0.setInt(1, i);
+ stmt0.setString(2, "Name");
+ stmt0.setInt(3, i + 1);
+ stmt0.setString(4, "Name");
+ stmt0.addBatch();
+ }
+
+ ret = stmt0.executeBatch();
+ }
+
+ try (ResultSet rs = stmt.executeQuery("select * from Person p order by
\"id\" asc")) {
+ int i = 0;
+ for (; rs.next(); ++i) {
+ assertEquals(i, rs.getInt(1));
+ assertEquals("Name", rs.getString(2));
+ }
+ assertEquals(ret.length * 2, i);
+ assertTrue(Arrays.stream(ret).anyMatch(k -> k == 2));
+ }
+ }
+
/**
* @throws SQLException If failed.
*/
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 1ad448bbb30..5fbd9ae8f9c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -91,6 +91,7 @@ import
org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
import org.apache.ignite.internal.processors.cache.query.IndexQueryDesc;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
import
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
@@ -2994,12 +2995,22 @@ public class GridQueryProcessor extends
GridProcessorAdapter {
QueryEngine qryEngine = engineForQuery(cliCtx, qry);
if (qryEngine != null) {
- res = qryEngine.query(
- QueryContext.of(qry, cliCtx, cancel),
- schemaName,
- qry.getSql(),
- qry.getArgs() != null ? qry.getArgs() :
X.EMPTY_OBJECT_ARRAY
- );
+ if (qry instanceof SqlFieldsQueryEx &&
((SqlFieldsQueryEx)qry).isBatched()) {
+ res = qryEngine.queryBatched(
+ QueryContext.of(qry, cliCtx, cancel),
+ schemaName,
+ qry.getSql(),
+ ((SqlFieldsQueryEx)qry).batchedArguments()
+ );
+ }
+ else {
+ res = qryEngine.query(
+ QueryContext.of(qry, cliCtx, cancel),
+ schemaName,
+ qry.getSql(),
+ qry.getArgs() != null ? qry.getArgs() :
X.EMPTY_OBJECT_ARRAY
+ );
+ }
}
else {
res = idx.querySqlFields(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
index 27d1ce79f81..4730f65bca0 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/NoOpQueryEngine.java
@@ -36,8 +36,22 @@ public class NoOpQueryEngine extends GridProcessorAdapter
implements QueryEngine
}
/** {@inheritDoc} */
- @Override public List<FieldsQueryCursor<List<?>>> query(@Nullable
QueryContext ctx, String schemaName, String query,
- Object... params) throws IgniteSQLException {
+ @Override public List<FieldsQueryCursor<List<?>>> query(
+ @Nullable QueryContext ctx,
+ String schemaName,
+ String query,
+ Object... params
+ ) throws IgniteSQLException {
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<FieldsQueryCursor<List<?>>> queryBatched(
+ @Nullable QueryContext ctx,
+ String schemaName,
+ String query,
+ List<Object[]> batchedParams
+ ) throws IgniteSQLException {
return Collections.emptyList();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
index e1206afc8ab..003f37339a1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/QueryEngine.java
@@ -34,12 +34,32 @@ public interface QueryEngine extends GridProcessor {
* @param schemaName Schema name.
* @param qry Query.
* @param params Optional query parameters.
- * @return Query cursor.
+ * @return List of query cursors. Size of list depends on number of
distinct queries in {@code qry}.
* @throws IgniteSQLException If failed.
*/
- List<FieldsQueryCursor<List<?>>> query(@Nullable QueryContext ctx, String
schemaName, String qry, Object... params)
+ List<FieldsQueryCursor<List<?>>> query(
+ @Nullable QueryContext ctx,
+ String schemaName,
+ String qry,
+ Object... params
+ )
throws IgniteSQLException;
+ /**
+ * @param ctx Query context, may be null.
+ * @param schemaName Schema name.
+ * @param qry Query.
+ * @param batchedParams Optional query parameters.
+ * @return List of query cursors. Size of list equals to size of {@code
batchedParams}.
+ * @throws IgniteSQLException If failed.
+ */
+ List<FieldsQueryCursor<List<?>>> queryBatched(
+ @Nullable QueryContext ctx,
+ String schemaName,
+ String qry,
+ List<Object[]> batchedParams
+ ) throws IgniteSQLException;
+
/** */
Collection<? extends RunningQuery> runningQueries();
diff --git a/modules/platforms/cpp/CMakeLists.txt
b/modules/platforms/cpp/CMakeLists.txt
index a9b88a68e43..85d5586b5b6 100644
--- a/modules/platforms/cpp/CMakeLists.txt
+++ b/modules/platforms/cpp/CMakeLists.txt
@@ -75,6 +75,12 @@ if (${WITH_SANITIZERS} AND (CMAKE_CXX_COMPILER_ID MATCHES
"Clang" OR CMAKE_CXX_C
message("Built with sanitizers. Sanitizers options for ctest:
${SANITIZERS_ENV}")
endif()
+if (CMAKE_BUILD_TYPE STREQUAL "Debug")
+ if (CMAKE_CXX_COMPILER_ID MATCHES "Clang")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fstandalone-debug")
+ endif()
+endif()
+
if (${WITH_CORE} OR ${WITH_TESTS})
find_package(Java 1.8 REQUIRED)
find_package(JNI REQUIRED)
diff --git a/modules/platforms/cpp/core-test/src/test_utils.cpp
b/modules/platforms/cpp/core-test/src/test_utils.cpp
index 917bc628830..a123a46f130 100644
--- a/modules/platforms/cpp/core-test/src/test_utils.cpp
+++ b/modules/platforms/cpp/core-test/src/test_utils.cpp
@@ -62,7 +62,6 @@ namespace ignite_test
cfg.jvmOpts.push_back("-XX:+HeapDumpOnOutOfMemoryError");
cfg.jvmOpts.push_back("-Duser.timezone=GMT");
cfg.jvmOpts.push_back("-DIGNITE_QUIET=false");
- cfg.jvmOpts.push_back("-DIGNITE_CONSOLE_APPENDER=false");
cfg.jvmOpts.push_back("-DIGNITE_UPDATE_NOTIFIER=false");
cfg.jvmOpts.push_back("-DIGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP=false");
cfg.jvmOpts.push_back("-Duser.language=en");
diff --git a/modules/platforms/cpp/odbc-test/src/cross_engine_test.cpp
b/modules/platforms/cpp/odbc-test/src/cross_engine_test.cpp
index de03e1ff2f7..39ea43d27e3 100644
--- a/modules/platforms/cpp/odbc-test/src/cross_engine_test.cpp
+++ b/modules/platforms/cpp/odbc-test/src/cross_engine_test.cpp
@@ -63,10 +63,7 @@ struct CrossEngineTestSuiteFixture : public
odbc::OdbcTestSuite
:
EngineMode::ToString(EngineMode::H2));
InsertTestStrings(10, false);
-
- // Batching is not supported for CALCITE
- if (EType != EngineMode::CALCITE)
- InsertTestBatch(11, 2000, 1989);
+ InsertTestBatch(11, 2000, 1989);
}
template<EngineMode::Type EType>
diff --git a/modules/platforms/cpp/odbc-test/src/test_utils.cpp
b/modules/platforms/cpp/odbc-test/src/test_utils.cpp
index bdd32d8afc3..0f1affd2f61 100644
--- a/modules/platforms/cpp/odbc-test/src/test_utils.cpp
+++ b/modules/platforms/cpp/odbc-test/src/test_utils.cpp
@@ -112,7 +112,6 @@ namespace ignite_test
cfg.jvmOpts.push_back("-XX:+HeapDumpOnOutOfMemoryError");
cfg.jvmOpts.push_back("-Duser.timezone=GMT");
cfg.jvmOpts.push_back("-DIGNITE_QUIET=false");
- cfg.jvmOpts.push_back("-DIGNITE_CONSOLE_APPENDER=false");
cfg.jvmOpts.push_back("-DIGNITE_UPDATE_NOTIFIER=false");
cfg.jvmOpts.push_back("-DIGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP=false");
cfg.jvmOpts.push_back("-Duser.language=en");
diff --git a/modules/platforms/cpp/thin-client-test/src/test_utils.cpp
b/modules/platforms/cpp/thin-client-test/src/test_utils.cpp
index 0c56d70660d..468cc36e47d 100644
--- a/modules/platforms/cpp/thin-client-test/src/test_utils.cpp
+++ b/modules/platforms/cpp/thin-client-test/src/test_utils.cpp
@@ -66,7 +66,6 @@ namespace ignite_test
cfg.jvmOpts.push_back("-XX:+HeapDumpOnOutOfMemoryError");
cfg.jvmOpts.push_back("-Duser.timezone=GMT");
cfg.jvmOpts.push_back("-DIGNITE_QUIET=false");
- cfg.jvmOpts.push_back("-DIGNITE_CONSOLE_APPENDER=false");
cfg.jvmOpts.push_back("-DIGNITE_UPDATE_NOTIFIER=false");
cfg.jvmOpts.push_back("-DIGNITE_LOG_CLASSPATH_CONTENT_ON_STARTUP=false");
cfg.jvmOpts.push_back("-Duser.language=en");