This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f6fd49566f Improve DurabilityQueue:  - All retries are appended to a 
delay queue so overlaps can be pruned  - Quorum successes are not retried if 
there are superseding sync points covering the ranges  - User-initiated 
requests are not purged unless the request has timed out or otherwise completed 
 - Overlapping requests are queued up against the next to run Alsp (C*):  - 
Catch-up with quorums on restart  - Manage an ordered set of keys in cache for 
faster range searches Also (Acc [...]
f6fd49566f is described below

commit f6fd49566ff1dbcdc921a1b5876c53713e4c12d5
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Nov 7 11:26:50 2025 +0000

    Improve DurabilityQueue:
     - All retries are appended to a delay queue so overlaps can be pruned
     - Quorum successes are not retried if there are superseding sync points 
covering the ranges
     - User-initiated requests are not purged unless the request has timed out 
or otherwise completed
     - Overlapping requests are queued up against the next to run
    Alsp (C*):
     - Catch-up with quorums on restart
     - Manage an ordered set of keys in cache for faster range searches
    Also (Accord):
     - Update copy of BTree and import IntervalBTree
     - Fix RedundantStatus WAS_OWNED_OVERRIDE_MASK
     - Add Catchup mechanism to reach parity with a quorum on restart
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21013
---
 modules/accord                                     |   2 +-
 .../org/apache/cassandra/config/AccordSpec.java    |   7 +
 .../cassandra/service/accord/AccordCache.java      |  49 +++-
 .../service/accord/AccordCommandStore.java         |  13 +-
 .../cassandra/service/accord/AccordService.java    | 160 ++++++++----
 .../cassandra/service/accord/AccordTask.java       |   4 +-
 .../cassandra/service/accord/OrderedKeys.java      | 193 ++++++++++++++
 .../serializers/GetDurableBeforeSerializers.java   |   4 +-
 .../accord/serializers/SetDurableSerializers.java  |  31 +--
 .../org/apache/cassandra/utils/btree/BTree.java    |  27 +-
 .../test/accord/AccordJournalIntegrationTest.java  |  10 +-
 .../accord/journal/AccordJournalReplayTest.java    |   5 +-
 .../JournalAccessRouteIndexOnStartupRaceTest.java  |   4 +-
 .../apache/cassandra/utils/OrderedKeysTest.java    | 282 +++++++++++++++++++++
 14 files changed, 702 insertions(+), 89 deletions(-)

diff --git a/modules/accord b/modules/accord
index a681b1bf7d..de03d14e48 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit a681b1bf7d9e8d1d797af2fd54bdd180a4863406
+Subproject commit de03d14e4821b669fecad5dcdb2de2dba3a9d926
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index 8a5801e41f..1e2379a564 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -178,6 +178,13 @@ public class AccordSpec
     public TransactionalMode default_transactional_mode = 
TransactionalMode.off;
     public boolean ephemeralReadEnabled = true;
     public boolean state_cache_listener_jfr_enabled = true;
+
+    public DurationSpec.IntSecondsBound catchup_on_start_success_latency = new 
DurationSpec.IntSecondsBound(60);
+    public DurationSpec.IntSecondsBound catchup_on_start_fail_latency = new 
DurationSpec.IntSecondsBound(900);
+    public int catchup_on_start_max_attempts = 5;
+    public boolean catchup_on_start_exit_on_failure = true;
+    public boolean catchup_on_start = true;
+
     public final JournalSpec journal = new JournalSpec();
 
     public enum MixedTimeSourceHandling
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index 20ce3d3b1f..0a1dd623f0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -21,19 +21,17 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.ToLongFunction;
-import java.util.stream.Stream;
-
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -68,6 +66,7 @@ import 
org.apache.cassandra.service.accord.serializers.Version;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.NoSpamLogger.NoSpamLogStatement;
 import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
 
 import static accord.utils.Invariants.illegalState;
 import static accord.utils.Invariants.require;
@@ -112,6 +111,7 @@ public class AccordCache implements CacheSize
         long estimateShrunkHeapSize(Object shrunk);
         boolean validate(AccordCommandStore commandStore, K key, V value);
         S safeRef(AccordCacheEntry<K, V> node);
+        default Comparator<K> keyComparator() { return null; }
 
         default AccordCacheEntry<K, V> newEntry(K key, AccordCache.Type<K, V, 
?>.Instance owner)
         {
@@ -274,7 +274,7 @@ public class AccordCache implements CacheSize
             entry.savingOrWaitingToSave().identity.onSuccess(onSuccess);
     }
 
-    private void evict(AccordCacheEntry<?, ?> node, boolean updateUnreferenced)
+    private <K> void evict(AccordCacheEntry<K, ?> node, boolean 
updateUnreferenced)
     {
         if (logger.isTraceEnabled())
             logger.trace("Evicting {}", node);
@@ -297,7 +297,7 @@ public class AccordCache implements CacheSize
         if (node.status() == LOADED && VALIDATE_LOAD_ON_EVICT)
             owner.validateLoadEvicted(node);
 
-        AccordCacheEntry<?, ?> self = node.owner.cache.remove(node.key());
+        AccordCacheEntry<K, ?> self = node.owner.remove(node.key());
         Invariants.require(self.references() == 0);
         require(self == node, "Leaked node detected; was attempting to remove 
%s but cache had %s", node, self);
         node.notifyListeners(Listener::onEvict);
@@ -400,6 +400,8 @@ public class AccordCache implements CacheSize
             // TODO (desired): don't need to store key separately as stored in 
node; ideally use a hash set that allows us to get the current entry
             private final Map<K, AccordCacheEntry<K, V>> cache = new 
Object2ObjectHashMap<>();
             private List<Listener<K, V>> listeners = null;
+            // TODO (expected): update this after releasing the lock
+            private OrderedKeys<K> orderedKeys;
 
             public Instance(AccordCommandStore commandStore)
             {
@@ -435,7 +437,6 @@ public class AccordCache implements CacheSize
 
             private AccordCacheEntry<K, V> acquire(K key, boolean onlyIfLoaded)
             {
-                @SuppressWarnings("unchecked")
                 AccordCacheEntry<K, V> node = cache.get(key);
                 return node == null
                        ? acquireAbsent(key, onlyIfLoaded)
@@ -454,8 +455,10 @@ public class AccordCache implements CacheSize
                 node.increment();
 
                 Object prev = cache.put(key, node);
-                node.initSize(parent());
                 Invariants.require(prev == null, "%s not absent from cache: %s 
already present", key, node);
+                if (orderedKeys != null)
+                    orderedKeys.add(key);
+                node.initSize(parent());
                 ++size;
                 node.notifyListeners(Listener::onAdd);
                 maybeShrinkOrEvictSomeNodes();
@@ -555,9 +558,12 @@ public class AccordCache implements CacheSize
                 maybeShrinkOrEvictSomeNodes();
             }
 
-            public Stream<AccordCacheEntry<K, V>> stream()
+            AccordCacheEntry<K, ?> remove(K key)
             {
-                return cache.values().stream();
+                AccordCacheEntry<K, ?> result = cache.remove(key);
+                if (orderedKeys != null && result != null)
+                    orderedKeys.remove(key);
+                return result;
             }
 
             final Type<K, V, S> parent()
@@ -565,6 +571,14 @@ public class AccordCache implements CacheSize
                 return Type.this;
             }
 
+            public Iterable<K> keysBetween(K start, boolean startInclusive, K 
end, boolean endInclusive)
+            {
+                if (orderedKeys == null)
+                    orderedKeys = new OrderedKeys<>(adapter.keyComparator(), 
cache.keySet());
+
+                return orderedKeys.between(start, startInclusive, end, 
endInclusive);
+            }
+
             @Override
             public Iterator<AccordCacheEntry<K, V>> iterator()
             {
@@ -602,11 +616,6 @@ public class AccordCache implements CacheSize
                 return cache.get(key);
             }
 
-            public Set<K> keySet()
-            {
-                return cache.keySet();
-            }
-
             @VisibleForTesting
             public boolean isReferenced(K key)
             {
@@ -1039,6 +1048,12 @@ public class AccordCache implements CacheSize
         {
             return newNode.apply(key, owner);
         }
+
+        @Override
+        public Comparator<K> keyComparator()
+        {
+            return Comparator.comparing(a -> ((Comparable) a));
+        }
     }
 
     static class SettableWrapper<K, V, S> extends FunctionalAdapter<K, V, S>
@@ -1148,6 +1163,12 @@ public class AccordCache implements CacheSize
         {
             return new AccordSafeCommandsForKey(node);
         }
+
+        @Override
+        public Comparator<RoutingKey> keyComparator()
+        {
+            return RoutingKey::compareAsRoutingKey;
+        }
     }
 
     public static class CommandAdapter implements Adapter<TxnId, Command, 
AccordSafeCommand>
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index d47f0ed0c3..5c1df8f136 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -55,6 +55,7 @@ import accord.local.RedundantBefore;
 import accord.local.SafeCommandStore;
 import accord.local.cfk.CommandsForKey;
 import accord.primitives.PartialTxn;
+import accord.primitives.Range;
 import accord.primitives.Ranges;
 import accord.primitives.RoutableKey;
 import accord.primitives.Route;
@@ -490,12 +491,16 @@ public class AccordCommandStore extends CommandStore
             Ready ready = new Ready();
             try (ExclusiveCaches caches = lockCaches())
             {
-                for (AccordCacheEntry<RoutingKey, CommandsForKey> e : 
caches.commandsForKeys())
+                for (Range range : ranges)
                 {
-                    if (ranges.contains(e.key()) && e.isModified())
+                    for (RoutingKey k : 
caches.commandsForKeys().keysBetween(range.start(), range.startInclusive(), 
range.end(), range.endInclusive()))
                     {
-                        ready.increment();
-                        caches.global().saveWhenReadyExclusive(e, ready);
+                        AccordCacheEntry<RoutingKey, CommandsForKey> e = 
caches.commandsForKeys().getUnsafe(k);
+                        if (e.isModified())
+                        {
+                            ready.increment();
+                            caches.global().saveWhenReadyExclusive(e, ready);
+                        }
                     }
                 }
             }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index bcf58650a9..fa26ea24b6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -38,13 +38,16 @@ import javax.annotation.concurrent.GuardedBy;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
+import accord.local.Catchup;
 import accord.topology.ActiveEpochs;
 import accord.topology.EpochReady;
 import accord.primitives.Txn;
+import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
 import org.apache.cassandra.metrics.AccordReplicaMetrics;
 import org.apache.cassandra.metrics.AccordSystemMetrics;
 import org.apache.cassandra.service.accord.api.AccordViolationHandler;
-import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
 import org.apache.cassandra.utils.concurrent.ImmediateFuture;
 import org.slf4j.Logger;
@@ -142,6 +145,7 @@ import static 
org.apache.cassandra.config.DatabaseDescriptor.getAccordShardDurab
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getAccordShardDurabilityMaxSplits;
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getAccordShardDurabilityTargetSplits;
 import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
+import static org.apache.cassandra.db.SystemKeyspace.BootstrapState.COMPLETED;
 import static org.apache.cassandra.journal.Params.ReplayMode.RESET;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordReadBookkeeping;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordWriteBookkeeping;
@@ -288,30 +292,7 @@ public class AccordService implements IAccordService, 
Shutdownable
 
         AccordService as = new 
AccordService(AccordTopology.tcmIdToAccord(tcmId));
         unsafeInstance = as;
-        as.node.unsafeSetReplaying(true);
-        try
-        {
-            as.startup();
-            replayJournal(as);
-        }
-        finally
-        {
-            as.node.unsafeSetReplaying(false);
-        }
-
-        as.finishInitialization();
-
-        as.fastPathCoordinator.start();
-
-        
ClusterMetadataService.instance().log().addListener(as.fastPathCoordinator);
-        
as.node.durability().shards().reconfigure(Ints.checkedCast(getAccordShardDurabilityTargetSplits()),
-                                                  
Ints.checkedCast(getAccordShardDurabilityMaxSplits()),
-                                                  
Ints.checkedCast(getAccordShardDurabilityCycle(SECONDS)), SECONDS);
-        
as.node.durability().global().setGlobalCycleTime(Ints.checkedCast(getAccordGlobalDurabilityCycle(SECONDS)),
 SECONDS);
-        as.state = State.STARTED;
-        // Only enable durability scheduling _after_ we have fully replayed 
journal
-        as.node.durability().start();
-
+        as.startup();
         instance = as;
         
         AccordReplicaMetrics.touch();
@@ -326,7 +307,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     public static boolean replayJournal(AccordService as)
     {
         logger.info("Starting journal replay.");
-        long before = Clock.Global.nanoTime();
+        long start = nanoTime();
         if (as.journalConfiguration().replayMode() == RESET)
             AccordKeyspace.truncateCommandsForKey();
 
@@ -337,8 +318,8 @@ public class AccordService implements IAccordService, 
Shutdownable
         as.journal.unsafeSetStarted();
         as.node.commandStores().forAllUnsafe(cs -> 
cs.unsafeProgressLog().start());
 
-        long after = Clock.Global.nanoTime();
-        logger.info("Finished journal replay. {}ms elapsed", 
NANOSECONDS.toMillis(after - before));
+        long end = nanoTime();
+        logger.info("Finished journal replay. {}s elapsed", 
String.format("%.2f", NANOSECONDS.toMillis(end - start)/1000.0));
         return true;
     }
 
@@ -414,29 +395,117 @@ public class AccordService implements IAccordService, 
Shutdownable
     {
         if (state != State.INIT)
             return;
-        journal.start(node);
-        node.load();
 
-        ClusterMetadata metadata = ClusterMetadata.current();
-        endpointMapper.updateMapping(metadata);
+        node.unsafeSetReplaying(true);
+        try
+        {
+            journal.start(node);
+            node.load();
+
+            ClusterMetadata metadata = ClusterMetadata.current();
+            endpointMapper.updateMapping(metadata);
+
+            List<TopologyUpdate> images = journal.replayTopologies();
+            if (!images.isEmpty())
+            {
+                // Initialise command stores using latest topology from the 
log;
+                // if there are no local command stores, don't report any 
topologies and simply fetch the latest known in the cluster
+                // this avoids a registered (not joined) node learning of 
topologies, then later restarting with some intervening
+                // epochs having been garbage collected by the other nodes in 
the cluster
+                TopologyUpdate last = images.get(images.size() - 1);
+                if (!last.commandStores.isEmpty())
+                {
+                    node.commandStores().initializeTopologyUnsafe(last);
+
+                    // Replay local epochs
+                    for (TopologyUpdate image : images)
+                        node.topology().reportTopology(image.global);
+                }
+            }
+            replayJournal(this);
+        }
+        finally
+        {
+            node.unsafeSetReplaying(false);
+        }
 
-        List<TopologyUpdate> images = journal.replayTopologies();
-        if (!images.isEmpty())
+        finishInitialization();
+        catchup();
+
+        fastPathCoordinator.start();
+        
ClusterMetadataService.instance().log().addListener(fastPathCoordinator);
+
+        
node.durability().shards().reconfigure(Ints.checkedCast(getAccordShardDurabilityTargetSplits()),
+                                               
Ints.checkedCast(getAccordShardDurabilityMaxSplits()),
+                                               
Ints.checkedCast(getAccordShardDurabilityCycle(SECONDS)), SECONDS);
+        
node.durability().global().setGlobalCycleTime(Ints.checkedCast(getAccordGlobalDurabilityCycle(SECONDS)),
 SECONDS);
+        // Only enable durability scheduling _after_ we have fully replayed 
journal
+        node.durability().start();
+        state = State.STARTED;
+    }
+
+    void catchup()
+    {
+        AccordSpec spec = DatabaseDescriptor.getAccord();
+        if (!spec.catchup_on_start)
         {
-            // Initialise command stores using latest topology from the log;
-            // if there are no local command stores, don't report any 
topologies and simply fetch the latest known in the cluster
-            // this avoids a registered (not joined) node learning of 
topologies, then later restarting with some intervening
-            // epochs having been garbage collected by the other nodes in the 
cluster
-            TopologyUpdate last = images.get(images.size() - 1);
-            if (!last.commandStores.isEmpty())
+            logger.info("Not catching up with peers");
+            return;
+        }
+
+        BootstrapState bootstrapState = SystemKeyspace.getBootstrapState();
+        if (bootstrapState == COMPLETED)
+        {
+            long maxLatencyNanos = 
spec.catchup_on_start_fail_latency.toNanoseconds();
+            int attempts = 1;
+            while (true)
             {
-                node.commandStores().initializeTopologyUnsafe(last);
+                logger.info("Catching up with quorum...");
+                long start = nanoTime();
+                long failAt = start + maxLatencyNanos;
+                Future<Void> f = toFuture(Catchup.catchup(node));
+                if (!f.awaitUntilThrowUncheckedOnInterrupt(failAt))
+                {
+                    if (spec.catchup_on_start_exit_on_failure)
+                    {
+                        logger.error("Catch up exceeded maximum latency of 
{}ns; shutting down", maxLatencyNanos);
+                        throw new RuntimeException("Could not catch up with 
peers");
+                    }
+                    logger.error("Catch up exceeded maximum latency of {}ns; 
starting up", maxLatencyNanos);
+                    break;
+                }
+
+                Throwable failed = f.cause();
+                if (failed != null)
+                    throw new RuntimeException("Could not catch up with 
peers", failed);
+
+                long end = nanoTime();
+                double seconds = NANOSECONDS.toMillis(end - start)/1000.0;
+                logger.info("Finished catching up with all quorums. {}s 
elapsed.", String.format("%.2f", seconds));
+
+                // TODO (expected): make configurable
+                if (seconds <= 
spec.catchup_on_start_success_latency.toSeconds())
+                    break;
 
-                // Replay local epochs
-                for (TopologyUpdate image : images)
-                    node.topology().reportTopology(image.global);
+                if (++attempts > spec.catchup_on_start_max_attempts)
+                {
+                    if (spec.catchup_on_start_exit_on_failure)
+                    {
+                        logger.error("Catch up was slow, aborting after {} 
attempts and shutting down", attempts);
+                        throw new RuntimeException("Could not catch up with 
peers");
+                    }
+
+                    logger.info("Catch up was slow; continuing to startup 
after {} attempts.", attempts - 1);
+                    break;
+                }
+
+                logger.info("Catch up was slow, so we may behind again; 
retrying");
             }
         }
+        else
+        {
+            logger.info("Not catching up with quorum, as bootstrap state is 
{}", bootstrapState);
+        }
     }
 
     /**
@@ -444,8 +513,7 @@ public class AccordService implements IAccordService, 
Shutdownable
      *  the latest epoch known to the node prior to restart. After that, we 
replay journal itself, and only after
      *  that we finish initializaiton and replay the rest of epochs.
      */
-    @VisibleForTesting
-    public void finishInitialization()
+    void finishInitialization()
     {
         endpointMapper.updateMapping(ClusterMetadata.current());
         TopologyManager topology = node.topology();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java 
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index 55d5e8b1aa..d2b87d25b1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -1001,9 +1001,9 @@ public abstract class AccordTask<R> extends 
SubmittableTask implements Function<
 
         void startInternal(Caches caches)
         {
-            for (RoutingKey key : caches.commandsForKeys().keySet())
+            for (Range range : ranges)
             {
-                if (ranges.contains(key))
+                for (RoutingKey key : 
caches.commandsForKeys().keysBetween(range.start(), range.startInclusive(), 
range.end(), range.endInclusive()))
                     intersectingKeys.add((TokenKey) key);
             }
             caches.commandsForKeys().register(keyWatcher);
diff --git a/src/java/org/apache/cassandra/service/accord/OrderedKeys.java 
b/src/java/org/apache/cassandra/service/accord/OrderedKeys.java
new file mode 100644
index 0000000000..a8f7acf6f4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/OrderedKeys.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.accord;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.cassandra.utils.BulkIterator;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+/**
+ * Maintains an ordered set of keys backed by a BTree, buffering additions and 
removals for batch updates.
+ */
+public class OrderedKeys<K> implements Iterable<K>
+{
+    private final Comparator<K> comparator;
+
+    private Object[] map;
+    private final Object[] buffer = new Object[32];
+    private int bufferRemovals;  // Bit field: 1 = removal, 0 = addition
+    private int bufferCount;
+
+    public OrderedKeys(Comparator<K> comparator)
+    {
+        this.comparator = comparator;
+        this.map = BTree.empty();
+    }
+
+    public OrderedKeys(Comparator<K> comparator, Collection<K> init)
+    {
+        this.comparator = comparator;
+        BTree.Builder<K> builder = BTree.builder(comparator);
+        for (K k : init)
+            builder.add(k);
+        this.map = builder.build();
+    }
+
+    public void add(K key, int removalBit)
+    {
+        if (bufferCount == buffer.length)
+            flush();
+        bufferRemovals |= removalBit << bufferCount;
+        buffer[bufferCount++] = key;
+    }
+
+    public void add(K key)
+    {
+        add(key, 0);
+    }
+
+    public void remove(K key)
+    {
+        add(key, 1);
+    }
+
+    /**
+     * Flushes all buffered additions and removals to the underlying BTree.
+     */
+    public void flush()
+    {
+        if (bufferCount == 0)
+            return;
+
+        // Partition the buffer: additions first, then removals
+        int additionEnd = 0, removalStart = bufferCount;
+        while (additionEnd != removalStart)
+        {
+            if (0 == (bufferRemovals & (1 << additionEnd))) ++additionEnd;
+            else if (0 != (bufferRemovals & (1 << (removalStart - 1)))) 
--removalStart;
+            else
+            {
+                Object tmp = buffer[additionEnd];
+                buffer[additionEnd++] = buffer[--removalStart];
+                buffer[removalStart] = tmp;
+            }
+        }
+
+        // Sort both partitions
+        Arrays.sort(buffer, 0, removalStart, (Comparator) comparator);
+        Arrays.sort(buffer, removalStart, bufferCount, (Comparator) 
comparator);
+
+        int i = 0, j = removalStart;
+
+        // Merge additions and removals
+        additionEnd = 0;
+        int removalEnd = removalStart;
+        while (i < removalStart && j < bufferCount)
+        {
+            int c = comparator.compare((K) buffer[i], (K) buffer[j]);
+            if (c == 0)
+            {
+                // matched pairs can be ignored
+                ++i; ++j;
+            }
+            else if (c < 0)
+            {
+                // Addition is smaller
+                if (additionEnd != i)
+                    buffer[additionEnd] = buffer[i];
+                ++additionEnd;
+                ++i;
+            }
+            else
+            {
+                // Removal is smaller
+                if (removalEnd != j)
+                    buffer[removalEnd] = buffer[j];
+                ++removalEnd;
+                ++j;
+            }
+        }
+
+        // Copy remaining additions
+        if (i != removalStart)
+        {
+            if (additionEnd == i) additionEnd = removalStart;
+            else
+            {
+                int count = removalStart - i;
+                System.arraycopy(buffer, i, buffer, additionEnd, count);
+                additionEnd += count;
+            }
+        }
+
+        // Copy remaining removals
+        if (j != bufferCount)
+        {
+            if (removalEnd == j) removalEnd = bufferCount;
+            else
+            {
+                int count = bufferCount - j;
+                System.arraycopy(buffer, j, buffer, removalEnd, count);
+                removalEnd += count;
+            }
+        }
+
+        // Apply additions to BTree
+        if (additionEnd != 0)
+            map = BTree.update(map, BTree.build(BulkIterator.of(buffer, 0), 
additionEnd, UpdateFunction.noOp()), comparator);
+
+        // Apply removals from BTree
+        if (removalEnd != removalStart)
+            map = BTree.subtract(map, BTree.build(BulkIterator.of(buffer, 
removalStart), removalEnd - removalStart, UpdateFunction.noOp()), comparator);
+
+        // Clear buffer
+        Arrays.fill(buffer, 0, bufferCount, null);
+        bufferRemovals = 0;
+        bufferCount = 0;
+    }
+
+    public Iterable<K> between(K start, boolean startInclusive, K end, boolean 
endInclusive)
+    {
+        return () -> {
+            flush();
+            return BTree.slice(map, comparator, start, startInclusive, end, 
endInclusive, BTree.Dir.ASC);
+        };
+    }
+
+    public Iterator<K> iterator()
+    {
+        flush();
+        return BTree.iterator(map);
+    }
+
+    public int size()
+    {
+        flush();
+        return BTree.size(map);
+    }
+
+    public int bufferSize()
+    {
+        return bufferCount;
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/GetDurableBeforeSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/GetDurableBeforeSerializers.java
index c314e51a79..4ba6d7366f 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/GetDurableBeforeSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/GetDurableBeforeSerializers.java
@@ -52,7 +52,7 @@ public class GetDurableBeforeSerializers
         @Override
         public void serialize(DurableBeforeReply msg, DataOutputPlus out) 
throws IOException
         {
-            
CommandStoreSerializers.durableBefore.serialize(msg.durableBeforeMap, out);
+            CommandStoreSerializers.durableBefore.serialize(msg.durableBefore, 
out);
         }
 
         @Override
@@ -64,7 +64,7 @@ public class GetDurableBeforeSerializers
         @Override
         public long serializedSize(DurableBeforeReply msg)
         {
-            return 
CommandStoreSerializers.durableBefore.serializedSize(msg.durableBeforeMap);
+            return 
CommandStoreSerializers.durableBefore.serializedSize(msg.durableBefore);
         }
     };
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
index c855103b42..a5e7ef92dc 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
@@ -22,8 +22,9 @@ import java.io.IOException;
 import accord.messages.SetGloballyDurable;
 import accord.messages.SetShardDurable;
 import accord.primitives.Deps;
-import accord.primitives.FullRoute;
-import accord.primitives.SyncPoint;
+import accord.primitives.MinimalSyncPoint;
+import accord.primitives.RangeRoute;
+import accord.primitives.Route;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import org.apache.cassandra.io.UnversionedSerializer;
@@ -38,7 +39,7 @@ public class SetDurableSerializers
         @Override
         public void serialize(SetShardDurable msg, DataOutputPlus out) throws 
IOException
         {
-            syncPoint.serialize(msg.exclusiveSyncPoint, out);
+            syncPoint.serialize(msg.syncPoint, out);
             CommandSerializers.outcomeDurability.serialize(msg.durability, 
out);
         }
 
@@ -52,7 +53,7 @@ public class SetDurableSerializers
         @Override
         public long serializedSize(SetShardDurable msg)
         {
-            return syncPoint.serializedSize(msg.exclusiveSyncPoint)
+            return syncPoint.serializedSize(msg.syncPoint)
                 + 
CommandSerializers.outcomeDurability.serializedSize(msg.durability);
         }
     };
@@ -78,34 +79,34 @@ public class SetDurableSerializers
         }
     };
 
-    public static final UnversionedSerializer<SyncPoint> syncPoint = new 
UnversionedSerializer<>()
+    public static final UnversionedSerializer<MinimalSyncPoint> syncPoint = 
new UnversionedSerializer<>()
     {
         @Override
-        public void serialize(SyncPoint sp, DataOutputPlus out) throws 
IOException
+        public void serialize(MinimalSyncPoint sp, DataOutputPlus out) throws 
IOException
         {
             CommandSerializers.txnId.serialize(sp.syncId, out);
             ExecuteAtSerializer.serialize(sp.syncId, sp.executeAt, out);
-            DepsSerializers.deps.serialize(sp.waitFor, out);
-            KeySerializers.fullRoute.serialize(sp.route, out);
+            DepsSerializers.deps.serialize(Deps.NONE, out);
+            KeySerializers.route.serialize(sp.route, out);
         }
 
         @Override
-        public SyncPoint deserialize(DataInputPlus in) throws IOException
+        public MinimalSyncPoint deserialize(DataInputPlus in) throws 
IOException
         {
             TxnId syncId = CommandSerializers.txnId.deserialize(in);
             Timestamp executeAt = ExecuteAtSerializer.deserialize(syncId, in);
-            Deps waitFor = DepsSerializers.deps.deserialize(in);
-            FullRoute<?> route = KeySerializers.fullRoute.deserialize(in);
-            return SyncPoint.SerializationSupport.construct(syncId, executeAt, 
waitFor, route);
+            DepsSerializers.deps.deserialize(in);
+            Route<?> route = KeySerializers.route.deserialize(in);
+            return MinimalSyncPoint.SerializationSupport.construct(syncId, 
executeAt, (RangeRoute) route);
         }
 
         @Override
-        public long serializedSize(SyncPoint sp)
+        public long serializedSize(MinimalSyncPoint sp)
         {
             return   CommandSerializers.txnId.serializedSize(sp.syncId)
                    + ExecuteAtSerializer.serializedSize(sp.syncId, 
sp.executeAt)
-                   + DepsSerializers.deps.serializedSize(sp.waitFor)
-                   + KeySerializers.fullRoute.serializedSize(sp.route);
+                   + DepsSerializers.deps.serializedSize(Deps.NONE)
+                   + KeySerializers.route.serializedSize(sp.route);
         }
     };
 }
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java 
b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 0ef17c1696..0f84b6a5a9 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -75,7 +75,7 @@ public class BTree
      * _DO NOT_ attempt to modify this field directly. Instead, use 
BRANCH_SHIFT above instead, as branch factor
      * should _always_ be a power of 2.
      */
-    static final int BRANCH_FACTOR = 1 << BRANCH_SHIFT;
+    public static final int BRANCH_FACTOR = 1 << BRANCH_SHIFT;
     public static final int MIN_KEYS = BRANCH_FACTOR / 2 - 1;
     public static final int MAX_KEYS = BRANCH_FACTOR - 1;
     public static final long STOP_SENTINEL_VALUE = Long.MAX_VALUE;
@@ -538,6 +538,17 @@ public class BTree
         }
     }
 
+    /**
+     * Subtracts {@code subtract} from {@code update}.
+     */
+    public static <Compare> Object[] subtract(Object[] toUpdate, Object[] 
subtract, Comparator<Compare> comparator)
+    {
+        try (Subtraction subtraction = Subtraction.get(comparator))
+        {
+            return subtraction.subtract(toUpdate, subtract);
+        }
+    }
+
     public static void reverseInSitu(Object[] tree)
     {
         reverseInSitu(tree, height(tree), true);
@@ -4240,6 +4251,20 @@ public class BTree
         }
     }
 
+    static class Subtraction<K, T extends K> extends AbstractSubtraction<K, T>
+    {
+        static final ThreadLocal<Subtraction> SHARED = new ThreadLocal<>();
+
+        static <K, T extends K> Subtraction<K, T> get(Comparator<K> comparator)
+        {
+            Subtraction subtraction = SHARED.get();
+            if (subtraction == null)
+                SHARED.set(subtraction = new Subtraction());
+            subtraction.comparator = comparator;
+            return subtraction;
+        }
+    }
+
     private static abstract class AbstractTransformer<I, O> extends 
AbstractSeekingTransformer<I, O>
     {
         @Override
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
index f77bfa2016..d0d3db5e01 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
@@ -47,6 +47,7 @@ public class AccordJournalIntegrationTest extends TestBaseImpl
     {
         try (WithProperties wp = new 
WithProperties().set(CassandraRelevantProperties.DTEST_ACCORD_JOURNAL_SANITY_CHECK_ENABLED,
 "true");
              Cluster cluster = init(Cluster.build(1)
+                                           .withConfig(config -> 
config.set("accord.catchup_on_start", "false"))
                                            .withoutVNodes()
                                            .start()))
         {
@@ -95,6 +96,7 @@ public class AccordJournalIntegrationTest extends TestBaseImpl
     {
         try (Cluster cluster = Cluster.build(1)
                                       .withoutVNodes()
+                                      .withConfig(config -> 
config.set("accord.catchup_on_start", "false"))
                                       .start())
         {
             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
@@ -122,7 +124,13 @@ public class AccordJournalIntegrationTest extends 
TestBaseImpl
     @Test
     public void restartWithEpochChanges() throws IOException
     {
-        try (Cluster cluster = Cluster.build(3).withoutVNodes().withConfig(c 
-> c.with(GOSSIP).with(NETWORK)).start())
+        try (Cluster cluster = Cluster.build(3)
+                                      .withoutVNodes()
+                                      .withConfig(c -> {
+                                          c.with(GOSSIP).with(NETWORK);
+                                          c.set("accord.catchup_on_start", 
"false");
+                                      })
+                                      .start())
         {
             init(cluster);
             final String TABLE = createTable(cluster);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalReplayTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalReplayTest.java
index 0e8a01c228..efabf33022 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalReplayTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalReplayTest.java
@@ -95,6 +95,7 @@ public class AccordJournalReplayTest extends TestBaseImpl
                                                                   
.set("accord.shard_durability_target_splits", "1")
                                                                   
.set("accord.retry_syncpoint", "1s*attempts")
                                                                   
.set("accord.retry_durability", "1s*attempts")
+                                                                  
.set("accord.catchup_on_start", "false")
                                                                   
.with(NETWORK, GOSSIP))
                                       .start())
         {
@@ -125,8 +126,8 @@ public class AccordJournalReplayTest extends TestBaseImpl
                 Command command = Command.Executed.executed(txnId, 
SaveStatus.PreApplied, Status.Durability.NotDurable, 
StoreParticipants.execute(commandStore.unsafeGetRangesForEpoch(), route, txnId, 
txnId.epoch()), Ballot.ZERO, txnId, txn.intersecting(route, true), 
deps.intersecting(route), Ballot.ZERO, waitingOn, writes, 
ResultSerializers.APPLIED);
                 commandStore.journal.saveCommand(commandStore.id(), new 
Journal.CommandUpdate(null, command), () -> {});
 
-                SyncPoint<accord.primitives.Range> syncPoint = 
AccordService.getBlocking(CoordinateSyncPoint.exclusive(node, syncPointId, 
Ranges.of(key.asRange())));
-                AccordService.getBlocking(ExecuteSyncPoint.coordinate(node, 
syncPoint, 1).onQuorum());
+                SyncPoint syncPoint = 
AccordService.getBlocking(CoordinateSyncPoint.exclusive(node, syncPointId, 
Ranges.of(key.asRange())));
+                AccordService.getBlocking(ExecuteSyncPoint.coordinate(node, 
syncPoint, 1).onQuorumOrDone());
                 
Keyspace.open("ks").getColumnFamilyStore("tbl").forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
                 
Keyspace.open("system_accord").getColumnFamilyStore("commands_for_key").forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
                 spinUntilTrue(() -> {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
index c9ec047071..039bfde51c 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
@@ -64,7 +64,9 @@ public class JournalAccessRouteIndexOnStartupRaceTest extends 
TestBaseImpl
     @Test
     public void test() throws IOException
     {
-        try (Cluster cluster = 
Cluster.build(1).withInstanceInitializer(BBHelper::install).start())
+        try (Cluster cluster = Cluster.build(1)
+                                      .withConfig(config -> 
config.set("accord.catchup_on_start", "false"))
+                                      
.withInstanceInitializer(BBHelper::install).start())
         {
             IInvokableInstance node = cluster.get(1);
             node.nodetoolResult("disableautocompaction", ACCORD_KEYSPACE_NAME, 
JOURNAL).asserts().success();
diff --git a/test/unit/org/apache/cassandra/utils/OrderedKeysTest.java 
b/test/unit/org/apache/cassandra/utils/OrderedKeysTest.java
new file mode 100644
index 0000000000..57dac2c393
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/OrderedKeysTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.service.accord.OrderedKeys;
+import org.quicktheories.WithQuickTheories;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+import org.quicktheories.impl.Constraint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class OrderedKeysTest implements WithQuickTheories
+{
+    private Gen<List<Op>> operationsGen()
+    {
+        Gen<Integer> sizeGen = SourceDSL.integers().between(0, 200);
+        Gen<Integer> keyGen = SourceDSL.integers().between(0, 1000);
+        Gen<Boolean> opGen = SourceDSL.booleans().all();
+        return rng -> {
+            Set<Integer> additions = new HashSet<>();
+            int size = sizeGen.generate(rng);
+            List<Op> ops = new ArrayList<>(size);
+            for (int i = 0; i < size; i++)
+            {
+                while (true)
+                {
+                    if (opGen.generate(rng))
+                    {
+                        int key = keyGen.generate(rng);
+                        // Do not add same key twice
+                        if (additions.contains(key))
+                            continue;
+                        ops.add(new AddOp(key));
+                        additions.add(key);
+                    }
+                    else if (!additions.isEmpty())
+                    {
+                        int idx = 0;
+                        if (additions.size() > 1)
+                            idx = (int) rng.next(Constraint.between(0, 
additions.size() - 1));
+
+                        Iterator<Integer> iter = additions.iterator();
+                        Integer key = iter.next();
+                        for (int j = 0; j < idx; j++)
+                            key = iter.next();
+
+                        ops.add(new RemoveOp(key));
+                        additions.remove(key);
+                    }
+                    break;
+                }
+            }
+
+            return ops;
+        };
+    }
+
+    interface Op
+    {
+        void apply(OrderedKeys<Integer> orderedKeys, TreeSet<Integer> 
reference, Map<Integer, Object> cache);
+    }
+
+    static class AddOp implements Op
+    {
+        final Integer key;
+
+        AddOp(Integer key)
+        {
+            this.key = key;
+        }
+
+        @Override
+        public void apply(OrderedKeys<Integer> orderedKeys, TreeSet<Integer> 
model, Map<Integer, Object> cache)
+        {
+            orderedKeys.add(key);
+            cache.put(key, key.toString());
+            model.add(key);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Add(" + key + ")";
+        }
+    }
+
+    static class RemoveOp implements Op
+    {
+        final Integer key;
+
+        RemoveOp(Integer key)
+        {
+            this.key = key;
+        }
+
+        @Override
+        public void apply(OrderedKeys<Integer> orderedKeys, TreeSet<Integer> 
model, Map<Integer, Object> cache)
+        {
+            orderedKeys.remove(key);
+            Object removed = cache.remove(key);
+            Assert.assertEquals(removed, key.toString());
+            Invariants.require(model.remove(key));
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Remove(" + key + ")";
+        }
+    }
+
+    @Test
+    public void simpleTest()
+    {
+        OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+        keys.add(1);
+        keys.add(5);
+        keys.add(3);
+
+        keys.flush();
+        assertEquals(3, keys.size());
+
+        Iterator<Integer> iter = keys.iterator();
+        assertEquals(Integer.valueOf(1), iter.next());
+        assertEquals(Integer.valueOf(3), iter.next());
+        assertEquals(Integer.valueOf(5), iter.next());
+        assertFalse(iter.hasNext());
+
+        // Remove a key
+        keys.remove(3);
+        keys.flush();
+        assertEquals(2, keys.size());
+
+        iter = keys.iterator();
+        assertEquals(Integer.valueOf(1), iter.next());
+        assertEquals(Integer.valueOf(5), iter.next());
+        assertFalse(iter.hasNext());
+    }
+
+    @Test
+    public void testDuplicates()
+    {
+        OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+        keys.add(1);
+        keys.remove(1);
+        keys.add(1);
+
+        keys.flush();
+
+        Iterator<Integer> iter = keys.iterator();
+        assertEquals((Integer) 1, iter.next());
+        assertFalse(iter.hasNext());
+    }
+
+    @Test
+    public void testBetween()
+    {
+        OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+
+        for (int i = 0; i < 10; i++)
+            keys.add(i);
+        keys.flush();
+
+        // incl
+        List<Integer> result = new ArrayList<>();
+        keys.between(3, true, 7, true).forEach(result::add);
+        assertEquals(List.of(3, 4, 5, 6, 7), result);
+
+        // excl
+        result.clear();
+        keys.between(3, false, 7, false).forEach(result::add);
+        assertEquals(List.of(4, 5, 6), result);
+
+        // incl/excl
+        result.clear();
+        keys.between(3, true, 7, false).forEach(result::add);
+        assertEquals(List.of(3, 4, 5, 6), result);
+    }
+
+    @Test
+    public void testBuffering()
+    {
+        OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+
+        for (int i = 0; i < 10; i++)
+            keys.add(i);
+
+        assertTrue("Buffer should have pending operations", keys.bufferSize() 
== 10);
+
+        // After flush, buffer should be empty
+        keys.flush();
+        assertEquals(0, keys.bufferSize());
+        assertEquals(10, keys.size());
+    }
+
+    @Test
+    public void testFlushOnOverflow()
+    {
+        OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+
+        // add more than 32 keys (default buffer size)
+        for (int i = 0; i < 50; i++)
+            keys.add(i);
+
+        assertTrue("Buffer should have flushed", keys.bufferSize() == (50 - 
32));
+    }
+
+    @Test
+    public void randomOperationsMatchReference()
+    {
+        qt().withExamples(1000)
+            .withShrinkCycles(0)
+            .withFixedSeed(1)
+            .forAll(operationsGen())
+            .checkAssert((operations) -> {
+                Map<Integer, Object> cache = new HashMap<>();
+                OrderedKeys<Integer> orderedKeys = new 
OrderedKeys<>(Integer::compareTo);
+                TreeSet<Integer> model = new TreeSet<>();
+
+                for (int i = 0; i < operations.size(); i++)
+                {
+                    Op op = operations.get(i);
+                    op.apply(orderedKeys, model, cache);
+                    if (Math.random() < 0.1)
+                    {
+                        orderedKeys.flush();
+                        assertMatchesReference(operations.subList(0, i), 
orderedKeys, model, cache, "After: " + op);
+                    }
+                }
+
+                orderedKeys.flush();
+                assertMatchesReference(operations, orderedKeys, model, cache, 
"Final state");
+            });
+    }
+
+    private void assertMatchesReference(List<Op> ops, OrderedKeys<Integer> 
orderedKeys, TreeSet<Integer> model,  Map<Integer, Object> cache, String 
message)
+    {
+        List<Integer> actualList = new ArrayList<>();
+        orderedKeys.iterator().forEachRemaining(actualList::add);
+        List<Integer> expectedList = new ArrayList<>(model);
+
+        if (!expectedList.equals(actualList))
+        {
+            throw new AssertionError(String.format("%s\n" +
+                                                   "Ops:          %s\n" +
+                                                   "Ordered Keys: %s\n" +
+                                                   "Cache:        %s\n" +
+                                                   "Model:        %s", 
message, ops, actualList, cache, model));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to