This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7818239ed7 IGNITE-21729 Prevent threads from being hijacked via async 
cursors in KV/Record view APIs (#3393)
7818239ed7 is described below

commit 7818239ed7412fc125425997e7d030d67610ce0b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Mar 11 21:00:10 2024 +0400

    IGNITE-21729 Prevent threads from being hijacked via async cursors in 
KV/Record view APIs (#3393)
---
 .../ignite/internal/thread/PublicApiThreading.java |  35 ++-
 .../internal/thread/PublicApiThreadingTest.java    | 158 +++++++++++++
 .../threading/ItKvRecordApiThreadingTest.java      | 256 +++++++++++++++++----
 .../ignite/internal/table/AbstractTableView.java   |  15 +-
 .../internal/table/AntiHijackAsyncCursor.java      |  67 ++++++
 5 files changed, 477 insertions(+), 54 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
index fa82b26717..221e0dd27d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
@@ -17,11 +17,14 @@
 
 package org.apache.ignite.internal.thread;
 
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.function.Supplier;
 
 /**
- * Logic related to the threading concern of public APIs: it allows to 
raise/clear/check a thread-local flag which gives us ability to
- * see whether a public API call actually comes from an Ignite internal code 
and not from an actual user.
+ * Logic related to the threading concern of public APIs: it allows to prevent 
Ignite thread hijack by the user's code and provides
+ * mechanisms supporting this protection.
  */
 public class PublicApiThreading {
     private static final ThreadLocal<Boolean> INTERNAL_CALL = new 
ThreadLocal<>();
@@ -48,6 +51,10 @@ public class PublicApiThreading {
      * @return Call result.
      */
     public static <T> T doInternalCall(Supplier<T> call) {
+        if (inInternalCall()) {
+            return call.get();
+        }
+
         startInternalCall();
 
         try {
@@ -64,4 +71,28 @@ public class PublicApiThreading {
         Boolean value = INTERNAL_CALL.get();
         return value != null && value;
     }
+
+    /**
+     * Prevents Ignite internal threads from being hijacked by the user code. 
If that happened, the user code could have blocked
+     * Ignite threads deteriorating progress.
+     *
+     * <p>This is done by completing the future in the async continuation 
thread pool if it would have been completed in an Ignite thread.
+     *
+     * <p>The switch to the async continuation pool is also skipped when it's 
known that the call is made by other Ignite component
+     * and not by the user.
+     *
+     * @param originalFuture Operation future.
+     * @param asyncContinuationExecutor Executor to which execution will be 
resubmitted when leaving asynchronous public API endpoints
+     *     (to prevent the user from stealing Ignite threads).
+     * @return Future that will be completed in the async continuation thread 
pool ({@link ForkJoinPool#commonPool()} by default).
+     */
+    public static  <T> CompletableFuture<T> 
preventThreadHijack(CompletableFuture<T> originalFuture, Executor 
asyncContinuationExecutor) {
+        if (originalFuture.isDone() || inInternalCall()) {
+            return originalFuture;
+        }
+
+        // The future is not complete yet, so it will be completed on an 
Ignite thread, so we need to complete the user-facing future
+        // in the continuation pool.
+        return originalFuture.whenCompleteAsync((res, ex) -> {}, 
asyncContinuationExecutor);
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/thread/PublicApiThreadingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/PublicApiThreadingTest.java
new file mode 100644
index 0000000000..5576969049
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/thread/PublicApiThreadingTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.hamcrest.core.CombinableMatcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class PublicApiThreadingTest {
+    private final ExecutorService internalThreadPool = 
Executors.newSingleThreadExecutor(task -> new TestThread(task, true));
+    private final ExecutorService asyncContinuationExecutor = 
Executors.newSingleThreadExecutor(task -> new TestThread(task, false));
+
+    private static CompletableFuture<Thread> 
getCompletionThreadFuture(CompletableFuture<Void> publicFuture) {
+        return publicFuture.thenApply(unused -> Thread.currentThread());
+    }
+
+    private static CombinableMatcher<Object> internalPoolThread() {
+        return both(instanceOf(TestThread.class)).and(hasProperty("internal", 
is(true)));
+    }
+
+    private static CombinableMatcher<Object> asyncContinuationThread() {
+        return both(instanceOf(TestThread.class)).and(hasProperty("internal", 
is(false)));
+    }
+
+    @AfterEach
+    void clearInInternalCallStatus() {
+        PublicApiThreading.endInternalCall();
+    }
+
+    @AfterEach
+    void shutdownThreadPools() {
+        IgniteUtils.shutdownAndAwaitTermination(asyncContinuationExecutor, 10, 
SECONDS);
+        IgniteUtils.shutdownAndAwaitTermination(internalThreadPool, 10, 
SECONDS);
+    }
+
+    @Test
+    void notInInternalCallByDefault() {
+        assertFalse(PublicApiThreading.inInternalCall());
+    }
+
+    @Test
+    void startInternalCallRaisesInInternalCallStatus() {
+        PublicApiThreading.startInternalCall();
+
+        assertTrue(PublicApiThreading.inInternalCall());
+    }
+
+    @Test
+    void endInternalCallClearsInInternalCallStatus() {
+        PublicApiThreading.startInternalCall();
+
+        PublicApiThreading.endInternalCall();
+
+        assertFalse(PublicApiThreading.inInternalCall());
+    }
+
+    @Test
+    void doInternalCallExecutesClosureWithFlagRaised() {
+        PublicApiThreading.doInternalCall(() -> {
+            assertTrue(PublicApiThreading.inInternalCall());
+
+            return null;
+        });
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void doInternalCallRestoresFlagAfterFinishing(boolean 
flagBeforeDoInternalCall) {
+        if (flagBeforeDoInternalCall) {
+            PublicApiThreading.startInternalCall();
+        }
+
+        PublicApiThreading.doInternalCall(() -> null);
+
+        assertEquals(flagBeforeDoInternalCall, 
PublicApiThreading.inInternalCall());
+    }
+
+    @Test
+    void doesNotSwitchThreadWhenFutureIsCompleteRightAway() {
+        CompletableFuture<Void> publicFuture = 
PublicApiThreading.preventThreadHijack(nullCompletedFuture(), 
asyncContinuationExecutor);
+        CompletableFuture<Thread> completionThreadFuture = 
getCompletionThreadFuture(publicFuture);
+
+        assertThat(completionThreadFuture, willBe(Thread.currentThread()));
+    }
+
+    @Test
+    void doesNotSwitchThreadWhenDoingInternalCall() {
+        CompletableFuture<Void> internallyCompletedFuture = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> publicFuture = 
PublicApiThreading.doInternalCall(
+                () -> 
PublicApiThreading.preventThreadHijack(internallyCompletedFuture, 
asyncContinuationExecutor)
+        );
+        CompletableFuture<Thread> completionThreadFuture = 
getCompletionThreadFuture(publicFuture);
+
+        internallyCompletedFuture.completeAsync(() -> null, 
internalThreadPool);
+
+        assertThat(completionThreadFuture, willBe(internalPoolThread()));
+    }
+
+    @Test
+    void switchesToAsyncContinuationPoolWhenFutureIsNotCompleteRightAway() {
+        CompletableFuture<Void> internallyCompletedFuture = new 
CompletableFuture<>();
+
+        CompletableFuture<Void> publicFuture = 
PublicApiThreading.preventThreadHijack(internallyCompletedFuture, 
asyncContinuationExecutor);
+        CompletableFuture<Thread> completionThreadFuture = 
getCompletionThreadFuture(publicFuture);
+
+        internallyCompletedFuture.completeAsync(() -> null, 
internalThreadPool);
+
+        assertThat(completionThreadFuture, willBe(asyncContinuationThread()));
+    }
+
+    @SuppressWarnings("ClassExplicitlyExtendsThread")
+    public static class TestThread extends Thread {
+        private final boolean internal;
+
+        TestThread(Runnable target, boolean internal) {
+            super(target);
+            this.internal = internal;
+        }
+
+        public boolean isInternal() {
+            return internal;
+        }
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
index a43c8b9ea6..c7cfc73d53 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
@@ -17,20 +17,28 @@
 
 package org.apache.ignite.internal.threading;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.either;
 import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.stream.IntStream;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.lang.IgniteSystemProperties;
 import org.apache.ignite.internal.streamer.SimplePublisher;
@@ -39,10 +47,12 @@ import 
org.apache.ignite.internal.testframework.TestIgnitionManager;
 import org.apache.ignite.internal.testframework.WithSystemProperty;
 import org.apache.ignite.internal.thread.IgniteThread;
 import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.lang.AsyncCursor;
 import org.apache.ignite.table.KeyValueView;
 import org.apache.ignite.table.RecordView;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.criteria.CriteriaQuerySource;
 import org.hamcrest.Matcher;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -53,23 +63,34 @@ import 
org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
 @SuppressWarnings("resource")
 class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
     private static final String TABLE_NAME = "test";
+
     private static final int KEY = 1;
 
     private static final Record KEY_RECORD = new Record(1, "");
 
+    private static final int MORE_THAN_DEFAULT_STATEMENT_PAGE_SIZE = 2048;
+
     @Override
     protected int initialNodes() {
         return 1;
     }
 
+    private static Matcher<Object> asyncContinuationPool() {
+        return both(hasProperty("name", 
startsWith("ForkJoinPool.commonPool-worker-")))
+                .and(not(instanceOf(IgniteThread.class)));
+    }
+
     @BeforeAll
     void createTable() {
         sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val 
VARCHAR)");
-    }
 
-    @BeforeEach
-    void upsertRecord() {
-        plainKeyValueView().put(null, KEY, "one");
+        // Putting more than the doubled default query page size rows to make 
sure that CriteriaQuerySource#query() returns a non-closed
+        // cursor even after we call its second page.
+        // TODO: Instead, configure pageSize=1 on each #query() call when 
https://issues.apache.org/jira/browse/IGNITE-18647 is fixed.
+        Map<Integer, String> valuesForQuerying = IntStream.range(KEY + 1, KEY 
+ 1 + 2 * MORE_THAN_DEFAULT_STATEMENT_PAGE_SIZE)
+                .boxed()
+                .collect(toMap(identity(), Object::toString));
+        plainKeyValueView().putAll(null, valuesForQuerying);
     }
 
     private static KeyValueView<Integer, String> plainKeyValueView() {
@@ -84,27 +105,32 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         return CLUSTER.aliveNode().tables().table(TABLE_NAME);
     }
 
-    @SuppressWarnings("rawtypes")
+    @BeforeEach
+    void upsertRecord() {
+        KeyValueView<Integer, String> view = plainKeyValueView();
+
+        // #KEY is used by tests related to KV operations and queries.
+        view.put(null, KEY, "one");
+    }
+
     @CartesianTest
     void keyValueViewFuturesCompleteInContinuationsPool(
             @Enum KeyValueViewAsyncOperation operation,
             @Enum KeyValueViewKind kind
     ) {
-        assumeTrue(kind.supportsGetNullable() || !operation.isGetNullable());
+        assumeTrue(
+                kind.supportsGetNullable() || !operation.isGetNullable(),
+                "Skipping the test as getNullable() is not supported by views 
of kind " + kind
+        );
 
-        KeyValueView tableView = kind.view();
+        KeyValueView<?, ?> tableView = kind.view();
 
-        @SuppressWarnings("unchecked") CompletableFuture<Thread> 
completerFuture = forcingSwitchFromUserThread(
+        CompletableFuture<Thread> completerFuture = 
forcingSwitchFromUserThread(
                 () -> operation.executeOn(tableView, kind.context())
                         .thenApply(unused -> Thread.currentThread())
         );
 
-        assertThat(completerFuture, willBe(commonPoolThread()));
-    }
-
-    private static Matcher<Object> commonPoolThread() {
-        return both(hasProperty("name", 
startsWith("ForkJoinPool.commonPool-worker-")))
-                .and(not(instanceOf(IgniteThread.class)));
+        assertThat(completerFuture, willBe(asyncContinuationPool()));
     }
 
     private static <T> T forcingSwitchFromUserThread(Supplier<? extends T> 
action) {
@@ -133,7 +159,6 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         return new KeyValueContext<>(Tuple.create().set("id", KEY), 
Tuple.create().set("val", "one"), Tuple.create().set("val", "two"));
     }
 
-    @SuppressWarnings("rawtypes")
     @CartesianTest
     void 
keyValueViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
             @Enum KeyValueViewAsyncOperation operation,
@@ -141,9 +166,9 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
     ) {
         assumeTrue(kind.supportsGetNullable() || !operation.isGetNullable());
 
-        KeyValueView tableView = kind.view();
+        KeyValueView<?, ?> tableView = kind.view();
 
-        @SuppressWarnings("unchecked") CompletableFuture<Thread> 
completerFuture = forcingSwitchFromUserThread(
+        CompletableFuture<Thread> completerFuture = 
forcingSwitchFromUserThread(
                 () -> PublicApiThreading.doInternalCall(
                         () -> operation.executeOn(tableView, kind.context())
                                 .thenApply(unused -> Thread.currentThread())
@@ -157,31 +182,29 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         return instanceOf(IgniteThread.class);
     }
 
-    @SuppressWarnings("rawtypes")
     @CartesianTest
     void recordViewFuturesCompleteInContinuationsPool(
             @Enum RecordViewAsyncOperation operation,
             @Enum RecordViewKind kind
     ) {
-        RecordView tableView = kind.view();
+        RecordView<?> tableView = kind.view();
 
-        @SuppressWarnings("unchecked") CompletableFuture<Thread> 
completerFuture = forcingSwitchFromUserThread(
+        CompletableFuture<Thread> completerFuture = 
forcingSwitchFromUserThread(
                 () -> operation.executeOn(tableView, kind.context())
                         .thenApply(unused -> Thread.currentThread())
         );
 
-        assertThat(completerFuture, willBe(commonPoolThread()));
+        assertThat(completerFuture, willBe(asyncContinuationPool()));
     }
 
-    @SuppressWarnings("rawtypes")
     @CartesianTest
     void 
recordViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
             @Enum RecordViewAsyncOperation operation,
             @Enum RecordViewKind kind
     ) {
-        RecordView tableView = kind.view();
+        RecordView<?> tableView = kind.view();
 
-        @SuppressWarnings("unchecked") CompletableFuture<Thread> 
completerFuture = forcingSwitchFromUserThread(
+        CompletableFuture<Thread> completerFuture = 
forcingSwitchFromUserThread(
                 () -> PublicApiThreading.doInternalCall(
                         () -> operation.executeOn(tableView, kind.context())
                                 .thenApply(unused -> Thread.currentThread())
@@ -207,6 +230,113 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         return new RecordContext<>(KEY_RECORD.toKeyTuple(), new Record(KEY, 
"one").toFullTuple(), new Record(KEY, "two").toFullTuple());
     }
 
+    @CartesianTest
+    void commonViewFuturesCompleteInContinuationsPool(@Enum 
CommonViewAsyncOperation operation, @Enum ViewKind kind) {
+        CriteriaQuerySource<?> tableView = kind.criteriaQuerySource();
+
+        CompletableFuture<Thread> completerFuture = 
forcingSwitchFromUserThread(
+                () -> operation.executeOn(tableView)
+                        .thenApply(unused -> Thread.currentThread())
+        );
+
+        assertThat(completerFuture, willBe(asyncContinuationPool()));
+    }
+
+    @CartesianTest
+    void 
commonViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+            @Enum CommonViewAsyncOperation operation,
+            @Enum ViewKind kind
+    ) {
+        CriteriaQuerySource<?> tableView = kind.criteriaQuerySource();
+
+        CompletableFuture<Thread> completerFuture = 
forcingSwitchFromUserThread(
+                () -> PublicApiThreading.doInternalCall(
+                        () -> operation.executeOn(tableView)
+                                .thenApply(unused -> Thread.currentThread())
+                )
+        );
+
+        assertThat(completerFuture, willBe(anIgniteThread()));
+    }
+
+    @CartesianTest
+    void asyncCursorFuturesCompleteInContinuationsPool(@Enum 
AsyncCursorAsyncOperation operation, @Enum ViewKind kind) throws Exception {
+        AsyncCursor<?> firstPage = kind.criteriaQuerySource().queryAsync(null, 
null).get(10, SECONDS);
+
+        CompletableFuture<Thread> completerFuture = 
operation.executeOn(firstPage)
+                        .thenApply(unused -> Thread.currentThread());
+
+        // The future might get completed in the calling thread as we don't 
force a wait inside Ignite
+        // (because we cannot do this with fetching next page or closing).
+        assertThat(completerFuture, willBe(
+                either(is(Thread.currentThread())).or(asyncContinuationPool())
+        ));
+    }
+
+    @CartesianTest
+    void 
asyncCursorFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+            @Enum AsyncCursorAsyncOperation operation,
+            @Enum ViewKind kind
+    ) throws Exception {
+        AsyncCursor<?> firstPage = kind.criteriaQuerySource().queryAsync(null, 
null).get(10, SECONDS);
+
+        CompletableFuture<Thread> completerFuture = 
PublicApiThreading.doInternalCall(
+                () -> operation.executeOn(firstPage)
+                        .thenApply(unused -> Thread.currentThread())
+        );
+
+        // The future might get completed in the calling thread as we don't 
force a wait inside Ignite
+        // (because we cannot do this with fetching next page or closing).
+        assertThat(completerFuture, willBe(
+                either(is(Thread.currentThread())).or(anIgniteThread())
+        ));
+    }
+
+    /**
+     * This test differs from {@link 
#asyncCursorFuturesCompleteInContinuationsPool(AsyncCursorAsyncOperation, 
ViewKind)} in that it obtains
+     * the future to test from a call on a cursor obtained from a cursor, not 
from a view.
+     */
+    @CartesianTest
+    void asyncCursorFuturesAfterFetchCompleteInContinuationsPool(@Enum 
AsyncCursorAsyncOperation operation, @Enum ViewKind kind)
+            throws Exception {
+        AsyncCursor<?> firstPage = kind.criteriaQuerySource().queryAsync(null, 
null).get(10, SECONDS);
+        AsyncCursor<?> secondPage = firstPage.fetchNextPage().get(10, SECONDS);
+
+        CompletableFuture<Thread> completerFuture = 
operation.executeOn(secondPage)
+                .thenApply(unused -> Thread.currentThread());
+
+        // The future might get completed in the calling thread as we don't 
force a wait inside Ignite
+        // (because we cannot do this with fetching next page or closing).
+        assertThat(completerFuture, willBe(
+                either(is(Thread.currentThread())).or(asyncContinuationPool())
+        ));
+    }
+
+    /**
+     * This test differs from
+     * {@link 
#asyncCursorFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(AsyncCursorAsyncOperation,
 ViewKind)} in that
+     * it obtains the future to test from a call on a cursor obtained from a 
cursor, not from a view.
+     */
+    @CartesianTest
+    void 
asyncCursorFuturesAfterFetchFromInternalCallsAreNotResubmittedToContinuationsPool(
+            @Enum AsyncCursorAsyncOperation operation,
+            @Enum ViewKind kind
+    ) throws Exception {
+        AsyncCursor<?> firstPage = kind.criteriaQuerySource().queryAsync(null, 
null).get(10, SECONDS);
+        AsyncCursor<?> secondPage = firstPage.fetchNextPage().get(10, SECONDS);
+
+        CompletableFuture<Thread> completerFuture = 
PublicApiThreading.doInternalCall(
+                () -> operation.executeOn(secondPage)
+                        .thenApply(unused -> Thread.currentThread())
+        );
+
+        // The future might get completed in the calling thread as we don't 
force a wait inside Ignite
+        // (because we cannot do this with fetching next page or closing).
+        assertThat(completerFuture, willBe(
+                either(is(Thread.currentThread())).or(anIgniteThread())
+        ));
+    }
+
     private enum KeyValueViewAsyncOperation {
         GET_ASYNC((view, context) -> view.getAsync(null, context.key)),
         GET_NULLABLE_ASYNC((view, context) -> view.getNullableAsync(null, 
context.key)),
@@ -227,18 +357,14 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         REPLACE_EXACT_ASYNC((view, context) -> view.replaceAsync(null, 
context.key, context.usualValue, context.anotherValue)),
         GET_AND_REPLACE_ASYNC((view, context) -> view.getAndReplaceAsync(null, 
context.key, context.usualValue)),
         GET_NULLABLE_AND_REPLACE_ASYNC((view, context) -> 
view.getNullableAndReplaceAsync(null, context.key, context.usualValue)),
-        @SuppressWarnings({"rawtypes", "unchecked"})
         STREAM_DATA((view, context) -> {
             CompletableFuture<?> future;
-            try (var publisher = new SimplePublisher()) {
+            try (var publisher = new SimplePublisher<Entry<Object, Object>>()) 
{
                 future = view.streamData(publisher, null);
                 publisher.submit(Map.entry(context.key, context.usualValue));
             }
             return future;
-        }),
-        QUERY_ASYNC((view, context) -> view.queryAsync(null, null)),
-        QUERY_ASYNC_WITH_INDEX_NAME((view, context) -> view.queryAsync(null, 
null, null)),
-        QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS((view, context) -> 
view.queryAsync(null, null, null, null));
+        });
 
         private final BiFunction<KeyValueView<Object, Object>, 
KeyValueContext<Object, Object>, CompletableFuture<?>> action;
 
@@ -246,7 +372,7 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
             this.action = action;
         }
 
-        <K, V> CompletableFuture<?> executeOn(KeyValueView<K, V> tableView, 
KeyValueContext<K, V> context) {
+        CompletableFuture<?> executeOn(KeyValueView<?, ?> tableView, 
KeyValueContext<?, ?> context) {
             return action.apply((KeyValueView<Object, Object>) tableView, 
(KeyValueContext<Object, Object>) context);
         }
 
@@ -275,15 +401,14 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         }
     }
 
-    @SuppressWarnings("rawtypes")
     private enum KeyValueViewKind {
         PLAIN, BINARY;
 
-        KeyValueView view() {
+        KeyValueView<?, ?> view() {
             return this == PLAIN ? plainKeyValueView() : binaryKeyValueView();
         }
 
-        KeyValueContext context() {
+        KeyValueContext<?, ?> context() {
             return this == PLAIN ? plainKeyValueContext() : 
binaryKeyValueContext();
         }
 
@@ -331,18 +456,14 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         GET_AND_DELETE_ASYNC((view, context) -> view.getAndDeleteAsync(null, 
context.keyRecord)),
         DELETE_ALL_ASYNC((view, context) -> view.deleteAllAsync(null, 
List.of(context.keyRecord))),
         DELETE_ALL_EXACT_ASYNC((view, context) -> 
view.deleteAllExactAsync(null, List.of(context.keyRecord))),
-        @SuppressWarnings({"rawtypes", "unchecked"})
         STREAM_DATA((view, context) -> {
             CompletableFuture<?> future;
-            try (var publisher = new SimplePublisher()) {
+            try (var publisher = new SimplePublisher<>()) {
                 future = view.streamData(publisher, null);
                 publisher.submit(context.fullRecord);
             }
             return future;
-        }),
-        QUERY_ASYNC((view, context) -> view.queryAsync(null, null)),
-        QUERY_ASYNC_WITH_INDEX_NAME((view, context) -> view.queryAsync(null, 
null, null)),
-        QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS((view, context) -> 
view.queryAsync(null, null, null, null));
+        });
 
         private final BiFunction<RecordView<Object>, RecordContext<Object>, 
CompletableFuture<?>> action;
 
@@ -350,7 +471,7 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
             this.action = action;
         }
 
-        <R> CompletableFuture<?> executeOn(RecordView<R> tableView, 
RecordContext<R> context) {
+        CompletableFuture<?> executeOn(RecordView<?> tableView, 
RecordContext<?> context) {
             return action.apply((RecordView<Object>) tableView, 
(RecordContext<Object>) context);
         }
     }
@@ -367,16 +488,63 @@ class ItKvRecordApiThreadingTest extends 
ClusterPerClassIntegrationTest {
         }
     }
 
-    @SuppressWarnings("rawtypes")
     private enum RecordViewKind {
         PLAIN, BINARY;
 
-        RecordView view() {
+        RecordView<?> view() {
             return this == PLAIN ? plainRecordView() : binaryRecordView();
         }
 
-        RecordContext context() {
+        RecordContext<?> context() {
             return this == PLAIN ? plainRecordContext() : 
binaryRecordContext();
         }
     }
+
+    private enum CommonViewAsyncOperation {
+        QUERY_ASYNC(view -> view.queryAsync(null, null)),
+        QUERY_ASYNC_WITH_INDEX_NAME(view -> view.queryAsync(null, null, null)),
+        QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS(view -> view.queryAsync(null, 
null, null, null));
+
+        private final Function<CriteriaQuerySource<Object>, 
CompletableFuture<?>> action;
+
+        CommonViewAsyncOperation(Function<CriteriaQuerySource<Object>, 
CompletableFuture<?>> action) {
+            this.action = action;
+        }
+
+        CompletableFuture<?> executeOn(CriteriaQuerySource<?> tableView) {
+            return action.apply((CriteriaQuerySource<Object>) tableView);
+        }
+    }
+
+    private enum ViewKind {
+        PLAIN_KEY_VALUE(() -> plainKeyValueView()),
+        BINARY_KEY_VALUE(() -> binaryKeyValueView()),
+        PLAIN_RECORD(() -> plainRecordView()),
+        BINARY_RECORD(() -> binaryRecordView());
+
+        private final Supplier<CriteriaQuerySource<?>> viewSupplier;
+
+        ViewKind(Supplier<CriteriaQuerySource<?>> viewSupplier) {
+            this.viewSupplier = viewSupplier;
+        }
+
+        CriteriaQuerySource<?> criteriaQuerySource() {
+            return viewSupplier.get();
+        }
+    }
+
+    private enum AsyncCursorAsyncOperation {
+        FETCH_NEXT_PAGE(cursor -> cursor.fetchNextPage()),
+        CLOSE(cursor -> cursor.closeAsync());
+
+        private final Function<AsyncCursor<Object>, CompletableFuture<?>> 
action;
+
+        AsyncCursorAsyncOperation(Function<AsyncCursor<Object>, 
CompletableFuture<?>> action) {
+            this.action = action;
+        }
+
+        CompletableFuture<?> executeOn(AsyncCursor<?> cursor) {
+            return action.apply((AsyncCursor<Object>) cursor);
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 3ba9fa63c4..e146007c0d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -207,13 +207,7 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
      * @return Future that will be completed in the async continuation thread 
pool ({@link ForkJoinPool#commonPool()} by default).
      */
     protected final <T> CompletableFuture<T> 
preventThreadHijack(CompletableFuture<T> originalFuture) {
-        if (originalFuture.isDone() || PublicApiThreading.inInternalCall()) {
-            return originalFuture;
-        }
-
-        // The future is not complete yet, so it will be completed on an 
Ignite thread, so we need to complete the user-facing future
-        // in the continuation pool.
-        return originalFuture.whenCompleteAsync((res, ex) -> {}, 
asyncContinuationExecutor);
+        return PublicApiThreading.preventThreadHijack(originalFuture, 
asyncContinuationExecutor);
     }
 
     /**
@@ -278,7 +272,12 @@ abstract class AbstractTableView<R> implements 
CriteriaQuerySource<R> {
 
                         assert meta != null : "Metadata can't be null.";
 
-                        return new QueryCriteriaAsyncCursor<>(resultSet, 
queryMapper(meta, schema), session::closeAsync);
+                        AsyncCursor<R> cursor = new QueryCriteriaAsyncCursor<>(
+                                resultSet,
+                                queryMapper(meta, schema),
+                                session::closeAsync
+                        );
+                        return new AntiHijackAsyncCursor<>(cursor, 
asyncContinuationExecutor);
                     })
                     .whenComplete((ignore, err) -> {
                         if (err != null) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/AntiHijackAsyncCursor.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AntiHijackAsyncCursor.java
new file mode 100644
index 0000000000..272b3af3b4
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/AntiHijackAsyncCursor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static 
org.apache.ignite.internal.thread.PublicApiThreading.preventThreadHijack;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.lang.AsyncCursor;
+
+/**
+ * Wrapper around {@link AsyncCursor} which prevents Ignite internal threads 
from being hijacked by the user via asynchronous methods.
+ *
+ * @see PublicApiThreading
+ */
+public class AntiHijackAsyncCursor<T> implements AsyncCursor<T> {
+    private final AsyncCursor<T> cursor;
+
+    private final Executor asyncContinuationExecutor;
+
+    public AntiHijackAsyncCursor(AsyncCursor<T> cursor, Executor 
asyncContinuationExecutor) {
+        this.cursor = cursor;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
+    }
+
+    @Override
+    public Iterable<T> currentPage() {
+        return cursor.currentPage();
+    }
+
+    @Override
+    public int currentPageSize() {
+        return cursor.currentPageSize();
+    }
+
+    @Override
+    public CompletableFuture<? extends AsyncCursor<T>> fetchNextPage() {
+        return preventThreadHijack(cursor.fetchNextPage(), 
asyncContinuationExecutor)
+                .thenApply(nextCursor -> new 
AntiHijackAsyncCursor<>(nextCursor, asyncContinuationExecutor));
+    }
+
+    @Override
+    public boolean hasMorePages() {
+        return cursor.hasMorePages();
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return preventThreadHijack(cursor.closeAsync(), 
asyncContinuationExecutor);
+    }
+}


Reply via email to