This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new fdf009b40e CASSANDRA-19825: Fix various bugs and abstraction
deficiencies, including:
fdf009b40e is described below
commit fdf009b40e95aaf8242313cdb724f28ef948ca81
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Aug 2 14:17:42 2024 +0100
CASSANDRA-19825: Fix various bugs and abstraction deficiencies, including:
- Remove concept of non-participating home keys; home keys are required to
be a participant in the transaction
- Remove covering/covers concept
- Various invalidation/truncation/erase behaviours
patch by Benedict; reviewed by Blake for CASSANDRA-19825
---
modules/accord | 2 +-
.../db/compaction/CompactionIterator.java | 16 +++++-----
.../index/accord/CheckpointIntervalArrayIndex.java | 2 +-
.../service/accord/AccordFetchCoordinator.java | 6 +++-
.../service/accord/AccordObjectSizes.java | 16 +++++-----
.../cassandra/service/accord/AccordService.java | 11 ++++---
.../cassandra/service/accord/AccordTopology.java | 13 ++++----
.../cassandra/service/accord/IAccordService.java | 18 +++++++++--
.../service/accord/api/AccordTopologySorter.java | 3 +-
.../service/accord/fastpath/FastPathStrategy.java | 4 +--
.../fastpath/InheritKeyspaceFastPathStrategy.java | 4 +--
.../fastpath/ParameterizedFastPathStrategy.java | 13 ++++----
.../accord/fastpath/SimpleFastPathStrategy.java | 21 +++++++------
.../accord/repair/RepairSyncPointAdapter.java | 2 +-
.../accord/serializers/CommandSerializers.java | 6 +---
.../service/accord/serializers/DepsSerializer.java | 12 ++++----
.../serializers/InformHomeDurableSerializers.java | 13 ++------
.../service/accord/serializers/KeySerializers.java | 36 ++++++----------------
.../accord/serializers/TopologySerializers.java | 4 +--
.../cassandra/service/accord/txn/TxnRead.java | 16 ++++++++--
.../cassandra/service/accord/txn/TxnUpdate.java | 9 ++++++
.../accord/txn/UnrecoverableRepairUpdate.java | 7 +++++
.../cassandra/utils/CollectionSerializers.java | 10 ++++++
.../compaction/CompactionAccordIteratorsTest.java | 9 ++++--
.../index/accord/AccordIndexStressTest.java | 2 +-
.../cassandra/index/accord/RouteIndexTest.java | 2 +-
.../service/accord/AccordCommandStoreTest.java | 3 +-
.../service/accord/AccordCommandTest.java | 6 ++--
.../accord/AccordConfigurationServiceTest.java | 4 +--
.../service/accord/AccordKeyspaceTest.java | 2 +-
.../service/accord/AccordMessageSinkTest.java | 5 +--
.../cassandra/service/accord/AccordTestUtils.java | 15 ++++-----
.../service/accord/AccordTopologyUtils.java | 4 +--
.../ParameterizedFastPathStrategyTest.java | 4 +--
.../serializers/CheckStatusSerializersTest.java | 2 +-
.../serializers/CommandsForKeySerializerTest.java | 2 +-
.../org/apache/cassandra/utils/RangeTreeTest.java | 2 +-
37 files changed, 173 insertions(+), 133 deletions(-)
diff --git a/modules/accord b/modules/accord
index 5f360e0b5b..d6182e3b75 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 5f360e0b5b197156df0ef3d9985cd94d18ea1c92
+Subproject commit d6182e3b755920f908263ce0524c8bd763448164
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index e0dcb5682f..4dbeae7896 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Ordering;
import accord.local.Cleanup;
+import accord.local.CommandStores.RangesForEpoch;
import accord.local.DurableBefore;
import accord.local.RedundantBefore;
import accord.local.SaveStatus;
@@ -87,7 +88,6 @@ import org.apache.cassandra.service.accord.IAccordService;
import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.service.paxos.PaxosRepairHistory;
import org.apache.cassandra.service.paxos.uncommitted.PaxosRows;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.TimeUUID;
import static accord.local.Cleanup.TRUNCATE_WITH_OUTCOME;
@@ -784,6 +784,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
class AccordCommandsPurger extends AbstractPurger
{
final Int2ObjectHashMap<RedundantBefore> redundantBefores;
+ final Int2ObjectHashMap<RangesForEpoch> ranges;
final DurableBefore durableBefore;
int storeId;
@@ -791,9 +792,10 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
AccordCommandsPurger(Supplier<IAccordService> accordService)
{
- Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore>
redundantBeforesAndDurableBefore =
accordService.get().getRedundantBeforesAndDurableBefore();
- this.redundantBefores = redundantBeforesAndDurableBefore.left;
- this.durableBefore = redundantBeforesAndDurableBefore.right;
+ IAccordService.CompactionInfo compactionInfo =
accordService.get().getCompactionInfo();
+ this.redundantBefores = compactionInfo.redundantBefores;
+ this.ranges = compactionInfo.ranges;
+ this.durableBefore = compactionInfo.durableBefore;
}
protected void beginPartition(UnfilteredRowIterator partition)
@@ -815,7 +817,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
// When commands end up being sliced by compaction we need this to
discard tombstones and slices
// without enough information to run the rest of the cleanup logic
- if (Cleanup.isSafeToCleanup(durableBefore, txnId))
+ if (Cleanup.isSafeToCleanup(durableBefore, txnId,
ranges.get(storeId).allAt(txnId.epoch())))
return null;
Cell durabilityCell = row.getCell(CommandsColumns.durability);
@@ -878,7 +880,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
AccordTimestampsForKeyPurger(Supplier<IAccordService> accordService)
{
- this.redundantBefores =
accordService.get().getRedundantBeforesAndDurableBefore().left;
+ this.redundantBefores =
accordService.get().getCompactionInfo().redundantBefores;
}
protected void beginPartition(UnfilteredRowIterator partition)
@@ -954,7 +956,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
AccordCommandsForKeyPurger(CommandsForKeyAccessor accessor,
Supplier<IAccordService> accordService)
{
this.accessor = accessor;
- this.redundantBefores =
accordService.get().getRedundantBeforesAndDurableBefore().left;
+ this.redundantBefores =
accordService.get().getCompactionInfo().redundantBefores;
}
protected void beginPartition(UnfilteredRowIterator partition)
diff --git
a/src/java/org/apache/cassandra/index/accord/CheckpointIntervalArrayIndex.java
b/src/java/org/apache/cassandra/index/accord/CheckpointIntervalArrayIndex.java
index 9cf950410d..e478d26982 100644
---
a/src/java/org/apache/cassandra/index/accord/CheckpointIntervalArrayIndex.java
+++
b/src/java/org/apache/cassandra/index/accord/CheckpointIntervalArrayIndex.java
@@ -609,7 +609,7 @@ public class CheckpointIntervalArrayIndex
};
var searcher = new CheckpointIntervalArray<>(accessor,
indexInput, checkpoints.bounds, checkpoints.headers, checkpoints.lists,
checkpoints.maxScanAndCheckpointMatches);
- searcher.forEach(start, end, (i1, i2, i3, i4, index) -> {
+ searcher.forEachRange(start, end, (i1, i2, i3, i4, index) -> {
stats.matches++;
callback.accept(reader.copyTo(accessor.get(indexInput,
index), buffer));
}, (i1, i2, i3, i4, startIdx, endIdx) -> {
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java
b/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java
index 088c6c1331..e4ad2aa559 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java
@@ -35,6 +35,7 @@ import accord.local.CommandStore;
import accord.local.Node;
import accord.local.SafeCommandStore;
import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Routable;
@@ -290,6 +291,9 @@ public class AccordFetchCoordinator extends
AbstractFetchCoordinator implements
@Override
public Read slice(Ranges ranges) { return new StreamingRead(to,
this.ranges.slice(ranges)); }
+ @Override
+ public Read intersecting(Participants<?> participants) { return new
StreamingRead(to, this.ranges.intersecting(ranges)); }
+
@Override
public Read merge(Read other) { throw new
UnsupportedOperationException(); }
}
@@ -383,7 +387,7 @@ public class AccordFetchCoordinator extends
AbstractFetchCoordinator implements
protected PartialTxn rangeReadTxn(Ranges ranges)
{
StreamingRead read = new
StreamingRead(FBUtilities.getBroadcastAddressAndPort(), ranges);
- return new PartialTxn.InMemory(ranges, Txn.Kind.Read, ranges, read,
noopQuery, null);
+ return new PartialTxn.InMemory(Txn.Kind.Read, ranges, read, noopQuery,
null);
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index d3cd7a34fd..46e0d8c158 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -137,7 +137,7 @@ public class AccordObjectSizes
return EMPTY_ROUTING_KEYS_SIZE + routingKeysOnly(keys);
}
- private static final long EMPTY_FULL_KEY_ROUTE_SIZE = measure(new
FullKeyRoute(new TokenKey(null, null), true, new RoutingKey[0]));
+ private static final long EMPTY_FULL_KEY_ROUTE_SIZE = measure(new
FullKeyRoute(new TokenKey(null, null), new RoutingKey[0]));
public static long fullKeyRoute(FullKeyRoute route)
{
return EMPTY_FULL_KEY_ROUTE_SIZE
@@ -145,12 +145,11 @@ public class AccordObjectSizes
+ key(route.homeKey()); // TODO: we will probably dedup
homeKey, serializer dependent, but perhaps this is an acceptable error
}
- private static final long EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE = measure(new
PartialKeyRoute(Ranges.EMPTY, new TokenKey(null, null), true, new
RoutingKey[0]));
+ private static final long EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE = measure(new
PartialKeyRoute(new TokenKey(null, null), new RoutingKey[0]));
public static long partialKeyRoute(PartialKeyRoute route)
{
return EMPTY_PARTIAL_KEY_ROUTE_KEYS_SIZE
+ routingKeysOnly(route)
- + ranges(route.covering())
+ key(route.homeKey());
}
@@ -162,7 +161,7 @@ public class AccordObjectSizes
return size;
}
- private static final long EMPTY_FULL_RANGE_ROUTE_SIZE = measure(new
FullRangeRoute(new TokenKey(null, null), true, new Range[0]));
+ private static final long EMPTY_FULL_RANGE_ROUTE_SIZE = measure(new
FullRangeRoute(new TokenKey(null, null), new Range[0]));
public static long fullRangeRoute(FullRangeRoute route)
{
return EMPTY_FULL_RANGE_ROUTE_SIZE
@@ -170,12 +169,11 @@ public class AccordObjectSizes
+ key(route.homeKey()); // TODO: we will probably dedup
homeKey, serializer dependent, but perhaps this is an acceptable error
}
- private static final long EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE =
measure(new PartialRangeRoute(Ranges.EMPTY, new TokenKey(null, null), true, new
Range[0]));
+ private static final long EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE =
measure(new PartialRangeRoute(new TokenKey(null, null), new Range[0]));
public static long partialRangeRoute(PartialRangeRoute route)
{
return EMPTY_PARTIAL_RANGE_ROUTE_KEYS_SIZE
+ rangesOnly(route)
- + ranges(route.covering())
+ key(route.homeKey());
}
@@ -193,7 +191,7 @@ public class AccordObjectSizes
}
}
- private static final long EMPTY_TXN = measure(new
PartialTxn.InMemory(null, null, null, null, null, null));
+ private static final long EMPTY_TXN = measure(new
PartialTxn.InMemory(null, null, null, null, null));
public static long txn(PartialTxn txn)
{
long size = EMPTY_TXN;
@@ -283,13 +281,13 @@ public class AccordObjectSizes
private static CommonAttributes attrs(boolean hasDeps, boolean hasTxn)
{
- CommonAttributes.Mutable attrs = new
CommonAttributes.Mutable(EMPTY_TXNID).route(new FullKeyRoute(EMPTY_KEY, true,
new RoutingKey[]{ EMPTY_KEY }));
+ CommonAttributes.Mutable attrs = new
CommonAttributes.Mutable(EMPTY_TXNID).route(new FullKeyRoute(EMPTY_KEY, new
RoutingKey[]{ EMPTY_KEY }));
attrs.durability(Status.Durability.NotDurable);
if (hasDeps)
attrs.partialDeps(PartialDeps.NONE);
if (hasTxn)
- attrs.partialTxn(new PartialTxn.InMemory(null, null, null,
null, null, null));
+ attrs.partialTxn(new PartialTxn.InMemory(null, null, null,
null, null));
return attrs;
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 752e041f48..75450fff2f 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -45,6 +45,7 @@ import accord.coordinate.Exhausted;
import accord.coordinate.FailureAccumulator;
import accord.coordinate.TopologyMismatch;
import accord.impl.CoordinateDurabilityScheduling;
+import accord.local.CommandStores;
import accord.primitives.SyncPoint;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.statements.RequestValidations;
@@ -226,9 +227,9 @@ public class AccordService implements IAccordService,
Shutdownable
public void receive(Message<List<AccordSyncPropagator.Notification>>
message) {}
@Override
- public Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore>
getRedundantBeforesAndDurableBefore()
+ public CompactionInfo getCompactionInfo()
{
- return Pair.create(new Int2ObjectHashMap<>(), DurableBefore.EMPTY);
+ return new CompactionInfo(new Int2ObjectHashMap<>(), new
Int2ObjectHashMap<>(), DurableBefore.EMPTY);
}
};
@@ -773,17 +774,19 @@ public class AccordService implements IAccordService,
Shutdownable
}
@Override
- public Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore>
getRedundantBeforesAndDurableBefore()
+ public CompactionInfo getCompactionInfo()
{
Int2ObjectHashMap<RedundantBefore> redundantBefores = new
Int2ObjectHashMap<>();
+ Int2ObjectHashMap<CommandStores.RangesForEpoch>ranges = new
Int2ObjectHashMap<>();
AtomicReference<DurableBefore> durableBefore = new
AtomicReference<>(DurableBefore.EMPTY);
AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> {
synchronized (redundantBefores)
{
redundantBefores.put(safeStore.commandStore().id(),
safeStore.commandStore().redundantBefore());
+ ranges.put(safeStore.commandStore().id(), safeStore.ranges());
}
durableBefore.set(DurableBefore.merge(durableBefore.get(),
safeStore.commandStore().durableBefore()));
}));
- return Pair.create(redundantBefores, durableBefore.get());
+ return new CompactionInfo(redundantBefores, ranges,
durableBefore.get());
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
index 0814c322d5..7bbfc0c250 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
@@ -32,6 +32,7 @@ import accord.local.Node;
import accord.topology.Shard;
import accord.topology.Topology;
import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
@@ -64,7 +65,7 @@ public class AccordTopology
private static class ShardLookup extends HashMap<accord.primitives.Range,
Shard>
{
- private Shard createOrReuse(accord.primitives.Range range,
List<Node.Id> nodes, Set<Node.Id> fastPathElectorate, Set<Node.Id> joining)
+ private Shard createOrReuse(accord.primitives.Range range,
SortedArrayList<Node.Id> nodes, Set<Node.Id> fastPathElectorate, Set<Node.Id>
joining)
{
Shard prev = get(range);
if (prev != null
@@ -81,10 +82,10 @@ public class AccordTopology
{
private final KeyspaceMetadata keyspace;
private final Range<Token> range;
- private final List<Node.Id> nodes;
+ private final SortedArrayList<Node.Id> nodes;
private final Set<Node.Id> pending;
- private KeyspaceShard(KeyspaceMetadata keyspace, Range<Token> range,
List<Node.Id> nodes, Set<Node.Id> pending)
+ private KeyspaceShard(KeyspaceMetadata keyspace, Range<Token> range,
SortedArrayList<Node.Id> nodes, Set<Node.Id> pending)
{
this.keyspace = keyspace;
this.range = range;
@@ -106,7 +107,7 @@ public class AccordTopology
{
TokenRange tokenRange = AccordTopology.range(metadata.id, range);
- Set<Node.Id> fastPath =
strategyFor(metadata).calculateFastPath(nodes, unavailable, dcMap);
+ SortedArrayList<Node.Id> fastPath =
strategyFor(metadata).calculateFastPath(nodes, unavailable, dcMap);
return lookup.createOrReuse(tokenRange, nodes, fastPath, pending);
}
@@ -122,10 +123,10 @@ public class AccordTopology
Sets.SetView<InetAddressAndPort> readOnly =
Sets.difference(readEndpoints, writeEndpoints);
Invariants.checkState(readOnly.isEmpty(), "Read only replicas
detected: %s", readOnly);
- List<Node.Id> nodes = writes.endpoints().stream()
+ SortedArrayList<Node.Id> nodes = new
SortedArrayList<>(writes.endpoints().stream()
.map(directory::peerId)
.map(AccordTopology::tcmIdToAccord)
- .sorted().collect(Collectors.toList());
+ .sorted().toArray(Node.Id[]::new));
Set<Node.Id> pending = readEndpoints.equals(writeEndpoints) ?
Collections.emptySet() :
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index ae0d08c48e..828ca1f287 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.service.accord;
import accord.api.BarrierType;
+import accord.local.CommandStores;
import accord.local.DurableBefore;
import accord.local.Node.Id;
import accord.local.RedundantBefore;
@@ -40,7 +41,6 @@ import
org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.service.accord.api.AccordScheduler;
import org.apache.cassandra.service.accord.txn.TxnResult;
import org.apache.cassandra.tcm.Epoch;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Future;
import javax.annotation.Nonnull;
@@ -108,10 +108,24 @@ public interface IAccordService
void receive(Message<List<AccordSyncPropagator.Notification>> message);
+ class CompactionInfo
+ {
+ public final Int2ObjectHashMap<RedundantBefore> redundantBefores;
+ public final Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges;
+ public final DurableBefore durableBefore;
+
+ public CompactionInfo(Int2ObjectHashMap<RedundantBefore>
redundantBefores, Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges,
DurableBefore durableBefore)
+ {
+ this.redundantBefores = redundantBefores;
+ this.ranges = ranges;
+ this.durableBefore = durableBefore;
+ }
+ }
+
/**
* Fetch the redundnant befores for every command store
*/
- Pair<Int2ObjectHashMap<RedundantBefore>, DurableBefore>
getRedundantBeforesAndDurableBefore();
+ CompactionInfo getCompactionInfo();
default Id nodeId() { throw new UnsupportedOperationException(); }
}
diff --git
a/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
b/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
index cbc46ef9a0..7d1ab21224 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordTopologySorter.java
@@ -27,6 +27,7 @@ import accord.local.Node;
import accord.topology.ShardSelection;
import accord.topology.Topologies;
import accord.topology.Topology;
+import accord.utils.SortedList;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.locator.Endpoint;
import org.apache.cassandra.locator.IEndpointSnitch;
@@ -61,7 +62,7 @@ public class AccordTopologySorter implements TopologySorter
return create(topologies.nodes());
}
- private AccordTopologySorter create(Set<Node.Id> nodes)
+ private AccordTopologySorter create(SortedList<Node.Id> nodes)
{
SortableEndpoints endpoints = SortableEndpoints.from(nodes,
mapper);
Comparator<Endpoint> comparator =
snitch.endpointComparator(FBUtilities.getBroadcastAddressAndPort(), endpoints);
diff --git
a/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java
b/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java
index 6978721a81..f6a81b0b66 100644
---
a/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java
+++
b/src/java/org/apache/cassandra/service/accord/fastpath/FastPathStrategy.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord.fastpath;
import java.io.IOException;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -29,6 +28,7 @@ import javax.annotation.Nullable;
import com.google.common.collect.ImmutableMap;
import accord.local.Node;
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -83,7 +83,7 @@ public interface FastPathStrategy
* @param dcMap
* @return
*/
- Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id>
unavailable, Map<Node.Id, String> dcMap);
+ SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id> nodes,
Set<Node.Id> unavailable, Map<Node.Id, String> dcMap);
Kind kind();
diff --git
a/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java
b/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java
index 08b7763a93..2ffc3c7cbf 100644
---
a/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java
+++
b/src/java/org/apache/cassandra/service/accord/fastpath/InheritKeyspaceFastPathStrategy.java
@@ -18,13 +18,13 @@
package org.apache.cassandra.service.accord.fastpath;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableMap;
import accord.local.Node;
+import accord.utils.SortedArrays.SortedArrayList;
public class InheritKeyspaceFastPathStrategy implements FastPathStrategy
{
@@ -35,7 +35,7 @@ public class InheritKeyspaceFastPathStrategy implements
FastPathStrategy
private InheritKeyspaceFastPathStrategy() {}
@Override
- public Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id>
unavailable, Map<Node.Id, String> dcMap)
+ public SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id>
nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap)
{
throw new IllegalStateException("InheritKeyspaceFastPathStrategy
should be replaced before calculateFastPath is called");
}
diff --git
a/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java
b/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java
index aef562f99a..dfe027367e 100644
---
a/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java
+++
b/src/java/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategy.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service.accord.fastpath;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@@ -32,12 +33,12 @@ import java.util.stream.Collectors;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import accord.api.VisibleForImplementation;
import accord.local.Node;
import accord.topology.Shard;
import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -204,7 +205,7 @@ public class ParameterizedFastPathStrategy implements
FastPathStrategy
}
@Override
- public Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id>
unavailable, Map<Node.Id, String> dcMap)
+ public SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id>
nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap)
{
List<NodeSorter> sorters = new ArrayList<>(nodes.size());
@@ -221,12 +222,12 @@ public class ParameterizedFastPathStrategy implements
FastPathStrategy
int slowQuorum = Shard.slowPathQuorumSize(nodes.size());
int fpSize = Math.max(size, slowQuorum);
- ImmutableSet.Builder<Node.Id> builder = ImmutableSet.builder();
-
+ Node.Id[] array = new Node.Id[fpSize];
for (int i=0; i<fpSize; i++)
- builder.add(sorters.get(i).id);
+ array[i] = sorters.get(i).id;
- ImmutableSet<Node.Id> fastPath = builder.build();
+ Arrays.sort(array);
+ SortedArrayList<Node.Id> fastPath = new SortedArrayList<>(array);
Invariants.checkState(fastPath.size() >= slowQuorum);
return fastPath;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java
b/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java
index 3f2e27bdc8..37d51b6c82 100644
---
a/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java
+++
b/src/java/org/apache/cassandra/service/accord/fastpath/SimpleFastPathStrategy.java
@@ -18,16 +18,16 @@
package org.apache.cassandra.service.accord.fastpath;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import accord.local.Node;
import accord.topology.Shard;
+import accord.utils.ArrayBuffers;
import accord.utils.Invariants;
+import accord.utils.SortedArrays.SortedArrayList;
public class SimpleFastPathStrategy implements FastPathStrategy
{
@@ -38,26 +38,27 @@ public class SimpleFastPathStrategy implements
FastPathStrategy
private SimpleFastPathStrategy() {}
@Override
- public Set<Node.Id> calculateFastPath(List<Node.Id> nodes, Set<Node.Id>
unavailable, Map<Node.Id, String> dcMap)
+ public SortedArrayList<Node.Id> calculateFastPath(SortedArrayList<Node.Id>
nodes, Set<Node.Id> unavailable, Map<Node.Id, String> dcMap)
{
int maxFailures = Shard.maxToleratedFailures(nodes.size());
int discarded = 0;
- ImmutableSet.Builder<Node.Id> builder = ImmutableSet.builder();
+ if (unavailable.isEmpty())
+ return nodes;
+ Object[] tmp = ArrayBuffers.cachedAny().get(nodes.size());
for (int i=0,mi=nodes.size(); i<mi; i++)
{
Node.Id node = nodes.get(i);
if (unavailable.contains(node) && discarded < maxFailures)
- {
discarded++;
- continue;
- }
-
- builder.add(node);
+ else
+ tmp[i - discarded] = node;
}
- Set<Node.Id> fastPath = builder.build();
+ Node.Id[] array = new Node.Id[nodes.size() - discarded];
+ System.arraycopy(tmp, 0, array, 0, nodes.size() - discarded);
+ SortedArrayList<Node.Id> fastPath = new SortedArrayList<>(array);
Invariants.checkState(fastPath.size() >=
Shard.slowPathQuorumSize(nodes.size()));
return fastPath;
}
diff --git
a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
index 76e29adbc5..8cc04f60da 100644
---
a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
+++
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
@@ -61,7 +61,7 @@ public class RepairSyncPointAdapter<S extends Seekables<?,
?>> extends Coordinat
public void execute(Node node, Topologies all, FullRoute<?> route,
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps,
BiConsumer<? super SyncPoint<S>, Throwable> callback)
{
RequiredResponseTracker tracker = new
RequiredResponseTracker(requiredResponses, all);
- ExecuteSyncPoint.ExecuteBlocking<S> execute = new
ExecuteSyncPoint.ExecuteBlocking<>(node, tracker, new SyncPoint<>(txnId, deps,
(S) txn.keys(), route), executeAt);
+ ExecuteSyncPoint.ExecuteBlocking<S> execute = new
ExecuteSyncPoint.ExecuteBlocking<>(node, new SyncPoint<>(txnId, deps, (S)
txn.keys(), route), tracker, executeAt);
execute.addCallback(callback);
execute.start();
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index fe16d3033e..cd76550262 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -36,7 +36,6 @@ import accord.local.Status.Known;
import accord.primitives.Ballot;
import accord.primitives.PartialTxn;
import accord.primitives.ProgressToken;
-import accord.primitives.Ranges;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
@@ -234,7 +233,6 @@ public class CommandSerializers
private void serializeWithoutKeys(PartialTxn txn, DataOutputPlus out,
int version) throws IOException
{
CommandSerializers.kind.serialize(txn.kind(), out, version);
- KeySerializers.ranges.serialize(txn.covering(), out, version);
readSerializer.serialize(txn.read(), out, version);
querySerializer.serialize(txn.query(), out, version);
out.writeBoolean(txn.update() != null);
@@ -245,18 +243,16 @@ public class CommandSerializers
private PartialTxn deserializeWithoutKeys(Seekables<?, ?> keys,
DataInputPlus in, int version) throws IOException
{
Txn.Kind kind = CommandSerializers.kind.deserialize(in, version);
- Ranges covering = KeySerializers.ranges.deserialize(in, version);
Read read = readSerializer.deserialize(in, version);
Query query = querySerializer.deserialize(in, version);
Update update = in.readBoolean() ?
updateSerializer.deserialize(in, version) : null;
- return new PartialTxn.InMemory(covering, kind, keys, read, query,
update);
+ return new PartialTxn.InMemory(kind, keys, read, query, update);
}
private long serializedSizeWithoutKeys(PartialTxn txn, int version)
{
long size = CommandSerializers.kind.serializedSize(txn.kind(),
version);
- size += KeySerializers.ranges.serializedSize(txn.covering(),
version);
size += readSerializer.serializedSize(txn.read(), version);
size += querySerializer.serializedSize(txn.query(), version);
size += TypeSizes.sizeof(txn.update() != null);
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
index dd78761cda..841d89882e 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
@@ -25,9 +25,9 @@ import accord.primitives.Deps;
import accord.primitives.KeyDeps;
import accord.primitives.Keys;
import accord.primitives.PartialDeps;
+import accord.primitives.Participants;
import accord.primitives.Range;
import accord.primitives.RangeDeps;
-import accord.primitives.Ranges;
import accord.primitives.Seekables;
import accord.primitives.TxnId;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -60,7 +60,7 @@ public abstract class DepsSerializer<D extends Deps> extends
IVersionedWithKeysS
@Override
PartialDeps deserialize(KeyDeps keyDeps, RangeDeps rangeDeps, KeyDeps
directKeyDeps, DataInputPlus in, int version) throws IOException
{
- Ranges covering = KeySerializers.ranges.deserialize(in, version);
+ Participants<?> covering =
KeySerializers.participants.deserialize(in, version);
return new PartialDeps(covering, keyDeps, rangeDeps,
directKeyDeps);
}
@@ -68,28 +68,28 @@ public abstract class DepsSerializer<D extends Deps>
extends IVersionedWithKeysS
public void serialize(PartialDeps partialDeps, DataOutputPlus out, int
version) throws IOException
{
super.serialize(partialDeps, out, version);
- KeySerializers.ranges.serialize(partialDeps.covering, out,
version);
+ KeySerializers.participants.serialize(partialDeps.covering, out,
version);
}
@Override
public void serialize(Seekables<?, ?> superset, PartialDeps
partialDeps, DataOutputPlus out, int version) throws IOException
{
super.serialize(superset, partialDeps, out, version);
- KeySerializers.ranges.serialize(partialDeps.covering, out,
version);
+ KeySerializers.participants.serialize(partialDeps.covering, out,
version);
}
@Override
public long serializedSize(PartialDeps partialDeps, int version)
{
return super.serializedSize(partialDeps, version)
- + KeySerializers.ranges.serializedSize(partialDeps.covering,
version);
+ +
KeySerializers.participants.serializedSize(partialDeps.covering, version);
}
@Override
public long serializedSize(Seekables<?, ?> keys, PartialDeps
partialDeps, int version)
{
return super.serializedSize(keys, partialDeps, version)
- + KeySerializers.ranges.serializedSize(partialDeps.covering,
version);
+ +
KeySerializers.participants.serializedSize(partialDeps.covering, version);
}
};
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/InformHomeDurableSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/InformHomeDurableSerializers.java
index 50f53a04f6..c4f2155577 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/InformHomeDurableSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/InformHomeDurableSerializers.java
@@ -25,13 +25,9 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import static org.apache.cassandra.utils.CollectionSerializers.deserializeSet;
-import static
org.apache.cassandra.utils.CollectionSerializers.serializeCollection;
-import static
org.apache.cassandra.utils.CollectionSerializers.serializedCollectionSize;
-
public class InformHomeDurableSerializers
{
- public static final IVersionedSerializer<InformHomeDurable> request = new
IVersionedSerializer<InformHomeDurable>()
+ public static final IVersionedSerializer<InformHomeDurable> request = new
IVersionedSerializer<>()
{
@Override
public void serialize(InformHomeDurable inform, DataOutputPlus out,
int version) throws IOException
@@ -40,7 +36,6 @@ public class InformHomeDurableSerializers
KeySerializers.route.serialize(inform.route, out, version);
CommandSerializers.timestamp.serialize(inform.executeAt, out,
version);
CommandSerializers.durability.serialize(inform.durability, out,
version);
- serializeCollection(inform.persistedOn, out, version,
TopologySerializers.nodeId);
}
@Override
@@ -49,8 +44,7 @@ public class InformHomeDurableSerializers
return new
InformHomeDurable(CommandSerializers.txnId.deserialize(in, version),
KeySerializers.route.deserialize(in,
version),
CommandSerializers.timestamp.deserialize(in, version),
-
CommandSerializers.durability.deserialize(in, version),
- deserializeSet(in, version,
TopologySerializers.nodeId));
+
CommandSerializers.durability.deserialize(in, version));
}
@Override
@@ -59,8 +53,7 @@ public class InformHomeDurableSerializers
return CommandSerializers.txnId.serializedSize(inform.txnId,
version)
+ KeySerializers.route.serializedSize(inform.route, version)
+
CommandSerializers.timestamp.serializedSize(inform.executeAt, version)
- +
CommandSerializers.durability.serializedSize(inform.durability, version)
- + serializedCollectionSize(inform.persistedOn, version,
TopologySerializers.nodeId);
+ +
CommandSerializers.durability.serializedSize(inform.durability, version);
}
};
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
index 6d2e86e8bb..1e105d8192 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
@@ -91,38 +91,31 @@ public class KeySerializers
{
@Override PartialKeyRoute deserialize(DataInputPlus in, int version,
RoutingKey[] keys) throws IOException
{
- Ranges covering = ranges.deserialize(in, version);
RoutingKey homeKey = routingKey.deserialize(in, version);
- boolean isParticipatingHomeKey = (in.readByte() & 0x1) != 0;
- return PartialKeyRoute.SerializationSupport.create(covering,
homeKey, isParticipatingHomeKey, keys);
+ return PartialKeyRoute.SerializationSupport.create(homeKey, keys);
}
@Override
public void serialize(PartialKeyRoute route, DataOutputPlus out, int
version) throws IOException
{
super.serialize(route, out, version);
- ranges.serialize(route.covering, out, version);
routingKey.serialize(route.homeKey, out, version);
- out.writeByte(route.isParticipatingHomeKey ? 0x1 : 0);
}
@Override
public long serializedSize(PartialKeyRoute keys, int version)
{
return super.serializedSize(keys, version)
- + ranges.serializedSize(keys.covering, version)
- + routingKey.serializedSize(keys.homeKey, version)
- + 1;
+ + routingKey.serializedSize(keys.homeKey, version);
}
};
- public static final IVersionedSerializer<FullKeyRoute> fullKeyRoute = new
AbstractKeysSerializer<RoutingKey, FullKeyRoute>(routingKey, RoutingKey[]::new)
+ public static final IVersionedSerializer<FullKeyRoute> fullKeyRoute = new
AbstractKeysSerializer<>(routingKey, RoutingKey[]::new)
{
@Override FullKeyRoute deserialize(DataInputPlus in, int version,
RoutingKey[] keys) throws IOException
{
RoutingKey homeKey = routingKey.deserialize(in, version);
- boolean isParticipatingHomeKey = (in.readByte() & 0x1) != 0;
- return FullKeyRoute.SerializationSupport.create(homeKey,
isParticipatingHomeKey, keys);
+ return FullKeyRoute.SerializationSupport.create(homeKey, keys);
}
@Override
@@ -130,15 +123,13 @@ public class KeySerializers
{
super.serialize(route, out, version);
routingKey.serialize(route.homeKey, out, version);
- out.writeByte(route.isParticipatingHomeKey ? 0x1 : 0);
}
@Override
public long serializedSize(FullKeyRoute route, int version)
{
return super.serializedSize(route, version)
- + routingKey.serializedSize(route.homeKey, version)
- + 1;
+ + routingKey.serializedSize(route.homeKey, version);
}
};
@@ -146,28 +137,22 @@ public class KeySerializers
{
@Override PartialRangeRoute deserialize(DataInputPlus in, int version,
Range[] rs) throws IOException
{
- Ranges covering = ranges.deserialize(in, version);
RoutingKey homeKey = routingKey.deserialize(in, version);
- boolean isParticipatingHomeKey = (in.readByte() & 0x1) != 0;
- return PartialRangeRoute.SerializationSupport.create(covering,
homeKey, isParticipatingHomeKey, rs);
+ return PartialRangeRoute.SerializationSupport.create(homeKey, rs);
}
@Override
public void serialize(PartialRangeRoute route, DataOutputPlus out, int
version) throws IOException
{
super.serialize(route, out, version);
- ranges.serialize(route.covering, out, version);
routingKey.serialize(route.homeKey, out, version);
- out.writeByte(route.isParticipatingHomeKey ? 0x1 : 0);
}
@Override
public long serializedSize(PartialRangeRoute rs, int version)
{
return super.serializedSize(rs, version)
- + ranges.serializedSize(rs.covering, version)
- + routingKey.serializedSize(rs.homeKey, version)
- + 1;
+ + routingKey.serializedSize(rs.homeKey, version);
}
};
@@ -177,8 +162,7 @@ public class KeySerializers
@Override FullRangeRoute deserialize(DataInputPlus in, int version,
Range[] Ranges) throws IOException
{
RoutingKey homeKey = routingKey.deserialize(in, version);
- boolean isParticipatingHomeKey = (in.readByte() & 0x1) != 0;
- return FullRangeRoute.SerializationSupport.create(homeKey,
isParticipatingHomeKey, Ranges);
+ return FullRangeRoute.SerializationSupport.create(homeKey, Ranges);
}
@Override
@@ -186,15 +170,13 @@ public class KeySerializers
{
super.serialize(route, out, version);
routingKey.serialize(route.homeKey, out, version);
- out.writeByte(route.isParticipatingHomeKey ? 0x1 : 0);
}
@Override
public long serializedSize(FullRangeRoute ranges, int version)
{
return super.serializedSize(ranges, version)
- + routingKey.serializedSize(ranges.homeKey(), version)
- + 1;
+ + routingKey.serializedSize(ranges.homeKey(), version);
}
};
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
index 4693c03c5c..782ecbf5ed 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.service.accord.serializers;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.List;
import java.util.Set;
import accord.local.Node;
import accord.primitives.Range;
import accord.topology.Shard;
import accord.topology.Topology;
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -135,7 +135,7 @@ public class TopologySerializers
public Shard deserialize(DataInputPlus in, int version) throws
IOException
{
Range range = TokenRange.serializer.deserialize(in, version);
- List<Node.Id> nodes = CollectionSerializers.deserializeList(in,
version, nodeId);
+ SortedArrayList<Node.Id> nodes =
CollectionSerializers.deserializeSortedArrayList(in, version, nodeId,
Node.Id[]::new);
Set<Node.Id> fastPathElectorate =
CollectionSerializers.deserializeSet(in, version, nodeId);
Set<Node.Id> joining = CollectionSerializers.deserializeSet(in,
version, nodeId);
return new Shard(range, nodes, fastPathElectorate, joining);
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index 62dfe68e2c..694c3f225a 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -32,6 +32,7 @@ import accord.api.DataStore;
import accord.api.Read;
import accord.local.SafeCommandStore;
import accord.primitives.Keys;
+import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Seekable;
import accord.primitives.Timestamp;
@@ -158,14 +159,25 @@ public class TxnRead extends
AbstractKeySorted<TxnNamedRead> implements Read
@Override
public Read slice(Ranges ranges)
{
- Keys keys = itemKeys.slice(ranges);
+ return intersecting(itemKeys.slice(ranges));
+ }
+
+ @Override
+ public Read intersecting(Participants<?> participants)
+ {
+ return intersecting(itemKeys.intersecting(participants));
+ }
+
+ private Read intersecting(Keys select)
+ {
+ Keys keys = itemKeys.intersecting(select);
List<TxnNamedRead> reads = new ArrayList<>(keys.size());
for (TxnNamedRead read : items)
if (keys.contains(read.key()))
reads.add(read);
- return createTxnRead(reads, txnKeys.slice(ranges),
cassandraConsistencyLevel);
+ return createTxnRead(reads, txnKeys.intersecting(select),
cassandraConsistencyLevel);
}
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index 5a32065e97..563af81675 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -32,6 +32,7 @@ import accord.api.Data;
import accord.api.Key;
import accord.api.Update;
import accord.primitives.Keys;
+import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.RoutableKey;
import accord.primitives.Timestamp;
@@ -148,6 +149,14 @@ public class TxnUpdate extends AccordUpdate
return new TxnUpdate(keys, select(this.keys, keys, fragments),
condition, cassandraCommitCL);
}
+ @Override
+ public Update intersecting(Participants<?> participants)
+ {
+ Keys keys = this.keys.intersecting(participants);
+ // TODO: Slice the condition.
+ return new TxnUpdate(keys, select(this.keys, keys, fragments),
condition, cassandraCommitCL);
+ }
+
private static ByteBuffer[] select(Keys in, Keys out, ByteBuffer[] from)
{
ByteBuffer[] result = new ByteBuffer[out.size()];
diff --git
a/src/java/org/apache/cassandra/service/accord/txn/UnrecoverableRepairUpdate.java
b/src/java/org/apache/cassandra/service/accord/txn/UnrecoverableRepairUpdate.java
index 310a3e6a58..0efcbf1ffd 100644
---
a/src/java/org/apache/cassandra/service/accord/txn/UnrecoverableRepairUpdate.java
+++
b/src/java/org/apache/cassandra/service/accord/txn/UnrecoverableRepairUpdate.java
@@ -29,6 +29,7 @@ import accord.api.Data;
import accord.api.Update;
import accord.api.Write;
import accord.local.Node;
+import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
@@ -147,6 +148,12 @@ public class UnrecoverableRepairUpdate<E extends
Endpoints<E>, P extends Replica
return this;
}
+ @Override
+ public Update intersecting(Participants<?> participants)
+ {
+ return this;
+ }
+
@Override
public Update merge(Update other)
{
diff --git a/src/java/org/apache/cassandra/utils/CollectionSerializers.java
b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
index 1fcb7cc2f3..0cf7fce0d9 100644
--- a/src/java/org/apache/cassandra/utils/CollectionSerializers.java
+++ b/src/java/org/apache/cassandra/utils/CollectionSerializers.java
@@ -31,6 +31,7 @@ import javax.annotation.Nonnull;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.IPartitionerDependentSerializer;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -102,6 +103,15 @@ public class CollectionSerializers
}
}
+ public static <V extends Comparable<? super V>> SortedArrayList<V>
deserializeSortedArrayList(DataInputPlus in, int version,
IVersionedSerializer<V> serializer, IntFunction<V[]> allocator) throws
IOException
+ {
+ int size = in.readUnsignedVInt32();
+ V[] array = allocator.apply(size);
+ for (int i = 0 ; i < array.length ; ++i)
+ array[i] = serializer.deserialize(in, version);
+ return new SortedArrayList<>(array);
+ }
+
public static <V> List<V> deserializeList(DataInputPlus in, int version,
IVersionedSerializer<V> serializer) throws IOException
{
return deserializeCollection(in, version, serializer, newArrayList());
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index 493283bdae..3e1f75f62a 100644
---
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import accord.primitives.Routable;
+import accord.local.CommandStores;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.service.accord.*;
@@ -418,7 +419,9 @@ public class CompactionAccordIteratorsTest
Int2ObjectHashMap<RedundantBefore> redundantBefores = new
Int2ObjectHashMap<>();
if (redundantBefore != null)
redundantBefores.put(commandStore.id(), redundantBefore);
-
when(mockAccordService.getRedundantBeforesAndDurableBefore()).thenReturn(Pair.create(redundantBefores,
durableBefore));
+ Int2ObjectHashMap<CommandStores.RangesForEpoch> rangesForEpochs = new
Int2ObjectHashMap<>();
+ rangesForEpochs.put(commandStore.id(),
commandStore.unsafeRangesForEpoch());
+ when(mockAccordService.getCompactionInfo()).thenReturn(new
IAccordService.CompactionInfo(redundantBefores, rangesForEpochs,
durableBefore));
return mockAccordService;
}
@@ -463,7 +466,7 @@ public class CompactionAccordIteratorsTest
for (TxnId txnId : txnIds)
{
Txn txn = txnId.kind().isWrite() ? writeTxn : readTxn;
- PartialDeps partialDeps =
Deps.NONE.slice(AccordTestUtils.fullRange(txn));
+ PartialDeps partialDeps =
Deps.NONE.intersecting(AccordTestUtils.fullRange(txn));
PartialTxn partialTxn =
txn.slice(commandStore.unsafeRangesForEpoch().currentRanges(), true);
PartialRoute<?> partialRoute =
route.slice(commandStore.unsafeRangesForEpoch().currentRanges());
getUninterruptibly(commandStore.execute(contextFor(txnId,
txn.keys(), COMMANDS), safe -> {
@@ -559,7 +562,7 @@ public class CompactionAccordIteratorsTest
scanners.add(random.nextInt(scanners.size()), new
Scanner(cfs.metadata(),
outputPartitions.stream().map(Partition::unfilteredIterator).collect(Collectors.toList())));
} while (!scanners.isEmpty());
- verify(mockAccordService, times(singleCompaction || numScanners == 1 ?
1 : numScanners - 1)).getRedundantBeforesAndDurableBefore();
+ verify(mockAccordService, times(singleCompaction || numScanners == 1 ?
1 : numScanners - 1)).getCompactionInfo();
return result;
}
}
diff --git
a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
index c4085c6083..59a7cf437d 100644
--- a/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/AccordIndexStressTest.java
@@ -464,7 +464,7 @@ public class AccordIndexStressTest extends CQLTester
var token = new
Murmur3Partitioner.LongToken(rs.nextInt(minToken, maxToken));
keys.add(new TokenKey(table, token));
}
- return new FullKeyRoute(keys.first(), true,
keys.toArray(RoutingKey[]::new));
+ return new FullKeyRoute(keys.first(),
keys.toArray(RoutingKey[]::new));
}
case Range:
{
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index 5200316a2f..17de14e415 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -259,7 +259,7 @@ public class RouteIndexTest extends CQLTester.InMemory
var token = new LongToken(state.tokenGen.nextInt(rs));
keys.add(new TokenKey(table, token));
}
- return new FullKeyRoute(keys.first(), true,
keys.toArray(RoutingKey[]::new));
+ return new FullKeyRoute(keys.first(),
keys.toArray(RoutingKey[]::new));
}
case Range:
{
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index bcc1f8bf11..dde88ca468 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -41,6 +41,7 @@ import accord.primitives.Ballot;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.Range;
+import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.Route;
import accord.primitives.RoutingKeys;
@@ -113,7 +114,7 @@ public class AccordCommandStoreTest
TxnId txnId = txnId(1, clock.incrementAndGet(), 1, Txn.Kind.Write,
Routable.Domain.Range);
PartialDeps dependencies;
- try (PartialDeps.Builder builder =
PartialDeps.builder(depTxn.covering()))
+ try (PartialDeps.Builder builder =
PartialDeps.builder(Ranges.of(range)))
{
builder.add(range, oldTxnId1);
builder.add(range, oldTxnId2);
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index a9bdd7aa52..70a66f2be4 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -101,7 +101,7 @@ public class AccordCommandTest
RoutingKey homeKey = key.toUnseekable();
FullRoute<?> fullRoute = txn.keys().toRoute(homeKey);
PartialRoute<?> route = fullRoute.slice(fullRange(txn));
- PartialTxn partialTxn = txn.slice(route.covering(), true);
+ PartialTxn partialTxn = txn.intersecting(route, true);
PreAccept preAccept = PreAccept.SerializerSupport.create(txnId, route,
1, 1, false, 1, partialTxn, fullRoute);
// Check preaccept
@@ -136,7 +136,7 @@ public class AccordCommandTest
TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1);
PartialDeps deps;
- try (PartialDeps.Builder builder =
PartialDeps.builder(route.covering()))
+ try (PartialDeps.Builder builder = PartialDeps.builder(route))
{
builder.add(key, txnId2);
deps = builder.build();
@@ -193,7 +193,7 @@ public class AccordCommandTest
RoutingKey homeKey = key.toUnseekable();
FullRoute<?> fullRoute = txn.keys().toRoute(homeKey);
PartialRoute<?> route = fullRoute.slice(fullRange(txn));
- PartialTxn partialTxn = txn.slice(route.covering(), true);
+ PartialTxn partialTxn = txn.intersecting(route, true);
PreAccept preAccept1 = PreAccept.SerializerSupport.create(txnId1,
route, 1, 1, false, 1, partialTxn, fullRoute);
getUninterruptibly(commandStore.execute(preAccept1, safeStore -> {
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
index 2f689187ac..94e181a853 100644
---
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.junit.Assert;
@@ -38,6 +37,7 @@ import accord.local.Node;
import accord.local.Node.Id;
import accord.topology.Shard;
import accord.topology.Topology;
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.concurrent.ScheduledExecutors;
@@ -71,7 +71,7 @@ public class AccordConfigurationServiceTest
private static final Id ID1 = new Id(1);
private static final Id ID2 = new Id(2);
private static final Id ID3 = new Id(3);
- private static final List<Id> ID_LIST = ImmutableList.of(ID1, ID2, ID3);
+ private static final SortedArrayList<Id> ID_LIST = new
SortedArrayList<>(new Id[] { ID1, ID2, ID3 });
private static final Set<Id> ID_SET = ImmutableSet.copyOf(ID_LIST);
private static final TableId TBL1 = TableId.fromUUID(new UUID(0, 1));
private static final TableId TBL2 = TableId.fromUUID(new UUID(0, 2));
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
index d6d2dab96b..717cda1de3 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java
@@ -113,7 +113,7 @@ public class AccordKeyspaceTest extends CQLTester.InMemory
CommonAttributes.Mutable common = new CommonAttributes.Mutable(id);
common.partialTxn(partialTxn);
common.route(route);
- common.partialDeps(deps.slice(scope));
+ common.partialDeps(deps.intersecting(scope));
common.durability(Status.Durability.NotDurable);
Command.WaitingOn waitingOn = null;
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
index 47ba369a09..1ece95f164 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
@@ -88,8 +88,9 @@ public class AccordMessageSinkTest
long epoch = 42;
Txn txn = Utils.readTxn(Keys.of(IntKey.key(42)));
TxnId id = nextTxnId(epoch, txn);
- PartialTxn partialTxn = txn.slice(Ranges.of(IntKey.range(40, 50)),
true);
- Request request = new AbstractFetchCoordinator.FetchRequest(epoch, id,
partialTxn.covering(), PartialDeps.NONE, partialTxn);
+ Ranges ranges = Ranges.of(IntKey.range(40, 50));
+ PartialTxn partialTxn = txn.slice(ranges, true);
+ Request request = new AbstractFetchCoordinator.FetchRequest(epoch, id,
ranges, PartialDeps.NONE, partialTxn);
checkRequestReplies(request,
new AbstractFetchCoordinator.FetchResponse(null,
null, id),
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index 0f28c6c7c2..ffd0d31186 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -32,8 +32,9 @@ import java.util.stream.IntStream;
import javax.annotation.Nullable;
-import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.File;
@@ -157,7 +158,7 @@ public class AccordTestUtils
{
Seekable key = txn.keys().get(0);
RoutingKey routingKey = key.asKey().toUnseekable();
- return new FullKeyRoute(routingKey, true, new RoutingKey[]{
routingKey });
+ return new FullKeyRoute(routingKey, new RoutingKey[]{ routingKey
});
}
}
@@ -347,7 +348,7 @@ public class AccordTestUtils
{
Txn txn = createTxn(key, key);
Ranges ranges = fullRange(txn);
- return new PartialTxn.InMemory(ranges, txn.kind(), txn.keys(),
txn.read(), txn.query(), txn.update());
+ return new PartialTxn.InMemory(txn.kind(), txn.keys(), txn.read(),
txn.query(), txn.update());
}
private static class SingleEpochRanges extends
CommandStore.EpochUpdateHolder
@@ -370,7 +371,7 @@ public class AccordTestUtils
TableMetadata metadata = Schema.instance.getTableMetadata(keyspace,
table);
TokenRange range = TokenRange.fullRange(metadata.id);
Node.Id node = new Id(1);
- Topology topology = new Topology(1, new Shard(range,
Lists.newArrayList(node), Sets.newHashSet(node), Collections.emptySet()));
+ Topology topology = new Topology(1, new Shard(range, new
SortedArrayList<>(new Id[] { node }), Sets.newHashSet(node),
Collections.emptySet()));
NodeTimeService time = new NodeTimeService()
{
@Override public Id id() { return node;}
@@ -437,7 +438,7 @@ public class AccordTestUtils
TableMetadata metadata = Schema.instance.getTableMetadata(keyspace,
table);
TokenRange range = TokenRange.fullRange(metadata.id);
Node.Id node = new Id(1);
- Topology topology = new Topology(1, new Shard(range,
Lists.newArrayList(node), Sets.newHashSet(node), Collections.emptySet()));
+ Topology topology = new Topology(1, new Shard(range, new
SortedArrayList<>(new Id[] { node }), Sets.newHashSet(node),
Collections.emptySet()));
AccordCommandStore store = createAccordCommandStore(node, now,
topology, loadExecutor, saveExecutor);
store.execute(PreLoadContext.empty(), safeStore ->
((AccordCommandStore)safeStore.commandStore()).setCapacity(1 << 20));
return store;
@@ -480,9 +481,9 @@ public class AccordTestUtils
return new Node.Id(id);
}
- public static List<Node.Id> idList(int... ids)
+ public static SortedArrayList<Id> idList(int... ids)
{
- return
Arrays.stream(ids).mapToObj(AccordTestUtils::id).collect(Collectors.toList());
+ return new
SortedArrayList<>(Arrays.stream(ids).mapToObj(AccordTestUtils::id).toArray(Id[]::new));
}
public static Set<Id> idSet(int... ids)
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java
b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java
index c3c8e6b588..e4a2a4791c 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyUtils.java
@@ -24,10 +24,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import accord.local.Node;
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
@@ -53,7 +53,7 @@ public class AccordTopologyUtils
public static final Node.Id ID1 = new Node.Id(1);
public static final Node.Id ID2 = new Node.Id(2);
public static final Node.Id ID3 = new Node.Id(3);
- public static final List<Node.Id> NODE_LIST = ImmutableList.of(ID1, ID2,
ID3);
+ public static final SortedArrayList<Node.Id> NODE_LIST = new
SortedArrayList<>(new Node.Id[] { ID1, ID2, ID3 });
public static final Set<Node.Id> NODE_SET = ImmutableSet.copyOf(NODE_LIST);
private static final IPartitioner partitioner =
Murmur3Partitioner.instance;
diff --git
a/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java
b/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java
index 65650274fa..cda86eff8b 100644
---
a/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/fastpath/ParameterizedFastPathStrategyTest.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.service.accord.fastpath;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
@@ -27,6 +26,7 @@ import org.junit.Assert;
import org.junit.Test;
import accord.local.Node;
+import accord.utils.SortedArrays.SortedArrayList;
import org.apache.cassandra.exceptions.ConfigurationException;
import
org.apache.cassandra.service.accord.fastpath.ParameterizedFastPathStrategy.WeightedDc;
@@ -39,7 +39,7 @@ import static org.junit.Assert.assertEquals;
public class ParameterizedFastPathStrategyTest
{
- private static final List<Node.Id> NODES = idList(1, 2, 3, 4, 5, 6);
+ private static final SortedArrayList<Node.Id> NODES = idList(1, 2, 3, 4,
5, 6);
private static final Map<Node.Id, String> DCS_2;
private static final Map<Node.Id, String> DCS_3;
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java
index d5da34c8fb..d966757e95 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CheckStatusSerializersTest.java
@@ -93,7 +93,7 @@ public class CheckStatusSerializersTest
List<AccordRoutingKey> forOrdering =
Gens.lists(keyGen).unique().ofSizeBetween(1, 10).next(rs);
forOrdering.sort(Comparator.naturalOrder());
// TODO (coverage): don't hard code keys type
- keysOrRanges = new FullKeyRoute(homeKey,
forOrdering.contains(homeKey), forOrdering.toArray(RoutingKey[]::new));
+ keysOrRanges = new FullKeyRoute(homeKey,
forOrdering.toArray(RoutingKey[]::new));
break;
case Range:
keysOrRanges =
AccordGenerators.ranges(Murmur3Partitioner.instance).next(rs);
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 33244f045b..81462c0e65 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -183,7 +183,7 @@ public class CommandsForKeySerializerTest
else return
Command.SerializerSupport.truncatedApply(attributes(), saveStatus, executeAt,
new Writes(txnId, executeAt, txn.keys(), new TxnWrite(Collections.emptyList(),
true)), new TxnData());
case Erased:
- case ErasedOrInvalidated:
+ case ErasedOrInvalidOrVestigial:
case Invalidated:
return Command.SerializerSupport.invalidated(txnId,
Listeners.Immutable.EMPTY);
}
diff --git a/test/unit/org/apache/cassandra/utils/RangeTreeTest.java
b/test/unit/org/apache/cassandra/utils/RangeTreeTest.java
index 6ef5325d1d..0f0208102c 100644
--- a/test/unit/org/apache/cassandra/utils/RangeTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/RangeTreeTest.java
@@ -544,7 +544,7 @@ public class RangeTreeTest
{
List<Map.Entry<Range, Integer>> matches = new ArrayList<>();
// find ranges, then add the values
- list.forEach(range, (a, b, c, d, idx) -> {
+ list.forEachRange(range, (a, b, c, d, idx) -> {
Range match = ranges[idx];
map.get(match).forEachInt(v -> matches.add(Map.entry(match,
v)));
}, (a, b, c, d, start, end) -> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]