This is an automated email from the ASF dual-hosted git repository.

korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ca95035383 IGNITE-20887 Sql. Avoid using blocking api in sql threads 
(#2854)
ca95035383 is described below

commit ca95035383771e2de85310be3dff2efc4e80c7e7
Author: korlov42 <[email protected]>
AuthorDate: Thu Nov 23 09:56:39 2023 +0200

    IGNITE-20887 Sql. Avoid using blocking api in sql threads (#2854)
---
 .../requests/jdbc/JdbcQueryCursorSelfTest.java     |  13 +-
 .../benchmark/AbstractMultiNodeBenchmark.java      |  67 ++++--
 .../ignite/internal/benchmark/InsertBenchmark.java |  18 +-
 .../internal/sql/engine/AsyncSqlCursorImpl.java    |  62 +++--
 .../sql/engine/QueryTransactionWrapper.java        |  12 +-
 .../internal/sql/engine/SqlQueryProcessor.java     |  21 +-
 .../internal/sql/engine/exec/rel/ModifyNode.java   | 104 ++++----
 .../internal/sql/engine/exec/TestDownstream.java   |  54 +++++
 .../exec/rel/DataSourceScanNodeSelfTest.java       |  27 +--
 .../engine/exec/rel/ModifyNodeExecutionTest.java   | 266 +++++++++++++++++++++
 10 files changed, 507 insertions(+), 137 deletions(-)

diff --git 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
index 3be4999c98..62a748b07d 100644
--- 
a/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
+++ 
b/modules/client-handler/src/test/java/org/apache/ignite/client/handler/requests/jdbc/JdbcQueryCursorSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.client.handler.requests.jdbc;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -26,28 +27,30 @@ import 
org.apache.ignite.internal.sql.engine.AsyncSqlCursorImpl;
 import org.apache.ignite.internal.sql.engine.QueryTransactionWrapper;
 import org.apache.ignite.internal.sql.engine.SqlQueryType;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
 import org.apache.ignite.internal.util.AsyncWrapper;
-import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
 
 /**
  * Test class for {@link JdbcQueryCursor}.
  */
-@ExtendWith(MockitoExtension.class)
 public class JdbcQueryCursorSelfTest extends BaseIgniteAbstractTest {
-    @Mock
     private QueryTransactionWrapper txWrapper;
 
     private static final List<Integer> ROWS = List.of(1, 2, 3);
 
     private static final int TOTAL_ROWS_COUNT = ROWS.size();
 
+    @BeforeEach
+    void initTxMock() {
+        txWrapper = new 
QueryTransactionWrapper(mock(InternalTransaction.class), false);
+    }
+
     /** Tests corner cases of setting the {@code maxRows} parameter. */
     @ParameterizedTest(name = "maxRows={0}, fetchSize={1}")
     @MethodSource("maxRowsTestParameters")
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
index dd3560f77a..f13101bfdc 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java
@@ -34,6 +34,7 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgnitionManager;
 import org.apache.ignite.InitParameters;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.catalog.commands.CatalogUtils;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.sql.engine.property.SqlPropertiesHelper;
 import org.apache.ignite.internal.testframework.TestIgnitionManager;
@@ -59,39 +60,57 @@ public class AbstractMultiNodeBenchmark {
 
     protected static final String FIELD_VAL = "a".repeat(100);
 
-    protected static final String TABLE_NAME = "usertable";
+    protected static final String TABLE_NAME = "USERTABLE";
+
+    protected static final String ZONE_NAME = TABLE_NAME + "_ZONE";
 
     protected static IgniteImpl clusterNode;
 
-    @Param({"true", "false"})
+    @Param({"false", "true"})
     private boolean fsync;
 
     /**
      * Starts ignite node and creates table {@link #TABLE_NAME}.
      */
     @Setup
-    public final void nodeSetUp() throws IOException {
+    public final void nodeSetUp() throws Exception {
         startCluster();
 
-        var queryEngine = clusterNode.queryEngine();
-
-        var sql = "CREATE TABLE " + TABLE_NAME + "(\n"
-                + "    ycsb_key int PRIMARY KEY,\n"
-                + "    field1   varchar(100),\n"
-                + "    field2   varchar(100),\n"
-                + "    field3   varchar(100),\n"
-                + "    field4   varchar(100),\n"
-                + "    field5   varchar(100),\n"
-                + "    field6   varchar(100),\n"
-                + "    field7   varchar(100),\n"
-                + "    field8   varchar(100),\n"
-                + "    field9   varchar(100),\n"
-                + "    field10  varchar(100)\n"
-                + ");";
-
-        getAllFromCursor(
-                
await(queryEngine.querySingleAsync(SqlPropertiesHelper.emptyProperties(), 
clusterNode.transactions(), null, sql))
-        );
+        try {
+            var queryEngine = clusterNode.queryEngine();
+
+            var createZoneStatement = "CREATE ZONE " + ZONE_NAME + " WITH 
partitions=" + partitionCount();
+
+            getAllFromCursor(
+                    await(queryEngine.querySingleAsync(
+                            SqlPropertiesHelper.emptyProperties(), 
clusterNode.transactions(), null, createZoneStatement
+                    ))
+            );
+
+            var createTableStatement = "CREATE TABLE " + TABLE_NAME + "(\n"
+                    + "    ycsb_key int PRIMARY KEY,\n"
+                    + "    field1   varchar(100),\n"
+                    + "    field2   varchar(100),\n"
+                    + "    field3   varchar(100),\n"
+                    + "    field4   varchar(100),\n"
+                    + "    field5   varchar(100),\n"
+                    + "    field6   varchar(100),\n"
+                    + "    field7   varchar(100),\n"
+                    + "    field8   varchar(100),\n"
+                    + "    field9   varchar(100),\n"
+                    + "    field10  varchar(100)\n"
+                    + ") WITH primary_zone='" + ZONE_NAME + "'";
+
+            getAllFromCursor(
+                    await(queryEngine.querySingleAsync(
+                            SqlPropertiesHelper.emptyProperties(), 
clusterNode.transactions(), null, createTableStatement
+                    ))
+            );
+        } catch (Throwable th) {
+            nodeTearDown();
+
+            throw th;
+        }
     }
 
     /**
@@ -167,4 +186,8 @@ public class AbstractMultiNodeBenchmark {
     protected int nodes() {
         return 3;
     }
+
+    protected int partitionCount() {
+        return CatalogUtils.DEFAULT_PARTITION_COUNT;
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
index d47d137948..b77f4931cf 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/InsertBenchmark.java
@@ -29,6 +29,7 @@ import java.util.stream.IntStream;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.sql.IgniteSql;
+import org.apache.ignite.sql.ResultSet;
 import org.apache.ignite.sql.Session;
 import org.apache.ignite.sql.Statement;
 import org.apache.ignite.table.KeyValueView;
@@ -55,7 +56,7 @@ import org.openjdk.jmh.runner.options.OptionsBuilder;
  * Benchmark for insertion operation, comparing KV, JDBC and SQL APIs.
  */
 @State(Scope.Benchmark)
-@Fork(1)
+@Fork(3)
 @Threads(1)
 @Warmup(iterations = 10, time = 2)
 @Measurement(iterations = 20, time = 2)
@@ -65,6 +66,9 @@ public class InsertBenchmark extends 
AbstractMultiNodeBenchmark {
     @Param({"1", "2", "3"})
     private int clusterSize;
 
+    @Param({"1", "2", "4", "8", "16", "32"})
+    private int partitionCount;
+
     /**
      * Benchmark for SQL insert via embedded client.
      */
@@ -111,9 +115,6 @@ public class InsertBenchmark extends 
AbstractMultiNodeBenchmark {
     public static void main(String[] args) throws RunnerException {
         Options opt = new OptionsBuilder()
                 .include(".*" + InsertBenchmark.class.getSimpleName() + ".*")
-                .forks(1)
-                .threads(1)
-                .mode(Mode.AverageTime)
                 .build();
 
         new Runner(opt).run();
@@ -154,7 +155,9 @@ public class InsertBenchmark extends 
AbstractMultiNodeBenchmark {
         private int id = 0;
 
         void executeQuery() {
-            session.execute(null, statement, id++);
+            try (ResultSet<?> rs = session.execute(null, statement, id++)) {
+                // NO-OP
+            }
         }
     }
 
@@ -318,4 +321,9 @@ public class InsertBenchmark extends 
AbstractMultiNodeBenchmark {
     protected int nodes() {
         return clusterSize;
     }
+
+    @Override
+    protected int partitionCount() {
+        return partitionCount;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
index 421f780cc0..689ecbbd31 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/AsyncSqlCursorImpl.java
@@ -20,7 +20,10 @@ package org.apache.ignite.internal.sql.engine;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
 import org.apache.ignite.internal.lang.SqlExceptionMapperUtil;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.util.AsyncCursor;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.sql.ResultSetMetadata;
@@ -32,6 +35,9 @@ import org.jetbrains.annotations.Nullable;
  * @param <T> Type of elements.
  */
 public class AsyncSqlCursorImpl<T> implements AsyncSqlCursor<T> {
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final CompletableFuture<Void> closeResult = new 
CompletableFuture<>();
+
     private final SqlQueryType queryType;
     private final ResultSetMetadata meta;
     private final QueryTransactionWrapper txWrapper;
@@ -100,20 +106,28 @@ public class AsyncSqlCursorImpl<T> implements 
AsyncSqlCursor<T> {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {
-        return dataCursor.requestNextAsync(rows).handle((batch, t) -> {
-            if (t != null) {
-                // Always rollback a transaction in case of an error.
-                txWrapper.rollback();
-
-                throw new CompletionException(wrapIfNecessary(t));
-            }
-
-            if (!batch.hasMore()) {
-                closeAsync();
-            }
-
-            return batch;
-        });
+        return dataCursor.requestNextAsync(rows)
+                .thenApply(batch -> {
+                    CompletableFuture<Void> fut = batch.hasMore()
+                            ? Commons.completedFuture()
+                            : closeAsync();
+
+                    return fut.thenApply(none -> batch);
+                })
+                .exceptionally(rootEx -> {
+                    // Always rollback a transaction in case of an error.
+                    return txWrapper.rollback()
+                            .handle((none, rollbackEx) -> {
+                                Throwable wrapped = wrapIfNecessary(rootEx);
+
+                                if (rollbackEx != null) {
+                                    wrapped.addSuppressed(rollbackEx);
+                                }
+
+                                throw new CompletionException(wrapped);
+                            });
+                })
+                .thenCompose(Function.identity());
     }
 
     /** {@inheritDoc} */
@@ -135,12 +149,22 @@ public class AsyncSqlCursorImpl<T> implements 
AsyncSqlCursor<T> {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> closeAsync() {
-        // Commit implicit transaction, if any.
-        txWrapper.commitImplicit();
-
-        onClose.run();
+        if (!closed.compareAndSet(false, true)) {
+            return closeResult;
+        }
 
-        return dataCursor.closeAsync();
+        dataCursor.closeAsync()
+                .thenCompose(ignored -> txWrapper.commitImplicit())
+                .thenRun(onClose)
+                .whenComplete((r, e) -> {
+                    if (e != null) {
+                        closeResult.completeExceptionally(e);
+                    } else {
+                        closeResult.complete(null);
+                    }
+                });
+
+        return closeResult;
     }
 
     private static Throwable wrapIfNecessary(Throwable t) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
index 2729f4efa5..4b8a5097e8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapper.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.sql.engine;
 
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.tx.InternalTransaction;
 
 /**
@@ -42,16 +44,18 @@ public class QueryTransactionWrapper {
     /**
      * Commits an implicit transaction, if one has been started.
      */
-    void commitImplicit() {
+    CompletableFuture<Void> commitImplicit() {
         if (implicit) {
-            transaction.commit();
+            return transaction.commitAsync();
         }
+
+        return Commons.completedFuture();
     }
 
     /**
      * Rolls back a transaction.
      */
-    void rollback() {
-        transaction.rollback();
+    CompletableFuture<Void> rollback() {
+        return transaction.rollbackAsync();
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 7830570d2c..1f5cd31304 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -661,18 +661,17 @@ public class SqlQueryProcessor implements QueryProcessor {
                             if (ex != null) {
                                 cursorFuture.completeExceptionally(ex);
                                 cancelAll(ex);
-
-                                return;
-                            }
-
-                            txWrapper.commitImplicit();
-
-                            if (nextCursorFuture != null) {
-                                taskExecutor.execute(this::processNext);
                             }
-
-                            cursorFuture.complete(res);
-                        });
+                        })
+                        .thenCompose(res -> txWrapper.commitImplicit()
+                                .thenRun(() -> {
+                                    if (nextCursorFuture != null) {
+                                        
taskExecutor.execute(this::processNext);
+                                    }
+
+                                    cursorFuture.complete(res);
+                                })
+                        );
             } catch (Exception e) {
                 cursorFuture.completeExceptionally(e);
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
index 76e6fa4f9d..8f563b8f6c 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNode.java
@@ -87,9 +87,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
 
     private int requested;
 
-    private boolean inLoop;
-
-    private State state = State.UPDATING;
+    private boolean inFlightUpdate;
 
     /**
      * Constructor.
@@ -124,9 +122,7 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
 
         requested = rowsCnt;
 
-        if (!inLoop) {
-            tryEnd();
-        }
+        requestNextBatchIfNeeded();
     }
 
     /** {@inheritDoc} */
@@ -134,7 +130,6 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
     public void push(RowT row) throws Exception {
         assert downstream() != null;
         assert waiting > 0;
-        assert state == State.UPDATING;
 
         checkState();
 
@@ -142,11 +137,13 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
 
         rows.add(row);
 
-        flushTuples(false);
+        assert rows.size() <= MODIFY_BATCH_SIZE;
 
-        if (waiting == 0) {
-            source().request(waiting = MODIFY_BATCH_SIZE);
+        if (needToFlush()) {
+            flushTuples();
         }
+
+        requestNextBatchIfNeeded();
     }
 
     /** {@inheritDoc} */
@@ -158,9 +155,15 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
         checkState();
 
         waiting = -1;
-        state = State.UPDATED;
 
-        tryEnd();
+        if (needToFlush()) {
+            flushTuples();
+        } else {
+            // special case: if there is nothing to flush, and no in-flight 
batch,
+            // then the source most probably was empty, and we just need to 
pass
+            // through this signal to downstream
+            tryEnd();
+        }
     }
 
     /** {@inheritDoc} */
@@ -179,50 +182,42 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
         return this;
     }
 
-    private void tryEnd() throws Exception {
-        assert downstream() != null;
-
-        if (state == State.UPDATING && waiting == 0) {
+    private void requestNextBatchIfNeeded() throws Exception {
+        if (waiting == 0 && rows.isEmpty()) {
             source().request(waiting = MODIFY_BATCH_SIZE);
         }
+    }
 
-        if (state == State.UPDATED && requested > 0) {
-            flushTuples(true);
-
-            state = State.END;
+    private void tryEnd() throws Exception {
+        assert downstream() != null;
 
-            inLoop = true;
-            try {
-                requested--;
-                
downstream().push(context().rowHandler().factory(MODIFY_RESULT).create(updatedRows));
-            } finally {
-                inLoop = false;
-            }
-        }
+        if (waiting == -1 && requested > 0 && !inFlightUpdate && 
rows.isEmpty()) {
+            
downstream().push(context().rowHandler().factory(MODIFY_RESULT).create(updatedRows));
 
-        if (state == State.END && requested > 0) {
             requested = 0;
             downstream().end();
         }
     }
 
-    private void flushTuples(boolean force) {
-        if (nullOrEmpty(rows) || (!force && rows.size() < MODIFY_BATCH_SIZE)) {
-            return;
-        }
+    private void flushTuples() {
+        assert !nullOrEmpty(rows);
+
+        inFlightUpdate = true;
 
         List<RowT> rows = this.rows;
         this.rows = new ArrayList<>(MODIFY_BATCH_SIZE);
 
+        CompletableFuture<?> modifyResult;
+
         switch (modifyOp) {
             case INSERT:
-                table.insertAll(context(), rows).join();
+                modifyResult = table.insertAll(context(), rows);
 
                 break;
             case UPDATE:
                 inlineUpdates(0, rows);
 
-                table.upsertAll(context(), rows).join();
+                modifyResult = table.upsertAll(context(), rows);
 
                 break;
             case MERGE:
@@ -241,20 +236,43 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
                     mergeParts.add(table.upsertAll(context(), 
split.getSecond()));
                 }
 
-                CompletableFuture.allOf(
+                modifyResult = CompletableFuture.allOf(
                         mergeParts.toArray(CompletableFuture[]::new)
-                ).join();
+                );
 
                 break;
             case DELETE:
-                table.deleteAll(context(), rows).join();
+                modifyResult = table.deleteAll(context(), rows);
 
                 break;
             default:
                 throw new UnsupportedOperationException(modifyOp.name());
         }
 
-        updatedRows += rows.size();
+        modifyResult.whenComplete((r, e) -> context().execute(() -> {
+            if (e != null) {
+                onError(e);
+
+                return;
+            }
+
+            inFlightUpdate = false;
+
+            updatedRows += rows.size();
+
+            if (needToFlush()) {
+                flushTuples();
+            }
+
+            requestNextBatchIfNeeded();
+
+            tryEnd();
+        }, this::onError));
+    }
+
+    private boolean needToFlush() {
+        return !inFlightUpdate
+                && (rows.size() >= MODIFY_BATCH_SIZE || (!rows.isEmpty() && 
waiting == -1));
     }
 
     /** See {@link #mapping(TableDescriptor, List)}. */
@@ -414,12 +432,4 @@ public class ModifyNode<RowT> extends AbstractNode<RowT> 
implements SingleNode<R
 
         return mapping;
     }
-
-    private enum State {
-        UPDATING,
-
-        UPDATED,
-
-        END
-    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TestDownstream.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TestDownstream.java
new file mode 100644
index 0000000000..a0eaf8433f
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/TestDownstream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.sql.engine.exec.rel.Downstream;
+
+/**
+ * Test implementations of {@link Downstream} that collects all rows in a list
+ * and then completes a future with that list or with an exception in case 
{@link #onError(Throwable)}
+ * was invoked on this downstream.
+ *
+ * @param <T> A type of the row.
+ */
+public class TestDownstream<T> implements Downstream<T> {
+    private final List<T> rows = new ArrayList<>();
+    private final CompletableFuture<List<T>> completion = new 
CompletableFuture<>();
+
+    @Override
+    public void push(T row) {
+        rows.add(row);
+    }
+
+    @Override
+    public void end() throws Exception {
+        completion.complete(rows);
+    }
+
+    @Override
+    public void onError(Throwable e) {
+        completion.completeExceptionally(e);
+    }
+
+    public CompletableFuture<List<T>> result() {
+        return completion;
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
index 1adb68c924..037742f1d9 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/DataSourceScanNodeSelfTest.java
@@ -22,12 +22,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.notNullValue;
 
-import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
@@ -45,6 +43,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.ScannableDataSource;
 import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
 import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.TestDownstream;
 import org.apache.ignite.internal.sql.engine.exec.row.BaseTypeSpec;
 import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
 import org.apache.ignite.internal.sql.engine.exec.row.TypeSpec;
@@ -241,13 +240,13 @@ public class DataSourceScanNodeSelfTest extends 
AbstractExecutionTest<RowWrapper
                 context, factory, fromRowSchema(ROW_SCHEMA), dataSource, 
predicate, projection, requiredFields
         );
 
-        DrainAllDownstream<RowWrapper> downstream = new DrainAllDownstream<>();
+        TestDownstream<RowWrapper> downstream = new TestDownstream<>();
 
         node.onRegister(downstream);
 
         context.execute(() -> node.request(Integer.MAX_VALUE), node::onError);
 
-        return await(downstream.completion);
+        return await(downstream.result());
     }
 
     static class IterableDataSource implements ScannableDataSource {
@@ -296,26 +295,6 @@ public class DataSourceScanNodeSelfTest extends 
AbstractExecutionTest<RowWrapper
         }
     }
 
-    static class DrainAllDownstream<T> implements Downstream<T> {
-        private final List<T> rows = new ArrayList<>();
-        private final CompletableFuture<List<T>> completion = new 
CompletableFuture<>();
-
-        @Override
-        public void push(T row) {
-            rows.add(row);
-        }
-
-        @Override
-        public void end() throws Exception {
-            completion.complete(rows);
-        }
-
-        @Override
-        public void onError(Throwable e) {
-            completion.completeExceptionally(e);
-        }
-    }
-
     @Override
     protected RowHandler<RowWrapper> rowHandler() {
         return SqlRowHandler.INSTANCE;
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
new file mode 100644
index 0000000000..14fc811758
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/ModifyNodeExecutionTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.rel;
+
+import static 
org.apache.ignite.internal.sql.engine.exec.rel.AbstractNode.MODIFY_BATCH_SIZE;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.calcite.rel.core.TableModify.Operation;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler;
+import org.apache.ignite.internal.sql.engine.exec.SqlRowHandler.RowWrapper;
+import org.apache.ignite.internal.sql.engine.exec.TestDownstream;
+import org.apache.ignite.internal.sql.engine.exec.UpdatableTable;
+import org.apache.ignite.internal.sql.engine.exec.row.RowSchema;
+import org.apache.ignite.internal.sql.engine.framework.DataProvider;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Test to verify {@link ModifyNode}.
+ */
+@SuppressWarnings("resource")
+@ExtendWith(MockitoExtension.class)
+public class ModifyNodeExecutionTest extends AbstractExecutionTest<RowWrapper> 
{
+    private static final RowSchema INT_LONG_SCHEMA = RowSchema.builder()
+            .addField(NativeTypes.INT32)
+            .addField(NativeTypes.INT64)
+            .build();
+
+    @SuppressWarnings("OverridableMethodCallDuringObjectConstruction")
+    private final RowHandler<RowWrapper> handler = rowHandler();
+    private final RowHandler.RowFactory<RowWrapper> rowFactory = 
handler.factory(INT_LONG_SCHEMA);
+
+    @Mock
+    private UpdatableTable updatableTable;
+
+
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE, 
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+    void nodeReportsExpectedNumberOfUpdatedRowsOnInsert(int sourceSize) {
+        ExecutionContext<RowWrapper> context = executionContext();
+
+        Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+        ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+                context, updatableTable, Operation.INSERT, null
+        );
+
+        TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+        modifyNode.register(List.of(sourceNode));
+        modifyNode.onRegister(downstream);
+
+        if (sourceSize > 0) {
+            when(updatableTable.insertAll(any(), any()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+        }
+
+        context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+        List<RowWrapper> result = await(downstream.result());
+
+        assertThat(result, notNullValue());
+        assertThat(result.get(0), notNullValue());
+        assertThat(handler.get(0, result.get(0)), is((long) sourceSize));
+        verify(updatableTable, 
times(numberOfBatches(sourceSize))).insertAll(any(), any());
+        verify(updatableTable).descriptor();
+        verifyNoMoreInteractions(updatableTable);
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE, 
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+    void nodeReportsExpectedNumberOfUpdatedRowsOnUpdate(int sourceSize) {
+        ExecutionContext<RowWrapper> context = executionContext();
+
+        Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+        ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+                context, updatableTable, Operation.UPDATE, null
+        );
+
+        TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+        modifyNode.register(List.of(sourceNode));
+        modifyNode.onRegister(downstream);
+
+        if (sourceSize > 0) {
+            when(updatableTable.upsertAll(any(), any()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+        }
+
+        context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+        List<RowWrapper> result = await(downstream.result());
+
+        assertThat(result, notNullValue());
+        assertThat(result.get(0), notNullValue());
+        assertThat(handler.get(0, result.get(0)), is((long) sourceSize));
+        verify(updatableTable, 
times(numberOfBatches(sourceSize))).upsertAll(any(), any());
+        verify(updatableTable).descriptor();
+        verifyNoMoreInteractions(updatableTable);
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {0, 1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE, 
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+    void nodeReportsExpectedNumberOfUpdatedRowsOnDelete(int sourceSize) {
+        ExecutionContext<RowWrapper> context = executionContext();
+
+        Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+        ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+                context, updatableTable, Operation.DELETE, null
+        );
+
+        TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+        modifyNode.register(List.of(sourceNode));
+        modifyNode.onRegister(downstream);
+
+        if (sourceSize > 0) {
+            when(updatableTable.deleteAll(any(), any()))
+                    .thenReturn(CompletableFuture.completedFuture(null));
+        }
+
+        context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+        List<RowWrapper> result = await(downstream.result());
+
+        assertThat(result, notNullValue());
+        assertThat(result.get(0), notNullValue());
+        assertThat(handler.get(0, result.get(0)), is((long) sourceSize));
+        verify(updatableTable, 
times(numberOfBatches(sourceSize))).deleteAll(any(), any());
+        verify(updatableTable).descriptor();
+        verifyNoMoreInteractions(updatableTable);
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE, 
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+    void exceptionIsPassedThroughToErrorHandlerOnInsert(int sourceSize) {
+        ExecutionContext<RowWrapper> context = executionContext();
+
+        Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+        ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+                context, updatableTable, Operation.INSERT, null
+        );
+
+        TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+        modifyNode.register(List.of(sourceNode));
+        modifyNode.onRegister(downstream);
+
+        RuntimeException expected = new RuntimeException("this is expected");
+        when(updatableTable.insertAll(any(), any()))
+                .thenReturn(CompletableFuture.failedFuture(expected));
+
+        context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+        assertThat(downstream.result(), willThrow(is(expected)));
+        verify(updatableTable).insertAll(any(), any());
+        verify(updatableTable).descriptor();
+        verifyNoMoreInteractions(updatableTable);
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE, 
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+    void exceptionIsPassedThroughToErrorHandlerOnUpdate(int sourceSize) {
+        ExecutionContext<RowWrapper> context = executionContext();
+
+        Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+        ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+                context, updatableTable, Operation.UPDATE, null
+        );
+
+        TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+        modifyNode.register(List.of(sourceNode));
+        modifyNode.onRegister(downstream);
+
+        RuntimeException expected = new RuntimeException("this is expected");
+        when(updatableTable.upsertAll(any(), any()))
+                .thenReturn(CompletableFuture.failedFuture(expected));
+
+        context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+        assertThat(downstream.result(), willThrow(is(expected)));
+        verify(updatableTable).upsertAll(any(), any());
+        verify(updatableTable).descriptor();
+        verifyNoMoreInteractions(updatableTable);
+    }
+
+    @ParameterizedTest
+    @ValueSource(ints = {1, MODIFY_BATCH_SIZE - 1, MODIFY_BATCH_SIZE, 
MODIFY_BATCH_SIZE + 1, 2 * MODIFY_BATCH_SIZE})
+    void exceptionIsPassedThroughToErrorHandlerOnDelete(int sourceSize) {
+        ExecutionContext<RowWrapper> context = executionContext();
+
+        Node<RowWrapper> sourceNode = createSource(sourceSize, context);
+
+        ModifyNode<RowWrapper> modifyNode = new ModifyNode<>(
+                context, updatableTable, Operation.DELETE, null
+        );
+
+        TestDownstream<RowWrapper> downstream = new TestDownstream<>();
+
+        modifyNode.register(List.of(sourceNode));
+        modifyNode.onRegister(downstream);
+
+        RuntimeException expected = new RuntimeException("this is expected");
+        when(updatableTable.deleteAll(any(), any()))
+                .thenReturn(CompletableFuture.failedFuture(expected));
+
+        context.execute(() -> modifyNode.request(1), modifyNode::onError);
+
+        assertThat(downstream.result(), willThrow(is(expected)));
+        verify(updatableTable).deleteAll(any(), any());
+        verify(updatableTable).descriptor();
+        verifyNoMoreInteractions(updatableTable);
+    }
+
+    private static int numberOfBatches(int rowCount) {
+        return rowCount / MODIFY_BATCH_SIZE + (rowCount % MODIFY_BATCH_SIZE == 
0 ? 0 : 1);
+    }
+
+    private Node<RowWrapper> createSource(int rowCount, 
ExecutionContext<RowWrapper> context) {
+        return new ScanNode<>(
+                context, DataProvider.fromRow(rowFactory.create(1, 1L), 
rowCount)
+        );
+    }
+
+    @Override
+    protected RowHandler<RowWrapper> rowHandler() {
+        return SqlRowHandler.INSTANCE;
+    }
+}


Reply via email to