This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
The following commit(s) were added to refs/heads/trunk by this push:
new 12da4693 ConcurrentModificationException with
CommandStore.waitingOnSync
12da4693 is described below
commit 12da4693b449b30d0420673579a06025b7b3e484
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Aug 6 12:28:29 2025 +0100
ConcurrentModificationException with CommandStore.waitingOnSync
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20821
---
.../src/main/java/accord/local/CommandStore.java | 135 ++++++++++++---------
1 file changed, 75 insertions(+), 60 deletions(-)
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java
b/accord-core/src/main/java/accord/local/CommandStore.java
index 14bf6677..376ff236 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -568,7 +568,10 @@ public abstract class CommandStore implements
SequentialAsyncExecutor
TxnId minForEpoch = TxnId.minForEpoch(epoch);
Ranges remaining = redundantBefore.removeWitnessed(minForEpoch,
ranges);
AsyncResults.SettableResult<Void> whenDone = new
AsyncResults.SettableResult<>();
- waitingOnSync.put(epoch, new WaitingOnSync(whenDone, remaining));
+ synchronized (waitingOnSync)
+ {
+ waitingOnSync.put(epoch, new WaitingOnSync(whenDone, remaining));
+ }
ensureReadyToCoordinate(epoch, ranges);
return whenDone;
}
@@ -665,56 +668,65 @@ public abstract class CommandStore implements
SequentialAsyncExecutor
protected final Ranges isWaitingOnSync(TxnId syncId, Ranges ranges)
{
- if (waitingOnSync.isEmpty())
- return Ranges.EMPTY;
-
- Ranges waitingOn = Ranges.EMPTY;
- for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ synchronized (waitingOnSync)
{
- if (e.getKey() > syncId.epoch())
- break;
+ if (waitingOnSync.isEmpty())
+ return Ranges.EMPTY;
- Ranges remaining = e.getValue().waitingOn;
- Ranges intersecting = remaining.slice(ranges, Minimal);
- if (!intersecting.isEmpty())
+ Ranges waitingOn = Ranges.EMPTY;
+ for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
{
- ranges = ranges.without(intersecting);
- waitingOn = waitingOn.with(intersecting);
+ if (e.getKey() > syncId.epoch())
+ break;
+
+ Ranges remaining = e.getValue().waitingOn;
+ Ranges intersecting = remaining.slice(ranges, Minimal);
+ if (!intersecting.isEmpty())
+ {
+ ranges = ranges.without(intersecting);
+ waitingOn = waitingOn.with(intersecting);
+ }
}
- }
- return waitingOn;
+ return waitingOn;
+ }
}
protected final void markSyncing(TxnId syncId, Ranges ranges)
{
- if (waitingOnSync.isEmpty())
- return;
-
- for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ synchronized (waitingOnSync)
{
- if (e.getKey() > syncId.epoch())
- break;
+ if (waitingOnSync.isEmpty())
+ return;
- Ranges remaining = e.getValue().waitingOn.without(ranges);
- if (e.getValue().waitingOn != remaining)
- e.getValue().waitingOn = remaining;
+ for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ {
+ if (e.getKey() > syncId.epoch())
+ break;
+
+ Ranges remaining = e.getValue().waitingOn.without(ranges);
+ if (e.getValue().waitingOn != remaining)
+ e.getValue().waitingOn = remaining;
+ }
}
}
protected final void unmarkSyncing(TxnId syncId, Ranges ranges)
{
- if (waitingOnSync.isEmpty())
- return;
-
- for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ synchronized (waitingOnSync)
{
- if (e.getKey() > syncId.epoch())
- break;
+ if (waitingOnSync.isEmpty())
+ return;
+
+ for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ {
+ if (e.getKey() > syncId.epoch())
+ break;
- Ranges unmark = e.getValue().waitingOnDurable.slice(ranges,
Minimal);
- if (!unmark.isEmpty())
- e.getValue().waitingOn = e.getValue().waitingOn.with(unmark);
+ Ranges unmark = e.getValue().waitingOnDurable.slice(ranges,
Minimal);
+ if (!unmark.isEmpty())
+ e.getValue().waitingOn =
e.getValue().waitingOn.with(unmark);
+ }
}
}
@@ -723,39 +735,42 @@ public abstract class CommandStore implements
SequentialAsyncExecutor
RedundantBefore addRedundantBefore = RedundantBefore.create(ranges,
syncId, LOCALLY_WITNESSED_ONLY);
safeStore.upsertRedundantBefore(addRedundantBefore);
- if (waitingOnSync.isEmpty())
- return;
-
- LongHashSet remove = null;
- for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
+ synchronized (waitingOnSync)
{
- if (e.getKey() > syncId.epoch())
- break;
-
- Ranges waitingOn = e.getValue().waitingOn;
- Ranges waitingOnDurable = e.getValue().waitingOnDurable;
- Ranges synced = waitingOnDurable.slice(ranges, Minimal);
- boolean intersects = waitingOnDurable.intersects(ranges);
- if (intersects)
+ if (waitingOnSync.isEmpty())
+ return;
+
+ LongHashSet remove = null;
+ for (Map.Entry<Long, WaitingOnSync> e : waitingOnSync.entrySet())
{
- e.getValue().waitingOn = waitingOn = waitingOn.without(ranges);
- e.getValue().waitingOnDurable = waitingOnDurable =
waitingOnDurable.without(ranges);
- if (waitingOnDurable.isEmpty())
+ if (e.getKey() > syncId.epoch())
+ break;
+
+ Ranges waitingOn = e.getValue().waitingOn;
+ Ranges waitingOnDurable = e.getValue().waitingOnDurable;
+ Ranges synced = waitingOnDurable.slice(ranges, Minimal);
+ boolean intersects = waitingOnDurable.intersects(ranges);
+ if (intersects)
{
- logger.debug("Completed full sync for {} on epoch {} using
{}", e.getValue().allRanges, e.getKey(), syncId);
- e.getValue().whenDone.trySuccess(null);
- if (remove == null)
- remove = new LongHashSet();
- remove.add(e.getKey());
- }
- else
- {
- logger.debug("Completed partial sync for {} on epoch {}
using {}; {} still to sync and {} to sync durably", synced, e.getKey(), syncId,
waitingOn, waitingOnDurable);
+ e.getValue().waitingOn = waitingOn =
waitingOn.without(ranges);
+ e.getValue().waitingOnDurable = waitingOnDurable =
waitingOnDurable.without(ranges);
+ if (waitingOnDurable.isEmpty())
+ {
+ logger.debug("Completed full sync for {} on epoch {}
using {}", e.getValue().allRanges, e.getKey(), syncId);
+ e.getValue().whenDone.trySuccess(null);
+ if (remove == null)
+ remove = new LongHashSet();
+ remove.add(e.getKey());
+ }
+ else
+ {
+ logger.debug("Completed partial sync for {} on epoch
{} using {}; {} still to sync and {} to sync durably", synced, e.getKey(),
syncId, waitingOn, waitingOnDurable);
+ }
}
}
+ if (remove != null)
+ remove.forEach(waitingOnSync::remove);
}
- if (remove != null)
- remove.forEach(waitingOnSync::remove);
}
public void markShardStale(SafeCommandStore safeStore, Timestamp
staleSince, Ranges ranges, boolean isSincePrecise)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]