This is an automated email from the ASF dual-hosted git repository. rpuch pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 6d0da43d37f IGNITE-25674 Fix race between closing OutgoingSnapshot and iterating tx states (#6037) 6d0da43d37f is described below commit 6d0da43d37fe01e5b9878d58e1aa50ed53833757 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Fri Jun 13 16:50:46 2025 +0400 IGNITE-25674 Fix race between closing OutgoingSnapshot and iterating tx states (#6037) --- .../raft/snapshot/outgoing/OutgoingSnapshot.java | 47 +++++++++++++++++----- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java index 0a6fda28903..655b2908267 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/outgoing/OutgoingSnapshot.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.Queue; import java.util.Set; import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.internal.catalog.CatalogService; import org.apache.ignite.internal.lang.IgniteBiTuple; @@ -63,6 +64,7 @@ import org.apache.ignite.internal.tx.TxMeta; import org.apache.ignite.internal.tx.message.TxMessagesFactory; import org.apache.ignite.internal.tx.message.TxMetaMessage; import org.apache.ignite.internal.util.Cursor; +import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.jetbrains.annotations.Nullable; /** @@ -138,7 +140,8 @@ public class OutgoingSnapshot { */ private volatile boolean finishedTxData; - private volatile boolean closed = false; + private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); + private final AtomicBoolean closedGuard = new AtomicBoolean(); /** * Creates a new instance. @@ -274,15 +277,19 @@ public class OutgoingSnapshot { SnapshotMetaResponse handleSnapshotMetaRequest(SnapshotMetaRequest request) { assert Objects.equals(request.id(), id) : "Expected id " + id + " but got " + request.id(); - if (closed) { + if (!busyLock.enterBusy()) { return logThatAlreadyClosedAndReturnNull(); } - PartitionSnapshotMeta meta = frozenMeta; + try { + PartitionSnapshotMeta meta = frozenMeta; - assert meta != null : "No snapshot meta yet, probably the snapshot scope was not yet frozen"; + assert meta != null : "No snapshot meta yet, probably the snapshot scope was not yet frozen"; - return PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMetaResponse().meta(meta).build(); + return PARTITION_REPLICATION_MESSAGES_FACTORY.snapshotMetaResponse().meta(meta).build(); + } finally { + busyLock.leaveBusy(); + } } @Nullable @@ -298,12 +305,20 @@ public class OutgoingSnapshot { */ @Nullable SnapshotMvDataResponse handleSnapshotMvDataRequest(SnapshotMvDataRequest request) { - if (closed) { + if (!busyLock.enterBusy()) { return logThatAlreadyClosedAndReturnNull(); } + try { + return handleSnapshotMvDataRequestInternal(request); + } finally { + busyLock.leaveBusy(); + } + } + + private SnapshotMvDataResponse handleSnapshotMvDataRequestInternal(SnapshotMvDataRequest request) { long totalBatchSize = 0; - List<SnapshotMvDataResponse.ResponseEntry> batch = new ArrayList<>(); + List<ResponseEntry> batch = new ArrayList<>(); while (true) { acquireMvLock(); @@ -460,10 +475,18 @@ public class OutgoingSnapshot { */ @Nullable SnapshotTxDataResponse handleSnapshotTxDataRequest(SnapshotTxDataRequest request) { - if (closed) { + if (!busyLock.enterBusy()) { return logThatAlreadyClosedAndReturnNull(); } + try { + return handleSnapshotTxDataRequestInternal(request); + } finally { + busyLock.leaveBusy(); + } + } + + private SnapshotTxDataResponse handleSnapshotTxDataRequestInternal(SnapshotTxDataRequest request) { List<IgniteBiTuple<UUID, TxMeta>> rows = new ArrayList<>(); boolean finishedTxData = this.finishedTxData; @@ -606,6 +629,12 @@ public class OutgoingSnapshot { * Closes the snapshot releasing the underlying resources. */ public void close() { + if (!closedGuard.compareAndSet(false, true)) { + return; + } + + busyLock.block(); + if (!finishedTxData) { Cursor<IgniteBiTuple<UUID, TxMeta>> txCursor = txDataCursor; @@ -614,7 +643,5 @@ public class OutgoingSnapshot { finishedTxData = true; } } - - closed = true; } }