This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f6fd49566f Improve DurabilityQueue: - All retries are appended to a
delay queue so overlaps can be pruned - Quorum successes are not retried if
there are superseding sync points covering the ranges - User-initiated
requests are not purged unless the request has timed out or otherwise completed
- Overlapping requests are queued up against the next to run Alsp (C*): -
Catch-up with quorums on restart - Manage an ordered set of keys in cache for
faster range searches Also (Acc [...]
f6fd49566f is described below
commit f6fd49566ff1dbcdc921a1b5876c53713e4c12d5
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Nov 7 11:26:50 2025 +0000
Improve DurabilityQueue:
- All retries are appended to a delay queue so overlaps can be pruned
- Quorum successes are not retried if there are superseding sync points
covering the ranges
- User-initiated requests are not purged unless the request has timed out
or otherwise completed
- Overlapping requests are queued up against the next to run
Alsp (C*):
- Catch-up with quorums on restart
- Manage an ordered set of keys in cache for faster range searches
Also (Accord):
- Update copy of BTree and import IntervalBTree
- Fix RedundantStatus WAS_OWNED_OVERRIDE_MASK
- Add Catchup mechanism to reach parity with a quorum on restart
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21013
---
modules/accord | 2 +-
.../org/apache/cassandra/config/AccordSpec.java | 7 +
.../cassandra/service/accord/AccordCache.java | 49 +++-
.../service/accord/AccordCommandStore.java | 13 +-
.../cassandra/service/accord/AccordService.java | 160 ++++++++----
.../cassandra/service/accord/AccordTask.java | 4 +-
.../cassandra/service/accord/OrderedKeys.java | 193 ++++++++++++++
.../serializers/GetDurableBeforeSerializers.java | 4 +-
.../accord/serializers/SetDurableSerializers.java | 31 +--
.../org/apache/cassandra/utils/btree/BTree.java | 27 +-
.../test/accord/AccordJournalIntegrationTest.java | 10 +-
.../accord/journal/AccordJournalReplayTest.java | 5 +-
.../JournalAccessRouteIndexOnStartupRaceTest.java | 4 +-
.../apache/cassandra/utils/OrderedKeysTest.java | 282 +++++++++++++++++++++
14 files changed, 702 insertions(+), 89 deletions(-)
diff --git a/modules/accord b/modules/accord
index a681b1bf7d..de03d14e48 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit a681b1bf7d9e8d1d797af2fd54bdd180a4863406
+Subproject commit de03d14e4821b669fecad5dcdb2de2dba3a9d926
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java
b/src/java/org/apache/cassandra/config/AccordSpec.java
index 8a5801e41f..1e2379a564 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -178,6 +178,13 @@ public class AccordSpec
public TransactionalMode default_transactional_mode =
TransactionalMode.off;
public boolean ephemeralReadEnabled = true;
public boolean state_cache_listener_jfr_enabled = true;
+
+ public DurationSpec.IntSecondsBound catchup_on_start_success_latency = new
DurationSpec.IntSecondsBound(60);
+ public DurationSpec.IntSecondsBound catchup_on_start_fail_latency = new
DurationSpec.IntSecondsBound(900);
+ public int catchup_on_start_max_attempts = 5;
+ public boolean catchup_on_start_exit_on_failure = true;
+ public boolean catchup_on_start = true;
+
public final JournalSpec journal = new JournalSpec();
public enum MixedTimeSourceHandling
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index 20ce3d3b1f..0a1dd623f0 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -21,19 +21,17 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Comparator;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.ToLongFunction;
-import java.util.stream.Stream;
-
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
@@ -68,6 +66,7 @@ import
org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.NoSpamLogger.NoSpamLogStatement;
import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.btree.BTree;
import static accord.utils.Invariants.illegalState;
import static accord.utils.Invariants.require;
@@ -112,6 +111,7 @@ public class AccordCache implements CacheSize
long estimateShrunkHeapSize(Object shrunk);
boolean validate(AccordCommandStore commandStore, K key, V value);
S safeRef(AccordCacheEntry<K, V> node);
+ default Comparator<K> keyComparator() { return null; }
default AccordCacheEntry<K, V> newEntry(K key, AccordCache.Type<K, V,
?>.Instance owner)
{
@@ -274,7 +274,7 @@ public class AccordCache implements CacheSize
entry.savingOrWaitingToSave().identity.onSuccess(onSuccess);
}
- private void evict(AccordCacheEntry<?, ?> node, boolean updateUnreferenced)
+ private <K> void evict(AccordCacheEntry<K, ?> node, boolean
updateUnreferenced)
{
if (logger.isTraceEnabled())
logger.trace("Evicting {}", node);
@@ -297,7 +297,7 @@ public class AccordCache implements CacheSize
if (node.status() == LOADED && VALIDATE_LOAD_ON_EVICT)
owner.validateLoadEvicted(node);
- AccordCacheEntry<?, ?> self = node.owner.cache.remove(node.key());
+ AccordCacheEntry<K, ?> self = node.owner.remove(node.key());
Invariants.require(self.references() == 0);
require(self == node, "Leaked node detected; was attempting to remove
%s but cache had %s", node, self);
node.notifyListeners(Listener::onEvict);
@@ -400,6 +400,8 @@ public class AccordCache implements CacheSize
// TODO (desired): don't need to store key separately as stored in
node; ideally use a hash set that allows us to get the current entry
private final Map<K, AccordCacheEntry<K, V>> cache = new
Object2ObjectHashMap<>();
private List<Listener<K, V>> listeners = null;
+ // TODO (expected): update this after releasing the lock
+ private OrderedKeys<K> orderedKeys;
public Instance(AccordCommandStore commandStore)
{
@@ -435,7 +437,6 @@ public class AccordCache implements CacheSize
private AccordCacheEntry<K, V> acquire(K key, boolean onlyIfLoaded)
{
- @SuppressWarnings("unchecked")
AccordCacheEntry<K, V> node = cache.get(key);
return node == null
? acquireAbsent(key, onlyIfLoaded)
@@ -454,8 +455,10 @@ public class AccordCache implements CacheSize
node.increment();
Object prev = cache.put(key, node);
- node.initSize(parent());
Invariants.require(prev == null, "%s not absent from cache: %s
already present", key, node);
+ if (orderedKeys != null)
+ orderedKeys.add(key);
+ node.initSize(parent());
++size;
node.notifyListeners(Listener::onAdd);
maybeShrinkOrEvictSomeNodes();
@@ -555,9 +558,12 @@ public class AccordCache implements CacheSize
maybeShrinkOrEvictSomeNodes();
}
- public Stream<AccordCacheEntry<K, V>> stream()
+ AccordCacheEntry<K, ?> remove(K key)
{
- return cache.values().stream();
+ AccordCacheEntry<K, ?> result = cache.remove(key);
+ if (orderedKeys != null && result != null)
+ orderedKeys.remove(key);
+ return result;
}
final Type<K, V, S> parent()
@@ -565,6 +571,14 @@ public class AccordCache implements CacheSize
return Type.this;
}
+ public Iterable<K> keysBetween(K start, boolean startInclusive, K
end, boolean endInclusive)
+ {
+ if (orderedKeys == null)
+ orderedKeys = new OrderedKeys<>(adapter.keyComparator(),
cache.keySet());
+
+ return orderedKeys.between(start, startInclusive, end,
endInclusive);
+ }
+
@Override
public Iterator<AccordCacheEntry<K, V>> iterator()
{
@@ -602,11 +616,6 @@ public class AccordCache implements CacheSize
return cache.get(key);
}
- public Set<K> keySet()
- {
- return cache.keySet();
- }
-
@VisibleForTesting
public boolean isReferenced(K key)
{
@@ -1039,6 +1048,12 @@ public class AccordCache implements CacheSize
{
return newNode.apply(key, owner);
}
+
+ @Override
+ public Comparator<K> keyComparator()
+ {
+ return Comparator.comparing(a -> ((Comparable) a));
+ }
}
static class SettableWrapper<K, V, S> extends FunctionalAdapter<K, V, S>
@@ -1148,6 +1163,12 @@ public class AccordCache implements CacheSize
{
return new AccordSafeCommandsForKey(node);
}
+
+ @Override
+ public Comparator<RoutingKey> keyComparator()
+ {
+ return RoutingKey::compareAsRoutingKey;
+ }
}
public static class CommandAdapter implements Adapter<TxnId, Command,
AccordSafeCommand>
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index d47f0ed0c3..5c1df8f136 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -55,6 +55,7 @@ import accord.local.RedundantBefore;
import accord.local.SafeCommandStore;
import accord.local.cfk.CommandsForKey;
import accord.primitives.PartialTxn;
+import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.RoutableKey;
import accord.primitives.Route;
@@ -490,12 +491,16 @@ public class AccordCommandStore extends CommandStore
Ready ready = new Ready();
try (ExclusiveCaches caches = lockCaches())
{
- for (AccordCacheEntry<RoutingKey, CommandsForKey> e :
caches.commandsForKeys())
+ for (Range range : ranges)
{
- if (ranges.contains(e.key()) && e.isModified())
+ for (RoutingKey k :
caches.commandsForKeys().keysBetween(range.start(), range.startInclusive(),
range.end(), range.endInclusive()))
{
- ready.increment();
- caches.global().saveWhenReadyExclusive(e, ready);
+ AccordCacheEntry<RoutingKey, CommandsForKey> e =
caches.commandsForKeys().getUnsafe(k);
+ if (e.isModified())
+ {
+ ready.increment();
+ caches.global().saveWhenReadyExclusive(e, ready);
+ }
}
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index bcf58650a9..fa26ea24b6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -38,13 +38,16 @@ import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
+import accord.local.Catchup;
import accord.topology.ActiveEpochs;
import accord.topology.EpochReady;
import accord.primitives.Txn;
+import org.apache.cassandra.config.AccordSpec;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
import org.apache.cassandra.metrics.AccordReplicaMetrics;
import org.apache.cassandra.metrics.AccordSystemMetrics;
import org.apache.cassandra.service.accord.api.AccordViolationHandler;
-import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import org.slf4j.Logger;
@@ -142,6 +145,7 @@ import static
org.apache.cassandra.config.DatabaseDescriptor.getAccordShardDurab
import static
org.apache.cassandra.config.DatabaseDescriptor.getAccordShardDurabilityMaxSplits;
import static
org.apache.cassandra.config.DatabaseDescriptor.getAccordShardDurabilityTargetSplits;
import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
+import static org.apache.cassandra.db.SystemKeyspace.BootstrapState.COMPLETED;
import static org.apache.cassandra.journal.Params.ReplayMode.RESET;
import static
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordReadBookkeeping;
import static
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.accordWriteBookkeeping;
@@ -288,30 +292,7 @@ public class AccordService implements IAccordService,
Shutdownable
AccordService as = new
AccordService(AccordTopology.tcmIdToAccord(tcmId));
unsafeInstance = as;
- as.node.unsafeSetReplaying(true);
- try
- {
- as.startup();
- replayJournal(as);
- }
- finally
- {
- as.node.unsafeSetReplaying(false);
- }
-
- as.finishInitialization();
-
- as.fastPathCoordinator.start();
-
-
ClusterMetadataService.instance().log().addListener(as.fastPathCoordinator);
-
as.node.durability().shards().reconfigure(Ints.checkedCast(getAccordShardDurabilityTargetSplits()),
-
Ints.checkedCast(getAccordShardDurabilityMaxSplits()),
-
Ints.checkedCast(getAccordShardDurabilityCycle(SECONDS)), SECONDS);
-
as.node.durability().global().setGlobalCycleTime(Ints.checkedCast(getAccordGlobalDurabilityCycle(SECONDS)),
SECONDS);
- as.state = State.STARTED;
- // Only enable durability scheduling _after_ we have fully replayed
journal
- as.node.durability().start();
-
+ as.startup();
instance = as;
AccordReplicaMetrics.touch();
@@ -326,7 +307,7 @@ public class AccordService implements IAccordService,
Shutdownable
public static boolean replayJournal(AccordService as)
{
logger.info("Starting journal replay.");
- long before = Clock.Global.nanoTime();
+ long start = nanoTime();
if (as.journalConfiguration().replayMode() == RESET)
AccordKeyspace.truncateCommandsForKey();
@@ -337,8 +318,8 @@ public class AccordService implements IAccordService,
Shutdownable
as.journal.unsafeSetStarted();
as.node.commandStores().forAllUnsafe(cs ->
cs.unsafeProgressLog().start());
- long after = Clock.Global.nanoTime();
- logger.info("Finished journal replay. {}ms elapsed",
NANOSECONDS.toMillis(after - before));
+ long end = nanoTime();
+ logger.info("Finished journal replay. {}s elapsed",
String.format("%.2f", NANOSECONDS.toMillis(end - start)/1000.0));
return true;
}
@@ -414,29 +395,117 @@ public class AccordService implements IAccordService,
Shutdownable
{
if (state != State.INIT)
return;
- journal.start(node);
- node.load();
- ClusterMetadata metadata = ClusterMetadata.current();
- endpointMapper.updateMapping(metadata);
+ node.unsafeSetReplaying(true);
+ try
+ {
+ journal.start(node);
+ node.load();
+
+ ClusterMetadata metadata = ClusterMetadata.current();
+ endpointMapper.updateMapping(metadata);
+
+ List<TopologyUpdate> images = journal.replayTopologies();
+ if (!images.isEmpty())
+ {
+ // Initialise command stores using latest topology from the
log;
+ // if there are no local command stores, don't report any
topologies and simply fetch the latest known in the cluster
+ // this avoids a registered (not joined) node learning of
topologies, then later restarting with some intervening
+ // epochs having been garbage collected by the other nodes in
the cluster
+ TopologyUpdate last = images.get(images.size() - 1);
+ if (!last.commandStores.isEmpty())
+ {
+ node.commandStores().initializeTopologyUnsafe(last);
+
+ // Replay local epochs
+ for (TopologyUpdate image : images)
+ node.topology().reportTopology(image.global);
+ }
+ }
+ replayJournal(this);
+ }
+ finally
+ {
+ node.unsafeSetReplaying(false);
+ }
- List<TopologyUpdate> images = journal.replayTopologies();
- if (!images.isEmpty())
+ finishInitialization();
+ catchup();
+
+ fastPathCoordinator.start();
+
ClusterMetadataService.instance().log().addListener(fastPathCoordinator);
+
+
node.durability().shards().reconfigure(Ints.checkedCast(getAccordShardDurabilityTargetSplits()),
+
Ints.checkedCast(getAccordShardDurabilityMaxSplits()),
+
Ints.checkedCast(getAccordShardDurabilityCycle(SECONDS)), SECONDS);
+
node.durability().global().setGlobalCycleTime(Ints.checkedCast(getAccordGlobalDurabilityCycle(SECONDS)),
SECONDS);
+ // Only enable durability scheduling _after_ we have fully replayed
journal
+ node.durability().start();
+ state = State.STARTED;
+ }
+
+ void catchup()
+ {
+ AccordSpec spec = DatabaseDescriptor.getAccord();
+ if (!spec.catchup_on_start)
{
- // Initialise command stores using latest topology from the log;
- // if there are no local command stores, don't report any
topologies and simply fetch the latest known in the cluster
- // this avoids a registered (not joined) node learning of
topologies, then later restarting with some intervening
- // epochs having been garbage collected by the other nodes in the
cluster
- TopologyUpdate last = images.get(images.size() - 1);
- if (!last.commandStores.isEmpty())
+ logger.info("Not catching up with peers");
+ return;
+ }
+
+ BootstrapState bootstrapState = SystemKeyspace.getBootstrapState();
+ if (bootstrapState == COMPLETED)
+ {
+ long maxLatencyNanos =
spec.catchup_on_start_fail_latency.toNanoseconds();
+ int attempts = 1;
+ while (true)
{
- node.commandStores().initializeTopologyUnsafe(last);
+ logger.info("Catching up with quorum...");
+ long start = nanoTime();
+ long failAt = start + maxLatencyNanos;
+ Future<Void> f = toFuture(Catchup.catchup(node));
+ if (!f.awaitUntilThrowUncheckedOnInterrupt(failAt))
+ {
+ if (spec.catchup_on_start_exit_on_failure)
+ {
+ logger.error("Catch up exceeded maximum latency of
{}ns; shutting down", maxLatencyNanos);
+ throw new RuntimeException("Could not catch up with
peers");
+ }
+ logger.error("Catch up exceeded maximum latency of {}ns;
starting up", maxLatencyNanos);
+ break;
+ }
+
+ Throwable failed = f.cause();
+ if (failed != null)
+ throw new RuntimeException("Could not catch up with
peers", failed);
+
+ long end = nanoTime();
+ double seconds = NANOSECONDS.toMillis(end - start)/1000.0;
+ logger.info("Finished catching up with all quorums. {}s
elapsed.", String.format("%.2f", seconds));
+
+ // TODO (expected): make configurable
+ if (seconds <=
spec.catchup_on_start_success_latency.toSeconds())
+ break;
- // Replay local epochs
- for (TopologyUpdate image : images)
- node.topology().reportTopology(image.global);
+ if (++attempts > spec.catchup_on_start_max_attempts)
+ {
+ if (spec.catchup_on_start_exit_on_failure)
+ {
+ logger.error("Catch up was slow, aborting after {}
attempts and shutting down", attempts);
+ throw new RuntimeException("Could not catch up with
peers");
+ }
+
+ logger.info("Catch up was slow; continuing to startup
after {} attempts.", attempts - 1);
+ break;
+ }
+
+ logger.info("Catch up was slow, so we may behind again;
retrying");
}
}
+ else
+ {
+ logger.info("Not catching up with quorum, as bootstrap state is
{}", bootstrapState);
+ }
}
/**
@@ -444,8 +513,7 @@ public class AccordService implements IAccordService,
Shutdownable
* the latest epoch known to the node prior to restart. After that, we
replay journal itself, and only after
* that we finish initializaiton and replay the rest of epochs.
*/
- @VisibleForTesting
- public void finishInitialization()
+ void finishInitialization()
{
endpointMapper.updateMapping(ClusterMetadata.current());
TopologyManager topology = node.topology();
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index 55d5e8b1aa..d2b87d25b1 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -1001,9 +1001,9 @@ public abstract class AccordTask<R> extends
SubmittableTask implements Function<
void startInternal(Caches caches)
{
- for (RoutingKey key : caches.commandsForKeys().keySet())
+ for (Range range : ranges)
{
- if (ranges.contains(key))
+ for (RoutingKey key :
caches.commandsForKeys().keysBetween(range.start(), range.startInclusive(),
range.end(), range.endInclusive()))
intersectingKeys.add((TokenKey) key);
}
caches.commandsForKeys().register(keyWatcher);
diff --git a/src/java/org/apache/cassandra/service/accord/OrderedKeys.java
b/src/java/org/apache/cassandra/service/accord/OrderedKeys.java
new file mode 100644
index 0000000000..a8f7acf6f4
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/OrderedKeys.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.accord;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import org.apache.cassandra.utils.BulkIterator;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
+
+/**
+ * Maintains an ordered set of keys backed by a BTree, buffering additions and
removals for batch updates.
+ */
+public class OrderedKeys<K> implements Iterable<K>
+{
+ private final Comparator<K> comparator;
+
+ private Object[] map;
+ private final Object[] buffer = new Object[32];
+ private int bufferRemovals; // Bit field: 1 = removal, 0 = addition
+ private int bufferCount;
+
+ public OrderedKeys(Comparator<K> comparator)
+ {
+ this.comparator = comparator;
+ this.map = BTree.empty();
+ }
+
+ public OrderedKeys(Comparator<K> comparator, Collection<K> init)
+ {
+ this.comparator = comparator;
+ BTree.Builder<K> builder = BTree.builder(comparator);
+ for (K k : init)
+ builder.add(k);
+ this.map = builder.build();
+ }
+
+ public void add(K key, int removalBit)
+ {
+ if (bufferCount == buffer.length)
+ flush();
+ bufferRemovals |= removalBit << bufferCount;
+ buffer[bufferCount++] = key;
+ }
+
+ public void add(K key)
+ {
+ add(key, 0);
+ }
+
+ public void remove(K key)
+ {
+ add(key, 1);
+ }
+
+ /**
+ * Flushes all buffered additions and removals to the underlying BTree.
+ */
+ public void flush()
+ {
+ if (bufferCount == 0)
+ return;
+
+ // Partition the buffer: additions first, then removals
+ int additionEnd = 0, removalStart = bufferCount;
+ while (additionEnd != removalStart)
+ {
+ if (0 == (bufferRemovals & (1 << additionEnd))) ++additionEnd;
+ else if (0 != (bufferRemovals & (1 << (removalStart - 1))))
--removalStart;
+ else
+ {
+ Object tmp = buffer[additionEnd];
+ buffer[additionEnd++] = buffer[--removalStart];
+ buffer[removalStart] = tmp;
+ }
+ }
+
+ // Sort both partitions
+ Arrays.sort(buffer, 0, removalStart, (Comparator) comparator);
+ Arrays.sort(buffer, removalStart, bufferCount, (Comparator)
comparator);
+
+ int i = 0, j = removalStart;
+
+ // Merge additions and removals
+ additionEnd = 0;
+ int removalEnd = removalStart;
+ while (i < removalStart && j < bufferCount)
+ {
+ int c = comparator.compare((K) buffer[i], (K) buffer[j]);
+ if (c == 0)
+ {
+ // matched pairs can be ignored
+ ++i; ++j;
+ }
+ else if (c < 0)
+ {
+ // Addition is smaller
+ if (additionEnd != i)
+ buffer[additionEnd] = buffer[i];
+ ++additionEnd;
+ ++i;
+ }
+ else
+ {
+ // Removal is smaller
+ if (removalEnd != j)
+ buffer[removalEnd] = buffer[j];
+ ++removalEnd;
+ ++j;
+ }
+ }
+
+ // Copy remaining additions
+ if (i != removalStart)
+ {
+ if (additionEnd == i) additionEnd = removalStart;
+ else
+ {
+ int count = removalStart - i;
+ System.arraycopy(buffer, i, buffer, additionEnd, count);
+ additionEnd += count;
+ }
+ }
+
+ // Copy remaining removals
+ if (j != bufferCount)
+ {
+ if (removalEnd == j) removalEnd = bufferCount;
+ else
+ {
+ int count = bufferCount - j;
+ System.arraycopy(buffer, j, buffer, removalEnd, count);
+ removalEnd += count;
+ }
+ }
+
+ // Apply additions to BTree
+ if (additionEnd != 0)
+ map = BTree.update(map, BTree.build(BulkIterator.of(buffer, 0),
additionEnd, UpdateFunction.noOp()), comparator);
+
+ // Apply removals from BTree
+ if (removalEnd != removalStart)
+ map = BTree.subtract(map, BTree.build(BulkIterator.of(buffer,
removalStart), removalEnd - removalStart, UpdateFunction.noOp()), comparator);
+
+ // Clear buffer
+ Arrays.fill(buffer, 0, bufferCount, null);
+ bufferRemovals = 0;
+ bufferCount = 0;
+ }
+
+ public Iterable<K> between(K start, boolean startInclusive, K end, boolean
endInclusive)
+ {
+ return () -> {
+ flush();
+ return BTree.slice(map, comparator, start, startInclusive, end,
endInclusive, BTree.Dir.ASC);
+ };
+ }
+
+ public Iterator<K> iterator()
+ {
+ flush();
+ return BTree.iterator(map);
+ }
+
+ public int size()
+ {
+ flush();
+ return BTree.size(map);
+ }
+
+ public int bufferSize()
+ {
+ return bufferCount;
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/GetDurableBeforeSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/GetDurableBeforeSerializers.java
index c314e51a79..4ba6d7366f 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/GetDurableBeforeSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/GetDurableBeforeSerializers.java
@@ -52,7 +52,7 @@ public class GetDurableBeforeSerializers
@Override
public void serialize(DurableBeforeReply msg, DataOutputPlus out)
throws IOException
{
-
CommandStoreSerializers.durableBefore.serialize(msg.durableBeforeMap, out);
+ CommandStoreSerializers.durableBefore.serialize(msg.durableBefore,
out);
}
@Override
@@ -64,7 +64,7 @@ public class GetDurableBeforeSerializers
@Override
public long serializedSize(DurableBeforeReply msg)
{
- return
CommandStoreSerializers.durableBefore.serializedSize(msg.durableBeforeMap);
+ return
CommandStoreSerializers.durableBefore.serializedSize(msg.durableBefore);
}
};
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
index c855103b42..a5e7ef92dc 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/SetDurableSerializers.java
@@ -22,8 +22,9 @@ import java.io.IOException;
import accord.messages.SetGloballyDurable;
import accord.messages.SetShardDurable;
import accord.primitives.Deps;
-import accord.primitives.FullRoute;
-import accord.primitives.SyncPoint;
+import accord.primitives.MinimalSyncPoint;
+import accord.primitives.RangeRoute;
+import accord.primitives.Route;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import org.apache.cassandra.io.UnversionedSerializer;
@@ -38,7 +39,7 @@ public class SetDurableSerializers
@Override
public void serialize(SetShardDurable msg, DataOutputPlus out) throws
IOException
{
- syncPoint.serialize(msg.exclusiveSyncPoint, out);
+ syncPoint.serialize(msg.syncPoint, out);
CommandSerializers.outcomeDurability.serialize(msg.durability,
out);
}
@@ -52,7 +53,7 @@ public class SetDurableSerializers
@Override
public long serializedSize(SetShardDurable msg)
{
- return syncPoint.serializedSize(msg.exclusiveSyncPoint)
+ return syncPoint.serializedSize(msg.syncPoint)
+
CommandSerializers.outcomeDurability.serializedSize(msg.durability);
}
};
@@ -78,34 +79,34 @@ public class SetDurableSerializers
}
};
- public static final UnversionedSerializer<SyncPoint> syncPoint = new
UnversionedSerializer<>()
+ public static final UnversionedSerializer<MinimalSyncPoint> syncPoint =
new UnversionedSerializer<>()
{
@Override
- public void serialize(SyncPoint sp, DataOutputPlus out) throws
IOException
+ public void serialize(MinimalSyncPoint sp, DataOutputPlus out) throws
IOException
{
CommandSerializers.txnId.serialize(sp.syncId, out);
ExecuteAtSerializer.serialize(sp.syncId, sp.executeAt, out);
- DepsSerializers.deps.serialize(sp.waitFor, out);
- KeySerializers.fullRoute.serialize(sp.route, out);
+ DepsSerializers.deps.serialize(Deps.NONE, out);
+ KeySerializers.route.serialize(sp.route, out);
}
@Override
- public SyncPoint deserialize(DataInputPlus in) throws IOException
+ public MinimalSyncPoint deserialize(DataInputPlus in) throws
IOException
{
TxnId syncId = CommandSerializers.txnId.deserialize(in);
Timestamp executeAt = ExecuteAtSerializer.deserialize(syncId, in);
- Deps waitFor = DepsSerializers.deps.deserialize(in);
- FullRoute<?> route = KeySerializers.fullRoute.deserialize(in);
- return SyncPoint.SerializationSupport.construct(syncId, executeAt,
waitFor, route);
+ DepsSerializers.deps.deserialize(in);
+ Route<?> route = KeySerializers.route.deserialize(in);
+ return MinimalSyncPoint.SerializationSupport.construct(syncId,
executeAt, (RangeRoute) route);
}
@Override
- public long serializedSize(SyncPoint sp)
+ public long serializedSize(MinimalSyncPoint sp)
{
return CommandSerializers.txnId.serializedSize(sp.syncId)
+ ExecuteAtSerializer.serializedSize(sp.syncId,
sp.executeAt)
- + DepsSerializers.deps.serializedSize(sp.waitFor)
- + KeySerializers.fullRoute.serializedSize(sp.route);
+ + DepsSerializers.deps.serializedSize(Deps.NONE)
+ + KeySerializers.route.serializedSize(sp.route);
}
};
}
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java
b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 0ef17c1696..0f84b6a5a9 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -75,7 +75,7 @@ public class BTree
* _DO NOT_ attempt to modify this field directly. Instead, use
BRANCH_SHIFT above instead, as branch factor
* should _always_ be a power of 2.
*/
- static final int BRANCH_FACTOR = 1 << BRANCH_SHIFT;
+ public static final int BRANCH_FACTOR = 1 << BRANCH_SHIFT;
public static final int MIN_KEYS = BRANCH_FACTOR / 2 - 1;
public static final int MAX_KEYS = BRANCH_FACTOR - 1;
public static final long STOP_SENTINEL_VALUE = Long.MAX_VALUE;
@@ -538,6 +538,17 @@ public class BTree
}
}
+ /**
+ * Subtracts {@code subtract} from {@code update}.
+ */
+ public static <Compare> Object[] subtract(Object[] toUpdate, Object[]
subtract, Comparator<Compare> comparator)
+ {
+ try (Subtraction subtraction = Subtraction.get(comparator))
+ {
+ return subtraction.subtract(toUpdate, subtract);
+ }
+ }
+
public static void reverseInSitu(Object[] tree)
{
reverseInSitu(tree, height(tree), true);
@@ -4240,6 +4251,20 @@ public class BTree
}
}
+ static class Subtraction<K, T extends K> extends AbstractSubtraction<K, T>
+ {
+ static final ThreadLocal<Subtraction> SHARED = new ThreadLocal<>();
+
+ static <K, T extends K> Subtraction<K, T> get(Comparator<K> comparator)
+ {
+ Subtraction subtraction = SHARED.get();
+ if (subtraction == null)
+ SHARED.set(subtraction = new Subtraction());
+ subtraction.comparator = comparator;
+ return subtraction;
+ }
+ }
+
private static abstract class AbstractTransformer<I, O> extends
AbstractSeekingTransformer<I, O>
{
@Override
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
index f77bfa2016..d0d3db5e01 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
@@ -47,6 +47,7 @@ public class AccordJournalIntegrationTest extends TestBaseImpl
{
try (WithProperties wp = new
WithProperties().set(CassandraRelevantProperties.DTEST_ACCORD_JOURNAL_SANITY_CHECK_ENABLED,
"true");
Cluster cluster = init(Cluster.build(1)
+ .withConfig(config ->
config.set("accord.catchup_on_start", "false"))
.withoutVNodes()
.start()))
{
@@ -95,6 +96,7 @@ public class AccordJournalIntegrationTest extends TestBaseImpl
{
try (Cluster cluster = Cluster.build(1)
.withoutVNodes()
+ .withConfig(config ->
config.set("accord.catchup_on_start", "false"))
.start())
{
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
@@ -122,7 +124,13 @@ public class AccordJournalIntegrationTest extends
TestBaseImpl
@Test
public void restartWithEpochChanges() throws IOException
{
- try (Cluster cluster = Cluster.build(3).withoutVNodes().withConfig(c
-> c.with(GOSSIP).with(NETWORK)).start())
+ try (Cluster cluster = Cluster.build(3)
+ .withoutVNodes()
+ .withConfig(c -> {
+ c.with(GOSSIP).with(NETWORK);
+ c.set("accord.catchup_on_start",
"false");
+ })
+ .start())
{
init(cluster);
final String TABLE = createTable(cluster);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalReplayTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalReplayTest.java
index 0e8a01c228..efabf33022 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalReplayTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/AccordJournalReplayTest.java
@@ -95,6 +95,7 @@ public class AccordJournalReplayTest extends TestBaseImpl
.set("accord.shard_durability_target_splits", "1")
.set("accord.retry_syncpoint", "1s*attempts")
.set("accord.retry_durability", "1s*attempts")
+
.set("accord.catchup_on_start", "false")
.with(NETWORK, GOSSIP))
.start())
{
@@ -125,8 +126,8 @@ public class AccordJournalReplayTest extends TestBaseImpl
Command command = Command.Executed.executed(txnId,
SaveStatus.PreApplied, Status.Durability.NotDurable,
StoreParticipants.execute(commandStore.unsafeGetRangesForEpoch(), route, txnId,
txnId.epoch()), Ballot.ZERO, txnId, txn.intersecting(route, true),
deps.intersecting(route), Ballot.ZERO, waitingOn, writes,
ResultSerializers.APPLIED);
commandStore.journal.saveCommand(commandStore.id(), new
Journal.CommandUpdate(null, command), () -> {});
- SyncPoint<accord.primitives.Range> syncPoint =
AccordService.getBlocking(CoordinateSyncPoint.exclusive(node, syncPointId,
Ranges.of(key.asRange())));
- AccordService.getBlocking(ExecuteSyncPoint.coordinate(node,
syncPoint, 1).onQuorum());
+ SyncPoint syncPoint =
AccordService.getBlocking(CoordinateSyncPoint.exclusive(node, syncPointId,
Ranges.of(key.asRange())));
+ AccordService.getBlocking(ExecuteSyncPoint.coordinate(node,
syncPoint, 1).onQuorumOrDone());
Keyspace.open("ks").getColumnFamilyStore("tbl").forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
Keyspace.open("system_accord").getColumnFamilyStore("commands_for_key").forceBlockingFlush(ColumnFamilyStore.FlushReason.UNIT_TESTS);
spinUntilTrue(() -> {
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
index c9ec047071..039bfde51c 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/journal/JournalAccessRouteIndexOnStartupRaceTest.java
@@ -64,7 +64,9 @@ public class JournalAccessRouteIndexOnStartupRaceTest extends
TestBaseImpl
@Test
public void test() throws IOException
{
- try (Cluster cluster =
Cluster.build(1).withInstanceInitializer(BBHelper::install).start())
+ try (Cluster cluster = Cluster.build(1)
+ .withConfig(config ->
config.set("accord.catchup_on_start", "false"))
+
.withInstanceInitializer(BBHelper::install).start())
{
IInvokableInstance node = cluster.get(1);
node.nodetoolResult("disableautocompaction", ACCORD_KEYSPACE_NAME,
JOURNAL).asserts().success();
diff --git a/test/unit/org/apache/cassandra/utils/OrderedKeysTest.java
b/test/unit/org/apache/cassandra/utils/OrderedKeysTest.java
new file mode 100644
index 0000000000..57dac2c393
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/OrderedKeysTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.service.accord.OrderedKeys;
+import org.quicktheories.WithQuickTheories;
+import org.quicktheories.core.Gen;
+import org.quicktheories.generators.SourceDSL;
+import org.quicktheories.impl.Constraint;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class OrderedKeysTest implements WithQuickTheories
+{
+ private Gen<List<Op>> operationsGen()
+ {
+ Gen<Integer> sizeGen = SourceDSL.integers().between(0, 200);
+ Gen<Integer> keyGen = SourceDSL.integers().between(0, 1000);
+ Gen<Boolean> opGen = SourceDSL.booleans().all();
+ return rng -> {
+ Set<Integer> additions = new HashSet<>();
+ int size = sizeGen.generate(rng);
+ List<Op> ops = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ {
+ while (true)
+ {
+ if (opGen.generate(rng))
+ {
+ int key = keyGen.generate(rng);
+ // Do not add same key twice
+ if (additions.contains(key))
+ continue;
+ ops.add(new AddOp(key));
+ additions.add(key);
+ }
+ else if (!additions.isEmpty())
+ {
+ int idx = 0;
+ if (additions.size() > 1)
+ idx = (int) rng.next(Constraint.between(0,
additions.size() - 1));
+
+ Iterator<Integer> iter = additions.iterator();
+ Integer key = iter.next();
+ for (int j = 0; j < idx; j++)
+ key = iter.next();
+
+ ops.add(new RemoveOp(key));
+ additions.remove(key);
+ }
+ break;
+ }
+ }
+
+ return ops;
+ };
+ }
+
+ interface Op
+ {
+ void apply(OrderedKeys<Integer> orderedKeys, TreeSet<Integer>
reference, Map<Integer, Object> cache);
+ }
+
+ static class AddOp implements Op
+ {
+ final Integer key;
+
+ AddOp(Integer key)
+ {
+ this.key = key;
+ }
+
+ @Override
+ public void apply(OrderedKeys<Integer> orderedKeys, TreeSet<Integer>
model, Map<Integer, Object> cache)
+ {
+ orderedKeys.add(key);
+ cache.put(key, key.toString());
+ model.add(key);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Add(" + key + ")";
+ }
+ }
+
+ static class RemoveOp implements Op
+ {
+ final Integer key;
+
+ RemoveOp(Integer key)
+ {
+ this.key = key;
+ }
+
+ @Override
+ public void apply(OrderedKeys<Integer> orderedKeys, TreeSet<Integer>
model, Map<Integer, Object> cache)
+ {
+ orderedKeys.remove(key);
+ Object removed = cache.remove(key);
+ Assert.assertEquals(removed, key.toString());
+ Invariants.require(model.remove(key));
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Remove(" + key + ")";
+ }
+ }
+
+ @Test
+ public void simpleTest()
+ {
+ OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+ keys.add(1);
+ keys.add(5);
+ keys.add(3);
+
+ keys.flush();
+ assertEquals(3, keys.size());
+
+ Iterator<Integer> iter = keys.iterator();
+ assertEquals(Integer.valueOf(1), iter.next());
+ assertEquals(Integer.valueOf(3), iter.next());
+ assertEquals(Integer.valueOf(5), iter.next());
+ assertFalse(iter.hasNext());
+
+ // Remove a key
+ keys.remove(3);
+ keys.flush();
+ assertEquals(2, keys.size());
+
+ iter = keys.iterator();
+ assertEquals(Integer.valueOf(1), iter.next());
+ assertEquals(Integer.valueOf(5), iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testDuplicates()
+ {
+ OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+ keys.add(1);
+ keys.remove(1);
+ keys.add(1);
+
+ keys.flush();
+
+ Iterator<Integer> iter = keys.iterator();
+ assertEquals((Integer) 1, iter.next());
+ assertFalse(iter.hasNext());
+ }
+
+ @Test
+ public void testBetween()
+ {
+ OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+
+ for (int i = 0; i < 10; i++)
+ keys.add(i);
+ keys.flush();
+
+ // incl
+ List<Integer> result = new ArrayList<>();
+ keys.between(3, true, 7, true).forEach(result::add);
+ assertEquals(List.of(3, 4, 5, 6, 7), result);
+
+ // excl
+ result.clear();
+ keys.between(3, false, 7, false).forEach(result::add);
+ assertEquals(List.of(4, 5, 6), result);
+
+ // incl/excl
+ result.clear();
+ keys.between(3, true, 7, false).forEach(result::add);
+ assertEquals(List.of(3, 4, 5, 6), result);
+ }
+
+ @Test
+ public void testBuffering()
+ {
+ OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+
+ for (int i = 0; i < 10; i++)
+ keys.add(i);
+
+ assertTrue("Buffer should have pending operations", keys.bufferSize()
== 10);
+
+ // After flush, buffer should be empty
+ keys.flush();
+ assertEquals(0, keys.bufferSize());
+ assertEquals(10, keys.size());
+ }
+
+ @Test
+ public void testFlushOnOverflow()
+ {
+ OrderedKeys<Integer> keys = new OrderedKeys<>(Integer::compareTo);
+
+ // add more than 32 keys (default buffer size)
+ for (int i = 0; i < 50; i++)
+ keys.add(i);
+
+ assertTrue("Buffer should have flushed", keys.bufferSize() == (50 -
32));
+ }
+
+ @Test
+ public void randomOperationsMatchReference()
+ {
+ qt().withExamples(1000)
+ .withShrinkCycles(0)
+ .withFixedSeed(1)
+ .forAll(operationsGen())
+ .checkAssert((operations) -> {
+ Map<Integer, Object> cache = new HashMap<>();
+ OrderedKeys<Integer> orderedKeys = new
OrderedKeys<>(Integer::compareTo);
+ TreeSet<Integer> model = new TreeSet<>();
+
+ for (int i = 0; i < operations.size(); i++)
+ {
+ Op op = operations.get(i);
+ op.apply(orderedKeys, model, cache);
+ if (Math.random() < 0.1)
+ {
+ orderedKeys.flush();
+ assertMatchesReference(operations.subList(0, i),
orderedKeys, model, cache, "After: " + op);
+ }
+ }
+
+ orderedKeys.flush();
+ assertMatchesReference(operations, orderedKeys, model, cache,
"Final state");
+ });
+ }
+
+ private void assertMatchesReference(List<Op> ops, OrderedKeys<Integer>
orderedKeys, TreeSet<Integer> model, Map<Integer, Object> cache, String
message)
+ {
+ List<Integer> actualList = new ArrayList<>();
+ orderedKeys.iterator().forEachRemaining(actualList::add);
+ List<Integer> expectedList = new ArrayList<>(model);
+
+ if (!expectedList.equals(actualList))
+ {
+ throw new AssertionError(String.format("%s\n" +
+ "Ops: %s\n" +
+ "Ordered Keys: %s\n" +
+ "Cache: %s\n" +
+ "Model: %s",
message, ops, actualList, cache, model));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]