This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7818239ed7 IGNITE-21729 Prevent threads from being hijacked via async
cursors in KV/Record view APIs (#3393)
7818239ed7 is described below
commit 7818239ed7412fc125425997e7d030d67610ce0b
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Mon Mar 11 21:00:10 2024 +0400
IGNITE-21729 Prevent threads from being hijacked via async cursors in
KV/Record view APIs (#3393)
---
.../ignite/internal/thread/PublicApiThreading.java | 35 ++-
.../internal/thread/PublicApiThreadingTest.java | 158 +++++++++++++
.../threading/ItKvRecordApiThreadingTest.java | 256 +++++++++++++++++----
.../ignite/internal/table/AbstractTableView.java | 15 +-
.../internal/table/AntiHijackAsyncCursor.java | 67 ++++++
5 files changed, 477 insertions(+), 54 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
index fa82b26717..221e0dd27d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/PublicApiThreading.java
@@ -17,11 +17,14 @@
package org.apache.ignite.internal.thread;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import java.util.function.Supplier;
/**
- * Logic related to the threading concern of public APIs: it allows to
raise/clear/check a thread-local flag which gives us ability to
- * see whether a public API call actually comes from an Ignite internal code
and not from an actual user.
+ * Logic related to the threading concern of public APIs: it allows to prevent
Ignite thread hijack by the user's code and provides
+ * mechanisms supporting this protection.
*/
public class PublicApiThreading {
private static final ThreadLocal<Boolean> INTERNAL_CALL = new
ThreadLocal<>();
@@ -48,6 +51,10 @@ public class PublicApiThreading {
* @return Call result.
*/
public static <T> T doInternalCall(Supplier<T> call) {
+ if (inInternalCall()) {
+ return call.get();
+ }
+
startInternalCall();
try {
@@ -64,4 +71,28 @@ public class PublicApiThreading {
Boolean value = INTERNAL_CALL.get();
return value != null && value;
}
+
+ /**
+ * Prevents Ignite internal threads from being hijacked by the user code.
If that happened, the user code could have blocked
+ * Ignite threads deteriorating progress.
+ *
+ * <p>This is done by completing the future in the async continuation
thread pool if it would have been completed in an Ignite thread.
+ *
+ * <p>The switch to the async continuation pool is also skipped when it's
known that the call is made by other Ignite component
+ * and not by the user.
+ *
+ * @param originalFuture Operation future.
+ * @param asyncContinuationExecutor Executor to which execution will be
resubmitted when leaving asynchronous public API endpoints
+ * (to prevent the user from stealing Ignite threads).
+ * @return Future that will be completed in the async continuation thread
pool ({@link ForkJoinPool#commonPool()} by default).
+ */
+ public static <T> CompletableFuture<T>
preventThreadHijack(CompletableFuture<T> originalFuture, Executor
asyncContinuationExecutor) {
+ if (originalFuture.isDone() || inInternalCall()) {
+ return originalFuture;
+ }
+
+ // The future is not complete yet, so it will be completed on an
Ignite thread, so we need to complete the user-facing future
+ // in the continuation pool.
+ return originalFuture.whenCompleteAsync((res, ex) -> {},
asyncContinuationExecutor);
+ }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/thread/PublicApiThreadingTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/thread/PublicApiThreadingTest.java
new file mode 100644
index 0000000000..5576969049
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/thread/PublicApiThreadingTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.thread;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.hasProperty;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.hamcrest.core.CombinableMatcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class PublicApiThreadingTest {
+ private final ExecutorService internalThreadPool =
Executors.newSingleThreadExecutor(task -> new TestThread(task, true));
+ private final ExecutorService asyncContinuationExecutor =
Executors.newSingleThreadExecutor(task -> new TestThread(task, false));
+
+ private static CompletableFuture<Thread>
getCompletionThreadFuture(CompletableFuture<Void> publicFuture) {
+ return publicFuture.thenApply(unused -> Thread.currentThread());
+ }
+
+ private static CombinableMatcher<Object> internalPoolThread() {
+ return both(instanceOf(TestThread.class)).and(hasProperty("internal",
is(true)));
+ }
+
+ private static CombinableMatcher<Object> asyncContinuationThread() {
+ return both(instanceOf(TestThread.class)).and(hasProperty("internal",
is(false)));
+ }
+
+ @AfterEach
+ void clearInInternalCallStatus() {
+ PublicApiThreading.endInternalCall();
+ }
+
+ @AfterEach
+ void shutdownThreadPools() {
+ IgniteUtils.shutdownAndAwaitTermination(asyncContinuationExecutor, 10,
SECONDS);
+ IgniteUtils.shutdownAndAwaitTermination(internalThreadPool, 10,
SECONDS);
+ }
+
+ @Test
+ void notInInternalCallByDefault() {
+ assertFalse(PublicApiThreading.inInternalCall());
+ }
+
+ @Test
+ void startInternalCallRaisesInInternalCallStatus() {
+ PublicApiThreading.startInternalCall();
+
+ assertTrue(PublicApiThreading.inInternalCall());
+ }
+
+ @Test
+ void endInternalCallClearsInInternalCallStatus() {
+ PublicApiThreading.startInternalCall();
+
+ PublicApiThreading.endInternalCall();
+
+ assertFalse(PublicApiThreading.inInternalCall());
+ }
+
+ @Test
+ void doInternalCallExecutesClosureWithFlagRaised() {
+ PublicApiThreading.doInternalCall(() -> {
+ assertTrue(PublicApiThreading.inInternalCall());
+
+ return null;
+ });
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void doInternalCallRestoresFlagAfterFinishing(boolean
flagBeforeDoInternalCall) {
+ if (flagBeforeDoInternalCall) {
+ PublicApiThreading.startInternalCall();
+ }
+
+ PublicApiThreading.doInternalCall(() -> null);
+
+ assertEquals(flagBeforeDoInternalCall,
PublicApiThreading.inInternalCall());
+ }
+
+ @Test
+ void doesNotSwitchThreadWhenFutureIsCompleteRightAway() {
+ CompletableFuture<Void> publicFuture =
PublicApiThreading.preventThreadHijack(nullCompletedFuture(),
asyncContinuationExecutor);
+ CompletableFuture<Thread> completionThreadFuture =
getCompletionThreadFuture(publicFuture);
+
+ assertThat(completionThreadFuture, willBe(Thread.currentThread()));
+ }
+
+ @Test
+ void doesNotSwitchThreadWhenDoingInternalCall() {
+ CompletableFuture<Void> internallyCompletedFuture = new
CompletableFuture<>();
+
+ CompletableFuture<Void> publicFuture =
PublicApiThreading.doInternalCall(
+ () ->
PublicApiThreading.preventThreadHijack(internallyCompletedFuture,
asyncContinuationExecutor)
+ );
+ CompletableFuture<Thread> completionThreadFuture =
getCompletionThreadFuture(publicFuture);
+
+ internallyCompletedFuture.completeAsync(() -> null,
internalThreadPool);
+
+ assertThat(completionThreadFuture, willBe(internalPoolThread()));
+ }
+
+ @Test
+ void switchesToAsyncContinuationPoolWhenFutureIsNotCompleteRightAway() {
+ CompletableFuture<Void> internallyCompletedFuture = new
CompletableFuture<>();
+
+ CompletableFuture<Void> publicFuture =
PublicApiThreading.preventThreadHijack(internallyCompletedFuture,
asyncContinuationExecutor);
+ CompletableFuture<Thread> completionThreadFuture =
getCompletionThreadFuture(publicFuture);
+
+ internallyCompletedFuture.completeAsync(() -> null,
internalThreadPool);
+
+ assertThat(completionThreadFuture, willBe(asyncContinuationThread()));
+ }
+
+ @SuppressWarnings("ClassExplicitlyExtendsThread")
+ public static class TestThread extends Thread {
+ private final boolean internal;
+
+ TestThread(Runnable target, boolean internal) {
+ super(target);
+ this.internal = internal;
+ }
+
+ public boolean isInternal() {
+ return internal;
+ }
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
index a43c8b9ea6..c7cfc73d53 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
@@ -17,20 +17,28 @@
package org.apache.ignite.internal.threading;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
+import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.IntStream;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.streamer.SimplePublisher;
@@ -39,10 +47,12 @@ import
org.apache.ignite.internal.testframework.TestIgnitionManager;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.thread.IgniteThread;
import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.lang.AsyncCursor;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.criteria.CriteriaQuerySource;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -53,23 +63,34 @@ import
org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
@SuppressWarnings("resource")
class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
private static final String TABLE_NAME = "test";
+
private static final int KEY = 1;
private static final Record KEY_RECORD = new Record(1, "");
+ private static final int MORE_THAN_DEFAULT_STATEMENT_PAGE_SIZE = 2048;
+
@Override
protected int initialNodes() {
return 1;
}
+ private static Matcher<Object> asyncContinuationPool() {
+ return both(hasProperty("name",
startsWith("ForkJoinPool.commonPool-worker-")))
+ .and(not(instanceOf(IgniteThread.class)));
+ }
+
@BeforeAll
void createTable() {
sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val
VARCHAR)");
- }
- @BeforeEach
- void upsertRecord() {
- plainKeyValueView().put(null, KEY, "one");
+ // Putting more than the doubled default query page size rows to make
sure that CriteriaQuerySource#query() returns a non-closed
+ // cursor even after we call its second page.
+ // TODO: Instead, configure pageSize=1 on each #query() call when
https://issues.apache.org/jira/browse/IGNITE-18647 is fixed.
+ Map<Integer, String> valuesForQuerying = IntStream.range(KEY + 1, KEY
+ 1 + 2 * MORE_THAN_DEFAULT_STATEMENT_PAGE_SIZE)
+ .boxed()
+ .collect(toMap(identity(), Object::toString));
+ plainKeyValueView().putAll(null, valuesForQuerying);
}
private static KeyValueView<Integer, String> plainKeyValueView() {
@@ -84,27 +105,32 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
return CLUSTER.aliveNode().tables().table(TABLE_NAME);
}
- @SuppressWarnings("rawtypes")
+ @BeforeEach
+ void upsertRecord() {
+ KeyValueView<Integer, String> view = plainKeyValueView();
+
+ // #KEY is used by tests related to KV operations and queries.
+ view.put(null, KEY, "one");
+ }
+
@CartesianTest
void keyValueViewFuturesCompleteInContinuationsPool(
@Enum KeyValueViewAsyncOperation operation,
@Enum KeyValueViewKind kind
) {
- assumeTrue(kind.supportsGetNullable() || !operation.isGetNullable());
+ assumeTrue(
+ kind.supportsGetNullable() || !operation.isGetNullable(),
+ "Skipping the test as getNullable() is not supported by views
of kind " + kind
+ );
- KeyValueView tableView = kind.view();
+ KeyValueView<?, ?> tableView = kind.view();
- @SuppressWarnings("unchecked") CompletableFuture<Thread>
completerFuture = forcingSwitchFromUserThread(
+ CompletableFuture<Thread> completerFuture =
forcingSwitchFromUserThread(
() -> operation.executeOn(tableView, kind.context())
.thenApply(unused -> Thread.currentThread())
);
- assertThat(completerFuture, willBe(commonPoolThread()));
- }
-
- private static Matcher<Object> commonPoolThread() {
- return both(hasProperty("name",
startsWith("ForkJoinPool.commonPool-worker-")))
- .and(not(instanceOf(IgniteThread.class)));
+ assertThat(completerFuture, willBe(asyncContinuationPool()));
}
private static <T> T forcingSwitchFromUserThread(Supplier<? extends T>
action) {
@@ -133,7 +159,6 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
return new KeyValueContext<>(Tuple.create().set("id", KEY),
Tuple.create().set("val", "one"), Tuple.create().set("val", "two"));
}
- @SuppressWarnings("rawtypes")
@CartesianTest
void
keyValueViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
@Enum KeyValueViewAsyncOperation operation,
@@ -141,9 +166,9 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
) {
assumeTrue(kind.supportsGetNullable() || !operation.isGetNullable());
- KeyValueView tableView = kind.view();
+ KeyValueView<?, ?> tableView = kind.view();
- @SuppressWarnings("unchecked") CompletableFuture<Thread>
completerFuture = forcingSwitchFromUserThread(
+ CompletableFuture<Thread> completerFuture =
forcingSwitchFromUserThread(
() -> PublicApiThreading.doInternalCall(
() -> operation.executeOn(tableView, kind.context())
.thenApply(unused -> Thread.currentThread())
@@ -157,31 +182,29 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
return instanceOf(IgniteThread.class);
}
- @SuppressWarnings("rawtypes")
@CartesianTest
void recordViewFuturesCompleteInContinuationsPool(
@Enum RecordViewAsyncOperation operation,
@Enum RecordViewKind kind
) {
- RecordView tableView = kind.view();
+ RecordView<?> tableView = kind.view();
- @SuppressWarnings("unchecked") CompletableFuture<Thread>
completerFuture = forcingSwitchFromUserThread(
+ CompletableFuture<Thread> completerFuture =
forcingSwitchFromUserThread(
() -> operation.executeOn(tableView, kind.context())
.thenApply(unused -> Thread.currentThread())
);
- assertThat(completerFuture, willBe(commonPoolThread()));
+ assertThat(completerFuture, willBe(asyncContinuationPool()));
}
- @SuppressWarnings("rawtypes")
@CartesianTest
void
recordViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
@Enum RecordViewAsyncOperation operation,
@Enum RecordViewKind kind
) {
- RecordView tableView = kind.view();
+ RecordView<?> tableView = kind.view();
- @SuppressWarnings("unchecked") CompletableFuture<Thread>
completerFuture = forcingSwitchFromUserThread(
+ CompletableFuture<Thread> completerFuture =
forcingSwitchFromUserThread(
() -> PublicApiThreading.doInternalCall(
() -> operation.executeOn(tableView, kind.context())
.thenApply(unused -> Thread.currentThread())
@@ -207,6 +230,113 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
return new RecordContext<>(KEY_RECORD.toKeyTuple(), new Record(KEY,
"one").toFullTuple(), new Record(KEY, "two").toFullTuple());
}
+ @CartesianTest
+ void commonViewFuturesCompleteInContinuationsPool(@Enum
CommonViewAsyncOperation operation, @Enum ViewKind kind) {
+ CriteriaQuerySource<?> tableView = kind.criteriaQuerySource();
+
+ CompletableFuture<Thread> completerFuture =
forcingSwitchFromUserThread(
+ () -> operation.executeOn(tableView)
+ .thenApply(unused -> Thread.currentThread())
+ );
+
+ assertThat(completerFuture, willBe(asyncContinuationPool()));
+ }
+
+ @CartesianTest
+ void
commonViewFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+ @Enum CommonViewAsyncOperation operation,
+ @Enum ViewKind kind
+ ) {
+ CriteriaQuerySource<?> tableView = kind.criteriaQuerySource();
+
+ CompletableFuture<Thread> completerFuture =
forcingSwitchFromUserThread(
+ () -> PublicApiThreading.doInternalCall(
+ () -> operation.executeOn(tableView)
+ .thenApply(unused -> Thread.currentThread())
+ )
+ );
+
+ assertThat(completerFuture, willBe(anIgniteThread()));
+ }
+
+ @CartesianTest
+ void asyncCursorFuturesCompleteInContinuationsPool(@Enum
AsyncCursorAsyncOperation operation, @Enum ViewKind kind) throws Exception {
+ AsyncCursor<?> firstPage = kind.criteriaQuerySource().queryAsync(null,
null).get(10, SECONDS);
+
+ CompletableFuture<Thread> completerFuture =
operation.executeOn(firstPage)
+ .thenApply(unused -> Thread.currentThread());
+
+ // The future might get completed in the calling thread as we don't
force a wait inside Ignite
+ // (because we cannot do this with fetching next page or closing).
+ assertThat(completerFuture, willBe(
+ either(is(Thread.currentThread())).or(asyncContinuationPool())
+ ));
+ }
+
+ @CartesianTest
+ void
asyncCursorFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(
+ @Enum AsyncCursorAsyncOperation operation,
+ @Enum ViewKind kind
+ ) throws Exception {
+ AsyncCursor<?> firstPage = kind.criteriaQuerySource().queryAsync(null,
null).get(10, SECONDS);
+
+ CompletableFuture<Thread> completerFuture =
PublicApiThreading.doInternalCall(
+ () -> operation.executeOn(firstPage)
+ .thenApply(unused -> Thread.currentThread())
+ );
+
+ // The future might get completed in the calling thread as we don't
force a wait inside Ignite
+ // (because we cannot do this with fetching next page or closing).
+ assertThat(completerFuture, willBe(
+ either(is(Thread.currentThread())).or(anIgniteThread())
+ ));
+ }
+
+ /**
+ * This test differs from {@link
#asyncCursorFuturesCompleteInContinuationsPool(AsyncCursorAsyncOperation,
ViewKind)} in that it obtains
+ * the future to test from a call on a cursor obtained from a cursor, not
from a view.
+ */
+ @CartesianTest
+ void asyncCursorFuturesAfterFetchCompleteInContinuationsPool(@Enum
AsyncCursorAsyncOperation operation, @Enum ViewKind kind)
+ throws Exception {
+ AsyncCursor<?> firstPage = kind.criteriaQuerySource().queryAsync(null,
null).get(10, SECONDS);
+ AsyncCursor<?> secondPage = firstPage.fetchNextPage().get(10, SECONDS);
+
+ CompletableFuture<Thread> completerFuture =
operation.executeOn(secondPage)
+ .thenApply(unused -> Thread.currentThread());
+
+ // The future might get completed in the calling thread as we don't
force a wait inside Ignite
+ // (because we cannot do this with fetching next page or closing).
+ assertThat(completerFuture, willBe(
+ either(is(Thread.currentThread())).or(asyncContinuationPool())
+ ));
+ }
+
+ /**
+ * This test differs from
+ * {@link
#asyncCursorFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(AsyncCursorAsyncOperation,
ViewKind)} in that
+ * it obtains the future to test from a call on a cursor obtained from a
cursor, not from a view.
+ */
+ @CartesianTest
+ void
asyncCursorFuturesAfterFetchFromInternalCallsAreNotResubmittedToContinuationsPool(
+ @Enum AsyncCursorAsyncOperation operation,
+ @Enum ViewKind kind
+ ) throws Exception {
+ AsyncCursor<?> firstPage = kind.criteriaQuerySource().queryAsync(null,
null).get(10, SECONDS);
+ AsyncCursor<?> secondPage = firstPage.fetchNextPage().get(10, SECONDS);
+
+ CompletableFuture<Thread> completerFuture =
PublicApiThreading.doInternalCall(
+ () -> operation.executeOn(secondPage)
+ .thenApply(unused -> Thread.currentThread())
+ );
+
+ // The future might get completed in the calling thread as we don't
force a wait inside Ignite
+ // (because we cannot do this with fetching next page or closing).
+ assertThat(completerFuture, willBe(
+ either(is(Thread.currentThread())).or(anIgniteThread())
+ ));
+ }
+
private enum KeyValueViewAsyncOperation {
GET_ASYNC((view, context) -> view.getAsync(null, context.key)),
GET_NULLABLE_ASYNC((view, context) -> view.getNullableAsync(null,
context.key)),
@@ -227,18 +357,14 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
REPLACE_EXACT_ASYNC((view, context) -> view.replaceAsync(null,
context.key, context.usualValue, context.anotherValue)),
GET_AND_REPLACE_ASYNC((view, context) -> view.getAndReplaceAsync(null,
context.key, context.usualValue)),
GET_NULLABLE_AND_REPLACE_ASYNC((view, context) ->
view.getNullableAndReplaceAsync(null, context.key, context.usualValue)),
- @SuppressWarnings({"rawtypes", "unchecked"})
STREAM_DATA((view, context) -> {
CompletableFuture<?> future;
- try (var publisher = new SimplePublisher()) {
+ try (var publisher = new SimplePublisher<Entry<Object, Object>>())
{
future = view.streamData(publisher, null);
publisher.submit(Map.entry(context.key, context.usualValue));
}
return future;
- }),
- QUERY_ASYNC((view, context) -> view.queryAsync(null, null)),
- QUERY_ASYNC_WITH_INDEX_NAME((view, context) -> view.queryAsync(null,
null, null)),
- QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS((view, context) ->
view.queryAsync(null, null, null, null));
+ });
private final BiFunction<KeyValueView<Object, Object>,
KeyValueContext<Object, Object>, CompletableFuture<?>> action;
@@ -246,7 +372,7 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
this.action = action;
}
- <K, V> CompletableFuture<?> executeOn(KeyValueView<K, V> tableView,
KeyValueContext<K, V> context) {
+ CompletableFuture<?> executeOn(KeyValueView<?, ?> tableView,
KeyValueContext<?, ?> context) {
return action.apply((KeyValueView<Object, Object>) tableView,
(KeyValueContext<Object, Object>) context);
}
@@ -275,15 +401,14 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
}
}
- @SuppressWarnings("rawtypes")
private enum KeyValueViewKind {
PLAIN, BINARY;
- KeyValueView view() {
+ KeyValueView<?, ?> view() {
return this == PLAIN ? plainKeyValueView() : binaryKeyValueView();
}
- KeyValueContext context() {
+ KeyValueContext<?, ?> context() {
return this == PLAIN ? plainKeyValueContext() :
binaryKeyValueContext();
}
@@ -331,18 +456,14 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
GET_AND_DELETE_ASYNC((view, context) -> view.getAndDeleteAsync(null,
context.keyRecord)),
DELETE_ALL_ASYNC((view, context) -> view.deleteAllAsync(null,
List.of(context.keyRecord))),
DELETE_ALL_EXACT_ASYNC((view, context) ->
view.deleteAllExactAsync(null, List.of(context.keyRecord))),
- @SuppressWarnings({"rawtypes", "unchecked"})
STREAM_DATA((view, context) -> {
CompletableFuture<?> future;
- try (var publisher = new SimplePublisher()) {
+ try (var publisher = new SimplePublisher<>()) {
future = view.streamData(publisher, null);
publisher.submit(context.fullRecord);
}
return future;
- }),
- QUERY_ASYNC((view, context) -> view.queryAsync(null, null)),
- QUERY_ASYNC_WITH_INDEX_NAME((view, context) -> view.queryAsync(null,
null, null)),
- QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS((view, context) ->
view.queryAsync(null, null, null, null));
+ });
private final BiFunction<RecordView<Object>, RecordContext<Object>,
CompletableFuture<?>> action;
@@ -350,7 +471,7 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
this.action = action;
}
- <R> CompletableFuture<?> executeOn(RecordView<R> tableView,
RecordContext<R> context) {
+ CompletableFuture<?> executeOn(RecordView<?> tableView,
RecordContext<?> context) {
return action.apply((RecordView<Object>) tableView,
(RecordContext<Object>) context);
}
}
@@ -367,16 +488,63 @@ class ItKvRecordApiThreadingTest extends
ClusterPerClassIntegrationTest {
}
}
- @SuppressWarnings("rawtypes")
private enum RecordViewKind {
PLAIN, BINARY;
- RecordView view() {
+ RecordView<?> view() {
return this == PLAIN ? plainRecordView() : binaryRecordView();
}
- RecordContext context() {
+ RecordContext<?> context() {
return this == PLAIN ? plainRecordContext() :
binaryRecordContext();
}
}
+
+ private enum CommonViewAsyncOperation {
+ QUERY_ASYNC(view -> view.queryAsync(null, null)),
+ QUERY_ASYNC_WITH_INDEX_NAME(view -> view.queryAsync(null, null, null)),
+ QUERY_ASYNC_WITH_INDEX_NAME_AND_OPTS(view -> view.queryAsync(null,
null, null, null));
+
+ private final Function<CriteriaQuerySource<Object>,
CompletableFuture<?>> action;
+
+ CommonViewAsyncOperation(Function<CriteriaQuerySource<Object>,
CompletableFuture<?>> action) {
+ this.action = action;
+ }
+
+ CompletableFuture<?> executeOn(CriteriaQuerySource<?> tableView) {
+ return action.apply((CriteriaQuerySource<Object>) tableView);
+ }
+ }
+
+ private enum ViewKind {
+ PLAIN_KEY_VALUE(() -> plainKeyValueView()),
+ BINARY_KEY_VALUE(() -> binaryKeyValueView()),
+ PLAIN_RECORD(() -> plainRecordView()),
+ BINARY_RECORD(() -> binaryRecordView());
+
+ private final Supplier<CriteriaQuerySource<?>> viewSupplier;
+
+ ViewKind(Supplier<CriteriaQuerySource<?>> viewSupplier) {
+ this.viewSupplier = viewSupplier;
+ }
+
+ CriteriaQuerySource<?> criteriaQuerySource() {
+ return viewSupplier.get();
+ }
+ }
+
+ private enum AsyncCursorAsyncOperation {
+ FETCH_NEXT_PAGE(cursor -> cursor.fetchNextPage()),
+ CLOSE(cursor -> cursor.closeAsync());
+
+ private final Function<AsyncCursor<Object>, CompletableFuture<?>>
action;
+
+ AsyncCursorAsyncOperation(Function<AsyncCursor<Object>,
CompletableFuture<?>> action) {
+ this.action = action;
+ }
+
+ CompletableFuture<?> executeOn(AsyncCursor<?> cursor) {
+ return action.apply((AsyncCursor<Object>) cursor);
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 3ba9fa63c4..e146007c0d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -207,13 +207,7 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
* @return Future that will be completed in the async continuation thread
pool ({@link ForkJoinPool#commonPool()} by default).
*/
protected final <T> CompletableFuture<T>
preventThreadHijack(CompletableFuture<T> originalFuture) {
- if (originalFuture.isDone() || PublicApiThreading.inInternalCall()) {
- return originalFuture;
- }
-
- // The future is not complete yet, so it will be completed on an
Ignite thread, so we need to complete the user-facing future
- // in the continuation pool.
- return originalFuture.whenCompleteAsync((res, ex) -> {},
asyncContinuationExecutor);
+ return PublicApiThreading.preventThreadHijack(originalFuture,
asyncContinuationExecutor);
}
/**
@@ -278,7 +272,12 @@ abstract class AbstractTableView<R> implements
CriteriaQuerySource<R> {
assert meta != null : "Metadata can't be null.";
- return new QueryCriteriaAsyncCursor<>(resultSet,
queryMapper(meta, schema), session::closeAsync);
+ AsyncCursor<R> cursor = new QueryCriteriaAsyncCursor<>(
+ resultSet,
+ queryMapper(meta, schema),
+ session::closeAsync
+ );
+ return new AntiHijackAsyncCursor<>(cursor,
asyncContinuationExecutor);
})
.whenComplete((ignore, err) -> {
if (err != null) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/AntiHijackAsyncCursor.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/AntiHijackAsyncCursor.java
new file mode 100644
index 0000000000..272b3af3b4
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/AntiHijackAsyncCursor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static
org.apache.ignite.internal.thread.PublicApiThreading.preventThreadHijack;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.lang.AsyncCursor;
+
+/**
+ * Wrapper around {@link AsyncCursor} which prevents Ignite internal threads
from being hijacked by the user via asynchronous methods.
+ *
+ * @see PublicApiThreading
+ */
+public class AntiHijackAsyncCursor<T> implements AsyncCursor<T> {
+ private final AsyncCursor<T> cursor;
+
+ private final Executor asyncContinuationExecutor;
+
+ public AntiHijackAsyncCursor(AsyncCursor<T> cursor, Executor
asyncContinuationExecutor) {
+ this.cursor = cursor;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
+ }
+
+ @Override
+ public Iterable<T> currentPage() {
+ return cursor.currentPage();
+ }
+
+ @Override
+ public int currentPageSize() {
+ return cursor.currentPageSize();
+ }
+
+ @Override
+ public CompletableFuture<? extends AsyncCursor<T>> fetchNextPage() {
+ return preventThreadHijack(cursor.fetchNextPage(),
asyncContinuationExecutor)
+ .thenApply(nextCursor -> new
AntiHijackAsyncCursor<>(nextCursor, asyncContinuationExecutor));
+ }
+
+ @Override
+ public boolean hasMorePages() {
+ return cursor.hasMorePages();
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return preventThreadHijack(cursor.closeAsync(),
asyncContinuationExecutor);
+ }
+}