This is an automated email from the ASF dual-hosted git repository.
ppa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9191e2233f IGNITE-21872 Sql. Implement IgniteSql#executeBatchAsync()
accepting Statement (#3801)
9191e2233f is described below
commit 9191e2233f6b0aad78d966ba7124e3435514a1c1
Author: Pavel Pereslegin <[email protected]>
AuthorDate: Thu May 23 14:28:21 2024 +0300
IGNITE-21872 Sql. Implement IgniteSql#executeBatchAsync() accepting
Statement (#3801)
---
.../ignite/internal/sql/api/ItSqlApiBaseTest.java | 56 +++++++++++++++++-----
.../internal/sql/api/ItSqlAsynchronousApiTest.java | 9 +++-
.../sql/api/ItSqlClientAsynchronousApiTest.java | 18 +++++++
.../sql/api/ItSqlClientSynchronousApiTest.java | 25 ++++++----
.../internal/sql/api/ItSqlSynchronousApiTest.java | 8 +++-
.../sql/threading/ItSqlApiThreadingTest.java | 5 +-
.../ignite/internal/sql/api/IgniteSqlImpl.java | 42 +++++++++-------
7 files changed, 114 insertions(+), 49 deletions(-)
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
index 79564f192b..0ba0d825b9 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlApiBaseTest.java
@@ -570,34 +570,64 @@ public abstract class ItSqlApiBaseTest extends
BaseSqlIntegrationTest {
public void batch() {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- IgniteSql sql = CLUSTER.aliveNode().sql();
+ // Execute batch using query.
+ {
+ BatchedArguments args = BatchedArguments.of(0, 0);
- BatchedArguments args = BatchedArguments.of(0, 0);
+ for (int i = 1; i < ROW_COUNT; ++i) {
+ args.add(i, i);
+ }
- for (int i = 1; i < ROW_COUNT; ++i) {
- args.add(i, i);
+ long[] batchRes = executeBatch("INSERT INTO TEST VALUES (?, ?)",
args);
+
+ Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
}
- long[] batchRes = executeBatch(sql, "INSERT INTO TEST VALUES (?, ?)",
args);
+ // Execute batch using statement.
+ {
+ BatchedArguments args = BatchedArguments.of(ROW_COUNT, ROW_COUNT);
+
+ for (int i = ROW_COUNT + 1; i < ROW_COUNT * 2; ++i) {
+ args.add(i, i);
+ }
+
+ Statement statement = igniteSql().createStatement("INSERT INTO
TEST VALUES (?, ?)");
- Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
+ long[] batchRes = executeBatch(statement, args);
+
+ Arrays.stream(batchRes).forEach(r -> assertEquals(1L, r));
+ }
// Check that data are inserted OK
List<List<Object>> res = sql("SELECT ID FROM TEST ORDER BY ID");
- IntStream.range(0, ROW_COUNT).forEach(i -> assertEquals(i,
res.get(i).get(0)));
+ IntStream.range(0, ROW_COUNT * 2).forEach(i -> assertEquals(i,
res.get(i).get(0)));
+
+ BatchedArguments args = BatchedArguments.of(-1, -1);
// Check invalid query type
assertThrowsSqlException(
SqlBatchException.class,
Sql.STMT_VALIDATION_ERR,
"Invalid SQL statement type",
- () -> executeBatch(sql, "SELECT * FROM TEST", args));
+ () -> executeBatch("SELECT * FROM TEST", args));
assertThrowsSqlException(
SqlBatchException.class,
Sql.STMT_VALIDATION_ERR,
"Invalid SQL statement type",
- () -> executeBatch(sql, "CREATE TABLE TEST1(ID INT PRIMARY
KEY, VAL0 INT)", args));
+ () -> executeBatch("CREATE TABLE TEST1(ID INT PRIMARY KEY,
VAL0 INT)", args));
+
+ // Check that statement parameters taken into account.
+ Statement statement = igniteSql().statementBuilder()
+ .defaultSchema("NON_EXISTING_SCHEMA")
+ .query("INSERT INTO TEST VALUES (?, ?)")
+ .build();
+
+ assertThrowsSqlException(
+ SqlBatchException.class,
+ Sql.SCHEMA_NOT_FOUND_ERR,
+ "Schema not found [schemaName=NON_EXISTING_SCHEMA]",
+ () -> executeBatch(statement, args));
}
@Test
@@ -606,8 +636,6 @@ public abstract class ItSqlApiBaseTest extends
BaseSqlIntegrationTest {
sql("CREATE TABLE TEST(ID INT PRIMARY KEY, VAL0 INT)");
- IgniteSql sql = CLUSTER.aliveNode().sql();
-
BatchedArguments args = BatchedArguments.of(0, 0);
for (int i = 1; i < ROW_COUNT; ++i) {
@@ -622,7 +650,7 @@ public abstract class ItSqlApiBaseTest extends
BaseSqlIntegrationTest {
SqlBatchException.class,
Sql.CONSTRAINT_VIOLATION_ERR,
"PK unique constraint is violated",
- () -> executeBatch(sql, "INSERT INTO TEST VALUES (?, ?)", args)
+ () -> executeBatch("INSERT INTO TEST VALUES (?, ?)", args)
);
assertEquals(err, ex.updateCounters().length);
@@ -890,7 +918,9 @@ public abstract class ItSqlApiBaseTest extends
BaseSqlIntegrationTest {
return assertThrowsSqlException(code, msg, () -> execute(sql, query,
args));
}
- protected abstract long[] executeBatch(IgniteSql sql, String query,
BatchedArguments args);
+ protected abstract long[] executeBatch(String query, BatchedArguments
args);
+
+ protected abstract long[] executeBatch(Statement statement,
BatchedArguments args);
protected ResultProcessor execute(Integer expectedPages, Transaction tx,
IgniteSql sql, String query, Object... args) {
return execute(expectedPages, tx, sql, sql.createStatement(query),
args);
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
index 02480a6e4d..9446c7ae8d 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlAsynchronousApiTest.java
@@ -101,8 +101,13 @@ public class ItSqlAsynchronousApiTest extends
ItSqlApiBaseTest {
}
@Override
- protected long[] executeBatch(IgniteSql sql, String query,
BatchedArguments args) {
- return await(sql.executeBatchAsync(null, query, args));
+ protected long[] executeBatch(String query, BatchedArguments args) {
+ return await(igniteSql().executeBatchAsync(null, query, args));
+ }
+
+ @Override
+ protected long[] executeBatch(Statement statement, BatchedArguments args) {
+ return await(igniteSql().executeBatchAsync(null, statement, args));
}
@Override
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
index ad98d06eda..0cfde82266 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientAsynchronousApiTest.java
@@ -23,6 +23,8 @@ import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
/**
* Tests for asynchronous client SQL API.
@@ -49,4 +51,20 @@ public class ItSqlClientAsynchronousApiTest extends
ItSqlAsynchronousApiTest {
protected IgniteTransactions igniteTx() {
return client.transactions();
}
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059")
+ @Override
+ public void batch() {
+ // TODO Method should be completely removed from this class after
IGNITE-17059.
+ super.batch();
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059")
+ @Override
+ public void batchIncomplete() {
+ // TODO Method should be completely removed from this class after
IGNITE-17059.
+ super.batchIncomplete();
+ }
}
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
index 66ca236798..8b6375d0ce 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientSynchronousApiTest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -42,24 +43,28 @@ public class ItSqlClientSynchronousApiTest extends
ItSqlSynchronousApiTest {
}
@Override
- @Test
- public void resultSetCloseShouldFinishImplicitTransaction() {
- super.resultSetCloseShouldFinishImplicitTransaction();
+ protected IgniteSql igniteSql() {
+ return client.sql();
}
@Override
- @Test
- public void errors() throws InterruptedException {
- super.errors();
+ protected IgniteTransactions igniteTx() {
+ return client.transactions();
}
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059")
@Override
- protected IgniteSql igniteSql() {
- return client.sql();
+ public void batch() {
+ // TODO Method should be completely removed from this class after
IGNITE-17059.
+ super.batch();
}
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17059")
@Override
- protected IgniteTransactions igniteTx() {
- return client.transactions();
+ public void batchIncomplete() {
+ // TODO Method should be completely removed from this class after
IGNITE-17059.
+ super.batchIncomplete();
}
}
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
index d1fcc8f2c7..329f82745e 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlSynchronousApiTest.java
@@ -39,10 +39,14 @@ public class ItSqlSynchronousApiTest extends
ItSqlApiBaseTest {
}
@Override
- protected long[] executeBatch(IgniteSql sql, String query,
BatchedArguments args) {
- return sql.executeBatch(null, query, args);
+ protected long[] executeBatch(String query, BatchedArguments args) {
+ return igniteSql().executeBatch(null, query, args);
}
+ @Override
+ protected long[] executeBatch(Statement statement, BatchedArguments args) {
+ return igniteSql().executeBatch(null, statement, args);
+ }
@Override
protected ResultProcessor execute(Integer expectedPages, Transaction tx,
IgniteSql sql, Statement statement, Object... args) {
diff --git
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
index 4271a70600..f73f942ff3 100644
---
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
+++
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/threading/ItSqlApiThreadingTest.java
@@ -197,10 +197,7 @@ class ItSqlApiThreadingTest extends
ClusterPerClassIntegrationTest {
// EXECUTE_QUERY_WITH_MAPPER_ASYNC(sql -> sql.executeAsync(null,
(Mapper<?>) null, SELECT_QUERY)),
// EXECUTE_STATEMENT_WITH_MAPPER_ASYNC(sql -> sql.executeAsync(null,
(Mapper<?>) null, sql.createStatement(SELECT_QUERY))),
EXECUTE_BATCH_QUERY_ASYNC(sql -> sql.executeBatchAsync(null,
UPDATE_QUERY, BatchedArguments.of(10_000))),
- // TODO: IGNITE-21872 - uncomment the following lines.
- // EXECUTE_BATCH_STATEMENT_ASYNC(
- // sql -> sql.executeBatchAsync(null,
sql.createStatement(UPDATE_QUERY), BatchedArguments.of(10_000))
- // ),
+ EXECUTE_BATCH_STATEMENT_ASYNC(sql -> sql.executeBatchAsync(null,
sql.createStatement(UPDATE_QUERY), BatchedArguments.of(10_000))),
EXECUTE_SCRIPT_ASYNC(sql -> sql.executeScriptAsync(SELECT_QUERY));
private final Function<IgniteSql, CompletableFuture<?>> action;
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index d660307c2b..88259e2403 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
@@ -266,7 +267,11 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
/** {@inheritDoc} */
@Override
public long[] executeBatch(@Nullable Transaction transaction, Statement
dmlStatement, BatchedArguments batch) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ try {
+ return executeBatchAsync(transaction, dmlStatement, batch).join();
+ } catch (CompletionException e) {
+ throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+ }
}
/** {@inheritDoc} */
@@ -288,7 +293,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
String query,
@Nullable Object... arguments
) {
- return executeAsyncInternal(transaction, new StatementImpl(query),
arguments);
+ return executeAsyncInternal(transaction, createStatement(query),
arguments);
}
/** {@inheritDoc} */
@@ -336,11 +341,7 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
CompletableFuture<AsyncResultSet<SqlRow>> result;
try {
- SqlProperties properties = SqlPropertiesHelper.newBuilder()
- .set(QueryProperty.ALLOWED_QUERY_TYPES,
SqlQueryType.SINGLE_STMT_TYPES)
- .set(QueryProperty.TIME_ZONE_ID, statement.timeZoneId())
- .set(QueryProperty.DEFAULT_SCHEMA,
statement.defaultSchema())
- .build();
+ SqlProperties properties =
createPropertiesFromStatement(SqlQueryType.SINGLE_STMT_TYPES, statement);
result = queryProcessor.queryAsync(properties, transactions,
(InternalTransaction) transaction, statement.query(), arguments)
.thenCompose(cur -> {
@@ -385,20 +386,24 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
/** {@inheritDoc} */
@Override
public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
+ return executeBatchAsync(transaction, createStatement(query), batch);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(nodeIsStoppingException());
}
try {
- SqlProperties properties = SqlPropertiesHelper.newBuilder()
- .set(QueryProperty.ALLOWED_QUERY_TYPES,
EnumSet.of(SqlQueryType.DML))
- .build();
+ SqlProperties properties =
createPropertiesFromStatement(EnumSet.of(SqlQueryType.DML), statement);
return executeBatchCore(
queryProcessor,
transactions,
(InternalTransaction) transaction,
- query,
+ statement.query(),
batch,
properties,
busyLock::enterBusy,
@@ -412,13 +417,6 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
}
}
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, Statement statement, BatchedArguments batch) {
- // TODO: IGNITE-21872 - implement.
- throw new UnsupportedOperationException("Not implemented yet.");
- }
-
/**
* Execute batch of DML statements.
*
@@ -599,6 +597,14 @@ public class IgniteSqlImpl implements IgniteSql,
IgniteComponent {
}
}
+ private static SqlProperties
createPropertiesFromStatement(Set<SqlQueryType> queryType, Statement statement)
{
+ return SqlPropertiesHelper.newBuilder()
+ .set(QueryProperty.ALLOWED_QUERY_TYPES, queryType)
+ .set(QueryProperty.TIME_ZONE_ID, statement.timeZoneId())
+ .set(QueryProperty.DEFAULT_SCHEMA, statement.defaultSchema())
+ .build();
+ }
+
private int registerCursor(AsyncSqlCursor<?> cursor) {
int cursorId = cursorIdGen.incrementAndGet();