This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk-tmp2 in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk-tmp2 by this push: new 5fff5240 ConcurrentModificationException with CommandStore.waitingOnSync 5fff5240 is described below commit 5fff5240cefa18f5629eeeff311022c1dd48b846 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Wed Aug 6 12:28:29 2025 +0100 ConcurrentModificationException with CommandStore.waitingOnSync patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20821 --- .../src/main/java/accord/local/CommandStore.java | 135 ++++++++++++--------- 1 file changed, 75 insertions(+), 60 deletions(-) diff --git a/accord-core/src/main/java/accord/local/CommandStore.java b/accord-core/src/main/java/accord/local/CommandStore.java index 14bf6677..376ff236 100644 --- a/accord-core/src/main/java/accord/local/CommandStore.java +++ b/accord-core/src/main/java/accord/local/CommandStore.java @@ -568,7 +568,10 @@ public abstract class CommandStore implements SequentialAsyncExecutor TxnId minForEpoch = TxnId.minForEpoch(epoch); Ranges remaining = redundantBefore.removeWitnessed(minForEpoch, ranges); AsyncResults.SettableResult<Void> whenDone = new AsyncResults.SettableResult<>(); - waitingOnSync.put(epoch, new WaitingOnSync(whenDone, remaining)); + synchronized (waitingOnSync) + { + waitingOnSync.put(epoch, new WaitingOnSync(whenDone, remaining)); + } ensureReadyToCoordinate(epoch, ranges); return whenDone; } @@ -665,56 +668,65 @@ public abstract class CommandStore implements SequentialAsyncExecutor protected final Ranges isWaitingOnSync(TxnId syncId, Ranges ranges) { - if (waitingOnSync.isEmpty()) - return Ranges.EMPTY; - - Ranges waitingOn = Ranges.EMPTY; - for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet()) + synchronized (waitingOnSync) { - if (e.getKey() > syncId.epoch()) - break; + if (waitingOnSync.isEmpty()) + return Ranges.EMPTY; - Ranges remaining = e.getValue().waitingOn; - Ranges intersecting = remaining.slice(ranges, Minimal); - if (!intersecting.isEmpty()) + Ranges waitingOn = Ranges.EMPTY; + for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet()) { - ranges = ranges.without(intersecting); - waitingOn = waitingOn.with(intersecting); + if (e.getKey() > syncId.epoch()) + break; + + Ranges remaining = e.getValue().waitingOn; + Ranges intersecting = remaining.slice(ranges, Minimal); + if (!intersecting.isEmpty()) + { + ranges = ranges.without(intersecting); + waitingOn = waitingOn.with(intersecting); + } } - } - return waitingOn; + return waitingOn; + } } protected final void markSyncing(TxnId syncId, Ranges ranges) { - if (waitingOnSync.isEmpty()) - return; - - for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet()) + synchronized (waitingOnSync) { - if (e.getKey() > syncId.epoch()) - break; + if (waitingOnSync.isEmpty()) + return; - Ranges remaining = e.getValue().waitingOn.without(ranges); - if (e.getValue().waitingOn != remaining) - e.getValue().waitingOn = remaining; + for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet()) + { + if (e.getKey() > syncId.epoch()) + break; + + Ranges remaining = e.getValue().waitingOn.without(ranges); + if (e.getValue().waitingOn != remaining) + e.getValue().waitingOn = remaining; + } } } protected final void unmarkSyncing(TxnId syncId, Ranges ranges) { - if (waitingOnSync.isEmpty()) - return; - - for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet()) + synchronized (waitingOnSync) { - if (e.getKey() > syncId.epoch()) - break; + if (waitingOnSync.isEmpty()) + return; + + for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet()) + { + if (e.getKey() > syncId.epoch()) + break; - Ranges unmark = e.getValue().waitingOnDurable.slice(ranges, Minimal); - if (!unmark.isEmpty()) - e.getValue().waitingOn = e.getValue().waitingOn.with(unmark); + Ranges unmark = e.getValue().waitingOnDurable.slice(ranges, Minimal); + if (!unmark.isEmpty()) + e.getValue().waitingOn = e.getValue().waitingOn.with(unmark); + } } } @@ -723,39 +735,42 @@ public abstract class CommandStore implements SequentialAsyncExecutor RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, syncId, LOCALLY_WITNESSED_ONLY); safeStore.upsertRedundantBefore(addRedundantBefore); - if (waitingOnSync.isEmpty()) - return; - - LongHashSet remove = null; - for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet()) + synchronized (waitingOnSync) { - if (e.getKey() > syncId.epoch()) - break; - - Ranges waitingOn = e.getValue().waitingOn; - Ranges waitingOnDurable = e.getValue().waitingOnDurable; - Ranges synced = waitingOnDurable.slice(ranges, Minimal); - boolean intersects = waitingOnDurable.intersects(ranges); - if (intersects) + if (waitingOnSync.isEmpty()) + return; + + LongHashSet remove = null; + for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet()) { - e.getValue().waitingOn = waitingOn = waitingOn.without(ranges); - e.getValue().waitingOnDurable = waitingOnDurable = waitingOnDurable.without(ranges); - if (waitingOnDurable.isEmpty()) + if (e.getKey() > syncId.epoch()) + break; + + Ranges waitingOn = e.getValue().waitingOn; + Ranges waitingOnDurable = e.getValue().waitingOnDurable; + Ranges synced = waitingOnDurable.slice(ranges, Minimal); + boolean intersects = waitingOnDurable.intersects(ranges); + if (intersects) { - logger.debug("Completed full sync for {} on epoch {} using {}", e.getValue().allRanges, e.getKey(), syncId); - e.getValue().whenDone.trySuccess(null); - if (remove == null) - remove = new LongHashSet(); - remove.add(e.getKey()); - } - else - { - logger.debug("Completed partial sync for {} on epoch {} using {}; {} still to sync and {} to sync durably", synced, e.getKey(), syncId, waitingOn, waitingOnDurable); + e.getValue().waitingOn = waitingOn = waitingOn.without(ranges); + e.getValue().waitingOnDurable = waitingOnDurable = waitingOnDurable.without(ranges); + if (waitingOnDurable.isEmpty()) + { + logger.debug("Completed full sync for {} on epoch {} using {}", e.getValue().allRanges, e.getKey(), syncId); + e.getValue().whenDone.trySuccess(null); + if (remove == null) + remove = new LongHashSet(); + remove.add(e.getKey()); + } + else + { + logger.debug("Completed partial sync for {} on epoch {} using {}; {} still to sync and {} to sync durably", synced, e.getKey(), syncId, waitingOn, waitingOnDurable); + } } } + if (remove != null) + remove.forEach(waitingOnSync::remove); } - if (remove != null) - remove.forEach(waitingOnSync::remove); } public void markShardStale(SafeCommandStore safeStore, Timestamp staleSince, Ranges ranges, boolean isSincePrecise) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org