This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ed2833fc69 IGNITE-24078 Sql. Fixed handling of DDL/KILL statements 
using jdbc batch (#4960)
ed2833fc69 is described below

commit ed2833fc69b22b1a67a51ffa1749acadf4899569
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Wed Dec 25 15:57:20 2024 +0300

    IGNITE-24078 Sql. Fixed handling of DDL/KILL statements using jdbc batch 
(#4960)
---
 .../client/handler/JdbcQueryEventHandlerImpl.java  | 51 ++++++++-----
 .../apache/ignite/jdbc/ItJdbcBatchSelfTest.java    | 88 ++++++++++++++++++++++
 .../sql/engine/exec/fsm/ValidationHelper.java      |  4 +-
 3 files changed, 124 insertions(+), 19 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
index d87c24c19f..1b286c6a57 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/JdbcQueryEventHandlerImpl.java
@@ -67,6 +67,7 @@ import 
org.apache.ignite.internal.sql.engine.property.SqlProperties;
 import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
 import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.tx.IgniteTransactions;
@@ -231,12 +232,9 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
                     token,
                     query,
                     OBJECT_EMPTY_ARRAY,
-                    queryTimeoutMillis
-            ).thenApply(cnt -> {
-                list.add(cnt > Integer.MAX_VALUE ? Statement.SUCCESS_NO_INFO : 
cnt.intValue());
-
-                return list;
-            }));
+                    queryTimeoutMillis,
+                    list
+            ));
         }
 
         return tail.handle((ignored, t) -> {
@@ -270,12 +268,8 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
 
         for (Object[] args : argList) {
             tail = tail.thenCompose(list -> executeAndCollectUpdateCount(
-                    connectionContext, tx, token, req.getQuery(), args, 
timeoutMillis
-            ).thenApply(cnt -> {
-                list.add(cnt > Integer.MAX_VALUE ? Statement.SUCCESS_NO_INFO : 
cnt.intValue());
-
-                return list;
-            }));
+                    connectionContext, tx, token, req.getQuery(), args, 
timeoutMillis, list
+            ));
         }
 
         return tail.handle((ignored, t) -> {
@@ -289,14 +283,15 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
         });
     }
 
-    private CompletableFuture<Long> executeAndCollectUpdateCount(
+    private CompletableFuture<IntArrayList> executeAndCollectUpdateCount(
             JdbcConnectionContext context,
             @Nullable InternalTransaction tx,
             CancellationToken token,
             String sql,
             Object[] arg,
-            long timeoutMillis) {
-
+            long timeoutMillis,
+            IntArrayList resultUpdateCounters
+    ) {
         if (!context.valid()) {
             return CompletableFuture.failedFuture(new 
IgniteInternalException(CONNECTION_ERR, "Connection is closed"));
         }
@@ -312,8 +307,30 @@ public class JdbcQueryEventHandlerImpl extends 
JdbcHandlerBase implements JdbcQu
                 arg == null ? OBJECT_EMPTY_ARRAY : arg
         );
 
-        return result.thenCompose(cursor -> cursor.requestNextAsync(1))
-                .thenApply(batch -> (Long) batch.items().get(0).get(0));
+        return result.thenCompose(cursor -> cursor.requestNextAsync(1)
+                .thenApply(batch -> {
+                    int updateCounter = handleBatchResult(cursor.queryType(), 
batch);
+
+                    resultUpdateCounters.add(updateCounter);
+
+                    return resultUpdateCounters;
+                }));
+    }
+
+    private static int handleBatchResult(SqlQueryType type, 
BatchedResult<InternalSqlRow> result) {
+        switch (type) {
+            case DDL:
+            case KILL:
+                return Statement.SUCCESS_NO_INFO;
+            case DML:
+                Long updateCounts = (Long) result.items().get(0).get(0);
+
+                assert updateCounts != null : "Invalid DML result";
+
+                return updateCounts > Integer.MAX_VALUE ? 
Statement.SUCCESS_NO_INFO : updateCounts.intValue();
+            default:
+                throw new IllegalStateException("Unexpected query type: " + 
type);
+        }
     }
 
     private static JdbcBatchExecuteResult handleBatchException(Throwable e, 
String query, int[] counters) {
diff --git 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
index 96f4a35ac4..db6f6bc930 100644
--- 
a/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
+++ 
b/modules/jdbc/src/integrationTest/java/org/apache/ignite/jdbc/ItJdbcBatchSelfTest.java
@@ -17,9 +17,12 @@
 
 package org.apache.ignite.jdbc;
 
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.jdbc.util.JdbcTestUtils.assertThrowsSqlException;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -38,6 +41,8 @@ import java.sql.Timestamp;
 import java.time.LocalDate;
 import java.time.LocalTime;
 import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.app.IgniteImpl;
@@ -46,12 +51,19 @@ import org.apache.ignite.internal.jdbc.JdbcStatement;
 import org.apache.ignite.internal.jdbc.proto.IgniteQueryErrorCode;
 import org.apache.ignite.internal.jdbc.proto.SqlStateCode;
 import org.apache.ignite.internal.restart.RestartProofIgnite;
+import org.apache.ignite.internal.sql.engine.QueryCancelledException;
+import org.apache.ignite.internal.sql.engine.SqlQueryProcessor;
+import org.apache.ignite.internal.sql.engine.exec.fsm.QueryInfo;
+import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
 import org.apache.ignite.internal.tx.TxManager;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 /**
  * Statement test.
@@ -136,6 +148,51 @@ public class ItJdbcBatchSelfTest extends 
AbstractJdbcSelfTest {
         }
     }
 
+    @Test
+    public void testBatchWithDdl() throws SQLException {
+        stmt.addBatch("CREATE TABLE t1(ID INT PRIMARY KEY)");
+        stmt.addBatch("CREATE TABLE t2(ID INT PRIMARY KEY)");
+        stmt.addBatch("INSERT INTO t1 VALUES (1)");
+        stmt.addBatch("INSERT INTO t2 VALUES (1), (2)");
+
+        int[] updCnts = stmt.executeBatch();
+
+        assertEquals(4, updCnts.length, "Invalid update counts size");
+        assertEquals(Statement.SUCCESS_NO_INFO, updCnts[0]);
+        assertEquals(Statement.SUCCESS_NO_INFO, updCnts[1]);
+        assertEquals(1, updCnts[2]);
+        assertEquals(2, updCnts[3]);
+    }
+
+    @Test
+    public void testBatchWithKill() throws SQLException {
+        try (Statement targetQueryStatement = conn.createStatement()) {
+            try (ResultSet rs = targetQueryStatement.executeQuery("SELECT x 
FROM system_range(0, 100000);")) {
+                IgniteImpl ignite = unwrapIgniteImpl(CLUSTER.aliveNode());
+                SqlQueryProcessor queryProcessor = (SqlQueryProcessor) 
ignite.queryEngine();
+
+                List<QueryInfo> queries = queryProcessor.runningQueries();
+
+                assertThat(queries, hasSize(1));
+                UUID targetId = queries.get(0).id();
+
+                stmt.addBatch("KILL QUERY '" + targetId + "'");
+                stmt.executeBatch();
+
+                SqlTestUtils.waitUntilRunningQueriesCount(queryProcessor, 
is(0));
+
+                //noinspection ThrowableNotThrown
+                assertThrowsSqlException(
+                        QueryCancelledException.CANCEL_MSG, () -> {
+                            //noinspection StatementWithEmptyBody
+                            while (rs.next()) {
+                            }
+                        }
+                );
+            }
+        }
+    }
+
     @Test
     public void testMultipleStatementForBatchIsNotAllowed() throws 
SQLException {
         String insertStmt = "insert into Person (id, firstName, lastName, age) 
values";
@@ -218,6 +275,18 @@ public class ItJdbcBatchSelfTest extends 
AbstractJdbcSelfTest {
         }
     }
 
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("forbiddenStatements")
+    public void testForbiddenQueryTypes(String sql, String expectedError) 
throws SQLException {
+        stmt.addBatch(sql);
+
+        assertThrowsSqlException(
+                BatchUpdateException.class,
+                expectedError,
+                stmt::executeBatch
+        );
+    }
+
     @Test
     public void testBatchException() throws Exception {
         final int successUpdates = 5;
@@ -850,4 +919,23 @@ public class ItJdbcBatchSelfTest extends 
AbstractJdbcSelfTest {
             return cnt.getLong(1);
         }
     }
+
+    private static List<Arguments> forbiddenStatements() {
+        return List.of(
+                Arguments.of("SELECT * FROM Person",
+                        "Invalid SQL statement type."),
+
+                Arguments.of("EXPLAIN PLAN FOR DELETE FROM Person",
+                        "Invalid SQL statement type."),
+
+                Arguments.of("START TRANSACTION",
+                        "Transaction control statement can not be executed as 
an independent statement."),
+
+                Arguments.of("COMMIT",
+                        "Transaction control statement can not be executed as 
an independent statement."),
+
+                Arguments.of("START TRANSACTION; COMMIT",
+                        "Multiple statements are not allowed.")
+        );
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ValidationHelper.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ValidationHelper.java
index 63a85ab8e5..001889255a 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ValidationHelper.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/ValidationHelper.java
@@ -41,13 +41,13 @@ public final class ValidationHelper {
         SqlQueryType queryType = parsedResult.queryType();
 
         if (parsedResult.queryType() == SqlQueryType.TX_CONTROL) {
-            String message = "Transaction control statement can not be 
executed as an independent statement";
+            String message = "Transaction control statement can not be 
executed as an independent statement.";
 
             throw new SqlException(STMT_VALIDATION_ERR, message);
         }
 
         if (!allowedTypes.contains(queryType)) {
-            String message = format("Invalid SQL statement type. Expected {} 
but got {}", allowedTypes, queryType);
+            String message = format("Invalid SQL statement type. Expected {} 
but got {}.", allowedTypes, queryType);
 
             throw new SqlException(STMT_VALIDATION_ERR, message);
         }

Reply via email to