This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ca95035383 IGNITE-20887 Sql. Avoid using blocking api in sql threads
(#2854)
ca95035383 is described below
commit ca95035383771e2de85310be3dff2efc4e80c7e7
Author: korlov42 <[email protected]>
AuthorDate: Thu Nov 23 09:56:39 2023 +0200
IGNITE-20887 Sql. Avoid using blocking api in sql threads (#2854)
---
.../requests/jdbc/JdbcQueryCursorSelfTest.java | 13 +-
.../benchmark/AbstractMultiNodeBenchmark.java | 67 ++++--
.../ignite/internal/benchmark/InsertBenchmark.java | 18 +-
.../internal/sql/engine/AsyncSqlCursorImpl.java | 62 +++--
.../sql/engine/QueryTransactionWrapper.java | 12 +-
.../internal/sql/engine/SqlQueryProcessor.java | 21 +-
.../internal/sql/engine/exec/rel/ModifyNode.java | 104 ++++----
.../internal/sql/engine/exec/TestDownstream.java | 54 +++++
.../exec/rel/DataSourceScanNodeSelfTest.java | 27 +--
.../engine/exec/rel/ModifyNodeExecutionTest.java | 266 +++++++++++++++++++++
10 files changed, 507 insertions(+), 137 deletions(-)
diff --git
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
index 3be4999c98..62a748b07d 100644
---
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
+++
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.client.handler.requests.jdbc;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
import java.util.ArrayList;
import java.util.List;
@@ -26,28 +27,30 @@ import
org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
import org.apache.ignite.internal.sql.engine.QueryTransactionWrapper;
import org.apache.ignite.internal.sql.engine.SqlQueryType;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
import org.apache.ignite.internal.util.AsyncWrapper;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
/**
* Test class for {@link JdbcQueryCursor}.
*/
-@ExtendWith(MockitoExtension.class)
public class JdbcQueryCursorSelfTest extends BaseIgniteAbstractTest {
- @Mock
private QueryTransactionWrapper txWrapper;
private static final List<Integer> ROWS = List.of(1, 2, 3);
private static final int TOTAL_ROWS_COUNT = ROWS.size();
+ @BeforeEach
+ void initTxMock() {
+ txWrapper = new
QueryTransactionWrapper(mock(InternalTransaction.class), false);
+ }
+
/** Tests corner cases of setting the {@code maxRows} parameter. */
@ParameterizedTest(name = "maxRows={0}, fetchSize={1}")
@MethodSource("maxRowsTestParameters")
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index dd3560f77a..f13101bfdc 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -34,6 +34,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.InitParameters;
import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -59,39 +60,57 @@ public class AbstractMultiNodeBenchmark {
protected static final String FIELD_VAL = "a".repeat(100);
- protected static final String TABLE_NAME = "usertable";
+ protected static final String TABLE_NAME = "USERTABLE";
+
+ protected static final String ZONE_NAME = TABLE_NAME + "_ZONE";
protected static IgniteImpl clusterNode;
- @Param({"true", "false"})
+ @Param({"false", "true"})
private boolean fsync;
/**
* Starts ignite node and creates table {@link #TABLE_NAME}.
*/
@Setup
- public final void nodeSetUp() throws IOException {
+ public final void nodeSetUp() throws Exception {
startCluster();
- var queryEngine = clusterNode.queryEngine();
-
- var sql = "CREATE TABLE " + TABLE_NAME + "(\n"
- + " ycsb_key int PRIMARY KEY,\n"
- + " field1 varchar(100),\n"
- + " field2 varchar(100),\n"
- + " field3 varchar(100),\n"
- + " field4 varchar(100),\n"
- + " field5 varchar(100),\n"
- + " field6 varchar(100),\n"
- + " field7 varchar(100),\n"
- + " field8 varchar(100),\n"
- + " field9 varchar(100),\n"
- + " field10 varchar(100)\n"
- + ");";
-
- getAllFromCursor(
-
await(queryEngine.querySingleAsync(SqlPropertiesHelper.emptyProperties(),
clusterNode.transactions(), null, sql))
- );
+ try {
+ var queryEngine = clusterNode.queryEngine();
+
+ var createZoneStatement = "CREATE ZONE " + ZONE_NAME + " WITH
partitions=" + partitionCount();
+
+ getAllFromCursor(
+ await(queryEngine.querySingleAsync(
+ SqlPropertiesHelper.emptyProperties(),
clusterNode.transactions(), null, createZoneStatement
+ ))
+ );
+
+ var createTableStatement = "CREATE TABLE " + TABLE_NAME + "(\n"
+ + " ycsb_key int PRIMARY KEY,\n"
+ + " field1 varchar(100),\n"
+ + " field2 varchar(100),\n"
+ + " field3 varchar(100),\n"
+ + " field4 varchar(100),\n"
+ + " field5 varchar(100),\n"
+ + " field6 varchar(100),\n"
+ + " field7 varchar(100),\n"
+ + " field8 varchar(100),\n"
+ + " field9 varchar(100),\n"
+ + " field10 varchar(100)\n"
+ + ") WITH primary_zone='" + ZONE_NAME + "'";
+
+ getAllFromCursor(
+ await(queryEngine.querySingleAsync(
+ SqlPropertiesHelper.emptyProperties(),
clusterNode.transactions(), null, createTableStatement
+ ))
+ );
+ } catch (Throwable th) {
+ nodeTearDown();
+
+ throw th;
+ }
}
/**
@@ -167,4 +186,8 @@ public class AbstractMultiNodeBenchmark {
protected int nodes() {
return 3;
}
+
+ protected int partitionCount() {
+ return CatalogUtils.DEFAULT_PARTITION_COUNT;
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
index d47d137948..b77f4931cf 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
@@ -29,6 +29,7 @@ import java.util.stream.IntStream;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.table.KeyValueView;
@@ -55,7 +56,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
* Benchmark for insertion operation, comparing KV, JDBC and SQL APIs.
*/
@State(Scope.Benchmark)
-@Fork(1)
+@Fork(3)
@Threads(1)
@Warmup(iterations = 10, time = 2)
@Measurement(iterations = 20, time = 2)
@@ -65,6 +66,9 @@ public class InsertBenchmark extends
AbstractMultiNodeBenchmark {
@Param({"1", "2", "3"})
private int clusterSize;
+ @Param({"1", "2", "4", "8", "16", "32"})
+ private int partitionCount;
+
/**
* Benchmark for SQL insert via embedded client.
*/
@@ -111,9 +115,6 @@ public class InsertBenchmark extends
AbstractMultiNodeBenchmark {
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + InsertBenchmark.class.getSimpleName() + ".*")
- .forks(1)
- .threads(1)
- .mode(Mode.AverageTime)
.build();
new Runner(opt).run();
@@ -154,7 +155,9 @@ public class InsertBenchmark extends
AbstractMultiNodeBenchmark {
private int id = 0;
void executeQuery() {
- session.execute(null, statement, id++);
+ try (ResultSet<?> rs = session.execute(null, statement, id++)) {
+ // NO-OP
+ }
}
}
@@ -318,4 +321,9 @@ public class InsertBenchmark extends
AbstractMultiNodeBenchmark {
protected int nodes() {
return clusterSize;
}
+
+ @Override
+ protected int partitionCount() {
+ return partitionCount;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 421f780cc0..689ecbbd31 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.sql.engine;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.sql.ResultSetMetadata;
@@ -32,6 +35,9 @@ import org.jetbrains.annotations.Nullable;
* @param <T> Type of elements.
*/
public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final CompletableFuture<Void> closeResult = new
CompletableFuture<>();
+
private final SqlQueryType queryType;
private final ResultSetMetadata meta;
private final QueryTransactionWrapper txWrapper;
@@ -100,20 +106,28 @@ public class AsyncSqlCursorImpl<T> implements
AsyncSqlCursor<T> {
/** {@inheritDoc} */
@Override
public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {
- return dataCursor.requestNextAsync(rows).handle((batch, t) -> {
- if (t != null) {
- // Always rollback a transaction in case of an error.
- txWrapper.rollback();
-
- throw new CompletionException(wrapIfNecessary(t));
- }
-
- if (!batch.hasMore()) {
- closeAsync();
- }
-
- return batch;
- });
+ return dataCursor.requestNextAsync(rows)
+ .thenApply(batch -> {
+ CompletableFuture<Void> fut = batch.hasMore()
+ ? Commons.completedFuture()
+ : closeAsync();
+
+ return fut.thenApply(none -> batch);
+ })
+ .exceptionally(rootEx -> {
+ // Always rollback a transaction in case of an error.
+ return txWrapper.rollback()
+ .handle((none, rollbackEx) -> {
+ Throwable wrapped = wrapIfNecessary(rootEx);
+
+ if (rollbackEx != null) {
+ wrapped.addSuppressed(rollbackEx);
+ }
+
+ throw new CompletionException(wrapped);
+ });
+ })
+ .thenCompose(Function.identity());
}
/** {@inheritDoc} */
@@ -135,12 +149,22 @@ public class AsyncSqlCursorImpl<T> implements
AsyncSqlCursor<T> {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> closeAsync() {
- // Commit implicit transaction, if any.
- txWrapper.commitImplicit();
-
- onClose.run();
+ if (!closed.compareAndSet(false, true)) {
+ return closeResult;
+ }
- return dataCursor.closeAsync();
+ dataCursor.closeAsync()
+ .thenCompose(ignored -> txWrapper.commitImplicit())
+ .thenRun(onClose)
+ .whenComplete((r, e) -> {
+ if (e != null) {
+ closeResult.completeExceptionally(e);
+ } else {
+ closeResult.complete(null);
+ }
+ });
+
+ return closeResult;
}
private static Throwable wrapIfNecessary(Throwable t) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
index 2729f4efa5..4b8a5097e8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.sql.engine;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.tx.InternalTransaction;
/**
@@ -42,16 +44,18 @@ public class QueryTransactionWrapper {
/**
* Commits an implicit transaction, if one has been started.
*/
- void commitImplicit() {
+ CompletableFuture<Void> commitImplicit() {
if (implicit) {
- transaction.commit();
+ return transaction.commitAsync();
}
+
+ return Commons.completedFuture();
}
/**
* Rolls back a transaction.
*/
- void rollback() {
- transaction.rollback();
+ CompletableFuture<Void> rollback() {
+ return transaction.rollbackAsync();
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 7830570d2c..1f5cd31304 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -661,18 +661,17 @@ public class SqlQueryProcessor implements QueryProcessor {
if (ex != null) {
cursorFuture.completeExceptionally(ex);
cancelAll(ex);
-
- return;
- }
-
- txWrapper.commitImplicit();
-
- if (nextCursorFuture != null) {
- taskExecutor.execute(this::processNext);
}
-
- cursorFuture.complete(res);
- });
+ })
+ .thenCompose(res -> txWrapper.commitImplicit()
+ .thenRun(() -> {
+ if (nextCursorFuture != null) {
+
taskExecutor.execute(this::processNext);
+ }
+
+ cursorFuture.complete(res);
+ })
+ );
} catch (Exception e) {
cursorFuture.completeExceptionally(e);
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index 76e6fa4f9d..8f563b8f6c 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -87,9 +87,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
private int requested;
- private boolean inLoop;
-
- private State state = State.UPDATING;
+ private boolean inFlightUpdate;
/**
* Constructor.
@@ -124,9 +122,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
requested = rowsCnt;
- if (!inLoop) {
- tryEnd();
- }
+ requestNextBatchIfNeeded();
}
/** {@inheritDoc} */
@@ -134,7 +130,6 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
public void push(RowT row) throws Exception {
assert downstream() != null;
assert waiting > 0;
- assert state == State.UPDATING;
checkState();
@@ -142,11 +137,13 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
rows.add(row);
- flushTuples(false);
+ assert rows.size() <= MODIFY_BATCH_SIZE;
- if (waiting == 0) {
- source().request(waiting = MODIFY_BATCH_SIZE);
+ if (needToFlush()) {
+ flushTuples();
}
+
+ requestNextBatchIfNeeded();
}
/** {@inheritDoc} */
@@ -158,9 +155,15 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
checkState();
waiting = -1;
- state = State.UPDATED;
- tryEnd();
+ if (needToFlush()) {
+ flushTuples();
+ } else {
+ // special case: if there is nothing to flush, and no in-flight
batch,
+ // then the source most probably was empty, and we just need to
pass
+ // through this signal to downstream
+ tryEnd();
+ }
}
/** {@inheritDoc} */
@@ -179,50 +182,42 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
return this;
}
- private void tryEnd() throws Exception {
- assert downstream() != null;
-
- if (state == State.UPDATING && waiting == 0) {
+ private void requestNextBatchIfNeeded() throws Exception {
+ if (waiting == 0 && rows.isEmpty()) {
source().request(waiting = MODIFY_BATCH_SIZE);
}
+ }
- if (state == State.UPDATED && requested > 0) {
- flushTuples(true);
-
- state = State.END;
+ private void tryEnd() throws Exception {
+ assert downstream() != null;
- inLoop = true;
- try {
- requested--;
-
downstream().push(context().rowHandler().factory(MODIFY_RESULT).create(updatedRows));
- } finally {
- inLoop = false;
- }
- }
+ if (waiting == -1 && requested > 0 && !inFlightUpdate &&
rows.isEmpty()) {
+
downstream().push(context().rowHandler().factory(MODIFY_RESULT).create(updatedRows));
- if (state == State.END && requested > 0) {
requested = 0;
downstream().end();
}
}
- private void flushTuples(boolean force) {
- if (nullOrEmpty(rows) || (!force && rows.size() < MODIFY_BATCH_SIZE)) {
- return;
- }
+ private void flushTuples() {
+ assert !nullOrEmpty(rows);
+
+ inFlightUpdate = true;
List<RowT> rows = this.rows;
this.rows = new ArrayList<>(MODIFY_BATCH_SIZE);
+ CompletableFuture<?> modifyResult;
+
switch (modifyOp) {
case INSERT:
- table.insertAll(context(), rows).join();
+ modifyResult = table.insertAll(context(), rows);
break;
case UPDATE:
inlineUpdates(0, rows);
- table.upsertAll(context(), rows).join();
+ modifyResult = table.upsertAll(context(), rows);
break;
case MERGE:
@@ -241,20 +236,43 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
mergeParts.add(table.upsertAll(context(),
split.getSecond()));
}
- CompletableFuture.allOf(
+ modifyResult = CompletableFuture.allOf(
mergeParts.toArray(CompletableFuture[]::new)
- ).join();
+ );
break;
case DELETE:
- table.deleteAll(context(), rows).join();
+ modifyResult = table.deleteAll(context(), rows);
break;
default:
throw new UnsupportedOperationException(modifyOp.name());
}
- updatedRows += rows.size();
+ modifyResult.whenComplete((r, e) -> context().execute(() -> {
+ if (e != null) {
+ onError(e);
+
+ return;
+ }
+
+ inFlightUpdate = false;
+
+ updatedRows += rows.size();
+
+ if (needToFlush()) {
+ flushTuples();
+ }
+
+ requestNextBatchIfNeeded();
+
+ tryEnd();
+ }, this::onError));
+ }
+
+ private boolean needToFlush() {
+ return !inFlightUpdate
+ && (rows.size() >= MODIFY_BATCH_SIZE || (!rows.isEmpty() &&
waiting == -1));
}
/** See {@link #mapping(TableDescriptor, List)}. */
@@ -414,12 +432,4 @@ public class ModifyNode<RowT> extends AbstractNode<RowT>
implements SingleNode<R
return mapping;
}
-
- private enum State {
- UPDATING,
-
- UPDATED,
-
- END
- }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TestDownstream.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TestDownstream.java
new file mode 100644
index 0000000000..a0eaf8433f
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TestDownstream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.engine.exec.rel.Downstream;
+
+/**
+ * Test implementations of {@link Downstream} that collects all rows in a list
+ * and then completes a future with that list or with an exception in case
{@link #onError(Throwable)}
+ * was invoked on this downstream.
+ *
+ * @param <T> A type of the row.
+ */
+public class TestDownstream<T> implements Downstream<T> {
+ private final List<T> rows = new ArrayList<>();
+ private final CompletableFuture<List<T>> completion = new
CompletableFuture<>();
+
+ @Override
+ public void push(T row) {
+ rows.add(row);
+ }
+
+ @Override
+ public void end() throws Exception {
+ completion.complete(rows);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ completion.completeExceptionally(e);
+ }
+
+ public CompletableFuture<List<T>> result() {
+ return completion;
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
index 1adb68c924..037742f1d9 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
@@ -22,12 +22,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
-import java.util.ArrayList;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
@@ -45,6 +43,7 @@ import
org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableDataSource;
import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.TestDownstream;
import org.apache.ignite.internal.sql.engine.exec.row.BaseTypeSpec;
import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
import org.apache.ignite.internal.sql.engine.exec.row.TypeSpec;
@@ -241,13 +240,13 @@ public class DataSourceScanNodeSelfTest extends
AbstractExecutionTest<RowWrapper
context, factory, fromRowSchema(ROW_SCHEMA), dataSource,
predicate, projection, requiredFields
);
- DrainAllDownstream<RowWrapper> downstream = new DrainAllDownstream<>();
+ TestDownstream<RowWrapper> downstream = new TestDownstream<>();
node.onRegister(downstream);
context.execute(() -> node.request(Integer.MAX_VALUE), node::onError);
- return await(downstream.completion);
+ return await(downstream.result());
}
static class IterableDataSource implements ScannableDataSource {
@@ -296,26 +295,6 @@ public class DataSourceScanNodeSelfTest extends
AbstractExecutionTest<RowWrapper
}
}
- static class DrainAllDownstream<T> implements Downstream<T> {
- private final List<T> rows = new ArrayList<>();
- private final CompletableFuture<List<T>> completion = new
CompletableFuture<>();
-
- @Override
- public void push(T row) {
- rows.add(row);
- }
-
- @Override
- public void end() throws Exception {
- completion.complete(rows);
- }
-
- @Override
- public void onError(Throwable e) {
- completion.completeExceptionally(e);
- }
- }
-
@Override
protected RowHandler<RowWrapper> rowHandler() {
return SqlRowHandler.INSTANCE;
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
new file mode 100644
index 0000000000..14fc811758
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static
org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode.MODIFY_BATCH_SIZE;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.calcite.rel.core.TableModify.Operation;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.TestDownstream;
+import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.framework.DataProvider;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Test to verify {@link ModifyNode}.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(MockitoExtension.class)
+public class ModifyNodeExecutionTest extends AbstractExecutionTest<RowWrapper>
{
+ private static final RowSchema INT_LONG_SCHEMA = RowSchema.builder()
+ .addField(NativeTypes.INT32)
+ .addField(NativeTypes.INT64)
+ .build();
+
+ @SuppressWarnings("OverridableMethodCallDuringObjectConstruction")
+ private final RowHandler<RowWrapper> handler = rowHandler();
+ private final RowHandler.RowFactory<RowWrapper> rowFactory =
handler.factory(INT_LONG_SCHEMA);
+
+ @Mock
+ private UpdatableTable updatableTable;
+
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE,
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+ void nodeReportsExpectedNumberOfUpdatedRowsOnInsert(int sourceSize) {
+ ExecutionContext<RowWrapper> context = executionContext();
+
+ Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+ ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+ context, updatableTable, Operation.INSERT, null
+ );
+
+ TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+ modifyNode.register(List.of(sourceNode));
+ modifyNode.onRegister(downstream);
+
+ if (sourceSize > 0) {
+ when(updatableTable.insertAll(any(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+ List<RowWrapper> result = await(downstream.result());
+
+ assertThat(result, notNullValue());
+ assertThat(result.get(0), notNullValue());
+ assertThat(handler.get(0, result.get(0)), is((long) sourceSize));
+ verify(updatableTable,
times(numberOfBatches(sourceSize))).insertAll(any(), any());
+ verify(updatableTable).descriptor();
+ verifyNoMoreInteractions(updatableTable);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE,
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+ void nodeReportsExpectedNumberOfUpdatedRowsOnUpdate(int sourceSize) {
+ ExecutionContext<RowWrapper> context = executionContext();
+
+ Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+ ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+ context, updatableTable, Operation.UPDATE, null
+ );
+
+ TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+ modifyNode.register(List.of(sourceNode));
+ modifyNode.onRegister(downstream);
+
+ if (sourceSize > 0) {
+ when(updatableTable.upsertAll(any(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+ List<RowWrapper> result = await(downstream.result());
+
+ assertThat(result, notNullValue());
+ assertThat(result.get(0), notNullValue());
+ assertThat(handler.get(0, result.get(0)), is((long) sourceSize));
+ verify(updatableTable,
times(numberOfBatches(sourceSize))).upsertAll(any(), any());
+ verify(updatableTable).descriptor();
+ verifyNoMoreInteractions(updatableTable);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE,
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+ void nodeReportsExpectedNumberOfUpdatedRowsOnDelete(int sourceSize) {
+ ExecutionContext<RowWrapper> context = executionContext();
+
+ Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+ ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+ context, updatableTable, Operation.DELETE, null
+ );
+
+ TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+ modifyNode.register(List.of(sourceNode));
+ modifyNode.onRegister(downstream);
+
+ if (sourceSize > 0) {
+ when(updatableTable.deleteAll(any(), any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+ List<RowWrapper> result = await(downstream.result());
+
+ assertThat(result, notNullValue());
+ assertThat(result.get(0), notNullValue());
+ assertThat(handler.get(0, result.get(0)), is((long) sourceSize));
+ verify(updatableTable,
times(numberOfBatches(sourceSize))).deleteAll(any(), any());
+ verify(updatableTable).descriptor();
+ verifyNoMoreInteractions(updatableTable);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE,
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+ void exceptionIsPassedThroughToErrorHandlerOnInsert(int sourceSize) {
+ ExecutionContext<RowWrapper> context = executionContext();
+
+ Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+ ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+ context, updatableTable, Operation.INSERT, null
+ );
+
+ TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+ modifyNode.register(List.of(sourceNode));
+ modifyNode.onRegister(downstream);
+
+ RuntimeException expected = new RuntimeException("this is expected");
+ when(updatableTable.insertAll(any(), any()))
+ .thenReturn(CompletableFuture.failedFuture(expected));
+
+ context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+ assertThat(downstream.result(), willThrow(is(expected)));
+ verify(updatableTable).insertAll(any(), any());
+ verify(updatableTable).descriptor();
+ verifyNoMoreInteractions(updatableTable);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE,
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+ void exceptionIsPassedThroughToErrorHandlerOnUpdate(int sourceSize) {
+ ExecutionContext<RowWrapper> context = executionContext();
+
+ Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+ ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+ context, updatableTable, Operation.UPDATE, null
+ );
+
+ TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+ modifyNode.register(List.of(sourceNode));
+ modifyNode.onRegister(downstream);
+
+ RuntimeException expected = new RuntimeException("this is expected");
+ when(updatableTable.upsertAll(any(), any()))
+ .thenReturn(CompletableFuture.failedFuture(expected));
+
+ context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+ assertThat(downstream.result(), willThrow(is(expected)));
+ verify(updatableTable).upsertAll(any(), any());
+ verify(updatableTable).descriptor();
+ verifyNoMoreInteractions(updatableTable);
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE,
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+ void exceptionIsPassedThroughToErrorHandlerOnDelete(int sourceSize) {
+ ExecutionContext<RowWrapper> context = executionContext();
+
+ Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+ ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+ context, updatableTable, Operation.DELETE, null
+ );
+
+ TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+ modifyNode.register(List.of(sourceNode));
+ modifyNode.onRegister(downstream);
+
+ RuntimeException expected = new RuntimeException("this is expected");
+ when(updatableTable.deleteAll(any(), any()))
+ .thenReturn(CompletableFuture.failedFuture(expected));
+
+ context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+ assertThat(downstream.result(), willThrow(is(expected)));
+ verify(updatableTable).deleteAll(any(), any());
+ verify(updatableTable).descriptor();
+ verifyNoMoreInteractions(updatableTable);
+ }
+
+ private static int numberOfBatches(int rowCount) {
+ return rowCount / MODIFY_BATCH_SIZE + (rowCount % MODIFY_BATCH_SIZE ==
0 ? 0 : 1);
+ }
+
+ private Node<RowWrapper> createSource(int rowCount,
ExecutionContext<RowWrapper> context) {
+ return new ScanNode<>(
+ context, DataProvider.fromRow(rowFactory.create(1, 1L),
rowCount)
+ );
+ }
+
+ @Override
+ protected RowHandler<RowWrapper> rowHandler() {
+ return SqlRowHandler.INSTANCE;
+ }
+}