This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a5140d30ba IGNITE-22693 Add protection against internal thread 
hijacking to PartitionManager (#4061)
a5140d30ba is described below

commit a5140d30ba43adba2b590e581ac8f225198a437e
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Jul 9 17:16:24 2024 +0400

    IGNITE-22693 Add protection against internal thread hijacking to 
PartitionManager (#4061)
---
 .../threading/ItKvRecordApiThreadingTest.java      |   1 -
 .../ItPartitionManagerApiThreadingTest.java        | 120 +++++++++++++++++++++
 .../table/distributed/PublicApiThreadingTable.java |   3 +-
 .../PublicApiThreadingPartitionManager.java        |  68 ++++++++++++
 4 files changed, 190 insertions(+), 2 deletions(-)

diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
index 3ba62cd6e4..08410419da 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItKvRecordApiThreadingTest.java
@@ -50,7 +50,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junitpioneer.jupiter.cartesian.CartesianTest;
 import org.junitpioneer.jupiter.cartesian.CartesianTest.Enum;
 
-@SuppressWarnings("resource")
 class ItKvRecordApiThreadingTest extends ClusterPerClassIntegrationTest {
     private static final String TABLE_NAME = "test";
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItPartitionManagerApiThreadingTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItPartitionManagerApiThreadingTest.java
new file mode 100644
index 0000000000..7060a932e7
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/threading/ItPartitionManagerApiThreadingTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.threading;
+
+import static java.lang.Thread.currentThread;
+import static 
org.apache.ignite.internal.PublicApiThreadingTests.anIgniteThread;
+import static 
org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.either;
+import static org.hamcrest.Matchers.is;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.partition.HashPartition;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.PartitionManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+class ItPartitionManagerApiThreadingTest extends 
ClusterPerClassIntegrationTest {
+    private static final String TABLE_NAME = "test";
+
+    private static final int KEY = 1;
+
+    private static final Tuple KEY_TUPLE = Tuple.create().set("id", 1);
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @BeforeEach
+    void createTable() {
+        sql("CREATE TABLE " + TABLE_NAME + " (id INT PRIMARY KEY, val 
VARCHAR)");
+    }
+
+    @AfterEach
+    void dropTable() {
+        sql("DROP TABLE " + TABLE_NAME);
+    }
+
+    @ParameterizedTest
+    @EnumSource(PartitionManagerAsyncOperation.class)
+    void 
partitionManagerFuturesCompleteInContinuationsPool(PartitionManagerAsyncOperation
 operation) {
+        PartitionManager partitionManager = partitionManager();
+
+        CompletableFuture<Thread> completerFuture = 
operation.executeOn(partitionManager)
+                .thenApply(unused -> currentThread());
+
+        assertThat(completerFuture, 
willBe(either(is(currentThread())).or(asyncContinuationPool())));
+    }
+
+    private static PartitionManager partitionManager() {
+        return testTable().partitionManager();
+    }
+
+    private static Table testTable() {
+        return CLUSTER.aliveNode().tables().table(TABLE_NAME);
+    }
+
+    @ParameterizedTest
+    @EnumSource(PartitionManagerAsyncOperation.class)
+    void 
partitionManagerFuturesFromInternalCallsAreNotResubmittedToContinuationsPool(PartitionManagerAsyncOperation
 operation) {
+        PartitionManager partitionManager = partitionManagerForInternalUse();
+
+        CompletableFuture<Thread> completerFuture = 
operation.executeOn(partitionManager)
+                .thenApply(unused -> currentThread());
+
+        assertThat(completerFuture, 
willBe(either(is(currentThread())).or(anIgniteThread())));
+    }
+
+    private static PartitionManager partitionManagerForInternalUse() {
+        return testTableForInternalUse().partitionManager();
+    }
+
+    private static Table testTableForInternalUse() {
+        TableManager internalIgniteTables = 
unwrapTableManager(CLUSTER.aliveNode().tables());
+        return internalIgniteTables.table(TABLE_NAME);
+    }
+
+    private enum PartitionManagerAsyncOperation {
+        PRIMARY_REPLICA_ASYNC(distribution -> 
distribution.primaryReplicaAsync(new HashPartition(0))),
+        PRIMARY_REPLICAS_ASYNC(distribution -> 
distribution.primaryReplicasAsync()),
+        PARTITION_BY_TUPLE_ASYNC(distribution -> 
distribution.partitionAsync(KEY_TUPLE)),
+        PARTITION_BY_KEY_ASYNC(distribution -> 
distribution.partitionAsync(KEY, Mapper.of(Integer.class)));
+
+        private final Function<PartitionManager, CompletableFuture<?>> action;
+
+        PartitionManagerAsyncOperation(Function<PartitionManager, 
CompletableFuture<?>> action) {
+            this.action = action;
+        }
+
+        CompletableFuture<?> executeOn(PartitionManager partitionManager) {
+            return action.apply(partitionManager);
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
index d7731f26e9..5ea03084e6 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/PublicApiThreadingTable.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import org.apache.ignite.internal.table.PublicApiThreadingKeyValueView;
 import org.apache.ignite.internal.table.PublicApiThreadingRecordView;
+import 
org.apache.ignite.internal.table.partition.PublicApiThreadingPartitionManager;
 import org.apache.ignite.internal.thread.PublicApiThreading;
 import org.apache.ignite.internal.wrapper.Wrapper;
 import org.apache.ignite.table.KeyValueView;
@@ -56,7 +57,7 @@ class PublicApiThreadingTable implements Table, Wrapper {
 
     @Override
     public PartitionManager partitionManager() {
-        return table.partitionManager();
+        return new 
PublicApiThreadingPartitionManager(table.partitionManager(), 
asyncContinuationExecutor);
     }
 
     @Override
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/partition/PublicApiThreadingPartitionManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/PublicApiThreadingPartitionManager.java
new file mode 100644
index 0000000000..f14bd507a7
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/partition/PublicApiThreadingPartitionManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.partition;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import org.apache.ignite.internal.thread.PublicApiThreading;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.mapper.Mapper;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionManager;
+
+/**
+ * Wrapper around {@link PartitionManager} that maintains public API 
invariants relating to threading.
+ * That is, it adds protection against thread hijacking by users.
+ *
+ * @see PublicApiThreading#preventThreadHijack(CompletableFuture, Executor)
+ */
+public class PublicApiThreadingPartitionManager implements PartitionManager {
+    private final PartitionManager partitionManager;
+    private final Executor asyncContinuationExecutor;
+
+    public PublicApiThreadingPartitionManager(PartitionManager 
partitionManager, Executor asyncContinuationExecutor) {
+        this.partitionManager = partitionManager;
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
+    }
+
+    @Override
+    public CompletableFuture<ClusterNode> primaryReplicaAsync(Partition 
partition) {
+        return 
preventThreadHijack(partitionManager.primaryReplicaAsync(partition));
+    }
+
+    @Override
+    public CompletableFuture<Map<Partition, ClusterNode>> 
primaryReplicasAsync() {
+        return preventThreadHijack(partitionManager.primaryReplicasAsync());
+    }
+
+    @Override
+    public <K> CompletableFuture<Partition> partitionAsync(K key, Mapper<K> 
mapper) {
+        return preventThreadHijack(partitionManager.partitionAsync(key, 
mapper));
+    }
+
+    @Override
+    public CompletableFuture<Partition> partitionAsync(Tuple key) {
+        return preventThreadHijack(partitionManager.partitionAsync(key));
+    }
+
+    private <T> CompletableFuture<T> preventThreadHijack(CompletableFuture<T> 
originalFuture) {
+        return PublicApiThreading.preventThreadHijack(originalFuture, 
asyncContinuationExecutor);
+    }
+}

Reply via email to