This is an automated email from the ASF dual-hosted git repository.
sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f0cc583fd2 IGNITE-19142 IncomingSnapshotCopier.cancel() blocks forever
if called from multiple threads (#1851)
f0cc583fd2 is described below
commit f0cc583fd200c97778fd428fa4a9d3cb1e83826c
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Mar 28 21:09:47 2023 +0400
IGNITE-19142 IncomingSnapshotCopier.cancel() blocks forever if called from
multiple threads (#1851)
---
.../snapshot/incoming/IncomingSnapshotCopier.java | 9 +++++++++
.../incoming/IncomingSnapshotCopierTest.java | 21 +++++++++++++++++++++
2 files changed, 30 insertions(+)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 904eb21d71..ee8edecf6a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -25,6 +25,7 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -72,6 +73,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
private final SnapshotUri snapshotUri;
+ /** Used to make sure that we execute cancellation at most once. */
+ private final AtomicBoolean cancellationGuard = new AtomicBoolean();
+
/** Busy lock for synchronous rebalance cancellation. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -151,6 +155,11 @@ public class IncomingSnapshotCopier extends SnapshotCopier
{
@Override
public void cancel() {
+ // Cancellation from one thread must not block cancellations from
other threads, hence this check.
+ if (!cancellationGuard.compareAndSet(false, true)) {
+ return;
+ }
+
busyLock.block();
LOG.info("Copier is canceled for partition [{}]",
createPartitionInfo());
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index b8e0847d33..c7b531c1e3 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -106,7 +106,9 @@ import
org.apache.ignite.raft.jraft.storage.snapshot.SnapshotCopier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Answers;
/**
* For {@link IncomingSnapshotCopier} testing.
@@ -595,6 +597,25 @@ public class IncomingSnapshotCopierTest {
verify(mvGc, times(1)).addStorage(eq(tablePartitionId),
any(StorageUpdateHandler.class));
}
+ @Test
+ @Timeout(1)
+ void cancellationsFromMultipleThreadsDoNotBlockEachOther() throws
Exception {
+ PartitionSnapshotStorage partitionSnapshotStorage =
mock(PartitionSnapshotStorage.class, Answers.RETURNS_DEEP_STUBS);
+
+
when(partitionSnapshotStorage.partition().partitionKey()).thenReturn(new
PartitionKey(UUID.randomUUID(), 0));
+
+ IncomingSnapshotCopier copier = new IncomingSnapshotCopier(
+ partitionSnapshotStorage,
+ SnapshotUri.fromStringUri(SnapshotUri.toStringUri(snapshotId,
NODE_NAME))
+ );
+
+ Thread anotherThread = new Thread(copier::cancel);
+ anotherThread.start();
+ anotherThread.join();
+
+ copier.cancel();
+ }
+
private TableConfiguration getTableConfig() {
return tablesConfig.tables().get("foo");
}