This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk-tmp2
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk-tmp2 by this push:
     new 5fff5240 ConcurrentModificationException with 
CommandStore.waitingOnSync
5fff5240 is described below

commit 5fff5240cefa18f5629eeeff311022c1dd48b846
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Wed Aug 6 12:28:29 2025 +0100

    ConcurrentModificationException with CommandStore.waitingOnSync
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20821
---
 .../src/main/java/accord/local/CommandStore.java   | 135 ++++++++++++---------
 1 file changed, 75 insertions(+), 60 deletions(-)

diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 14bf6677..376ff236 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -568,7 +568,10 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
         TxnId minForEpoch = TxnId.minForEpoch(epoch);
         Ranges remaining = redundantBefore.removeWitnessed(minForEpoch, 
ranges);
         AsyncResults.SettableResult<Void> whenDone = new 
AsyncResults.SettableResult<>();
-        waitingOnSync.put(epoch, new WaitingOnSync(whenDone, remaining));
+        synchronized (waitingOnSync)
+        {
+            waitingOnSync.put(epoch, new WaitingOnSync(whenDone, remaining));
+        }
         ensureReadyToCoordinate(epoch, ranges);
         return whenDone;
     }
@@ -665,56 +668,65 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
 
     protected final Ranges isWaitingOnSync(TxnId syncId, Ranges ranges)
     {
-        if (waitingOnSync.isEmpty())
-            return Ranges.EMPTY;
-
-        Ranges waitingOn = Ranges.EMPTY;
-        for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+        synchronized (waitingOnSync)
         {
-            if (e.getKey() > syncId.epoch())
-                break;
+            if (waitingOnSync.isEmpty())
+                return Ranges.EMPTY;
 
-            Ranges remaining = e.getValue().waitingOn;
-            Ranges intersecting = remaining.slice(ranges, Minimal);
-            if (!intersecting.isEmpty())
+            Ranges waitingOn = Ranges.EMPTY;
+            for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
             {
-                ranges = ranges.without(intersecting);
-                waitingOn = waitingOn.with(intersecting);
+                if (e.getKey() > syncId.epoch())
+                    break;
+
+                Ranges remaining = e.getValue().waitingOn;
+                Ranges intersecting = remaining.slice(ranges, Minimal);
+                if (!intersecting.isEmpty())
+                {
+                    ranges = ranges.without(intersecting);
+                    waitingOn = waitingOn.with(intersecting);
+                }
             }
-        }
 
-        return waitingOn;
+            return waitingOn;
+        }
     }
 
     protected final void markSyncing(TxnId syncId, Ranges ranges)
     {
-        if (waitingOnSync.isEmpty())
-            return;
-
-        for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+        synchronized (waitingOnSync)
         {
-            if (e.getKey() > syncId.epoch())
-                break;
+            if (waitingOnSync.isEmpty())
+                return;
 
-            Ranges remaining = e.getValue().waitingOn.without(ranges);
-            if (e.getValue().waitingOn != remaining)
-                e.getValue().waitingOn = remaining;
+            for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+            {
+                if (e.getKey() > syncId.epoch())
+                    break;
+
+                Ranges remaining = e.getValue().waitingOn.without(ranges);
+                if (e.getValue().waitingOn != remaining)
+                    e.getValue().waitingOn = remaining;
+            }
         }
     }
 
     protected final void unmarkSyncing(TxnId syncId, Ranges ranges)
     {
-        if (waitingOnSync.isEmpty())
-            return;
-
-        for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+        synchronized (waitingOnSync)
         {
-            if (e.getKey() > syncId.epoch())
-                break;
+            if (waitingOnSync.isEmpty())
+                return;
+
+            for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+            {
+                if (e.getKey() > syncId.epoch())
+                    break;
 
-            Ranges unmark = e.getValue().waitingOnDurable.slice(ranges, 
Minimal);
-            if (!unmark.isEmpty())
-                e.getValue().waitingOn = e.getValue().waitingOn.with(unmark);
+                Ranges unmark = e.getValue().waitingOnDurable.slice(ranges, 
Minimal);
+                if (!unmark.isEmpty())
+                    e.getValue().waitingOn = 
e.getValue().waitingOn.with(unmark);
+            }
         }
     }
 
@@ -723,39 +735,42 @@ public abstract class CommandStore implements 
SequentialAsyncExecutor
         RedundantBefore addRedundantBefore = RedundantBefore.create(ranges, 
syncId, LOCALLY_WITNESSED_ONLY);
         safeStore.upsertRedundantBefore(addRedundantBefore);
 
-        if (waitingOnSync.isEmpty())
-            return;
-
-        LongHashSet remove = null;
-        for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+        synchronized (waitingOnSync)
         {
-            if (e.getKey() > syncId.epoch())
-                break;
-
-            Ranges waitingOn = e.getValue().waitingOn;
-            Ranges waitingOnDurable = e.getValue().waitingOnDurable;
-            Ranges synced = waitingOnDurable.slice(ranges, Minimal);
-            boolean intersects = waitingOnDurable.intersects(ranges);
-            if (intersects)
+            if (waitingOnSync.isEmpty())
+                return;
+
+            LongHashSet remove = null;
+            for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
             {
-                e.getValue().waitingOn = waitingOn = waitingOn.without(ranges);
-                e.getValue().waitingOnDurable = waitingOnDurable = 
waitingOnDurable.without(ranges);
-                if (waitingOnDurable.isEmpty())
+                if (e.getKey() > syncId.epoch())
+                    break;
+
+                Ranges waitingOn = e.getValue().waitingOn;
+                Ranges waitingOnDurable = e.getValue().waitingOnDurable;
+                Ranges synced = waitingOnDurable.slice(ranges, Minimal);
+                boolean intersects = waitingOnDurable.intersects(ranges);
+                if (intersects)
                 {
-                    logger.debug("Completed full sync for {} on epoch {} using 
{}", e.getValue().allRanges, e.getKey(), syncId);
-                    e.getValue().whenDone.trySuccess(null);
-                    if (remove == null)
-                        remove = new LongHashSet();
-                    remove.add(e.getKey());
-                }
-                else
-                {
-                    logger.debug("Completed partial sync for {} on epoch {} 
using {}; {} still to sync and {} to sync durably", synced, e.getKey(), syncId, 
waitingOn, waitingOnDurable);
+                    e.getValue().waitingOn = waitingOn = 
waitingOn.without(ranges);
+                    e.getValue().waitingOnDurable = waitingOnDurable = 
waitingOnDurable.without(ranges);
+                    if (waitingOnDurable.isEmpty())
+                    {
+                        logger.debug("Completed full sync for {} on epoch {} 
using {}", e.getValue().allRanges, e.getKey(), syncId);
+                        e.getValue().whenDone.trySuccess(null);
+                        if (remove == null)
+                            remove = new LongHashSet();
+                        remove.add(e.getKey());
+                    }
+                    else
+                    {
+                        logger.debug("Completed partial sync for {} on epoch 
{} using {}; {} still to sync and {} to sync durably", synced, e.getKey(), 
syncId, waitingOn, waitingOnDurable);
+                    }
                 }
             }
+            if (remove != null)
+                remove.forEach(waitingOnSync::remove);
         }
-        if (remove != null)
-            remove.forEach(waitingOnSync::remove);
     }
 
     public void markShardStale(SafeCommandStore safeStore, Timestamp 
staleSince, Ranges ranges, boolean isSincePrecise)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to