This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ed2833fc69 IGNITE-24078 Sql. Fixed handling of DDL/KILL statements
using jdbc batch (#4960)
ed2833fc69 is described below
commit ed2833fc69b22b1a67a51ffa1749acadf4899569
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Dec 25 15:57:20 2024 +0300
IGNITE-24078 Sql. Fixed handling of DDL/KILL statements using jdbc batch
(#4960)
---
.../client/handler/JdbcQueryEventHandlerImpl.java | 51 ++++++++-----
.../apache/ignite/jdbc/ItJdbcBatchSelfTest.java | 88 ++++++++++++++++++++++
.../sql/engine/exec/fsm/ValidationHelper.java | 4 +-
3 files changed, 124 insertions(+), 19 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index d87c24c19f..1b286c6a57 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -67,6 +67,7 @@ import
org.apache.ignite.internal.sql.engine.property.SqlProperties;
import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.tx.IgniteTransactions;
@@ -231,12 +232,9 @@ public class JdbcQueryEventHandlerImpl extends
JdbcHandlerBase implements JdbcQu
token,
query,
OBJECT_EMPTY_ARRAY,
- queryTimeoutMillis
- ).thenApply(cnt -> {
- list.add(cnt > Integer.MAX_VALUE ? Statement.SUCCESS_NO_INFO :
cnt.intValue());
-
- return list;
- }));
+ queryTimeoutMillis,
+ list
+ ));
}
return tail.handle((ignored, t) -> {
@@ -270,12 +268,8 @@ public class JdbcQueryEventHandlerImpl extends
JdbcHandlerBase implements JdbcQu
for (Object[] args : argList) {
tail = tail.thenCompose(list -> executeAndCollectUpdateCount(
- connectionContext, tx, token, req.getQuery(), args,
timeoutMillis
- ).thenApply(cnt -> {
- list.add(cnt > Integer.MAX_VALUE ? Statement.SUCCESS_NO_INFO :
cnt.intValue());
-
- return list;
- }));
+ connectionContext, tx, token, req.getQuery(), args,
timeoutMillis, list
+ ));
}
return tail.handle((ignored, t) -> {
@@ -289,14 +283,15 @@ public class JdbcQueryEventHandlerImpl extends
JdbcHandlerBase implements JdbcQu
});
}
- private CompletableFuture<Long> executeAndCollectUpdateCount(
+ private CompletableFuture<IntArrayList> executeAndCollectUpdateCount(
JdbcConnectionContext context,
@Nullable InternalTransaction tx,
CancellationToken token,
String sql,
Object[] arg,
- long timeoutMillis) {
-
+ long timeoutMillis,
+ IntArrayList resultUpdateCounters
+ ) {
if (!context.valid()) {
return CompletableFuture.failedFuture(new
IgniteInternalException(CONNECTION_ERR, "Connection is closed"));
}
@@ -312,8 +307,30 @@ public class JdbcQueryEventHandlerImpl extends
JdbcHandlerBase implements JdbcQu
arg == null ? OBJECT_EMPTY_ARRAY : arg
);
- return result.thenCompose(cursor -> cursor.requestNextAsync(1))
- .thenApply(batch -> (Long) batch.items().get(0).get(0));
+ return result.thenCompose(cursor -> cursor.requestNextAsync(1)
+ .thenApply(batch -> {
+ int updateCounter = handleBatchResult(cursor.queryType(),
batch);
+
+ resultUpdateCounters.add(updateCounter);
+
+ return resultUpdateCounters;
+ }));
+ }
+
+ private static int handleBatchResult(SqlQueryType type,
BatchedResult<InternalSqlRow> result) {
+ switch (type) {
+ case DDL:
+ case KILL:
+ return Statement.SUCCESS_NO_INFO;
+ case DML:
+ Long updateCounts = (Long) result.items().get(0).get(0);
+
+ assert updateCounts != null : "Invalid DML result";
+
+ return updateCounts > Integer.MAX_VALUE ?
Statement.SUCCESS_NO_INFO : updateCounts.intValue();
+ default:
+ throw new IllegalStateException("Unexpected query type: " +
type);
+ }
}
private static JdbcBatchExecuteResult handleBatchException(Throwable e,
String query, int[] counters) {
diff --git
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
index 96f4a35ac4..db6f6bc930 100644
---
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
+++
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
@@ -17,9 +17,12 @@
package org.apache.ignite.jdbc;
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.jdbc.util.JdbcTestUtils.assertThrowsSqlException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -38,6 +41,8 @@ import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.ignite.internal.app.IgniteImpl;
@@ -46,12 +51,19 @@ import org.apache.ignite.internal.jdbc.JdbcStatement;
import org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode;
import org.apache.ignite.internal.jdbc.proto.SqlStateCode;
import org.apache.ignite.internal.restart.RestartProofIgnite;
+import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.sql.engine.exec.fsm.QueryInfo;
+import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
/**
* Statement test.
@@ -136,6 +148,51 @@ public class ItJdbcBatchSelfTest extends
AbstractJdbcSelfTest {
}
}
+ @Test
+ public void testBatchWithDdl() throws SQLException {
+ stmt.addBatch("CREATE TABLE t1(ID INT PRIMARY KEY)");
+ stmt.addBatch("CREATE TABLE t2(ID INT PRIMARY KEY)");
+ stmt.addBatch("INSERT INTO t1 VALUES (1)");
+ stmt.addBatch("INSERT INTO t2 VALUES (1), (2)");
+
+ int[] updCnts = stmt.executeBatch();
+
+ assertEquals(4, updCnts.length, "Invalid update counts size");
+ assertEquals(Statement.SUCCESS_NO_INFO, updCnts[0]);
+ assertEquals(Statement.SUCCESS_NO_INFO, updCnts[1]);
+ assertEquals(1, updCnts[2]);
+ assertEquals(2, updCnts[3]);
+ }
+
+ @Test
+ public void testBatchWithKill() throws SQLException {
+ try (Statement targetQueryStatement = conn.createStatement()) {
+ try (ResultSet rs = targetQueryStatement.executeQuery("SELECT x
FROM system_range(0, 100000);")) {
+ IgniteImpl ignite = unwrapIgniteImpl(CLUSTER.aliveNode());
+ SqlQueryProcessor queryProcessor = (SqlQueryProcessor)
ignite.queryEngine();
+
+ List<QueryInfo> queries = queryProcessor.runningQueries();
+
+ assertThat(queries, hasSize(1));
+ UUID targetId = queries.get(0).id();
+
+ stmt.addBatch("KILL QUERY '" + targetId + "'");
+ stmt.executeBatch();
+
+ SqlTestUtils.waitUntilRunningQueriesCount(queryProcessor,
is(0));
+
+ //noinspection ThrowableNotThrown
+ assertThrowsSqlException(
+ QueryCancelledException.CANCEL_MSG, () -> {
+ //noinspection StatementWithEmptyBody
+ while (rs.next()) {
+ }
+ }
+ );
+ }
+ }
+ }
+
@Test
public void testMultipleStatementForBatchIsNotAllowed() throws
SQLException {
String insertStmt = "insert into Person (id, firstName, lastName, age)
values";
@@ -218,6 +275,18 @@ public class ItJdbcBatchSelfTest extends
AbstractJdbcSelfTest {
}
}
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("forbiddenStatements")
+ public void testForbiddenQueryTypes(String sql, String expectedError)
throws SQLException {
+ stmt.addBatch(sql);
+
+ assertThrowsSqlException(
+ BatchUpdateException.class,
+ expectedError,
+ stmt::executeBatch
+ );
+ }
+
@Test
public void testBatchException() throws Exception {
final int successUpdates = 5;
@@ -850,4 +919,23 @@ public class ItJdbcBatchSelfTest extends
AbstractJdbcSelfTest {
return cnt.getLong(1);
}
}
+
+ private static List<Arguments> forbiddenStatements() {
+ return List.of(
+ Arguments.of("SELECT * FROM Person",
+ "Invalid SQL statement type."),
+
+ Arguments.of("EXPLAIN PLAN FOR DELETE FROM Person",
+ "Invalid SQL statement type."),
+
+ Arguments.of("START TRANSACTION",
+ "Transaction control statement can not be executed as
an independent statement."),
+
+ Arguments.of("COMMIT",
+ "Transaction control statement can not be executed as
an independent statement."),
+
+ Arguments.of("START TRANSACTION; COMMIT",
+ "Multiple statements are not allowed.")
+ );
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ValidationHelper.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ValidationHelper.java
index 63a85ab8e5..001889255a 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ValidationHelper.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ValidationHelper.java
@@ -41,13 +41,13 @@ public final class ValidationHelper {
SqlQueryType queryType = parsedResult.queryType();
if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
- String message = "Transaction control statement can not be
executed as an independent statement";
+ String message = "Transaction control statement can not be
executed as an independent statement.";
throw new SqlException(STMT_VALIDATION_ERR, message);
}
if (!allowedTypes.contains(queryType)) {
- String message = format("Invalid SQL statement type. Expected {}
but got {}", allowedTypes, queryType);
+ String message = format("Invalid SQL statement type. Expected {}
but got {}.", allowedTypes, queryType);
throw new SqlException(STMT_VALIDATION_ERR, message);
}