This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d0da43d37f IGNITE-25674 Fix race between closing OutgoingSnapshot and 
iterating tx states (#6037)
6d0da43d37f is described below

commit 6d0da43d37fe01e5b9878d58e1aa50ed53833757
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Fri Jun 13 16:50:46 2025 +0400

    IGNITE-25674 Fix race between closing OutgoingSnapshot and iterating tx 
states (#6037)
---
 .../raft/snapshot/outgoing/OutgoingSnapshot.java   | 47 +++++++++++++++++-----
 1 file changed, 37 insertions(+), 10 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
index 0a6fda28903..655b2908267 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -34,6 +34,7 @@ import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -63,6 +64,7 @@ import org.apache.ignite.internal.tx.TxMeta;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxMetaMessage;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -138,7 +140,8 @@ public class OutgoingSnapshot {
      */
     private volatile boolean finishedTxData;
 
-    private volatile boolean closed = false;
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+    private final AtomicBoolean closedGuard = new AtomicBoolean();
 
     /**
      * Creates a new instance.
@@ -274,15 +277,19 @@ public class OutgoingSnapshot {
     SnapshotMetaResponse handleSnapshotMetaRequest(SnapshotMetaRequest 
request) {
         assert Objects.equals(request.id(), id) : "Expected id " + id + " but 
got " + request.id();
 
-        if (closed) {
+        if (!busyLock.enterBusy()) {
             return logThatAlreadyClosedAndReturnNull();
         }
 
-        PartitionSnapshotMeta meta = frozenMeta;
+        try {
+            PartitionSnapshotMeta meta = frozenMeta;
 
-        assert meta != null : "No snapshot meta yet, probably the snapshot 
scope was not yet frozen";
+            assert meta != null : "No snapshot meta yet, probably the snapshot 
scope was not yet frozen";
 
-        return 
PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMetaResponse().meta(meta).build();
+            return 
PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMetaResponse().meta(meta).build();
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     @Nullable
@@ -298,12 +305,20 @@ public class OutgoingSnapshot {
      */
     @Nullable
     SnapshotMvDataResponse handleSnapshotMvDataRequest(SnapshotMvDataRequest 
request) {
-        if (closed) {
+        if (!busyLock.enterBusy()) {
             return logThatAlreadyClosedAndReturnNull();
         }
 
+        try {
+            return handleSnapshotMvDataRequestInternal(request);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private SnapshotMvDataResponse 
handleSnapshotMvDataRequestInternal(SnapshotMvDataRequest request) {
         long totalBatchSize = 0;
-        List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>();
+        List<ResponseEntry> batch = new ArrayList<>();
 
         while (true) {
             acquireMvLock();
@@ -460,10 +475,18 @@ public class OutgoingSnapshot {
      */
     @Nullable
     SnapshotTxDataResponse handleSnapshotTxDataRequest(SnapshotTxDataRequest 
request) {
-        if (closed) {
+        if (!busyLock.enterBusy()) {
             return logThatAlreadyClosedAndReturnNull();
         }
 
+        try {
+            return handleSnapshotTxDataRequestInternal(request);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    private SnapshotTxDataResponse 
handleSnapshotTxDataRequestInternal(SnapshotTxDataRequest request) {
         List<IgniteBiTuple<UUID, TxMeta>> rows = new ArrayList<>();
 
         boolean finishedTxData = this.finishedTxData;
@@ -606,6 +629,12 @@ public class OutgoingSnapshot {
      * Closes the snapshot releasing the underlying resources.
      */
     public void close() {
+        if (!closedGuard.compareAndSet(false, true)) {
+            return;
+        }
+
+        busyLock.block();
+
         if (!finishedTxData) {
             Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor = txDataCursor;
 
@@ -614,7 +643,5 @@ public class OutgoingSnapshot {
                 finishedTxData = true;
             }
         }
-
-        closed = true;
     }
 }

Reply via email to