This is an automated email from the ASF dual-hosted git repository.

ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9191e2233f IGNITE-21872 Sql. Implement IgniteSql#executeBatchAsync() 
accepting Statement (#3801)
9191e2233f is described below

commit 9191e2233f6b0aad78d966ba7124e3435514a1c1
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu May 23 14:28:21 2024 +0300

    IGNITE-21872 Sql. Implement IgniteSql#executeBatchAsync() accepting 
Statement (#3801)
---
 .../ignite/internal/sql/api/ItSqlApiBaseTest.java  | 56 +++++++++++++++++-----
 .../internal/sql/api/ItSqlAsynchronousApiTest.java |  9 +++-
 .../sql/api/ItSqlClientAsynchronousApiTest.java    | 18 +++++++
 .../sql/api/ItSqlClientSynchronousApiTest.java     | 25 ++++++----
 .../internal/sql/api/ItSqlSynchronousApiTest.java  |  8 +++-
 .../sql/threading/ItSqlApiThreadingTest.java       |  5 +-
 .../ignite/internal/sql/api/IgniteSqlImpl.java     | 42 +++++++++-------
 7 files changed, 114 insertions(+), 49 deletions(-)

diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index 79564f192b..0ba0d825b9 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -570,34 +570,64 @@ public abstract class ItSqlApiBaseTest extends 
BaseSqlIntegrationTest {
     public void batch() {
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
 
-        IgniteSql sql = CLUSTER.aliveNode().sql();
+        // Execute batch using query.
+        {
+            BatchedArguments args = BatchedArguments.of(0, 0);
 
-        BatchedArguments args = BatchedArguments.of(0, 0);
+            for (int i = 1; i < ROW_COUNT; ++i) {
+                args.add(i, i);
+            }
 
-        for (int i = 1; i < ROW_COUNT; ++i) {
-            args.add(i, i);
+            long[] batchRes = executeBatch("INSERT INTO TEST VALUES (?, ?)", 
args);
+
+            Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
         }
 
-        long[] batchRes = executeBatch(sql, "INSERT INTO TEST VALUES (?, ?)", 
args);
+        // Execute batch using statement.
+        {
+            BatchedArguments args = BatchedArguments.of(ROW_COUNT, ROW_COUNT);
+
+            for (int i = ROW_COUNT + 1; i < ROW_COUNT * 2; ++i) {
+                args.add(i, i);
+            }
+
+            Statement statement = igniteSql().createStatement("INSERT INTO 
TEST VALUES (?, ?)");
 
-        Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
+            long[] batchRes = executeBatch(statement, args);
+
+            Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
+        }
 
         // Check that data are inserted OK
         List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
-        IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i, 
res.get(i).get(0)));
+        IntStream.range(0, ROW_COUNT * 2).forEach(i -> assertEquals(i, 
res.get(i).get(0)));
+
+        BatchedArguments args = BatchedArguments.of(-1, -1);
 
         // Check invalid query type
         assertThrowsSqlException(
                 SqlBatchException.class,
                 Sql.STMT_VALIDATION_ERR,
                 "Invalid SQL statement type",
-                () -> executeBatch(sql, "SELECT * FROM TEST", args));
+                () -> executeBatch("SELECT * FROM TEST", args));
 
         assertThrowsSqlException(
                 SqlBatchException.class,
                 Sql.STMT_VALIDATION_ERR,
                 "Invalid SQL statement type",
-                () -> executeBatch(sql, "CREATE TABLE TEST1(ID INT PRIMARY 
KEY, VAL0 INT)", args));
+                () -> executeBatch("CREATE TABLE TEST1(ID INT PRIMARY KEY, 
VAL0 INT)", args));
+
+        // Check that statement parameters taken into account.
+        Statement statement = igniteSql().statementBuilder()
+                .defaultSchema("NON_EXISTING_SCHEMA")
+                .query("INSERT INTO TEST VALUES (?, ?)")
+                .build();
+
+        assertThrowsSqlException(
+                SqlBatchException.class,
+                Sql.SCHEMA_NOT_FOUND_ERR,
+                "Schema not found [schemaName=NON_EXISTING_SCHEMA]",
+                () -> executeBatch(statement, args));
     }
 
     @Test
@@ -606,8 +636,6 @@ public abstract class ItSqlApiBaseTest extends 
BaseSqlIntegrationTest {
 
         sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
 
-        IgniteSql sql = CLUSTER.aliveNode().sql();
-
         BatchedArguments args = BatchedArguments.of(0, 0);
 
         for (int i = 1; i < ROW_COUNT; ++i) {
@@ -622,7 +650,7 @@ public abstract class ItSqlApiBaseTest extends 
BaseSqlIntegrationTest {
                 SqlBatchException.class,
                 Sql.CONSTRAINT_VIOLATION_ERR,
                 "PK unique constraint is violated",
-                () -> executeBatch(sql, "INSERT INTO TEST VALUES (?, ?)", args)
+                () -> executeBatch("INSERT INTO TEST VALUES (?, ?)", args)
         );
 
         assertEquals(err, ex.updateCounters().length);
@@ -890,7 +918,9 @@ public abstract class ItSqlApiBaseTest extends 
BaseSqlIntegrationTest {
         return assertThrowsSqlException(code, msg, () -> execute(sql, query, 
args));
     }
 
-    protected abstract long[] executeBatch(IgniteSql sql, String query, 
BatchedArguments args);
+    protected abstract long[] executeBatch(String query, BatchedArguments 
args);
+
+    protected abstract long[] executeBatch(Statement statement, 
BatchedArguments args);
 
     protected ResultProcessor execute(Integer expectedPages, Transaction tx, 
IgniteSql sql, String query, Object... args) {
         return execute(expectedPages, tx, sql, sql.createStatement(query), 
args);
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 02480a6e4d..9446c7ae8d 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -101,8 +101,13 @@ public class ItSqlAsynchronousApiTest extends 
ItSqlApiBaseTest {
     }
 
     @Override
-    protected long[] executeBatch(IgniteSql sql, String query, 
BatchedArguments args) {
-        return await(sql.executeBatchAsync(null, query, args));
+    protected long[] executeBatch(String query, BatchedArguments args) {
+        return await(igniteSql().executeBatchAsync(null, query, args));
+    }
+
+    @Override
+    protected long[] executeBatch(Statement statement, BatchedArguments args) {
+        return await(igniteSql().executeBatchAsync(null, statement, args));
     }
 
     @Override
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index ad98d06eda..0cfde82266 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -23,6 +23,8 @@ import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 
 /**
  * Tests for asynchronous client SQL API.
@@ -49,4 +51,20 @@ public class ItSqlClientAsynchronousApiTest extends 
ItSqlAsynchronousApiTest {
     protected IgniteTransactions igniteTx() {
         return client.transactions();
     }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059";)
+    @Override
+    public void batch() {
+        // TODO Method should be completely removed from this class after 
IGNITE-17059.
+        super.batch();
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059";)
+    @Override
+    public void batchIncomplete() {
+        // TODO Method should be completely removed from this class after 
IGNITE-17059.
+        super.batchIncomplete();
+    }
 }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 66ca236798..8b6375d0ce 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -42,24 +43,28 @@ public class ItSqlClientSynchronousApiTest extends 
ItSqlSynchronousApiTest {
     }
 
     @Override
-    @Test
-    public void resultSetCloseShouldFinishImplicitTransaction() {
-        super.resultSetCloseShouldFinishImplicitTransaction();
+    protected IgniteSql igniteSql() {
+        return client.sql();
     }
 
     @Override
-    @Test
-    public void errors() throws InterruptedException {
-        super.errors();
+    protected IgniteTransactions igniteTx() {
+        return client.transactions();
     }
 
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059";)
     @Override
-    protected IgniteSql igniteSql() {
-        return client.sql();
+    public void batch() {
+        // TODO Method should be completely removed from this class after 
IGNITE-17059.
+        super.batch();
     }
 
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059";)
     @Override
-    protected IgniteTransactions igniteTx() {
-        return client.transactions();
+    public void batchIncomplete() {
+        // TODO Method should be completely removed from this class after 
IGNITE-17059.
+        super.batchIncomplete();
     }
 }
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index d1fcc8f2c7..329f82745e 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -39,10 +39,14 @@ public class ItSqlSynchronousApiTest extends 
ItSqlApiBaseTest {
     }
 
     @Override
-    protected long[] executeBatch(IgniteSql sql, String query, 
BatchedArguments args) {
-        return sql.executeBatch(null, query, args);
+    protected long[] executeBatch(String query, BatchedArguments args) {
+        return igniteSql().executeBatch(null, query, args);
     }
 
+    @Override
+    protected long[] executeBatch(Statement statement, BatchedArguments args) {
+        return igniteSql().executeBatch(null, statement, args);
+    }
 
     @Override
     protected ResultProcessor execute(Integer expectedPages, Transaction tx, 
IgniteSql sql, Statement statement, Object... args) {
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
index 4271a70600..f73f942ff3 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
@@ -197,10 +197,7 @@ class ItSqlApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         // EXECUTE_QUERY_WITH_MAPPER_ASYNC(sql -> sql.executeAsync(null, 
(Mapper<?>) null, SELECT_QUERY)),
         // EXECUTE_STATEMENT_WITH_MAPPER_ASYNC(sql -> sql.executeAsync(null, 
(Mapper<?>) null, sql.createStatement(SELECT_QUERY))),
         EXECUTE_BATCH_QUERY_ASYNC(sql -> sql.executeBatchAsync(null, 
UPDATE_QUERY, BatchedArguments.of(10_000))),
-        // TODO: IGNITE-21872 - uncomment the following lines.
-        // EXECUTE_BATCH_STATEMENT_ASYNC(
-        //         sql -> sql.executeBatchAsync(null, 
sql.createStatement(UPDATE_QUERY), BatchedArguments.of(10_000))
-        // ),
+        EXECUTE_BATCH_STATEMENT_ASYNC(sql -> sql.executeBatchAsync(null, 
sql.createStatement(UPDATE_QUERY), BatchedArguments.of(10_000))),
         EXECUTE_SCRIPT_ASYNC(sql -> sql.executeScriptAsync(SELECT_QUERY));
 
         private final Function<IgniteSql, CompletableFuture<?>> action;
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index d660307c2b..88259e2403 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -266,7 +267,11 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
     /** {@inheritDoc} */
     @Override
     public long[] executeBatch(@Nullable Transaction transaction, Statement 
dmlStatement, BatchedArguments batch) {
-        throw new UnsupportedOperationException("Not implemented yet.");
+        try {
+            return executeBatchAsync(transaction, dmlStatement, batch).join();
+        } catch (CompletionException e) {
+            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+        }
     }
 
     /** {@inheritDoc} */
@@ -288,7 +293,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
             String query,
             @Nullable Object... arguments
     ) {
-        return executeAsyncInternal(transaction, new StatementImpl(query), 
arguments);
+        return executeAsyncInternal(transaction, createStatement(query), 
arguments);
     }
 
     /** {@inheritDoc} */
@@ -336,11 +341,7 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
         CompletableFuture<AsyncResultSet<SqlRow>> result;
 
         try {
-            SqlProperties properties = SqlPropertiesHelper.newBuilder()
-                    .set(QueryProperty.ALLOWED_QUERY_TYPES, 
SqlQueryType.SINGLE_STMT_TYPES)
-                    .set(QueryProperty.TIME_ZONE_ID, statement.timeZoneId())
-                    .set(QueryProperty.DEFAULT_SCHEMA, 
statement.defaultSchema())
-                    .build();
+            SqlProperties properties = 
createPropertiesFromStatement(SqlQueryType.SINGLE_STMT_TYPES, statement);
 
             result = queryProcessor.queryAsync(properties, transactions, 
(InternalTransaction) transaction, statement.query(), arguments)
                     .thenCompose(cur -> {
@@ -385,20 +386,24 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, String query, BatchedArguments batch) {
+        return executeBatchAsync(transaction, createStatement(query), batch);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
         if (!busyLock.enterBusy()) {
             return CompletableFuture.failedFuture(nodeIsStoppingException());
         }
 
         try {
-            SqlProperties properties = SqlPropertiesHelper.newBuilder()
-                    .set(QueryProperty.ALLOWED_QUERY_TYPES, 
EnumSet.of(SqlQueryType.DML))
-                    .build();
+            SqlProperties properties = 
createPropertiesFromStatement(EnumSet.of(SqlQueryType.DML), statement);
 
             return executeBatchCore(
                     queryProcessor,
                     transactions,
                     (InternalTransaction) transaction,
-                    query,
+                    statement.query(),
                     batch,
                     properties,
                     busyLock::enterBusy,
@@ -412,13 +417,6 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction 
transaction, Statement statement, BatchedArguments batch) {
-        // TODO: IGNITE-21872 - implement.
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
     /**
      * Execute batch of DML statements.
      *
@@ -599,6 +597,14 @@ public class IgniteSqlImpl implements IgniteSql, 
IgniteComponent {
         }
     }
 
+    private static SqlProperties 
createPropertiesFromStatement(Set<SqlQueryType> queryType, Statement statement) 
{
+        return SqlPropertiesHelper.newBuilder()
+                .set(QueryProperty.ALLOWED_QUERY_TYPES, queryType)
+                .set(QueryProperty.TIME_ZONE_ID, statement.timeZoneId())
+                .set(QueryProperty.DEFAULT_SCHEMA, statement.defaultSchema())
+                .build();
+    }
+
     private int registerCursor(AsyncSqlCursor<?> cursor) {
         int cursorId = cursorIdGen.incrementAndGet();
 

Reply via email to