This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ead4b1e798 IGNITE-21714 Prevent threads from being hijacked in 
KV/Record view APIs (#3378)
ead4b1e798 is described below

commit ead4b1e798fe7106287702f7517b1d4453d4315d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Mar 8 17:23:31 2024 +0400

    IGNITE-21714 Prevent threads from being hijacked in KV/Record view APIs 
(#3378)
---
 buildscripts/java-integration-test.gradle          |   2 +
 buildscripts/java-junit5.gradle                    |   1 +
 .../handler/ClientInboundMessageHandler.java       |  13 +-
 .../internal/lang/IgniteExceptionMapperUtil.java   |   6 +
 .../ignite/internal/thread/PublicApiThreading.java |  67 ++++
 .../ignite/internal/index/IndexManagerTest.java    |   3 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |   4 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   7 +-
 .../internal/test/WatchListenerInhibitor.java      |  30 ++
 modules/table/build.gradle                         |   1 -
 .../rebalance/ItRebalanceDistributedTest.java      |   4 +-
 .../threading/ItKvRecordApiThreadingTest.java      | 382 +++++++++++++++++++++
 .../ignite/internal/table/AbstractTableView.java   |  94 ++++-
 .../internal/table/KeyValueBinaryViewImpl.java     |  81 +++--
 .../ignite/internal/table/KeyValueViewImpl.java    |  94 ++---
 .../internal/table/RecordBinaryViewImpl.java       |  86 +++--
 .../ignite/internal/table/RecordViewImpl.java      |  84 +++--
 .../apache/ignite/internal/table/TableImpl.java    |  20 +-
 .../internal/table/distributed/TableManager.java   |  18 +-
 .../table/KeyValueBinaryViewOperationsTest.java    |   5 +-
 .../internal/table/KeyValueViewOperationsTest.java |   4 +-
 .../table/RecordBinaryViewOperationsTest.java      |   8 +-
 .../internal/table/RecordViewOperationsTest.java   |   6 +-
 .../table/distributed/TableManagerTest.java        |   4 +-
 24 files changed, 832 insertions(+), 192 deletions(-)

diff --git a/buildscripts/java-integration-test.gradle 
b/buildscripts/java-integration-test.gradle
index a5793d66fe..7e3167c7a8 100644
--- a/buildscripts/java-integration-test.gradle
+++ b/buildscripts/java-integration-test.gradle
@@ -30,6 +30,8 @@ testing {
                 implementation libs.junit5.api
                 implementation libs.junit5.impl
                 implementation libs.junit5.params
+                implementation libs.junit.pioneer
+
                 implementation libs.mockito.core
                 implementation libs.mockito.junit
                 implementation libs.hamcrest.core
diff --git a/buildscripts/java-junit5.gradle b/buildscripts/java-junit5.gradle
index 5ca6ba1e29..2084cb536b 100644
--- a/buildscripts/java-junit5.gradle
+++ b/buildscripts/java-junit5.gradle
@@ -34,6 +34,7 @@ dependencies {
     testImplementation libs.junit5.impl
     testImplementation libs.junit5.api
     testImplementation libs.junit5.params
+    testImplementation libs.junit.pioneer
 
     testImplementation libs.jansi.core
     testImplementation libs.log4j.api
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 8979942cf5..a04e1f0c41 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -128,6 +128,7 @@ import 
org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
+import org.apache.ignite.internal.thread.PublicApiThreading;
 import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteException;
@@ -566,7 +567,17 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
             // Observable timestamp should be calculated after the operation 
is processed; reserve space, write later.
             int observableTimestampIdx = out.reserveLong();
 
-            CompletableFuture fut = processOperation(in, out, opCode, 
requestId);
+            CompletableFuture fut;
+
+            // Enclosing in 'internal call' to save resubmission to the async 
continuation thread pool on return. This will only
+            // work if the corresponding call (like an async KeyValueView 
method) is invoked in this same thread, but in most cases this
+            // will be true.
+            PublicApiThreading.startInternalCall();
+            try {
+                fut = processOperation(in, out, opCode, requestId);
+            } finally {
+                PublicApiThreading.endInternalCall();
+            }
 
             if (fut == null) {
                 // Operation completed synchronously.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
index fbae12e574..b77c94023b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/lang/IgniteExceptionMapperUtil.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.lang;
 
 import static java.util.Collections.unmodifiableMap;
+import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
@@ -126,6 +127,11 @@ public class IgniteExceptionMapperUtil {
      * @return New CompletableFuture.
      */
     public static <T> CompletableFuture<T> 
convertToPublicFuture(CompletableFuture<T> origin) {
+        if (isCompletedSuccessfully(origin)) {
+            // No need to translate exceptions.
+            return origin;
+        }
+
         return origin
                 .handle((res, err) -> {
                     if (err != null) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
new file mode 100644
index 0000000000..fa82b26717
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import java.util.function.Supplier;
+
+/**
+ * Logic related to the threading concern of public APIs: it allows to 
raise/clear/check a thread-local flag which gives us ability to
+ * see whether a public API call actually comes from an Ignite internal code 
and not from an actual user.
+ */
+public class PublicApiThreading {
+    private static final ThreadLocal<Boolean> INTERNAL_CALL = new 
ThreadLocal<>();
+
+    /**
+     * Raises the 'internal call' state; the state is stored in the current 
thread.
+     */
+    public static void startInternalCall() {
+        INTERNAL_CALL.set(true);
+    }
+
+    /**
+     * Clears the 'internal call' state; the state is stored in the current 
thread.
+     */
+    public static void endInternalCall() {
+        INTERNAL_CALL.remove();
+    }
+
+    /**
+     * Executes the call while the 'internal call' state is raised; so if the 
call invokes {@link #inInternalCall()},
+     * if will get {@code true}.
+     *
+     * @param call Call to execute as internal.
+     * @return Call result.
+     */
+    public static <T> T doInternalCall(Supplier<T> call) {
+        startInternalCall();
+
+        try {
+            return call.get();
+        } finally {
+            endInternalCall();
+        }
+    }
+
+    /**
+     * Returns {@code} true if the 'internal call' status is currently raised 
in the current thread.
+     */
+    public static boolean inInternalCall() {
+        Boolean value = INTERNAL_CALL.get();
+        return value != null && value;
+    }
+}
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index b6ee9dee73..7c1f7b1efa 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -199,7 +199,8 @@ public class IndexManagerTest extends 
BaseIgniteAbstractTest {
                 new ConstantSchemaVersions(1),
                 marshallers,
                 mock(IgniteSql.class),
-                table.primaryKeyIndexId()
+                table.primaryKeyIndexId(),
+                ForkJoinPool.commonPool()
         ));
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 1f9d7a988e..e468284ba0 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -58,6 +58,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -571,7 +572,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 sqlRef::get,
                 resourcesRegistry,
                 rebalanceScheduler,
-                lowWatermark
+                lowWatermark,
+                ForkJoinPool.commonPool()
         );
 
         var indexManager = new IndexManager(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 59514b0468..11f2ac141e 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -39,8 +39,10 @@ import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.function.BiPredicate;
@@ -376,6 +378,8 @@ public class IgniteImpl implements Ignite {
     /** Remote triggered resources registry. */
     private final RemotelyTriggeredResourceRegistry resourcesRegistry;
 
+    private final Executor asyncContinuationExecutor = 
ForkJoinPool.commonPool();
+
     /**
      * The Constructor.
      *
@@ -729,7 +733,8 @@ public class IgniteImpl implements Ignite {
                 this::sql,
                 resourcesRegistry,
                 rebalanceScheduler,
-                lowWatermark
+                lowWatermark,
+                asyncContinuationExecutor
         );
 
         indexManager = new IndexManager(
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index b28365b3a5..a93ed1068a 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldV
 import java.lang.reflect.Field;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Supplier;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
@@ -111,4 +112,33 @@ public class WatchListenerInhibitor {
     public void stopInhibit() {
         inhibitFuture.complete(null);
     }
+
+    /**
+     * Executes an action enclosed in watch inhibition: that is, before 
execution inhibition gets started, and after the execution
+     * it gets stopped.
+     *
+     * @param action Action to execute.
+     * @return Action result.
+     */
+    public <T> T withInhibition(Supplier<? extends T> action) {
+        startInhibit();
+
+        try {
+            return action.get();
+        } finally {
+            stopInhibit();
+        }
+    }
+
+    /**
+     * Executes an action enclosed in watch inhibition: that is, before 
execution inhibition gets started, and after the execution
+     * it gets stopped.
+     *
+     * @param ignite Node on which to inhibit watch processing.
+     * @param action Action to execute.
+     * @return Action result.
+     */
+    public static <T> T withInhibition(Ignite ignite, Supplier<? extends T> 
action) {
+        return metastorageEventsInhibitor(ignite).withInhibition(action);
+    }
 }
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 009354c26b..bd52b7a05b 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -80,7 +80,6 @@ dependencies {
     testImplementation libs.mockito.junit
     testImplementation libs.hamcrest.core
     testImplementation libs.hamcrest.optional
-    testImplementation libs.junit.pioneer
     testImplementation libs.jmh.core
     testImplementation libs.javax.annotations
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 33354bd9f1..0a2b0e47b0 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -72,6 +72,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -1215,7 +1216,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     () -> mock(IgniteSql.class),
                     resourcesRegistry,
                     rebalanceScheduler,
-                    lowWatermark
+                    lowWatermark,
+                    ForkJoinPool.commonPool()
             ) {
                 @Override
                 protected TxStateTableStorage createTxStateTableStorage(
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
new file mode 100644
index 0000000000..a43c8b9ea6
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.threading;
+
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.lang.IgniteSystemProperties;
+import org.apache.ignite.internal.streamer.SimplePublisher;
+import org.apache.ignite.internal.test.WatchListenerInhibitor;
+import org.apache.ignite.internal.testframework.TestIgnitionManager;
+import org.apache.ignite.internal.testframework.WithSystemProperty;
+import org.apache.ignite.internal.thread.IgniteThread;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
+
+@WithSystemProperty(key = IgniteSystemProperties.THREAD_ASSERTIONS_ENABLED, 
value = "false")
+@SuppressWarnings("resource")
+class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
+    private static final String TABLE_NAME = "test";
+    private static final int KEY = 1;
+
+    private static final Record KEY_RECORD = new Record(1, "");
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @BeforeAll
+    void createTable() {
+        sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val 
VARCHAR)");
+    }
+
+    @BeforeEach
+    void upsertRecord() {
+        plainKeyValueView().put(null, KEY, "one");
+    }
+
+    private static KeyValueView<Integer, String> plainKeyValueView() {
+        return testTable().keyValueView(Integer.class, String.class);
+    }
+
+    private static KeyValueView<Tuple, Tuple> binaryKeyValueView() {
+        return testTable().keyValueView();
+    }
+
+    private static Table testTable() {
+        return CLUSTER.aliveNode().tables().table(TABLE_NAME);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @CartesianTest
+    void keyValueViewFuturesCompleteInContinuationsPool(
+            @Enum KeyValueViewAsyncOperation operation,
+            @Enum KeyValueViewKind kind
+    ) {
+        assumeTrue(kind.supportsGetNullable() || !operation.isGetNullable());
+
+        KeyValueView tableView = kind.view();
+
+        @SuppressWarnings("unchecked") CompletableFuture<Thread> 
completerFuture = forcingSwitchFromUserThread(
+                () -> operation.executeOn(tableView, kind.context())
+                        .thenApply(unused -> Thread.currentThread())
+        );
+
+        assertThat(completerFuture, willBe(commonPoolThread()));
+    }
+
+    private static Matcher<Object> commonPoolThread() {
+        return both(hasProperty("name", 
startsWith("ForkJoinPool.commonPool-worker-")))
+                .and(not(instanceOf(IgniteThread.class)));
+    }
+
+    private static <T> T forcingSwitchFromUserThread(Supplier<? extends T> 
action) {
+        return WatchListenerInhibitor.withInhibition(CLUSTER.aliveNode(), () 
-> {
+            waitForSchemaSyncRequiringWait();
+
+            return action.get();
+        });
+    }
+
+    private static void waitForSchemaSyncRequiringWait() {
+        try {
+            Thread.sleep(TestIgnitionManager.DEFAULT_DELAY_DURATION_MS + 1);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static KeyValueContext<Integer, String> plainKeyValueContext() {
+        return new KeyValueContext<>(KEY, "one", "two");
+    }
+
+    private static KeyValueContext<Tuple, Tuple> binaryKeyValueContext() {
+        return new KeyValueContext<>(Tuple.create().set("id", KEY), 
Tuple.create().set("val", "one"), Tuple.create().set("val", "two"));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @CartesianTest
+    void 
keyValueViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+            @Enum KeyValueViewAsyncOperation operation,
+            @Enum KeyValueViewKind kind
+    ) {
+        assumeTrue(kind.supportsGetNullable() || !operation.isGetNullable());
+
+        KeyValueView tableView = kind.view();
+
+        @SuppressWarnings("unchecked") CompletableFuture<Thread> 
completerFuture = forcingSwitchFromUserThread(
+                () -> PublicApiThreading.doInternalCall(
+                        () -> operation.executeOn(tableView, kind.context())
+                                .thenApply(unused -> Thread.currentThread())
+                )
+        );
+
+        assertThat(completerFuture, willBe(anIgniteThread()));
+    }
+
+    private static Matcher<Object> anIgniteThread() {
+        return instanceOf(IgniteThread.class);
+    }
+
+    @SuppressWarnings("rawtypes")
+    @CartesianTest
+    void recordViewFuturesCompleteInContinuationsPool(
+            @Enum RecordViewAsyncOperation operation,
+            @Enum RecordViewKind kind
+    ) {
+        RecordView tableView = kind.view();
+
+        @SuppressWarnings("unchecked") CompletableFuture<Thread> 
completerFuture = forcingSwitchFromUserThread(
+                () -> operation.executeOn(tableView, kind.context())
+                        .thenApply(unused -> Thread.currentThread())
+        );
+
+        assertThat(completerFuture, willBe(commonPoolThread()));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @CartesianTest
+    void 
recordViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+            @Enum RecordViewAsyncOperation operation,
+            @Enum RecordViewKind kind
+    ) {
+        RecordView tableView = kind.view();
+
+        @SuppressWarnings("unchecked") CompletableFuture<Thread> 
completerFuture = forcingSwitchFromUserThread(
+                () -> PublicApiThreading.doInternalCall(
+                        () -> operation.executeOn(tableView, kind.context())
+                                .thenApply(unused -> Thread.currentThread())
+                )
+        );
+
+        assertThat(completerFuture, willBe(anIgniteThread()));
+    }
+
+    private static RecordView<Record> plainRecordView() {
+        return testTable().recordView(Record.class);
+    }
+
+    private static RecordView<Tuple> binaryRecordView() {
+        return testTable().recordView();
+    }
+
+    private static RecordContext<Record> plainRecordContext() {
+        return new RecordContext<>(KEY_RECORD, new Record(KEY, "one"), new 
Record(KEY, "two"));
+    }
+
+    private static RecordContext<Tuple> binaryRecordContext() {
+        return new RecordContext<>(KEY_RECORD.toKeyTuple(), new Record(KEY, 
"one").toFullTuple(), new Record(KEY, "two").toFullTuple());
+    }
+
+    private enum KeyValueViewAsyncOperation {
+        GET_ASYNC((view, context) -> view.getAsync(null, context.key)),
+        GET_NULLABLE_ASYNC((view, context) -> view.getNullableAsync(null, 
context.key)),
+        GET_OR_DEFAULT_ASYNC((view, context) -> view.getOrDefaultAsync(null, 
context.key, context.anotherValue)),
+        GET_ALL_ASYNC((view, context) -> view.getAllAsync(null, 
List.of(context.key))),
+        CONTAINS_ASYNC((view, context) -> view.containsAsync(null, 
context.key)),
+        PUT_ASYNC((view, context) -> view.putAsync(null, context.key, 
context.usualValue)),
+        PUT_ALL_ASYNC((view, context) -> view.putAllAsync(null, 
Map.of(context.key, context.usualValue))),
+        GET_AND_PUT_ASYNC((view, context) -> view.getAndPutAsync(null, 
context.key, context.usualValue)),
+        GET_NULLABLE_AND_PUT_ASYNC((view, context) -> 
view.getNullableAndPutAsync(null, context.key, context.usualValue)),
+        PUT_IF_ABSENT_ASYNC((view, context) -> view.putIfAbsentAsync(null, 
context.key, context.usualValue)),
+        REMOVE_ASYNC((view, context) -> view.removeAsync(null, context.key)),
+        REMOVE_EXACT_ASYNC((view, context) -> view.removeAsync(null, 
context.key, context.usualValue)),
+        REMOVE_ALL_ASYNC((view, context) -> view.removeAllAsync(null, 
List.of(context.key))),
+        GET_AND_REMOVE_ASYNC((view, context) -> view.getAndRemoveAsync(null, 
context.key)),
+        GET_NULLABLE_AND_REMOVE_ASYNC((view, context) -> 
view.getNullableAndRemoveAsync(null, context.key)),
+        REPLACE_ASYNC((view, context) -> view.replaceAsync(null, context.key, 
context.usualValue)),
+        REPLACE_EXACT_ASYNC((view, context) -> view.replaceAsync(null, 
context.key, context.usualValue, context.anotherValue)),
+        GET_AND_REPLACE_ASYNC((view, context) -> view.getAndReplaceAsync(null, 
context.key, context.usualValue)),
+        GET_NULLABLE_AND_REPLACE_ASYNC((view, context) -> 
view.getNullableAndReplaceAsync(null, context.key, context.usualValue)),
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        STREAM_DATA((view, context) -> {
+            CompletableFuture<?> future;
+            try (var publisher = new SimplePublisher()) {
+                future = view.streamData(publisher, null);
+                publisher.submit(Map.entry(context.key, context.usualValue));
+            }
+            return future;
+        }),
+        QUERY_ASYNC((view, context) -> view.queryAsync(null, null)),
+        QUERY_ASYNC_WITH_INDEX_NAME((view, context) -> view.queryAsync(null, 
null, null)),
+        QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS((view, context) -> 
view.queryAsync(null, null, null, null));
+
+        private final BiFunction<KeyValueView<Object, Object>, 
KeyValueContext<Object, Object>, CompletableFuture<?>> action;
+
+        KeyValueViewAsyncOperation(BiFunction<KeyValueView<Object, Object>, 
KeyValueContext<Object, Object>, CompletableFuture<?>> action) {
+            this.action = action;
+        }
+
+        <K, V> CompletableFuture<?> executeOn(KeyValueView<K, V> tableView, 
KeyValueContext<K, V> context) {
+            return action.apply((KeyValueView<Object, Object>) tableView, 
(KeyValueContext<Object, Object>) context);
+        }
+
+        boolean isGetNullable() {
+            switch (this) {
+                case GET_NULLABLE_ASYNC:
+                case GET_NULLABLE_AND_PUT_ASYNC:
+                case GET_NULLABLE_AND_REMOVE_ASYNC:
+                case GET_NULLABLE_AND_REPLACE_ASYNC:
+                    return true;
+                default:
+                    return false;
+            }
+        }
+    }
+
+    private static class KeyValueContext<K, V> {
+        final K key;
+        final V usualValue;
+        final V anotherValue;
+
+        private KeyValueContext(K key, V usualValue, V anotherValue) {
+            this.key = key;
+            this.usualValue = usualValue;
+            this.anotherValue = anotherValue;
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    private enum KeyValueViewKind {
+        PLAIN, BINARY;
+
+        KeyValueView view() {
+            return this == PLAIN ? plainKeyValueView() : binaryKeyValueView();
+        }
+
+        KeyValueContext context() {
+            return this == PLAIN ? plainKeyValueContext() : 
binaryKeyValueContext();
+        }
+
+        boolean supportsGetNullable() {
+            return this == PLAIN;
+        }
+    }
+
+    @SuppressWarnings({"FieldCanBeLocal", "unused"})
+    private static class Record {
+        private int id;
+        private String val;
+
+        private Record() {
+        }
+
+        private Record(int id, String val) {
+            this.id = id;
+            this.val = val;
+        }
+
+        Tuple toKeyTuple() {
+            return Tuple.create().set("id", id);
+        }
+
+        Tuple toFullTuple() {
+            return Tuple.create().set("id", id).set("val", val);
+        }
+    }
+
+    private enum RecordViewAsyncOperation {
+        GET_ASYNC((view, context) -> view.getAsync(null, context.keyRecord)),
+        GET_ALL_ASYNC((view, context) -> view.getAllAsync(null, 
List.of(context.keyRecord))),
+        CONTAINS_ASYNC((view, context) -> view.containsAsync(null, 
context.keyRecord)),
+        UPSERT_ASYNC((view, context) -> view.upsertAsync(null, 
context.fullRecord)),
+        UPSERT_ALL_ASYNC((view, context) -> view.upsertAllAsync(null, 
List.of(context.fullRecord))),
+        GET_AND_UPSERT_ASYNC((view, context) -> view.getAndUpsertAsync(null, 
context.fullRecord)),
+        INSERT_ASYNC((view, context) -> view.insertAsync(null, 
context.fullRecord)),
+        INSERT_ALL_ASYNC((view, context) -> view.insertAllAsync(null, 
List.of(context.fullRecord))),
+        REPLACE_ASYNC((view, context) -> view.replaceAsync(null, 
context.fullRecord)),
+        REPLACE_EXACT_ASYNC((view, context) -> view.replaceAsync(null, 
context.fullRecord, context.anotherFullRecord)),
+        GET_AND_REPLACE_ASYNC((view, context) -> view.getAndReplaceAsync(null, 
context.keyRecord)),
+        DELETE_ASYNC((view, context) -> view.deleteAsync(null, 
context.keyRecord)),
+        DELETE_EXACT_ASYNC((view, context) -> view.deleteExactAsync(null, 
context.fullRecord)),
+        GET_AND_DELETE_ASYNC((view, context) -> view.getAndDeleteAsync(null, 
context.keyRecord)),
+        DELETE_ALL_ASYNC((view, context) -> view.deleteAllAsync(null, 
List.of(context.keyRecord))),
+        DELETE_ALL_EXACT_ASYNC((view, context) -> 
view.deleteAllExactAsync(null, List.of(context.keyRecord))),
+        @SuppressWarnings({"rawtypes", "unchecked"})
+        STREAM_DATA((view, context) -> {
+            CompletableFuture<?> future;
+            try (var publisher = new SimplePublisher()) {
+                future = view.streamData(publisher, null);
+                publisher.submit(context.fullRecord);
+            }
+            return future;
+        }),
+        QUERY_ASYNC((view, context) -> view.queryAsync(null, null)),
+        QUERY_ASYNC_WITH_INDEX_NAME((view, context) -> view.queryAsync(null, 
null, null)),
+        QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS((view, context) -> 
view.queryAsync(null, null, null, null));
+
+        private final BiFunction<RecordView<Object>, RecordContext<Object>, 
CompletableFuture<?>> action;
+
+        RecordViewAsyncOperation(BiFunction<RecordView<Object>, 
RecordContext<Object>, CompletableFuture<?>> action) {
+            this.action = action;
+        }
+
+        <R> CompletableFuture<?> executeOn(RecordView<R> tableView, 
RecordContext<R> context) {
+            return action.apply((RecordView<Object>) tableView, 
(RecordContext<Object>) context);
+        }
+    }
+
+    private static class RecordContext<R> {
+        final R keyRecord;
+        final R fullRecord;
+        final R anotherFullRecord;
+
+        private RecordContext(R keyRecord, R fullRecord, R anotherFullRecord) {
+            this.keyRecord = keyRecord;
+            this.fullRecord = fullRecord;
+            this.anotherFullRecord = anotherFullRecord;
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    private enum RecordViewKind {
+        PLAIN, BINARY;
+
+        RecordView view() {
+            return this == PLAIN ? plainRecordView() : binaryRecordView();
+        }
+
+        RecordContext context() {
+            return this == PLAIN ? plainRecordContext() : 
binaryRecordContext();
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 7cf46edcdb..3ba9fa63c4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -29,7 +29,10 @@ import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
 import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.schema.Column;
@@ -38,10 +41,11 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.table.criteria.CursorAdapter;
 import org.apache.ignite.internal.table.criteria.QueryCriteriaAsyncCursor;
 import org.apache.ignite.internal.table.criteria.SqlSerializer;
+import org.apache.ignite.internal.table.criteria.SqlSerializer.Builder;
 import 
org.apache.ignite.internal.table.distributed.replicator.InternalSchemaVersionMismatchException;
 import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
+import org.apache.ignite.internal.thread.PublicApiThreading;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.AsyncCursor;
 import org.apache.ignite.lang.Cursor;
 import org.apache.ignite.sql.IgniteSql;
@@ -73,6 +77,8 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
     /** Marshallers provider. */
     protected final MarshallersProvider marshallers;
 
+    private final Executor asyncContinuationExecutor;
+
     /**
      * Constructor.
      *
@@ -81,15 +87,24 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
      * @param schemaReg Schema registry.
      * @param sql Ignite SQL facade.
      * @param marshallers Marshallers provider.
+     * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API endpoints
+     *     (to prevent the user from stealing Ignite threads).
      */
-    AbstractTableView(InternalTable tbl, SchemaVersions schemaVersions, 
SchemaRegistry schemaReg, IgniteSql sql,
-            MarshallersProvider marshallers) {
+    AbstractTableView(
+            InternalTable tbl,
+            SchemaVersions schemaVersions,
+            SchemaRegistry schemaReg,
+            IgniteSql sql,
+            MarshallersProvider marshallers,
+            Executor asyncContinuationExecutor
+    ) {
         this.tbl = tbl;
         this.schemaVersions = schemaVersions;
         this.sql = sql;
+        this.marshallers = marshallers;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
 
         this.rowConverter = new TableViewRowConverter(schemaReg);
-        this.marshallers = marshallers;
     }
 
     /**
@@ -99,19 +114,40 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
      * @param <T> Future result type.
      * @return Future result.
      */
-    protected final <T> T sync(CompletableFuture<T> fut) {
+    protected final <T> T sync(Supplier<CompletableFuture<T>> fut) {
+        // Enclose in 'internal call' to prevent resubmission to the async 
continuation pool on future completion
+        // as such a resubmission is useless in the sync case and will just 
increase latency.
+        PublicApiThreading.startInternalCall();
+
         try {
-            return fut.get();
+            return fut.get().get();
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt(); // Restore interrupt flag.
 
             throw 
sneakyThrow(IgniteExceptionMapperUtil.mapToPublicException(e));
         } catch (ExecutionException e) {
-            Throwable cause = ExceptionUtils.unwrapCause(e);
+            Throwable cause = unwrapCause(e);
             throw sneakyThrow(cause);
+        } finally {
+            PublicApiThreading.endInternalCall();
         }
     }
 
+    /**
+     * Combines the effect of {@link #withSchemaSync(Transaction, KvAction)}, 
{@link #preventThreadHijack(CompletableFuture)} and
+     * {@link 
IgniteExceptionMapperUtil#convertToPublicFuture(CompletableFuture)}.
+     *
+     * @param <T> Type of the data the action returns.
+     * @param tx Transaction or {@code null}.
+     * @param action Action to execute.
+     * @return Future of whatever the action returns.
+     */
+    protected final <T> CompletableFuture<T> doOperation(@Nullable Transaction 
tx, KvAction<T> action) {
+        CompletableFuture<T> future = preventThreadHijack(withSchemaSync(tx, 
action));
+
+        return convertToPublicFuture(future);
+    }
+
     /**
      * Executes the provided KV action in the given transaction, maintaining 
Schema Synchronization semantics: that is, before executing
      * the action, a check is made to make sure that the current node has 
schemas complete wrt timestamp corresponding to the operation
@@ -139,7 +175,7 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
                 ? schemaVersions.schemaVersionAtNow(tbl.tableId())
                 : schemaVersions.schemaVersionAt(((InternalTransaction) 
tx).startTimestamp(), tbl.tableId());
 
-        CompletableFuture<T> future = schemaVersionFuture
+        return schemaVersionFuture
                 .thenCompose(schemaVersion -> action.act(schemaVersion)
                         .handle((res, ex) -> {
                             if 
(isOrCausedBy(InternalSchemaVersionMismatchException.class, ex)) {
@@ -155,8 +191,29 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
                             }
                         }))
                 .thenCompose(identity());
+    }
 
-        return convertToPublicFuture(future);
+    /**
+     * Prevents Ignite internal threads from being hijacked by the user code. 
If that happened, the user code could have blocked
+     * Ignite threads deteriorating progress.
+     *
+     * <p>This is done by completing the future in the async continuation 
thread pool if it would have been completed in an Ignite thread.
+     * This does not happen for synchronous operations as it's impossible to 
hijack a thread using such operations.
+     *
+     * <p>The switch to the async continuation pool is also skipped when it's 
known that the call is made by other Ignite component
+     * and not by the user.
+     *
+     * @param originalFuture Operation future.
+     * @return Future that will be completed in the async continuation thread 
pool ({@link ForkJoinPool#commonPool()} by default).
+     */
+    protected final <T> CompletableFuture<T> 
preventThreadHijack(CompletableFuture<T> originalFuture) {
+        if (originalFuture.isDone() || PublicApiThreading.inInternalCall()) {
+            return originalFuture;
+        }
+
+        // The future is not complete yet, so it will be completed on an 
Ignite thread, so we need to complete the user-facing future
+        // in the continuation pool.
+        return originalFuture.whenCompleteAsync((res, ex) -> {}, 
asyncContinuationExecutor);
     }
 
     /**
@@ -189,7 +246,7 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
     /** {@inheritDoc} */
     @Override
     public Cursor<R> query(@Nullable Transaction tx, @Nullable Criteria 
criteria, @Nullable String indexName, CriteriaQueryOptions opts) {
-        return new CursorAdapter<>(sync(queryAsync(tx, criteria, indexName, 
opts)));
+        return new CursorAdapter<>(sync(() -> queryAsync(tx, criteria, 
indexName, opts)));
     }
 
     /** {@inheritDoc} */
@@ -202,10 +259,10 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
     ) {
         CriteriaQueryOptions opts0 = opts == null ? 
CriteriaQueryOptions.DEFAULT : opts;
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        CompletableFuture<AsyncCursor<R>> future = doOperation(tx, 
(schemaVersion) -> {
             SchemaDescriptor schema = 
rowConverter.registry().schema(schemaVersion);
 
-            SqlSerializer ser = new SqlSerializer.Builder()
+            SqlSerializer ser = new Builder()
                     .tableName(tbl.name())
                     .columns(schema.columnNames())
                     .indexName(indexName)
@@ -228,19 +285,20 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
                             session.closeAsync();
                         }
                     });
-        })
-                .exceptionally(th -> {
-                    throw new 
CompletionException(mapToPublicCriteriaException(unwrapCause(th)));
-                });
+        });
+
+        return future.exceptionally(th -> {
+            throw new 
CompletionException(mapToPublicCriteriaException(unwrapCause(th)));
+        });
     }
 
 
     /**
      * Action representing some KV operation. When executed, the action is 
supplied with schema version corresponding
-     * to the operation timestamp (see {@link #withSchemaSync(Transaction, 
KvAction)} for details).
+     * to the operation timestamp (see {@link #doOperation(Transaction, 
KvAction)} for details).
      *
      * @param <R> Type of the result.
-     * @see #withSchemaSync(Transaction, KvAction)
+     * @see #doOperation(Transaction, KvAction)
      */
     @FunctionalInterface
     protected interface KvAction<R> {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
index ff6a58ccaa..65ede03a30 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table;
 
+import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
@@ -26,6 +28,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -69,17 +72,20 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
      * @param tbl Table storage.
      * @param schemaReg Schema registry.
      * @param schemaVersions Schema versions access.
-     * @param marshallers Marshallers provider.
      * @param sql Ignite SQL facade.
+     * @param marshallers Marshallers provider.
+     * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API
+     *         endpoints (so as to prevent the user from stealing Ignite 
threads).
      */
     public KeyValueBinaryViewImpl(
             InternalTable tbl,
             SchemaRegistry schemaReg,
             SchemaVersions schemaVersions,
+            IgniteSql sql,
             MarshallersProvider marshallers,
-            IgniteSql sql
+            Executor asyncContinuationExecutor
     ) {
-        super(tbl, schemaVersions, schemaReg, sql, marshallers);
+        super(tbl, schemaVersions, schemaReg, sql, marshallers, 
asyncContinuationExecutor);
 
         marshallerCache = new TupleMarshallerCache(schemaReg);
     }
@@ -87,7 +93,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public Tuple get(@Nullable Transaction tx, Tuple key) {
-        return sync(getAsync(tx, key));
+        return sync(() -> getAsync(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -95,7 +101,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     public CompletableFuture<Tuple> getAsync(@Nullable Transaction tx, Tuple 
key) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row keyRow = marshal(key, null, schemaVersion);
 
             return tbl.get(keyRow, (InternalTransaction) tx).thenApply(row -> 
unmarshalValue(row, schemaVersion));
@@ -125,7 +131,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public Tuple getOrDefault(@Nullable Transaction tx, Tuple key, Tuple 
defaultValue) {
-        return sync(getOrDefaultAsync(tx, key, defaultValue));
+        return sync(() -> getOrDefaultAsync(tx, key, defaultValue));
     }
 
     /** {@inheritDoc} */
@@ -133,7 +139,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     public CompletableFuture<Tuple> getOrDefaultAsync(@Nullable Transaction 
tx, Tuple key, Tuple defaultValue) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(key, null, schemaVersion);
 
             return tbl.get(keyRow, (InternalTransaction) tx)
@@ -144,7 +150,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public Map<Tuple, Tuple> getAll(@Nullable Transaction tx, 
Collection<Tuple> keys) {
-        return sync(getAllAsync(tx, keys));
+        return sync(() -> getAllAsync(tx, keys));
     }
 
     /** {@inheritDoc} */
@@ -152,7 +158,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     public CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@Nullable 
Transaction tx, Collection<Tuple> keys) {
         checkKeysForNulls(keys);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             List<BinaryRowEx> keyRows = marshalKeys(keys, schemaVersion);
 
             return tbl.getAll(keyRows, (InternalTransaction) 
tx).thenApply(rows -> unmarshalValues(rows, schemaVersion));
@@ -182,7 +188,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public void put(@Nullable Transaction tx, Tuple key, Tuple val) {
-        sync(putAsync(tx, key, val));
+        sync(() -> putAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -191,7 +197,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
         Objects.requireNonNull(key);
         Objects.requireNonNull(val);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(key, val, schemaVersion);
 
             return tbl.upsert(row, (InternalTransaction) tx);
@@ -201,7 +207,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public void putAll(@Nullable Transaction tx, Map<Tuple, Tuple> pairs) {
-        sync(putAllAsync(tx, pairs));
+        sync(() -> putAllAsync(tx, pairs));
     }
 
     /** {@inheritDoc} */
@@ -213,7 +219,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
             Objects.requireNonNull(entry.getValue());
         }
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.upsertAll(marshalPairs(pairs.entrySet(), schemaVersion, 
null), (InternalTransaction) tx);
         });
     }
@@ -221,7 +227,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public Tuple getAndPut(@Nullable Transaction tx, Tuple key, Tuple val) {
-        return sync(getAndPutAsync(tx, key, val));
+        return sync(() -> getAndPutAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -230,7 +236,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
         Objects.requireNonNull(key);
         Objects.requireNonNull(val);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(key, val, schemaVersion);
 
             return tbl.getAndUpsert(row, (InternalTransaction) 
tx).thenApply(resultRow -> unmarshalValue(resultRow, schemaVersion));
@@ -261,7 +267,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public boolean putIfAbsent(@Nullable Transaction tx, Tuple key, Tuple val) 
{
-        return sync(putIfAbsentAsync(tx, key, val));
+        return sync(() -> putIfAbsentAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -270,7 +276,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
         Objects.requireNonNull(key);
         Objects.requireNonNull(val);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(key, val, schemaVersion);
 
             return tbl.insert(row, (InternalTransaction) tx);
@@ -280,13 +286,13 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public boolean remove(@Nullable Transaction tx, Tuple key) {
-        return sync(removeAsync(tx, key));
+        return sync(() -> removeAsync(tx, key));
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean remove(@Nullable Transaction tx, Tuple key, Tuple val) {
-        return sync(removeAsync(tx, key, val));
+        return sync(() -> removeAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -294,7 +300,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, 
Tuple key) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(key, null, schemaVersion);
 
             return tbl.delete(row, (InternalTransaction) tx);
@@ -307,7 +313,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
         Objects.requireNonNull(key);
         Objects.requireNonNull(val);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(key, val, schemaVersion);
 
             return tbl.deleteExact(row, (InternalTransaction) tx);
@@ -317,7 +323,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public Collection<Tuple> removeAll(@Nullable Transaction tx, 
Collection<Tuple> keys) {
-        return sync(removeAllAsync(tx, keys));
+        return sync(() -> removeAllAsync(tx, keys));
     }
 
     /** {@inheritDoc} */
@@ -325,7 +331,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     public CompletableFuture<Collection<Tuple>> removeAllAsync(@Nullable 
Transaction tx, Collection<Tuple> keys) {
         checkKeysForNulls(keys);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             List<BinaryRowEx> keyRows = marshalKeys(keys, schemaVersion);
 
             return tbl.deleteAll(keyRows, (InternalTransaction) 
tx).thenApply(rows -> unmarshalKeys(rows, schemaVersion));
@@ -337,7 +343,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     public Tuple getAndRemove(@Nullable Transaction tx, Tuple key) {
         Objects.requireNonNull(key);
 
-        return sync(getAndRemoveAsync(tx, key));
+        return sync(() -> getAndRemoveAsync(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -345,7 +351,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     public CompletableFuture<Tuple> getAndRemoveAsync(@Nullable Transaction 
tx, Tuple key) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.getAndDelete(marshal(key, null, schemaVersion), 
(InternalTransaction) tx)
                     .thenApply(row -> unmarshalValue(row, schemaVersion));
         });
@@ -374,13 +380,13 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public boolean replace(@Nullable Transaction tx, Tuple key, Tuple val) {
-        return sync(replaceAsync(tx, key, val));
+        return sync(() -> replaceAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean replace(@Nullable Transaction tx, Tuple key, Tuple oldVal, 
Tuple newVal) {
-        return sync(replaceAsync(tx, key, oldVal, newVal));
+        return sync(() -> replaceAsync(tx, key, oldVal, newVal));
     }
 
     /** {@inheritDoc} */
@@ -389,7 +395,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
         Objects.requireNonNull(key);
         Objects.requireNonNull(val);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(key, val, schemaVersion);
 
             return tbl.replace(row, (InternalTransaction) tx);
@@ -408,7 +414,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
         Objects.requireNonNull(oldVal);
         Objects.requireNonNull(newVal);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row oldRow = marshal(key, oldVal, schemaVersion);
             Row newRow = marshal(key, newVal, schemaVersion);
 
@@ -419,7 +425,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
     /** {@inheritDoc} */
     @Override
     public Tuple getAndReplace(@Nullable Transaction tx, Tuple key, Tuple val) 
{
-        return sync(getAndReplaceAsync(tx, key, val));
+        return sync(() -> getAndReplaceAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -428,7 +434,7 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
         Objects.requireNonNull(key);
         Objects.requireNonNull(val);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.getAndReplace(marshal(key, val, schemaVersion), 
(InternalTransaction) tx)
                     .thenApply(row -> unmarshalValue(row, schemaVersion));
         });
@@ -450,8 +456,11 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
      * @throws UnsupportedOperationException unconditionally.
      */
     @Override
-    public CompletableFuture<NullableValue<Tuple>> 
getNullableAndReplaceAsync(@Nullable Transaction tx, Tuple key,
-            Tuple val) {
+    public CompletableFuture<NullableValue<Tuple>> getNullableAndReplaceAsync(
+            @Nullable Transaction tx,
+            Tuple key,
+            Tuple val
+    ) {
         throw new UnsupportedOperationException("Binary view doesn't allow 
null tuples.");
     }
 
@@ -558,9 +567,11 @@ public class KeyValueBinaryViewImpl extends 
AbstractTableView<Entry<Tuple, Tuple
         StreamerBatchSender<Entry<Tuple, Tuple>, Integer> batchSender = 
(partitionId, items, deleted) ->
                 withSchemaSync(
                         null,
-                        schemaVersion -> 
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted, 
partitionId));
+                        schemaVersion -> 
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted, 
partitionId)
+                );
 
-        return DataStreamer.streamData(publisher, options, batchSender, 
partitioner);
+        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner);
+        return convertToPublicFuture(preventThreadHijack(future));
     }
 
     private List<BinaryRowEx> marshalPairs(Collection<Entry<Tuple, Tuple>> 
pairs, int schemaVersion, @Nullable BitSet deleted) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
index 8cc00ffa70..3c4d74c27c 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueViewImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table;
 
+import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
@@ -26,6 +28,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -82,8 +85,10 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
      * @param tbl Table storage.
      * @param schemaRegistry Schema registry.
      * @param schemaVersions Schema versions access.
-     * @param marshallers Marshallers provider.
      * @param sql Ignite SQL facade.
+     * @param marshallers Marshallers provider.
+     * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API
+     *         endpoints (so as to prevent the user from stealing Ignite 
threads).
      * @param keyMapper Key class mapper.
      * @param valueMapper Value class mapper.
      */
@@ -91,12 +96,13 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
             InternalTable tbl,
             SchemaRegistry schemaRegistry,
             SchemaVersions schemaVersions,
-            MarshallersProvider marshallers,
             IgniteSql sql,
+            MarshallersProvider marshallers,
+            Executor asyncContinuationExecutor,
             Mapper<K> keyMapper,
             Mapper<V> valueMapper
     ) {
-        super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
+        super(tbl, schemaVersions, schemaRegistry, sql, marshallers, 
asyncContinuationExecutor);
 
         this.keyMapper = keyMapper;
         this.valueMapper = valueMapper;
@@ -107,7 +113,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public V get(@Nullable Transaction tx, K key) {
-        return sync(getAsync(tx, key));
+        return sync(() -> getAsync(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -115,7 +121,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<V> getAsync(@Nullable Transaction tx, K key) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(key, schemaVersion);
 
             return tbl.get(keyRow, (InternalTransaction) 
tx).thenApply(binaryRow -> unmarshalValue(binaryRow, schemaVersion));
@@ -125,7 +131,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public NullableValue<V> getNullable(@Nullable Transaction tx, K key) {
-        return sync(getNullableAsync(tx, key));
+        return sync(() -> getNullableAsync(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -133,7 +139,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<NullableValue<V>> getNullableAsync(@Nullable 
Transaction tx, K key) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(key, schemaVersion);
 
             return tbl.get(keyRow, (InternalTransaction) tx)
@@ -144,7 +150,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public V getOrDefault(@Nullable Transaction tx, K key, V defaultValue) {
-        return sync(getOrDefaultAsync(tx, key, defaultValue));
+        return sync(() -> getOrDefaultAsync(tx, key, defaultValue));
     }
 
     /** {@inheritDoc} */
@@ -152,7 +158,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<V> getOrDefaultAsync(@Nullable Transaction tx, K 
key, V defaultValue) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(key, schemaVersion);
 
             return tbl.get(keyRow, (InternalTransaction) tx)
@@ -163,7 +169,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public Map<K, V> getAll(@Nullable Transaction tx, Collection<K> keys) {
-        return sync(getAllAsync(tx, keys));
+        return sync(() -> getAllAsync(tx, keys));
     }
 
     /** {@inheritDoc} */
@@ -171,7 +177,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<Map<K, V>> getAllAsync(@Nullable Transaction tx, 
Collection<K> keys) {
         checkKeysForNulls(keys);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Collection<BinaryRowEx> rows = marshal(keys, schemaVersion);
 
             return tbl.getAll(rows, (InternalTransaction) 
tx).thenApply(resultRows -> unmarshalPairs(resultRows, schemaVersion));
@@ -189,7 +195,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public boolean contains(@Nullable Transaction tx, K key) {
-        return sync(containsAsync(tx, key));
+        return sync(() -> containsAsync(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -197,7 +203,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx, 
K key) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(key, schemaVersion);
 
             return tbl.get(keyRow, (InternalTransaction) 
tx).thenApply(Objects::nonNull);
@@ -207,7 +213,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public void put(@Nullable Transaction tx, K key, @Nullable V val) {
-        sync(putAsync(tx, key, val));
+        sync(() -> putAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -215,7 +221,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<Void> putAsync(@Nullable Transaction tx, K key, 
@Nullable V val) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshal(key, val, schemaVersion);
 
             return tbl.upsert(row, (InternalTransaction) tx);
@@ -225,7 +231,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public void putAll(@Nullable Transaction tx, Map<K, V> pairs) {
-        sync(putAllAsync(tx, pairs));
+        sync(() -> putAllAsync(tx, pairs));
     }
 
     /** {@inheritDoc} */
@@ -236,7 +242,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
             Objects.requireNonNull(entry.getKey());
         }
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Collection<BinaryRowEx> rows = marshalPairs(pairs.entrySet(), 
schemaVersion, null);
 
             return tbl.upsertAll(rows, (InternalTransaction) tx);
@@ -246,7 +252,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public V getAndPut(@Nullable Transaction tx, K key, @Nullable V val) {
-        return sync(getAndPutAsync(tx, key, val));
+        return sync(() -> getAndPutAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -255,7 +261,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
         Objects.requireNonNull(key);
         Objects.requireNonNull(val);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.getAndUpsert(marshal(key, val, schemaVersion), 
(InternalTransaction) tx)
                     .thenApply(binaryRow -> unmarshalValue(binaryRow, 
schemaVersion));
         });
@@ -264,7 +270,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public NullableValue<V> getNullableAndPut(@Nullable Transaction tx, K key, 
@Nullable V val) {
-        return sync(getNullableAndPutAsync(tx, key, val));
+        return sync(() -> getNullableAndPutAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -272,7 +278,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<NullableValue<V>> 
getNullableAndPutAsync(@Nullable Transaction tx, K key, @Nullable V val) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshal(key, val, schemaVersion);
 
             return tbl.getAndUpsert(row, (InternalTransaction) tx)
@@ -283,7 +289,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public boolean putIfAbsent(@Nullable Transaction tx, K key, @Nullable V 
val) {
-        return sync(putIfAbsentAsync(tx, key, val));
+        return sync(() -> putIfAbsentAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -291,7 +297,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<Boolean> putIfAbsentAsync(@Nullable Transaction 
tx, K key, @Nullable V val) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshal(key, val, schemaVersion);
 
             return tbl.insert(row, (InternalTransaction) tx);
@@ -301,13 +307,13 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public boolean remove(@Nullable Transaction tx, K key) {
-        return sync(removeAsync(tx, key));
+        return sync(() -> removeAsync(tx, key));
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean remove(@Nullable Transaction tx, K key, @Nullable V val) {
-        return sync(removeAsync(tx, key, val));
+        return sync(() -> removeAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -315,7 +321,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K 
key) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshal(key, schemaVersion);
 
             return tbl.delete(row, (InternalTransaction) tx);
@@ -327,7 +333,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<Boolean> removeAsync(@Nullable Transaction tx, K 
key, @Nullable V val) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshal(key, val, schemaVersion);
 
             return tbl.deleteExact(row, (InternalTransaction) tx);
@@ -337,7 +343,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public Collection<K> removeAll(@Nullable Transaction tx, Collection<K> 
keys) {
-        return sync(removeAllAsync(tx, keys));
+        return sync(() -> removeAllAsync(tx, keys));
     }
 
     /** {@inheritDoc} */
@@ -345,7 +351,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<Collection<K>> removeAllAsync(@Nullable 
Transaction tx, Collection<K> keys) {
         checkKeysForNulls(keys);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Collection<BinaryRowEx> rows = marshal(keys, schemaVersion);
 
             return tbl.deleteAll(rows, (InternalTransaction) 
tx).thenApply(resultRows -> unmarshalKeys(resultRows, schemaVersion));
@@ -355,7 +361,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public V getAndRemove(@Nullable Transaction tx, K key) {
-        return sync(getAndRemoveAsync(tx, key));
+        return sync(() -> getAndRemoveAsync(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -363,7 +369,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<V> getAndRemoveAsync(@Nullable Transaction tx, K 
key) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(key, schemaVersion);
 
             return tbl.getAndDelete(keyRow, (InternalTransaction) tx)
@@ -374,7 +380,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public NullableValue<V> getNullableAndRemove(@Nullable Transaction tx, K 
key) {
-        return sync(getNullableAndRemoveAsync(tx, key));
+        return sync(() -> getNullableAndRemoveAsync(tx, key));
     }
 
     /** {@inheritDoc} */
@@ -382,7 +388,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<NullableValue<V>> 
getNullableAndRemoveAsync(@Nullable Transaction tx, K key) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(key, schemaVersion);
 
             return tbl.getAndDelete(keyRow, (InternalTransaction) tx)
@@ -393,13 +399,13 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public boolean replace(@Nullable Transaction tx, K key, @Nullable V val) {
-        return sync(replaceAsync(tx, key, val));
+        return sync(() -> replaceAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean replace(@Nullable Transaction tx, K key, @Nullable V 
oldVal, @Nullable V newVal) {
-        return sync(replaceAsync(tx, key, oldVal, newVal));
+        return sync(() -> replaceAsync(tx, key, oldVal, newVal));
     }
 
     /** {@inheritDoc} */
@@ -407,7 +413,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K 
key, @Nullable V val) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshal(key, val, schemaVersion);
 
             return tbl.replace(row, (InternalTransaction) tx);
@@ -419,7 +425,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, K 
key, @Nullable V oldVal, @Nullable V newVal) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx oldRow = marshal(key, oldVal, schemaVersion);
             BinaryRowEx newRow = marshal(key, newVal, schemaVersion);
 
@@ -430,7 +436,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public V getAndReplace(@Nullable Transaction tx, K key, @Nullable V val) {
-        return sync(getAndReplaceAsync(tx, key, val));
+        return sync(() -> getAndReplaceAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -439,7 +445,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
         Objects.requireNonNull(key);
         Objects.requireNonNull(val);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.getAndReplace(marshal(key, val, schemaVersion), 
(InternalTransaction) tx)
                     .thenApply(binaryRow -> unmarshalValue(binaryRow, 
schemaVersion));
         });
@@ -448,7 +454,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     /** {@inheritDoc} */
     @Override
     public NullableValue<V> getNullableAndReplace(@Nullable Transaction tx, K 
key, @Nullable V val) {
-        return sync(getNullableAndReplaceAsync(tx, key, val));
+        return sync(() -> getNullableAndReplaceAsync(tx, key, val));
     }
 
     /** {@inheritDoc} */
@@ -456,7 +462,7 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
     public CompletableFuture<NullableValue<V>> 
getNullableAndReplaceAsync(@Nullable Transaction tx, K key, @Nullable V val) {
         Objects.requireNonNull(key);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshal(key, val, schemaVersion);
 
             return tbl.getAndReplace(row, (InternalTransaction) tx)
@@ -700,9 +706,11 @@ public class KeyValueViewImpl<K, V> extends 
AbstractTableView<Entry<K, V>> imple
         StreamerBatchSender<Entry<K, V>, Integer> batchSender = (partitionId, 
items, deleted) ->
                 withSchemaSync(
                         null,
-                        schemaVersion -> 
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted, 
partitionId));
+                        schemaVersion -> 
this.tbl.updateAll(marshalPairs(items, schemaVersion, deleted), deleted, 
partitionId)
+                );
 
-        return DataStreamer.streamData(publisher, options, batchSender, 
partitioner);
+        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner);
+        return convertToPublicFuture(preventThreadHijack(future));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
index d62cce9023..84fa6db137 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordBinaryViewImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table;
 
+import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
@@ -24,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Flow.Publisher;
 import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -57,17 +60,20 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
      * @param tbl The table.
      * @param schemaRegistry Table schema registry.
      * @param schemaVersions Schema versions access.
-     * @param marshallers Marshallers provider.
      * @param sql Ignite SQL facade.
+     * @param marshallers Marshallers provider.
+     * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API
+     *         endpoints (so as to prevent the user from stealing Ignite 
threads).
      */
     public RecordBinaryViewImpl(
             InternalTable tbl,
             SchemaRegistry schemaRegistry,
             SchemaVersions schemaVersions,
+            IgniteSql sql,
             MarshallersProvider marshallers,
-            IgniteSql sql
+            Executor asyncContinuationExecutor
     ) {
-        super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
+        super(tbl, schemaVersions, schemaRegistry, sql, marshallers, 
asyncContinuationExecutor);
 
         marshallerCache = new TupleMarshallerCache(schemaRegistry);
     }
@@ -75,7 +81,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public Tuple get(@Nullable Transaction tx, Tuple keyRec) {
-        return sync(getAsync(tx, keyRec));
+        return sync(() -> getAsync(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -83,7 +89,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Tuple> getAsync(@Nullable Transaction tx, Tuple 
keyRec) {
         Objects.requireNonNull(keyRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row keyRow = marshal(keyRec, schemaVersion, true); // Convert to 
portable format to pass TX/storage layer.
 
             return tbl.get(keyRow, (InternalTransaction) tx).thenApply(row -> 
wrap(row, schemaVersion));
@@ -101,14 +107,14 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
 
     @Override
     public List<Tuple> getAll(@Nullable Transaction tx, Collection<Tuple> 
keyRecs) {
-        return sync(getAllAsync(tx, keyRecs));
+        return sync(() -> getAllAsync(tx, keyRecs));
     }
 
     @Override
     public CompletableFuture<List<Tuple>> getAllAsync(@Nullable Transaction 
tx, Collection<Tuple> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.getAll(mapToBinary(keyRecs, schemaVersion, true), 
(InternalTransaction) tx)
                     .thenApply(binaryRows -> wrap(binaryRows, schemaVersion, 
true));
         });
@@ -117,7 +123,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public boolean contains(@Nullable Transaction tx, Tuple keyRec) {
-        return sync(containsAsync(tx, keyRec));
+        return sync(() -> containsAsync(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -125,7 +131,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx, 
Tuple keyRec) {
         Objects.requireNonNull(keyRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row keyRow = marshal(keyRec, schemaVersion, true); // Convert to 
portable format to pass TX/storage layer.
 
             return tbl.get(keyRow, (InternalTransaction) 
tx).thenApply(Objects::nonNull);
@@ -135,7 +141,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public void upsert(@Nullable Transaction tx, Tuple rec) {
-        sync(upsertAsync(tx, rec));
+        sync(() -> upsertAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -143,7 +149,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Void> upsertAsync(@Nullable Transaction tx, Tuple 
rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(rec, schemaVersion, false);
 
             return tbl.upsert(row, (InternalTransaction) tx);
@@ -153,7 +159,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public void upsertAll(@Nullable Transaction tx, Collection<Tuple> recs) {
-        sync(upsertAllAsync(tx, recs));
+        sync(() -> upsertAllAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
@@ -161,7 +167,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx, 
Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.upsertAll(mapToBinary(recs, schemaVersion, false), 
(InternalTransaction) tx);
         });
     }
@@ -169,7 +175,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public Tuple getAndUpsert(@Nullable Transaction tx, Tuple rec) {
-        return sync(getAndUpsertAsync(tx, rec));
+        return sync(() -> getAndUpsertAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -177,7 +183,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Tuple> getAndUpsertAsync(@Nullable Transaction 
tx, Tuple rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(rec, schemaVersion, false);
 
             return tbl.getAndUpsert(row, (InternalTransaction) 
tx).thenApply(resultRow -> wrap(resultRow, schemaVersion));
@@ -187,7 +193,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public boolean insert(@Nullable Transaction tx, Tuple rec) {
-        return sync(insertAsync(tx, rec));
+        return sync(() -> insertAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -195,7 +201,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Boolean> insertAsync(@Nullable Transaction tx, 
Tuple rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(rec, schemaVersion, false);
 
             return tbl.insert(row, (InternalTransaction) tx);
@@ -205,7 +211,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public List<Tuple> insertAll(@Nullable Transaction tx, Collection<Tuple> 
recs) {
-        return sync(insertAllAsync(tx, recs));
+        return sync(() -> insertAllAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
@@ -213,7 +219,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<List<Tuple>> insertAllAsync(@Nullable Transaction 
tx, Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.insertAll(mapToBinary(recs, schemaVersion, false), 
(InternalTransaction) tx)
                     .thenApply(rows -> wrap(rows, schemaVersion, false));
         });
@@ -222,13 +228,13 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public boolean replace(@Nullable Transaction tx, Tuple rec) {
-        return sync(replaceAsync(tx, rec));
+        return sync(() -> replaceAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean replace(@Nullable Transaction tx, Tuple oldRec, Tuple 
newRec) {
-        return sync(replaceAsync(tx, oldRec, newRec));
+        return sync(() -> replaceAsync(tx, oldRec, newRec));
     }
 
     /** {@inheritDoc} */
@@ -236,7 +242,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, 
Tuple rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(rec, schemaVersion, false);
 
             return tbl.replace(row, (InternalTransaction) tx);
@@ -249,7 +255,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
         Objects.requireNonNull(oldRec);
         Objects.requireNonNull(newRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row oldRow = marshal(oldRec, schemaVersion, false);
             Row newRow = marshal(newRec, schemaVersion, false);
 
@@ -260,7 +266,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public Tuple getAndReplace(@Nullable Transaction tx, Tuple rec) {
-        return sync(getAndReplaceAsync(tx, rec));
+        return sync(() -> getAndReplaceAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -268,7 +274,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Tuple> getAndReplaceAsync(@Nullable Transaction 
tx, Tuple rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(rec, schemaVersion, false);
 
             return tbl.getAndReplace(row, (InternalTransaction) 
tx).thenApply(resultRow -> wrap(resultRow, schemaVersion));
@@ -278,7 +284,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public boolean delete(@Nullable Transaction tx, Tuple keyRec) {
-        return sync(deleteAsync(tx, keyRec));
+        return sync(() -> deleteAsync(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -286,7 +292,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Boolean> deleteAsync(@Nullable Transaction tx, 
Tuple keyRec) {
         Objects.requireNonNull(keyRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row keyRow = marshal(keyRec, schemaVersion, true);
 
             return tbl.delete(keyRow, (InternalTransaction) tx);
@@ -296,7 +302,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public boolean deleteExact(@Nullable Transaction tx, Tuple rec) {
-        return sync(deleteExactAsync(tx, rec));
+        return sync(() -> deleteExactAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -304,7 +310,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Boolean> deleteExactAsync(@Nullable Transaction 
tx, Tuple rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row row = marshal(rec, schemaVersion, false);
 
             return tbl.deleteExact(row, (InternalTransaction) tx);
@@ -314,7 +320,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public Tuple getAndDelete(@Nullable Transaction tx, Tuple keyRec) {
-        return sync(getAndDeleteAsync(tx, keyRec));
+        return sync(() -> getAndDeleteAsync(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -322,7 +328,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<Tuple> getAndDeleteAsync(@Nullable Transaction 
tx, Tuple keyRec) {
         Objects.requireNonNull(keyRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Row keyRow = marshal(keyRec, schemaVersion, true);
 
             return tbl.getAndDelete(keyRow, (InternalTransaction) 
tx).thenApply(row -> wrap(row, schemaVersion));
@@ -332,7 +338,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public List<Tuple> deleteAll(@Nullable Transaction tx, Collection<Tuple> 
keyRecs) {
-        return sync(deleteAllAsync(tx, keyRecs));
+        return sync(() -> deleteAllAsync(tx, keyRecs));
     }
 
     /** {@inheritDoc} */
@@ -340,7 +346,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<List<Tuple>> deleteAllAsync(@Nullable Transaction 
tx, Collection<Tuple> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.deleteAll(mapToBinary(keyRecs, schemaVersion, true), 
(InternalTransaction) tx)
                     .thenApply(rows -> wrapKeys(rows, schemaVersion));
         });
@@ -349,7 +355,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     /** {@inheritDoc} */
     @Override
     public List<Tuple> deleteAllExact(@Nullable Transaction tx, 
Collection<Tuple> recs) {
-        return sync(deleteAllExactAsync(tx, recs));
+        return sync(() -> deleteAllExactAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
@@ -357,7 +363,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
     public CompletableFuture<List<Tuple>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<Tuple> recs) {
         Objects.requireNonNull(recs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.deleteAllExact(mapToBinary(recs, schemaVersion, false), 
(InternalTransaction) tx)
                     .thenApply(rows -> wrap(rows, schemaVersion, false));
         });
@@ -484,9 +490,13 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
         Objects.requireNonNull(publisher);
 
         var partitioner = new 
TupleStreamerPartitionAwarenessProvider(rowConverter.registry(), 
tbl.partitions());
-        StreamerBatchSender<Tuple, Integer> batchSender = this::updateAll;
+        StreamerBatchSender<Tuple, Integer> batchSender = (partitionId, rows, 
deleted) ->
+                withSchemaSync(null,
+                        schemaVersion -> this.tbl.updateAll(mapToBinary(rows, 
schemaVersion, deleted), deleted, partitionId)
+                );
 
-        return DataStreamer.streamData(publisher, options, batchSender, 
partitioner);
+        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner);
+        return convertToPublicFuture(preventThreadHijack(future));
     }
 
     /**
@@ -498,7 +508,7 @@ public class RecordBinaryViewImpl extends 
AbstractTableView<Tuple> implements Re
      * @return Future that will be completed when the stream is finished.
      */
     public CompletableFuture<Void> updateAll(int partitionId, 
Collection<Tuple> rows, @Nullable BitSet deleted) {
-        return withSchemaSync(null,
+        return doOperation(null,
                 schemaVersion -> this.tbl.updateAll(mapToBinary(rows, 
schemaVersion, deleted), deleted, partitionId));
     }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index cf7ec8df18..279c09e827 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.table;
 
+import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
@@ -24,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Flow.Publisher;
 import java.util.function.Function;
 import org.apache.ignite.internal.marshaller.Marshaller;
@@ -73,19 +76,22 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
      * @param tbl Table.
      * @param schemaRegistry Schema registry.
      * @param schemaVersions Schema versions access.
+     * @param sql Ignite SQL facade.
      * @param marshallers Marshallers provider.
+     * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API
+     *         endpoints (so as to prevent the user from stealing Ignite 
threads).
      * @param mapper Record class mapper.
-     * @param sql Ignite SQL facade.
      */
     public RecordViewImpl(
             InternalTable tbl,
             SchemaRegistry schemaRegistry,
             SchemaVersions schemaVersions,
+            IgniteSql sql,
             MarshallersProvider marshallers,
-            Mapper<R> mapper,
-            IgniteSql sql
+            Executor asyncContinuationExecutor,
+            Mapper<R> mapper
     ) {
-        super(tbl, schemaVersions, schemaRegistry, sql, marshallers);
+        super(tbl, schemaVersions, schemaRegistry, sql, marshallers, 
asyncContinuationExecutor);
 
         this.mapper = mapper;
         marshallerFactory = (schema) -> new RecordMarshallerImpl<>(schema, 
marshallers, mapper);
@@ -94,7 +100,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public R get(@Nullable Transaction tx, R keyRec) {
-        return sync(getAsync(tx, keyRec));
+        return sync(() -> getAsync(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -102,7 +108,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<R> getAsync(@Nullable Transaction tx, R keyRec) {
         Objects.requireNonNull(keyRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshalKey(keyRec, schemaVersion);
 
             return tbl.get(keyRow, (InternalTransaction) 
tx).thenApply(binaryRow -> unmarshal(binaryRow, schemaVersion));
@@ -111,14 +117,14 @@ public class RecordViewImpl<R> extends 
AbstractTableView<R> implements RecordVie
 
     @Override
     public List<R> getAll(@Nullable Transaction tx, Collection<R> keyRecs) {
-        return sync(getAllAsync(tx, keyRecs));
+        return sync(() -> getAllAsync(tx, keyRecs));
     }
 
     @Override
     public CompletableFuture<List<R>> getAllAsync(@Nullable Transaction tx, 
Collection<R> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.getAll(marshalKeys(keyRecs, schemaVersion), 
(InternalTransaction) tx)
                     .thenApply(binaryRows -> unmarshal(binaryRows, 
schemaVersion, true));
         });
@@ -127,7 +133,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public boolean contains(@Nullable Transaction tx, R keyRec) {
-        return sync(containsAsync(tx, keyRec));
+        return sync(() -> containsAsync(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -135,7 +141,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<Boolean> containsAsync(@Nullable Transaction tx, 
R keyRec) {
         Objects.requireNonNull(keyRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshalKey(keyRec, schemaVersion);
 
             return tbl.get(keyRow, (InternalTransaction) 
tx).thenApply(Objects::nonNull);
@@ -145,7 +151,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public void upsert(@Nullable Transaction tx, R rec) {
-        sync(upsertAsync(tx, rec));
+        sync(() -> upsertAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -153,7 +159,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<Void> upsertAsync(@Nullable Transaction tx, R 
rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(rec, schemaVersion);
 
             return tbl.upsert(keyRow, (InternalTransaction) tx);
@@ -163,7 +169,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public void upsertAll(@Nullable Transaction tx, Collection<R> recs) {
-        sync(upsertAllAsync(tx, recs));
+        sync(() -> upsertAllAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
@@ -171,7 +177,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<Void> upsertAllAsync(@Nullable Transaction tx, 
Collection<R> recs) {
         Objects.requireNonNull(recs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             return tbl.upsertAll(marshal(recs, schemaVersion), 
(InternalTransaction) tx);
         });
     }
@@ -179,7 +185,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public R getAndUpsert(@Nullable Transaction tx, R rec) {
-        return sync(getAndUpsertAsync(tx, rec));
+        return sync(() -> getAndUpsertAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -187,7 +193,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<R> getAndUpsertAsync(@Nullable Transaction tx, R 
rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(rec, schemaVersion);
 
             return tbl.getAndUpsert(keyRow, (InternalTransaction) 
tx).thenApply(binaryRow -> unmarshal(binaryRow, schemaVersion));
@@ -197,7 +203,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public boolean insert(@Nullable Transaction tx, R rec) {
-        return sync(insertAsync(tx, rec));
+        return sync(() -> insertAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -205,7 +211,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<Boolean> insertAsync(@Nullable Transaction tx, R 
rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx keyRow = marshal(rec, schemaVersion);
 
             return tbl.insert(keyRow, (InternalTransaction) tx);
@@ -215,7 +221,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public List<R> insertAll(@Nullable Transaction tx, Collection<R> recs) {
-        return sync(insertAllAsync(tx, recs));
+        return sync(() -> insertAllAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
@@ -223,7 +229,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<List<R>> insertAllAsync(@Nullable Transaction tx, 
Collection<R> recs) {
         Objects.requireNonNull(recs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Collection<BinaryRowEx> rows = marshal(recs, schemaVersion);
 
             return tbl.insertAll(rows, (InternalTransaction) 
tx).thenApply(binaryRows -> unmarshal(binaryRows, schemaVersion, false));
@@ -233,13 +239,13 @@ public class RecordViewImpl<R> extends 
AbstractTableView<R> implements RecordVie
     /** {@inheritDoc} */
     @Override
     public boolean replace(@Nullable Transaction tx, R rec) {
-        return sync(replaceAsync(tx, rec));
+        return sync(() -> replaceAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean replace(@Nullable Transaction tx, R oldRec, R newRec) {
-        return sync(replaceAsync(tx, oldRec, newRec));
+        return sync(() -> replaceAsync(tx, oldRec, newRec));
     }
 
     /** {@inheritDoc} */
@@ -247,7 +253,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<Boolean> replaceAsync(@Nullable Transaction tx, R 
rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx newRow = marshal(rec, schemaVersion);
 
             return tbl.replace(newRow, (InternalTransaction) tx);
@@ -260,7 +266,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
         Objects.requireNonNull(oldRec);
         Objects.requireNonNull(newRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx oldRow = marshal(oldRec, schemaVersion);
             BinaryRowEx newRow = marshal(newRec, schemaVersion);
 
@@ -271,7 +277,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public R getAndReplace(@Nullable Transaction tx, R rec) {
-        return sync(getAndReplaceAsync(tx, rec));
+        return sync(() -> getAndReplaceAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -279,7 +285,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<R> getAndReplaceAsync(@Nullable Transaction tx, R 
rec) {
         Objects.requireNonNull(rec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshal(rec, schemaVersion);
 
             return tbl.getAndReplace(row, (InternalTransaction) 
tx).thenApply(binaryRow -> unmarshal(binaryRow, schemaVersion));
@@ -289,7 +295,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public boolean delete(@Nullable Transaction tx, R keyRec) {
-        return sync(deleteAsync(tx, keyRec));
+        return sync(() -> deleteAsync(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -297,7 +303,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<Boolean> deleteAsync(@Nullable Transaction tx, R 
keyRec) {
         Objects.requireNonNull(keyRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshalKey(keyRec, schemaVersion);
 
             return tbl.delete(row, (InternalTransaction) tx);
@@ -307,7 +313,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public boolean deleteExact(@Nullable Transaction tx, R rec) {
-        return sync(deleteExactAsync(tx, rec));
+        return sync(() -> deleteExactAsync(tx, rec));
     }
 
     /** {@inheritDoc} */
@@ -315,7 +321,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<Boolean> deleteExactAsync(@Nullable Transaction 
tx, R keyRec) {
         Objects.requireNonNull(keyRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshal(keyRec, schemaVersion);
 
             return tbl.deleteExact(row, (InternalTransaction) tx);
@@ -325,7 +331,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public R getAndDelete(@Nullable Transaction tx, R keyRec) {
-        return sync(getAndDeleteAsync(tx, keyRec));
+        return sync(() -> getAndDeleteAsync(tx, keyRec));
     }
 
     /** {@inheritDoc} */
@@ -333,7 +339,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<R> getAndDeleteAsync(@Nullable Transaction tx, R 
keyRec) {
         Objects.requireNonNull(keyRec);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             BinaryRowEx row = marshalKey(keyRec, schemaVersion);
 
             return tbl.getAndDelete(row, (InternalTransaction) 
tx).thenApply(binaryRow -> unmarshal(binaryRow, schemaVersion));
@@ -343,7 +349,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public List<R> deleteAll(@Nullable Transaction tx, Collection<R> keyRecs) {
-        return sync(deleteAllAsync(tx, keyRecs));
+        return sync(() -> deleteAllAsync(tx, keyRecs));
     }
 
     /** {@inheritDoc} */
@@ -351,7 +357,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<List<R>> deleteAllAsync(@Nullable Transaction tx, 
Collection<R> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Collection<BinaryRowEx> rows = marshal(keyRecs, schemaVersion);
 
             return tbl.deleteAll(rows, (InternalTransaction) 
tx).thenApply(binaryRows -> unmarshal(binaryRows, schemaVersion, false));
@@ -361,7 +367,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     /** {@inheritDoc} */
     @Override
     public List<R> deleteAllExact(@Nullable Transaction tx, Collection<R> 
recs) {
-        return sync(deleteAllExactAsync(tx, recs));
+        return sync(() -> deleteAllExactAsync(tx, recs));
     }
 
     /** {@inheritDoc} */
@@ -369,7 +375,7 @@ public class RecordViewImpl<R> extends AbstractTableView<R> 
implements RecordVie
     public CompletableFuture<List<R>> deleteAllExactAsync(@Nullable 
Transaction tx, Collection<R> keyRecs) {
         Objects.requireNonNull(keyRecs);
 
-        return withSchemaSync(tx, (schemaVersion) -> {
+        return doOperation(tx, (schemaVersion) -> {
             Collection<BinaryRowEx> rows = marshal(keyRecs, schemaVersion);
 
             return tbl.deleteAllExact(rows, (InternalTransaction) tx)
@@ -572,9 +578,11 @@ public class RecordViewImpl<R> extends 
AbstractTableView<R> implements RecordVie
         StreamerBatchSender<R, Integer> batchSender = (partitionId, items, 
deleted) ->
                 withSchemaSync(
                         null,
-                        schemaVersion -> this.tbl.updateAll(marshal(items, 
schemaVersion, deleted), deleted, partitionId));
+                        schemaVersion -> this.tbl.updateAll(marshal(items, 
schemaVersion, deleted), deleted, partitionId)
+                );
 
-        return DataStreamer.streamData(publisher, options, batchSender, 
partitioner);
+        CompletableFuture<Void> future = DataStreamer.streamData(publisher, 
options, batchSender, partitioner);
+        return convertToPublicFuture(preventThreadHijack(future));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index e29c7667aa..11d805f409 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.marshaller.MarshallerException;
@@ -76,6 +78,8 @@ public class TableImpl implements TableViewInternal {
 
     private final int pkId;
 
+    private final Executor asyncContinuationExecutor;
+
     /**
      * Constructor.
      *
@@ -85,6 +89,8 @@ public class TableImpl implements TableViewInternal {
      * @param marshallers Marshallers provider.
      * @param sql Ignite SQL facade.
      * @param pkId ID of a primary index.
+     * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API endpoints
+     *     (so as to prevent the user from stealing Ignite threads).
      */
     public TableImpl(
             InternalTable tbl,
@@ -92,7 +98,8 @@ public class TableImpl implements TableViewInternal {
             SchemaVersions schemaVersions,
             MarshallersProvider marshallers,
             IgniteSql sql,
-            int pkId
+            int pkId,
+            Executor asyncContinuationExecutor
     ) {
         this.tbl = tbl;
         this.lockManager = lockManager;
@@ -100,6 +107,7 @@ public class TableImpl implements TableViewInternal {
         this.marshallers = marshallers;
         this.sql = sql;
         this.pkId = pkId;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
     }
 
     /**
@@ -121,7 +129,7 @@ public class TableImpl implements TableViewInternal {
             IgniteSql sql,
             int pkId
     ) {
-        this(tbl, lockManager, schemaVersions, new 
ReflectionMarshallersProvider(), sql, pkId);
+        this(tbl, lockManager, schemaVersions, new 
ReflectionMarshallersProvider(), sql, pkId, ForkJoinPool.commonPool());
 
         this.schemaReg = schemaReg;
     }
@@ -166,22 +174,22 @@ public class TableImpl implements TableViewInternal {
 
     @Override
     public <R> RecordView<R> recordView(Mapper<R> recMapper) {
-        return new RecordViewImpl<>(tbl, schemaReg, schemaVersions, 
marshallers, recMapper, sql);
+        return new RecordViewImpl<>(tbl, schemaReg, schemaVersions, sql, 
marshallers, asyncContinuationExecutor, recMapper);
     }
 
     @Override
     public RecordView<Tuple> recordView() {
-        return new RecordBinaryViewImpl(tbl, schemaReg, schemaVersions, 
marshallers, sql);
+        return new RecordBinaryViewImpl(tbl, schemaReg, schemaVersions, sql, 
marshallers, asyncContinuationExecutor);
     }
 
     @Override
     public <K, V> KeyValueView<K, V> keyValueView(Mapper<K> keyMapper, 
Mapper<V> valMapper) {
-        return new KeyValueViewImpl<>(tbl, schemaReg, schemaVersions, 
marshallers, sql, keyMapper, valMapper);
+        return new KeyValueViewImpl<>(tbl, schemaReg, schemaVersions, sql, 
marshallers, asyncContinuationExecutor, keyMapper, valMapper);
     }
 
     @Override
     public KeyValueView<Tuple, Tuple> keyValueView() {
-        return new KeyValueBinaryViewImpl(tbl, schemaReg, schemaVersions, 
marshallers, sql);
+        return new KeyValueBinaryViewImpl(tbl, schemaReg, schemaVersions, sql, 
marshallers, asyncContinuationExecutor);
     }
 
     @Override
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 561ff10ddf..89e5676705 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -391,6 +391,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry;
 
+    private final Executor asyncContinuationExecutor;
+
     /**
      * Creates a new table manager.
      *
@@ -416,6 +418,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
      * @param sql A supplier function that returns {@link IgniteSql}.
      * @param rebalanceScheduler Executor for scheduling rebalance routine.
      * @param lowWatermark Low watermark.
+     * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API endpoints
+     *     (so as to prevent the user from stealing Ignite threads).
      */
     public TableManager(
             String nodeName,
@@ -448,7 +452,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             Supplier<IgniteSql> sql,
             RemotelyTriggeredResourceRegistry 
remotelyTriggeredResourceRegistry,
             ScheduledExecutorService rebalanceScheduler,
-            LowWatermark lowWatermark
+            LowWatermark lowWatermark,
+            Executor asyncContinuationExecutor
     ) {
         this.topologyService = topologyService;
         this.raftMgr = raftMgr;
@@ -473,6 +478,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.remotelyTriggeredResourceRegistry = 
remotelyTriggeredResourceRegistry;
         this.rebalanceScheduler = rebalanceScheduler;
         this.lowWatermark = lowWatermark;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -1267,7 +1273,15 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 tableRaftService
         );
 
-        var table = new TableImpl(internalTable, lockMgr, schemaVersions, 
marshallers, sql.get(), tableDescriptor.primaryKeyIndexId());
+        var table = new TableImpl(
+                internalTable,
+                lockMgr,
+                schemaVersions,
+                marshallers,
+                sql.get(),
+                tableDescriptor.primaryKeyIndexId(),
+                asyncContinuationExecutor
+        );
 
         tablesVv.update(causalityToken, (ignore, e) -> inBusyLock(busyLock, () 
-> {
             if (e != null) {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
index 53faae7d06..19636d9db6 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.verify;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ForkJoinPool;
 import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
@@ -458,7 +459,9 @@ public class KeyValueBinaryViewOperationsTest extends 
TableKvOperationsTestBase
                 internalTable,
                 new DummySchemaManagerImpl(schema),
                 schemaVersions,
-                marshallers, mock(IgniteSql.class)
+                mock(IgniteSql.class),
+                marshallers,
+                ForkJoinPool.commonPool()
         );
 
         BinaryRow resultRow = new 
TupleMarshallerImpl(schema).marshal(Tuple.create().set("ID", 1L).set("VAL", 
2L));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
index b998131b97..9cb632d123 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsTest.java
@@ -57,6 +57,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.marshaller.MarshallersProvider;
 import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
@@ -726,8 +727,9 @@ public class KeyValueViewOperationsTest extends 
TableKvOperationsTestBase {
                 internalTable,
                 new DummySchemaManagerImpl(schema),
                 schemaVersions,
-                marshallers,
                 mock(IgniteSql.class),
+                marshallers,
+                ForkJoinPool.commonPool(),
                 keyMapper,
                 valMapper
         );
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
index cb8fe29c2c..f5904e412f 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
@@ -40,6 +40,7 @@ import static org.mockito.Mockito.verify;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ForkJoinPool;
 import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
@@ -611,7 +612,12 @@ public class RecordBinaryViewOperationsTest extends 
TableKvOperationsTestBase {
         ReflectionMarshallersProvider marshallers = new 
ReflectionMarshallersProvider();
 
         RecordView<Tuple> view = new RecordBinaryViewImpl(
-                internalTable, new DummySchemaManagerImpl(schema), 
schemaVersions, marshallers, mock(IgniteSql.class)
+                internalTable,
+                new DummySchemaManagerImpl(schema),
+                schemaVersions,
+                mock(IgniteSql.class),
+                marshallers,
+                ForkJoinPool.commonPool()
         );
 
         BinaryRow resultRow = new 
TupleMarshallerImpl(schema).marshal(Tuple.create().set("id", 1L).set("val", 
2L));
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
index 64a80bb584..f3918d3a96 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordViewOperationsTest.java
@@ -55,6 +55,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
 import java.util.stream.Collectors;
 import org.apache.ignite.internal.marshaller.ReflectionMarshallersProvider;
 import 
org.apache.ignite.internal.marshaller.testobjects.TestObjectWithAllTypes;
@@ -409,9 +410,10 @@ public class RecordViewOperationsTest extends 
TableKvOperationsTestBase {
                 internalTable,
                 new DummySchemaManagerImpl(schema),
                 schemaVersions,
+                mock(IgniteSql.class),
                 marshallers,
-                recMapper,
-                mock(IgniteSql.class)
+                ForkJoinPool.commonPool(),
+                recMapper
         );
     }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 7b041ea1a6..2fa2603f43 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -65,6 +65,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Phaser;
 import java.util.concurrent.ScheduledExecutorService;
@@ -777,7 +778,8 @@ public class TableManagerTest extends IgniteAbstractTest {
                 () -> mock(IgniteSql.class),
                 new RemotelyTriggeredResourceRegistry(),
                 mock(ScheduledExecutorService.class),
-                new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock, 
tm, vaultManager, mock(FailureProcessor.class))
+                new LowWatermark(NODE_NAME, gcConfig.lowWatermark(), clock, 
tm, vaultManager, mock(FailureProcessor.class)),
+                ForkJoinPool.commonPool()
         ) {
 
             @Override

Reply via email to