This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f0cc583fd2 IGNITE-19142 IncomingSnapshotCopier.cancel() blocks forever 
if called from multiple threads (#1851)
f0cc583fd2 is described below

commit f0cc583fd200c97778fd428fa4a9d3cb1e83826c
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Mar 28 21:09:47 2023 +0400

    IGNITE-19142 IncomingSnapshotCopier.cancel() blocks forever if called from 
multiple threads (#1851)
---
 .../snapshot/incoming/IncomingSnapshotCopier.java   |  9 +++++++++
 .../incoming/IncomingSnapshotCopierTest.java        | 21 +++++++++++++++++++++
 2 files changed, 30 insertions(+)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 904eb21d71..ee8edecf6a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -72,6 +73,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
 
     private final SnapshotUri snapshotUri;
 
+    /** Used to make sure that we execute cancellation at most once. */
+    private final AtomicBoolean cancellationGuard = new AtomicBoolean();
+
     /** Busy lock for synchronous rebalance cancellation. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -151,6 +155,11 @@ public class IncomingSnapshotCopier extends SnapshotCopier 
{
 
     @Override
     public void cancel() {
+        // Cancellation from one thread must not block cancellations from 
other threads, hence this check.
+        if (!cancellationGuard.compareAndSet(false, true)) {
+            return;
+        }
+
         busyLock.block();
 
         LOG.info("Copier is canceled for partition [{}]", 
createPartitionInfo());
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index b8e0847d33..c7b531c1e3 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -106,7 +106,9 @@ import 
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
 
 /**
  * For {@link IncomingSnapshotCopier} testing.
@@ -595,6 +597,25 @@ public class IncomingSnapshotCopierTest {
         verify(mvGc, times(1)).addStorage(eq(tablePartitionId), 
any(StorageUpdateHandler.class));
     }
 
+    @Test
+    @Timeout(1)
+    void cancellationsFromMultipleThreadsDoNotBlockEachOther() throws 
Exception {
+        PartitionSnapshotStorage partitionSnapshotStorage = 
mock(PartitionSnapshotStorage.class, Answers.RETURNS_DEEP_STUBS);
+
+        
when(partitionSnapshotStorage.partition().partitionKey()).thenReturn(new 
PartitionKey(UUID.randomUUID(), 0));
+
+        IncomingSnapshotCopier copier = new IncomingSnapshotCopier(
+                partitionSnapshotStorage,
+                SnapshotUri.fromStringUri(SnapshotUri.toStringUri(snapshotId, 
NODE_NAME))
+        );
+
+        Thread anotherThread = new Thread(copier::cancel);
+        anotherThread.start();
+        anotherThread.join();
+
+        copier.cancel();
+    }
+
     private TableConfiguration getTableConfig() {
         return tablesConfig.tables().get("foo");
     }

Reply via email to