This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a5140d30ba IGNITE-22693 Add protection against internal thread
hijacking to PartitionManager (#4061)
a5140d30ba is described below
commit a5140d30ba43adba2b590e581ac8f225198a437e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Jul 9 17:16:24 2024 +0400
IGNITE-22693 Add protection against internal thread hijacking to
PartitionManager (#4061)
---
.../threading/ItKvRecordApiThreadingTest.java | 1 -
.../ItPartitionManagerApiThreadingTest.java | 120 +++++++++++++++++++++
.../table/distributed/PublicApiThreadingTable.java | 3 +-
.../PublicApiThreadingPartitionManager.java | 68 ++++++++++++
4 files changed, 190 insertions(+), 2 deletions(-)
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
index 3ba62cd6e4..08410419da 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
@@ -50,7 +50,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junitpioneer.jupiter.cartesian.CartesianTest;
import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
-@SuppressWarnings("resource")
class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
private static final String TABLE_NAME = "test";
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItPartitionManagerApiThreadingTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItPartitionManagerApiThreadingTest.java
new file mode 100644
index 0000000000..7060a932e7
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItPartitionManagerApiThreadingTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.threading;
+
+import static java.lang.Thread.currentThread;
+import static
org.apache.ignite.internal.PublicApiThreadingTests.anIgniteThread;
+import static
org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+class ItPartitionManagerApiThreadingTest extends
ClusterPerClassIntegrationTest {
+ private static final String TABLE_NAME = "test";
+
+ private static final int KEY = 1;
+
+ private static final Tuple KEY_TUPLE = Tuple.create().set("id", 1);
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeEach
+ void createTable() {
+ sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val
VARCHAR)");
+ }
+
+ @AfterEach
+ void dropTable() {
+ sql("DROP TABLE " + TABLE_NAME);
+ }
+
+ @ParameterizedTest
+ @EnumSource(PartitionManagerAsyncOperation.class)
+ void
partitionManagerFuturesCompleteInContinuationsPool(PartitionManagerAsyncOperation
operation) {
+ PartitionManager partitionManager = partitionManager();
+
+ CompletableFuture<Thread> completerFuture =
operation.executeOn(partitionManager)
+ .thenApply(unused -> currentThread());
+
+ assertThat(completerFuture,
willBe(either(is(currentThread())).or(asyncContinuationPool())));
+ }
+
+ private static PartitionManager partitionManager() {
+ return testTable().partitionManager();
+ }
+
+ private static Table testTable() {
+ return CLUSTER.aliveNode().tables().table(TABLE_NAME);
+ }
+
+ @ParameterizedTest
+ @EnumSource(PartitionManagerAsyncOperation.class)
+ void
partitionManagerFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(PartitionManagerAsyncOperation
operation) {
+ PartitionManager partitionManager = partitionManagerForInternalUse();
+
+ CompletableFuture<Thread> completerFuture =
operation.executeOn(partitionManager)
+ .thenApply(unused -> currentThread());
+
+ assertThat(completerFuture,
willBe(either(is(currentThread())).or(anIgniteThread())));
+ }
+
+ private static PartitionManager partitionManagerForInternalUse() {
+ return testTableForInternalUse().partitionManager();
+ }
+
+ private static Table testTableForInternalUse() {
+ TableManager internalIgniteTables =
unwrapTableManager(CLUSTER.aliveNode().tables());
+ return internalIgniteTables.table(TABLE_NAME);
+ }
+
+ private enum PartitionManagerAsyncOperation {
+ PRIMARY_REPLICA_ASYNC(distribution ->
distribution.primaryReplicaAsync(new HashPartition(0))),
+ PRIMARY_REPLICAS_ASYNC(distribution ->
distribution.primaryReplicasAsync()),
+ PARTITION_BY_TUPLE_ASYNC(distribution ->
distribution.partitionAsync(KEY_TUPLE)),
+ PARTITION_BY_KEY_ASYNC(distribution ->
distribution.partitionAsync(KEY, Mapper.of(Integer.class)));
+
+ private final Function<PartitionManager, CompletableFuture<?>> action;
+
+ PartitionManagerAsyncOperation(Function<PartitionManager,
CompletableFuture<?>> action) {
+ this.action = action;
+ }
+
+ CompletableFuture<?> executeOn(PartitionManager partitionManager) {
+ return action.apply(partitionManager);
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
index d7731f26e9..5ea03084e6 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.internal.table.PublicApiThreadingKeyValueView;
import org.apache.ignite.internal.table.PublicApiThreadingRecordView;
+import
org.apache.ignite.internal.table.partition.PublicApiThreadingPartitionManager;
import org.apache.ignite.internal.thread.PublicApiThreading;
import org.apache.ignite.internal.wrapper.Wrapper;
import org.apache.ignite.table.KeyValueView;
@@ -56,7 +57,7 @@ class PublicApiThreadingTable implements Table, Wrapper {
@Override
public PartitionManager partitionManager() {
- return table.partitionManager();
+ return new
PublicApiThreadingPartitionManager(table.partitionManager(),
asyncContinuationExecutor);
}
@Override
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/PublicApiThreadingPartitionManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/PublicApiThreadingPartitionManager.java
new file mode 100644
index 0000000000..f14bd507a7
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/PublicApiThreadingPartitionManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.partition;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+
+/**
+ * Wrapper around {@link PartitionManager} that maintains public API
invariants relating to threading.
+ * That is, it adds protection against thread hijacking by users.
+ *
+ * @see PublicApiThreading#preventThreadHijack(CompletableFuture, Executor)
+ */
+public class PublicApiThreadingPartitionManager implements PartitionManager {
+ private final PartitionManager partitionManager;
+ private final Executor asyncContinuationExecutor;
+
+ public PublicApiThreadingPartitionManager(PartitionManager
partitionManager, Executor asyncContinuationExecutor) {
+ this.partitionManager = partitionManager;
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
+ }
+
+ @Override
+ public CompletableFuture<ClusterNode> primaryReplicaAsync(Partition
partition) {
+ return
preventThreadHijack(partitionManager.primaryReplicaAsync(partition));
+ }
+
+ @Override
+ public CompletableFuture<Map<Partition, ClusterNode>>
primaryReplicasAsync() {
+ return preventThreadHijack(partitionManager.primaryReplicasAsync());
+ }
+
+ @Override
+ public <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K>
mapper) {
+ return preventThreadHijack(partitionManager.partitionAsync(key,
mapper));
+ }
+
+ @Override
+ public CompletableFuture<Partition> partitionAsync(Tuple key) {
+ return preventThreadHijack(partitionManager.partitionAsync(key));
+ }
+
+ private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T>
originalFuture) {
+ return PublicApiThreading.preventThreadHijack(originalFuture,
asyncContinuationExecutor);
+ }
+}