This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 4827516764 CEP-15: (C*) Add notion of CommandsForRanges and make this
durable in C*
4827516764 is described below
commit 4827516764152441677032e6ef0b025b7912e111
Author: David Capwell <[email protected]>
AuthorDate: Thu May 25 13:20:39 2023 -0700
CEP-15: (C*) Add notion of CommandsForRanges and make this durable in C*
patch by David Capwell; reviewed by Ariel Weisberg for CASSANDRA-18519
---
modules/accord | 2 +-
src/java/org/apache/cassandra/dht/Token.java | 11 +
.../service/accord/AccordCommandStore.java | 134 +++++-
.../service/accord/AccordCommandStores.java | 22 +
.../service/accord/AccordConfigurationService.java | 13 +-
...AccordVerbHandler.java => AccordDataStore.java} | 35 +-
.../cassandra/service/accord/AccordKeyspace.java | 337 +++++++++++++-
.../service/accord/AccordObjectSizes.java | 3 +
.../service/accord/AccordSafeCommandStore.java | 152 ++++--
.../cassandra/service/accord/AccordService.java | 8 +-
.../cassandra/service/accord/AccordStateCache.java | 8 +
.../service/accord/AccordVerbHandler.java | 15 +-
.../service/accord/CommandsForRanges.java | 508 +++++++++++++++++++++
.../service/accord/api/AccordRoutingKey.java | 17 +-
.../service/accord/async/AsyncLoader.java | 113 ++++-
.../service/accord/async/AsyncOperation.java | 20 +-
.../serializers/CommandsForKeySerializer.java | 2 +-
.../accord/serializers/ListenerSerializers.java | 35 +-
.../org/apache/cassandra/utils/IntervalTree.java | 89 +++-
.../distributed/impl/AbstractCluster.java | 7 +-
.../distributed/test/accord/AccordCQLTest.java | 18 +-
.../distributed/test/accord/AccordTestBase.java | 39 +-
.../distributed/test/accord/NewSchemaTest.java | 85 ++++
.../service/accord/async/AsyncLoaderTest.java | 12 +-
.../serializers/CommandsForKeySerializerTest.java | 4 +-
25 files changed, 1542 insertions(+), 147 deletions(-)
diff --git a/modules/accord b/modules/accord
index b99c4671fa..3d0ff07cd5 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit b99c4671fa0b22bed7f5a37fc5acaa2d2579e5b2
+Subproject commit 3d0ff07cd5c7db43390b85afa593e6f76471d886
diff --git a/src/java/org/apache/cassandra/dht/Token.java
b/src/java/org/apache/cassandra/dht/Token.java
index bd327f96b9..00bbe66406 100644
--- a/src/java/org/apache/cassandra/dht/Token.java
+++ b/src/java/org/apache/cassandra/dht/Token.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import org.apache.cassandra.utils.bytecomparable.ByteSource;
+import org.apache.cassandra.utils.bytecomparable.ByteSourceInverse;
public abstract class Token implements RingPosition<Token>, Serializable
{
@@ -41,6 +42,16 @@ public abstract class Token implements RingPosition<Token>,
Serializable
public abstract ByteBuffer toByteArray(Token token);
public abstract Token fromByteArray(ByteBuffer bytes);
+ public byte[] toOrderedByteArray(Token token, ByteComparable.Version
version)
+ {
+ return ByteSourceInverse.readBytes(asComparableBytes(token,
version));
+ }
+
+ public Token fromOrderedByteArray(byte[] bytes, ByteComparable.Version
version)
+ {
+ return
fromComparableBytes(ByteSource.peekable(ByteSource.fixedLength(bytes)),
version);
+ }
+
/**
* Produce a byte-comparable representation of the token.
* See {@link Token#asComparableBytes}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index b3db3f3822..d2f627fb9b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -18,19 +18,29 @@
package org.apache.cassandra.service.accord;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import accord.api.Agent;
import accord.api.DataStore;
+import accord.api.Key;
import accord.api.ProgressLog;
+import accord.impl.CommandTimeseriesHolder;
import accord.impl.CommandsForKey;
import accord.local.Command;
import accord.local.CommandStore;
@@ -39,20 +49,35 @@ import accord.local.CommandStores.RangesForEpochHolder;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
-import accord.primitives.Deps;
+import accord.local.SaveStatus;
+import accord.primitives.AbstractKeys;
+import accord.primitives.AbstractRanges;
+import accord.primitives.PartialDeps;
+import accord.primitives.PartialTxn;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
import accord.primitives.RoutableKey;
+import accord.primitives.Routables;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
+import accord.utils.async.Observable;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.service.accord.async.AsyncOperation;
import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
import static
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
public class AccordCommandStore extends CommandStore
{
+ private static final Logger logger =
LoggerFactory.getLogger(AccordCommandStore.class);
+
private static long getThreadId(ExecutorService executor)
{
try
@@ -78,6 +103,7 @@ public class AccordCommandStore extends CommandStore
private AsyncOperation<?> currentOperation = null;
private AccordSafeCommandStore current = null;
private long lastSystemTimestampMicros = Long.MIN_VALUE;
+ private CommandsForRanges commandsForRanges = new CommandsForRanges();
public AccordCommandStore(int id,
NodeTimeService time,
@@ -94,17 +120,67 @@ public class AccordCommandStore extends CommandStore
this.commandCache = stateCache.instance(TxnId.class,
accord.local.Command.class, AccordSafeCommand::new, AccordObjectSizes::command);
this.commandsForKeyCache = stateCache.instance(RoutableKey.class,
CommandsForKey.class, AccordSafeCommandsForKey::new,
AccordObjectSizes::commandsForKey);
executor.execute(() -> CommandStore.register(this));
+ executor.execute(this::loadRangesToCommands);
}
- @Override
- public boolean inStore()
+ private void loadRangesToCommands()
{
- return Thread.currentThread().getId() == threadId;
+ AsyncPromise<CommandsForRanges> future = new AsyncPromise<>();
+ AccordKeyspace.findAllCommandsByDomain(id, Routable.Domain.Range,
ImmutableSet.of("txn_id", "status", "txn", "execute_at", "dependencies"), new
Observable<UntypedResultSet.Row>()
+ {
+ private CommandsForRanges.Builder builder = new
CommandsForRanges.Builder();
+ @Override
+ public void onNext(UntypedResultSet.Row row) throws Exception
+ {
+ TxnId txnId = AccordKeyspace.deserializeTxnId(row);
+ SaveStatus status = AccordKeyspace.deserializeStatus(row);
+ Timestamp executeAt = AccordKeyspace.deserializeExecuteAt(row);
+
+ PartialTxn txn = AccordKeyspace.deserializeTxn(row);
+ Seekables<?, ?> keys = txn.keys();
+ if (keys.domain() != Routable.Domain.Range)
+ throw new AssertionError(String.format("Txn keys are not
range", txn));
+ Ranges ranges = (Ranges) keys;
+
+ PartialDeps deps = AccordKeyspace.deserializeDependencies(row);
+ List<TxnId> dependsOn = deps == null ? Collections.emptyList()
: deps.txnIds();
+ builder.put(txnId, ranges, status, executeAt, dependsOn);
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ builder = null;
+ future.tryFailure(t);
+ }
+
+ @Override
+ public void onCompleted()
+ {
+ CommandsForRanges result = this.builder.build();
+ builder = null;
+ future.trySuccess(result);
+ }
+ });
+ try
+ {
+ commandsForRanges = future.get();
+ logger.debug("Loaded {} intervals", commandsForRanges.size());
+ }
+ catch (InterruptedException e)
+ {
+ throw new UncheckedInterruptedException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
}
@Override
- protected void registerHistoricalTransactions(Deps deps)
+ public boolean inStore()
{
+ return Thread.currentThread().getId() == threadId;
}
public void setCacheSize(long bytes)
@@ -234,7 +310,7 @@ public class AccordCommandStore extends CommandStore
public AccordSafeCommandStore beginOperation(PreLoadContext preLoadContext,
Map<TxnId, AccordSafeCommand>
commands,
- Map<RoutableKey,
AccordSafeCommandsForKey> commandsForKeys)
+ NavigableMap<RoutableKey,
AccordSafeCommandsForKey> commandsForKeys)
{
Invariants.checkState(current == null);
commands.values().forEach(AccordSafeState::preExecute);
@@ -252,6 +328,52 @@ public class AccordCommandStore extends CommandStore
current = null;
}
+ <O> O mapReduceForRange(Routables<?, ?> keysOrRanges, Ranges slice,
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
+ {
+ keysOrRanges = keysOrRanges.slice(slice, Routables.Slice.Minimal);
+ switch (keysOrRanges.domain())
+ {
+ case Key:
+ {
+ AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>)
keysOrRanges;
+ for (CommandTimeseriesHolder summary :
commandsForRanges.search(keys))
+ {
+ accumulate = map.apply(summary, accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ break;
+ case Range:
+ {
+ AbstractRanges<?> ranges = (AbstractRanges<?>) keysOrRanges;
+ for (Range range : ranges)
+ {
+ CommandTimeseriesHolder summary =
commandsForRanges.search(range);
+ if (summary == null)
+ continue;
+ accumulate = map.apply(summary, accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ break;
+ default:
+ throw new AssertionError("Unknown domain: " +
keysOrRanges.domain());
+ }
+ return accumulate;
+ }
+
+ CommandsForRanges commandsForRanges()
+ {
+ return commandsForRanges;
+ }
+
+ CommandsForRanges.Updater updateRanges()
+ {
+ return commandsForRanges.update();
+ }
+
public void abortCurrentOperation()
{
current = null;
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 3bad135715..38040b466b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -29,12 +29,14 @@ import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
import accord.local.ShardDistributor;
+import accord.primitives.Range;
import accord.primitives.Routables;
import accord.topology.Topology;
import accord.utils.MapReduceConsume;
import accord.utils.RandomSource;
import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.journal.AsyncWriteCallback;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
public class AccordCommandStores extends CommandStores
{
@@ -96,6 +98,26 @@ public class AccordCommandStores extends CommandStores
});
}
+ @Override
+ protected boolean shouldBootstrap(Node node, Topology previous, Topology
updated, Range range)
+ {
+ if (!super.shouldBootstrap(node, previous, updated, range))
+ return false;
+ // we see new ranges when a new keyspace is added, so avoid bootstrap
in these cases
+ return contains(previous, ((AccordRoutingKey)
range.start()).keyspace());
+ }
+
+ private static boolean contains(Topology previous, String searchKeyspace)
+ {
+ for (Range range : previous.ranges())
+ {
+ String keyspace = ((AccordRoutingKey) range.start()).keyspace();
+ if (keyspace.equals(searchKeyspace))
+ return true;
+ }
+ return false;
+ }
+
private long cacheSize;
synchronized void setCacheSize(long bytes)
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index e8f1033664..3ab64c9bd0 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -20,8 +20,9 @@ package org.apache.cassandra.service.accord;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
-import com.google.common.base.Preconditions;
+import javax.annotation.concurrent.GuardedBy;
import accord.api.ConfigurationService;
import accord.local.Node;
@@ -33,7 +34,8 @@ import accord.topology.Topology;
public class AccordConfigurationService implements ConfigurationService
{
private final Node.Id localId;
- private final List<Listener> listeners = new ArrayList<>();
+ private final List<Listener> listeners = new CopyOnWriteArrayList<>();
+ @GuardedBy("this")
private final List<Topology> epochs = new ArrayList<>();
public AccordConfigurationService(Node.Id localId)
@@ -43,7 +45,7 @@ public class AccordConfigurationService implements
ConfigurationService
}
@Override
- public synchronized void registerListener(Listener listener)
+ public void registerListener(Listener listener)
{
listeners.add(listener);
}
@@ -55,7 +57,7 @@ public class AccordConfigurationService implements
ConfigurationService
}
@Override
- public Topology getTopologyForEpoch(long epoch)
+ public synchronized Topology getTopologyForEpoch(long epoch)
{
return epochs.get((int) epoch);
}
@@ -64,7 +66,8 @@ public class AccordConfigurationService implements
ConfigurationService
public synchronized void fetchTopologyForEpoch(long epoch)
{
Topology current = currentTopology();
- Preconditions.checkArgument(epoch > current.epoch(), "Requested to
fetch epoch %d which is <= %d (current epoch)", epoch, current.epoch());
+ if (epoch < current.epoch())
+ return;
while (current.epoch() < epoch)
{
current = AccordTopologyUtils.createTopology(epochs.size());
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
similarity index 53%
copy from src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
copy to src/java/org/apache/cassandra/service/accord/AccordDataStore.java
index 216d64df42..b1f191a396 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordDataStore.java
@@ -18,31 +18,30 @@
package org.apache.cassandra.service.accord;
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import accord.api.DataStore;
import accord.local.Node;
-import accord.messages.Request;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
+import accord.local.SafeCommandStore;
+import accord.primitives.Ranges;
+import accord.primitives.SyncPoint;
+import accord.primitives.Timestamp;
+import accord.utils.async.AsyncResults;
-public class AccordVerbHandler<T extends Request> implements IVerbHandler<T>
+public enum AccordDataStore implements DataStore
{
- private static final Logger logger =
LoggerFactory.getLogger(AccordVerbHandler.class);
-
- private final Node node;
+ INSTANCE;
- public AccordVerbHandler(Node node)
+ @Override
+ public FetchResult fetch(Node node, SafeCommandStore safeStore, Ranges
ranges, SyncPoint syncPoint, FetchRanges callback)
{
- this.node = node;
+ //TODO (implement): do real work
+ callback.starting(ranges).started(Timestamp.NONE);
+ callback.fetched(ranges);
+ return new ImmediateFetchFuture(ranges);
}
- @Override
- public void doVerb(Message<T> message) throws IOException
+ private static class ImmediateFetchFuture extends
AsyncResults.SettableResult<Ranges> implements FetchResult
{
- logger.debug("Receiving {} from {}", message.payload, message.from());
- message.payload.process(node, EndpointMapping.getId(message.from()),
message);
+ ImmediateFetchFuture(Ranges ranges) { setSuccess(ranges); }
+ @Override public void abort(Ranges abort) { }
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index cdf257f61f..7cab857ff6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
@@ -29,10 +30,12 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.ImmutableSortedSet;
import com.google.common.collect.Lists;
@@ -42,7 +45,7 @@ import org.slf4j.LoggerFactory;
import accord.api.Result;
import accord.impl.CommandsForKey;
-import accord.impl.CommandsForKey.CommandTimeseries;
+import accord.impl.CommandTimeseries;
import accord.local.Command;
import accord.local.CommandStore;
import accord.local.CommonAttributes;
@@ -53,11 +56,16 @@ import accord.local.Status;
import accord.primitives.Ballot;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
+import accord.primitives.Routable;
import accord.primitives.Route;
import accord.primitives.Timestamp;
+import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Writes;
import accord.utils.Invariants;
+import accord.utils.async.Observable;
+import org.apache.cassandra.concurrent.DebuggableTask;
+import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
@@ -76,11 +84,14 @@ import
org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.marshal.ByteArrayAccessor;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.BTreeRow;
@@ -89,6 +100,10 @@ import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.LocalVersionedSerializer;
import org.apache.cassandra.io.util.DataInputBuffer;
@@ -96,6 +111,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.Tables;
@@ -112,6 +128,7 @@ import
org.apache.cassandra.service.accord.serializers.KeySerializers;
import org.apache.cassandra.service.accord.serializers.ListenerSerializers;
import org.apache.cassandra.service.accord.txn.TxnData;
import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.bytecomparable.ByteComparable;
import static java.lang.String.format;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
@@ -128,9 +145,10 @@ public class AccordKeyspace
public static final String COMMANDS = "commands";
public static final String COMMANDS_FOR_KEY = "commands_for_key";
- private static final String TIMESTAMP_TUPLE = "tuple<bigint, bigint, int>";
private static final TupleType TIMESTAMP_TYPE = new
TupleType(Lists.newArrayList(LongType.instance, LongType.instance,
Int32Type.instance));
- private static final String KEY_TUPLE = "tuple<uuid, blob>";
+ private static final String TIMESTAMP_TUPLE =
TIMESTAMP_TYPE.asCQL3Type().toString();
+ private static final TupleType KEY_TYPE = new
TupleType(Arrays.asList(UUIDType.instance, BytesType.instance));
+ private static final String KEY_TUPLE = KEY_TYPE.asCQL3Type().toString();
private static final ClusteringIndexFilter FULL_PARTITION = new
ClusteringIndexSliceFilter(Slices.ALL, false);
@@ -156,12 +174,36 @@ public class AccordKeyspace
}
}
+ private enum TokenType
+ {
+ Murmur3((byte) 1),
+ ByteOrdered((byte) 2),
+ ;
+
+ private final byte value;
+
+ TokenType(byte b)
+ {
+ this.value = b;
+ }
+
+ static TokenType valueOf(Token token)
+ {
+ if (token instanceof Murmur3Partitioner.LongToken)
+ return Murmur3;
+ if (token instanceof ByteOrderedPartitioner.BytesToken)
+ return ByteOrdered;
+ throw new IllegalArgumentException("Unexpected token type: " +
token.getClass());
+ }
+ }
+
// TODO: store timestamps as blobs (confirm there are no negative numbers,
or offset)
private static final TableMetadata Commands =
parse(COMMANDS,
"accord commands",
"CREATE TABLE %s ("
+ "store_id int,"
+ + "domain int," // this is stored as part of txn_id, used
currently for more cheaper scans of the table
+ format("txn_id %s,", TIMESTAMP_TUPLE)
+ "status int,"
+ "home_key blob,"
@@ -178,8 +220,10 @@ public class AccordKeyspace
+ format("waiting_on_commit set<%s>,", TIMESTAMP_TUPLE)
+ format("waiting_on_apply map<%s, blob>,", TIMESTAMP_TUPLE)
+ "listeners set<blob>, "
- + "PRIMARY KEY((store_id, txn_id))"
- + ')');
+ + "PRIMARY KEY((store_id, domain, txn_id))"
+ + ')')
+ .partitioner(new
LocalPartitioner(CompositeType.getInstance(Int32Type.instance,
Int32Type.instance, TIMESTAMP_TYPE)))
+ .build();
// TODO: naming is not very clearly distinct from the base serializers
private static class CommandsSerializers
@@ -231,6 +275,7 @@ public class AccordKeyspace
"accord commands per key",
"CREATE TABLE %s ("
+ "store_id int, "
+ + "key_token blob, " // can't use "token" as this is restricted
word in CQL
+ format("key %s, ", KEY_TUPLE)
+ format("max_timestamp %s static, ", TIMESTAMP_TUPLE)
+ format("last_executed_timestamp %s static, ", TIMESTAMP_TUPLE)
@@ -240,8 +285,10 @@ public class AccordKeyspace
+ "series int, "
+ format("timestamp %s, ", TIMESTAMP_TUPLE)
+ "data blob, "
- + "PRIMARY KEY((store_id, key), series, timestamp)"
- + ')');
+ + "PRIMARY KEY((store_id, key_token, key), series, timestamp)"
+ + ')')
+ .partitioner(new
LocalPartitioner(CompositeType.getInstance(Int32Type.instance,
BytesType.instance, KEY_TYPE)))
+ .build();
private static class CommandsForKeyColumns
{
@@ -292,13 +339,12 @@ public class AccordKeyspace
}
}
- private static TableMetadata parse(String name, String description, String
cql)
+ private static TableMetadata.Builder parse(String name, String
description, String cql)
{
return CreateTableStatement.parse(format(cql, name),
ACCORD_KEYSPACE_NAME)
.id(TableId.forSystemTable(ACCORD_KEYSPACE_NAME, name))
.comment(description)
- .gcGraceSeconds((int)
TimeUnit.DAYS.toSeconds(90))
- .build();
+ .gcGraceSeconds((int)
TimeUnit.DAYS.toSeconds(90));
}
public static KeyspaceMetadata metadata()
@@ -338,7 +384,7 @@ public class AccordKeyspace
private static <T> T deserializeOrNull(ByteBuffer bytes,
LocalVersionedSerializer<T> serializer) throws IOException
{
- return bytes != null && ! ByteBufferAccessor.instance.isEmpty(bytes) ?
deserialize(bytes, serializer) : null;
+ return bytes != null && !ByteBufferAccessor.instance.isEmpty(bytes) ?
deserialize(bytes, serializer) : null;
}
private static ImmutableSortedMap<Timestamp, TxnId>
deserializeWaitingOnApply(Map<ByteBuffer, ByteBuffer> serialized)
@@ -524,6 +570,7 @@ public class AccordKeyspace
}
ByteBuffer key =
CommandsColumns.keyComparator.make(commandStore.id(),
+
command.txnId().domain().ordinal(),
serializeTimestamp(command.txnId())).serializeAsPartitionKey();
Row row = builder.build();
if (row.isEmpty())
@@ -537,6 +584,20 @@ public class AccordKeyspace
}
}
+ public static ByteBuffer serializeToken(Token token)
+ {
+ return serializeToken(token, ByteBufferAccessor.instance);
+ }
+
+ private static <V> V serializeToken(Token token, ValueAccessor<V> accessor)
+ {
+ TokenType type = TokenType.valueOf(token);
+ byte[] ordered =
token.getPartitioner().getTokenFactory().toOrderedByteArray(token,
ByteComparable.Version.OSS42);
+ V value = accessor.allocate(ordered.length + 1);
+ accessor.putByte(value, 0, type.value);
+ ByteArrayAccessor.instance.copyTo(ordered, 0, value, accessor, 1,
ordered.length);
+ return value;
+ }
private static ByteBuffer serializeKey(PartitionKey key)
{
@@ -595,13 +656,218 @@ public class AccordKeyspace
{
String cql = "SELECT * FROM %s.%s " +
"WHERE store_id = ? " +
+ "AND domain = ? " +
"AND txn_id=(?, ?, ?)";
return executeInternal(String.format(cql, ACCORD_KEYSPACE_NAME,
COMMANDS),
commandStore.id(),
+ txnId.domain().ordinal(),
txnId.msb, txnId.lsb, txnId.node.id);
}
+ public static void findAllCommandsByDomain(int commandStore,
Routable.Domain domain, Set<String> columns, Observable<UntypedResultSet.Row>
callback)
+ {
+ WalkCommandsForDomain work = new WalkCommandsForDomain(commandStore,
domain, columns, Stage.READ.executor(), callback);
+ work.schedule();
+ }
+
+ private static abstract class TableWalk implements Runnable, DebuggableTask
+ {
+ private final long creationTimeNanos = Clock.Global.nanoTime();
+ private final Executor executor;
+ private final Observable<UntypedResultSet.Row> callback;
+ private long startTimeNanos = -1;
+ private int numQueries = 0;
+ private UntypedResultSet.Row lastSeen = null;
+
+ private TableWalk(Executor executor, Observable<UntypedResultSet.Row>
callback)
+ {
+ this.executor = executor;
+ this.callback = callback;
+ }
+
+ protected abstract UntypedResultSet query(UntypedResultSet.Row
lastSeen);
+
+ public final void schedule()
+ {
+ executor.execute(this);
+ }
+
+ @Override
+ public final void run()
+ {
+ try
+ {
+ if (startTimeNanos == -1)
+ startTimeNanos = Clock.Global.nanoTime();
+ numQueries++;
+ UntypedResultSet result = query(lastSeen);
+ if (result.isEmpty())
+ {
+ callback.onCompleted();
+ return;
+ }
+ UntypedResultSet.Row lastRow = null;
+ for (UntypedResultSet.Row row : result)
+ {
+ callback.onNext(row);
+ lastRow = row;
+ }
+ lastSeen = lastRow;
+ schedule();
+ }
+ catch (Throwable t)
+ {
+ callback.onError(t);
+ }
+ }
+
+ @Override
+ public long creationTimeNanos()
+ {
+ return creationTimeNanos;
+ }
+
+ @Override
+ public long startTimeNanos()
+ {
+ return startTimeNanos;
+ }
+
+ @Override
+ public String description()
+ {
+ return String.format("Table Walker for %s; queries = %d",
getClass().getSimpleName(), numQueries);
+ }
+ }
+
+ private static String selection(TableMetadata metadata, Set<String>
requiredColumns, Set<String> forIteration)
+ {
+ StringBuilder selection = new StringBuilder();
+ if (requiredColumns.isEmpty())
+ selection.append("*");
+ else
+ {
+ Sets.SetView<String> other = Sets.difference(requiredColumns,
forIteration);
+ for (String name : other)
+ {
+ ColumnMetadata meta = metadata.getColumn(new
ColumnIdentifier(name, true));
+ if (meta == null)
+ throw new IllegalArgumentException("Unknown column: " +
name);
+ }
+ List<String> names = new ArrayList<>(forIteration.size() +
other.size());
+ names.addAll(forIteration);
+ names.addAll(other);
+ // this sort is to make sure the CQL is determanistic
+ Collections.sort(names);
+ for (int i = 0; i < names.size(); i++)
+ {
+ if (i > 0)
+ selection.append(", ");
+ selection.append(names.get(i));
+ }
+ }
+ return selection.toString();
+ }
+
+ private static class WalkCommandsForDomain extends TableWalk
+ {
+ private static final Set<String> COLUMNS_FOR_ITERATION =
ImmutableSet.of("txn_id", "store_id", "domain");
+ private final String cql;
+ private final int storeId, domain;
+
+ private WalkCommandsForDomain(int commandStore, Routable.Domain
domain, Set<String> requiredColumns, Executor executor,
Observable<UntypedResultSet.Row> callback)
+ {
+ super(executor, callback);
+ this.storeId = commandStore;
+ this.domain = domain.ordinal();
+ cql = String.format("SELECT %s " +
+ "FROM %s " +
+ "WHERE store_id = ? " +
+ " AND domain = ? " +
+ " AND token(store_id, domain, txn_id) >
token(?, ?, (?, ?, ?)) " +
+ "ALLOW FILTERING", selection(Commands,
requiredColumns, COLUMNS_FOR_ITERATION), Commands);
+ }
+
+ @Override
+ protected UntypedResultSet query(UntypedResultSet.Row lastSeen)
+ {
+ TxnId lastTxnId = lastSeen == null ?
+ new TxnId(0, 0, Txn.Kind.Read,
Routable.Domain.Key, Node.Id.NONE)
+ : deserializeTxnId(lastSeen);
+ return executeInternal(cql, storeId, domain, storeId, domain,
lastTxnId.msb, lastTxnId.lsb, lastTxnId.node.id);
+ }
+ }
+
+ public static void findAllKeysBetween(int commandStore,
+ Token start, boolean startInclusive,
+ Token end, boolean endInclusive,
+ Observable<PartitionKey> callback)
+ {
+ //TODO (optimize) : CQL doesn't look smart enough to only walk
Index.db, and ends up walking the Data.db file for each row in the partitions
found (for frequent keys, this cost adds up)
+ // it would be possible to find all SSTables that "could" intersect
this range, then have a merge iterator over the Index.db (filtered to the
range; index stores partition liveness)...
+ KeysBetween work = new KeysBetween(commandStore,
+
AccordKeyspace.serializeToken(start), startInclusive,
+ AccordKeyspace.serializeToken(end),
endInclusive,
+ ImmutableSet.of("key"),
+ Stage.READ.executor(),
Observable.distinct(callback).map(value ->
AccordKeyspace.deserializeKey(value)));
+ work.schedule();
+ }
+
+ private static class KeysBetween extends TableWalk
+ {
+ private static final Set<String> COLUMNS_FOR_ITERATION =
ImmutableSet.of("store_id", "key_token");
+
+ private final int storeId;
+ private final ByteBuffer start, end;
+ private final String cqlFirst;
+ private final String cqlContinue;
+
+ private KeysBetween(int storeId,
+ ByteBuffer start, boolean startInclusive,
+ ByteBuffer end, boolean endInclusive,
+ Set<String> requiredColumns,
+ Executor executor,
Observable<UntypedResultSet.Row> callback)
+ {
+ super(executor, callback);
+ this.storeId = storeId;
+ this.start = start;
+ this.end = end;
+
+ String selection = selection(CommandsForKeys, requiredColumns,
COLUMNS_FOR_ITERATION);
+ this.cqlFirst = String.format("SELECT DISTINCT %s\n" +
+ "FROM %s\n" +
+ "WHERE store_id = ?\n" +
+ (startInclusive ? " AND key_token
>= ?\n" : " AND key_token > ?\n") +
+ (endInclusive ? " AND key_token <=
?\n" : " AND key_token < ?\n") +
+ "ALLOW FILTERING",
+ selection, CommandsForKeys);
+ this.cqlContinue = String.format("SELECT DISTINCT %s\n" +
+ "FROM %s\n" +
+ "WHERE store_id = ?\n" +
+ " AND key_token > ?\n" +
+ " AND key > ?\n" +
+ (endInclusive ? " AND key_token
<= ?\n" : " AND key_token < ?\n") +
+ "ALLOW FILTERING",
+ selection, CommandsForKeys);
+ }
+
+ @Override
+ protected UntypedResultSet query(UntypedResultSet.Row lastSeen)
+ {
+ if (lastSeen == null)
+ {
+ return executeInternal(cqlFirst, storeId, start, end);
+ }
+ else
+ {
+ ByteBuffer previousToken = lastSeen.getBytes("key_token");
+ ByteBuffer previousKey = lastSeen.getBytes("key");
+ return executeInternal(cqlContinue, storeId, previousToken,
previousKey, end);
+ }
+ }
+ }
+
public static Command loadCommand(AccordCommandStore commandStore, TxnId
txnId)
{
commandStore.checkNotInStoreThread();
@@ -616,19 +882,19 @@ public class AccordKeyspace
try
{
UntypedResultSet.Row row = rows.one();
- Invariants.checkState(deserializeTimestampOrNull(row, "txn_id",
TxnId::fromBits).equals(txnId));
- SaveStatus status = SaveStatus.values()[row.getInt("status")];
+ Invariants.checkState(deserializeTxnId(row).equals(txnId));
+ SaveStatus status = deserializeStatus(row);
CommonAttributes.Mutable attributes = new
CommonAttributes.Mutable(txnId);
// TODO: something less brittle than ordinal, more efficient than
values()
attributes.durability(Status.Durability.values()[row.getInt("durability", 0)]);
attributes.homeKey(deserializeOrNull(row.getBlob("home_key"),
CommandsSerializers.routingKey));
attributes.progressKey(deserializeOrNull(row.getBlob("progress_key"),
CommandsSerializers.routingKey));
attributes.route(deserializeOrNull(row.getBlob("route"),
CommandsSerializers.route));
- attributes.partialTxn(deserializeOrNull(row.getBlob("txn"),
CommandsSerializers.partialTxn));
-
attributes.partialDeps(deserializeOrNull(row.getBlob("dependencies"),
CommandsSerializers.partialDeps));
+ attributes.partialTxn(deserializeTxn(row));
+ attributes.partialDeps(deserializeDependencies(row));
attributes.setListeners(deserializeListeners(row, "listeners"));
- Timestamp executeAt = deserializeTimestampOrNull(row,
"execute_at", Timestamp::fromBits);
+ Timestamp executeAt = deserializeExecuteAt(row);
Ballot promised = deserializeTimestampOrNull(row,
"promised_ballot", Ballot::fromBits);
Ballot accepted = deserializeTimestampOrNull(row,
"accepted_ballot", Ballot::fromBits);
ImmutableSortedSet<TxnId> waitingOnCommit =
deserializeTxnIdNavigableSet(row, "waiting_on_commit");
@@ -669,6 +935,43 @@ public class AccordKeyspace
}
}
+ public static PartialDeps deserializeDependencies(UntypedResultSet.Row
row) throws IOException
+ {
+ return deserializeOrNull(row.getBlob("dependencies"),
CommandsSerializers.partialDeps);
+ }
+
+ public static Timestamp deserializeExecuteAt(UntypedResultSet.Row row)
+ {
+ return deserializeTimestampOrNull(row, "execute_at",
Timestamp::fromBits);
+ }
+
+ public static SaveStatus deserializeStatus(UntypedResultSet.Row row)
+ {
+ return SaveStatus.values()[row.getInt("status")];
+ }
+
+ public static TxnId deserializeTxnId(UntypedResultSet.Row row)
+ {
+ return deserializeTimestampOrNull(row, "txn_id", TxnId::fromBits);
+ }
+
+ public static PartialTxn deserializeTxn(UntypedResultSet.Row row) throws
IOException
+ {
+ return deserializeOrNull(row.getBlob("txn"),
CommandsSerializers.partialTxn);
+ }
+
+ public static PartitionKey deserializeKey(UntypedResultSet.Row row)
+ {
+ ByteBuffer[] split = KEY_TYPE.split(ByteBufferAccessor.instance,
row.getBytes("key"));
+ TableId tableId =
TableId.fromUUID(UUIDSerializer.instance.deserialize(split[0]));
+ ByteBuffer key = split[1];
+
+ TableMetadata metadata = Schema.instance.getTableMetadata(tableId);
+ if (metadata == null)
+ throw new IllegalStateException("Table with id " + tableId + "
could not be found; was it deleted?");
+ return new PartitionKey(metadata.keyspace, tableId,
metadata.partitioner.decorateKey(key));
+ }
+
private static void addSeriesMutations(ImmutableSortedMap<Timestamp,
ByteBuffer> prev,
ImmutableSortedMap<Timestamp,
ByteBuffer> value,
SeriesKind kind,
@@ -713,7 +1016,9 @@ public class AccordKeyspace
private static DecoratedKey makeKey(CommandStore commandStore,
PartitionKey key)
{
+ Token token = key.token();
ByteBuffer pk =
CommandsForKeyColumns.keyComparator.make(commandStore.id(),
+
serializeToken(token),
serializeKey(key)).serializeAsPartitionKey();
return CommandsForKeys.partitioner.decorateKey(pk);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index c79d1ba5b9..eac47bfe4e 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -245,12 +245,15 @@ public class AccordObjectSizes
private static final long EMPTY_COMMAND_LISTENER = measure(new
Command.ProxyListener(null));
private static final long EMPTY_CFK_LISTENER = measure(new
CommandsForKey.Listener((Key) null));
+ private static final long EMPTY_CFR_LISTENER = measure(new
CommandsForRanges.Listener(null));
public static long listener(Command.DurableAndIdempotentListener listener)
{
if (listener instanceof Command.ProxyListener)
return EMPTY_COMMAND_LISTENER + timestamp(((Command.ProxyListener)
listener).txnId());
if (listener instanceof CommandsForKey.Listener)
return EMPTY_CFK_LISTENER + key(((CommandsForKey.Listener)
listener).key());
+ if (listener instanceof CommandsForRanges.Listener)
+ return EMPTY_CFR_LISTENER +
timestamp(((CommandsForRanges.Listener) listener).txnId);
throw new IllegalArgumentException("Unhandled listener type: " +
listener.getClass());
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index e1bf80895b..5c6ac654f6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -18,9 +18,8 @@
package org.apache.cassandra.service.accord;
-import java.util.Comparator;
import java.util.Map;
-import java.util.Objects;
+import java.util.NavigableMap;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
@@ -30,15 +29,20 @@ import accord.api.DataStore;
import accord.api.Key;
import accord.api.ProgressLog;
import accord.impl.AbstractSafeCommandStore;
+import accord.impl.CommandTimeseries;
+import accord.impl.CommandTimeseries.CommandLoader;
+import accord.impl.CommandTimeseriesHolder;
import accord.impl.CommandsForKey;
import accord.impl.SafeCommandsForKey;
+import accord.local.CommandStores;
import accord.local.CommandStores.RangesForEpoch;
import accord.local.CommonAttributes;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.Status;
import accord.primitives.AbstractKeys;
-import accord.primitives.Keys;
+import accord.primitives.Deps;
+import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.RoutableKey;
import accord.primitives.Routables;
@@ -51,12 +55,13 @@ import
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeCommand, AccordSafeCommandsForKey>
{
private final Map<TxnId, AccordSafeCommand> commands;
- private final Map<RoutableKey, AccordSafeCommandsForKey> commandsForKeys;
+ private final NavigableMap<RoutableKey, AccordSafeCommandsForKey>
commandsForKeys;
private final AccordCommandStore commandStore;
+ CommandsForRanges.Updater rangeUpdates = null;
public AccordSafeCommandStore(PreLoadContext context,
Map<TxnId, AccordSafeCommand> commands,
- Map<RoutableKey, AccordSafeCommandsForKey>
commandsForKey,
+ NavigableMap<RoutableKey,
AccordSafeCommandsForKey> commandsForKey,
AccordCommandStore commandStore)
{
super(context);
@@ -146,24 +151,59 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
@Override
public Timestamp maxConflict(Seekables<?, ?> keysOrRanges, Ranges slice)
{
- // TODO: Seekables
- // TODO: efficiency
- return ((Keys)keysOrRanges).stream()
- .map(this::maybeCommandsForKey)
- .filter(Objects::nonNull)
- .map(SafeCommandsForKey::current)
- .filter(Objects::nonNull)
- .map(CommandsForKey::max)
- .max(Comparator.naturalOrder())
- .orElse(Timestamp.NONE);
+ return mapReduce(keysOrRanges, slice, (ts, accum) ->
Timestamp.max(ts.max(), accum), Timestamp.NONE, null);
}
- private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice,
BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
+ @Override
+ public void registerHistoricalTransactions(Deps deps)
+ {
+ // used in places such as accord.local.CommandStore.fetchMajorityDeps
+ // We find a set of dependencies for a range then update CommandsFor
to know about them
+ CommandStores.RangesForEpochHolder rangesForEpochHolder =
commandStore.rangesForEpochHolder();
+ Ranges allRanges = rangesForEpochHolder.get().all();
+ deps.keyDeps.keys().forEach(allRanges, key -> {
+ SafeCommandsForKey cfk = commandsForKey(key);
+ deps.keyDeps.forEach(key, txnId -> {
+ // TODO (desired, efficiency): this can be made more efficient
by batching by epoch
+ if
(rangesForEpochHolder.get().coordinates(txnId).contains(key))
+ return; // already coordinates, no need to replicate
+ if
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).contains(key))
+ return;
+
+ cfk.registerNotWitnessed(txnId);
+ });
+ });
+ CommandsForRanges commandsForRanges = commandStore.commandsForRanges();
+ deps.rangeDeps.forEachUniqueTxnId(allRanges, txnId -> {
+ if (commandsForRanges.containsLocally(txnId))
+ return;
+
+ Ranges ranges = deps.rangeDeps.ranges(txnId);
+ if
(rangesForEpochHolder.get().coordinates(txnId).intersects(ranges))
+ return; // already coordinates, no need to replicate
+ if
(!rangesForEpochHolder.get().allBefore(txnId.epoch()).intersects(ranges))
+ return;
+
+ updateRanges().mergeRemote(txnId, ranges.slice(allRanges),
Ranges::with);
+ });
+ }
+
+ private <O> O mapReduce(Routables<?, ?> keysOrRanges, Ranges slice,
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
+ {
+ accumulate = commandStore.mapReduceForRange(keysOrRanges, slice, map,
accumulate, terminalValue);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ return mapReduceForKey(keysOrRanges, slice, map, accumulate,
terminalValue);
+ }
+
+ private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges slice,
BiFunction<CommandTimeseriesHolder, O, O> map, O accumulate, O terminalValue)
{
- switch (keysOrRanges.domain()) {
+ switch (keysOrRanges.domain())
+ {
default:
- throw new AssertionError();
+ throw new AssertionError("Unknown domain: " +
keysOrRanges.domain());
case Key:
+ {
// TODO: efficiency
AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>)
keysOrRanges;
for (Key key : keys)
@@ -174,10 +214,26 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
if (accumulate.equals(terminalValue))
return accumulate;
}
- break;
+ }
+ break;
case Range:
- // TODO (required): implement
- throw new UnsupportedOperationException();
+ {
+ // Assuming the range provided is in the PreLoadContext, then
AsyncLoader has populated commandsForKeys with keys that
+ // are contained within the ranges... so walk all keys found
in commandsForKeys
+ Routables<?, ?> sliced = keysOrRanges.slice(slice,
Routables.Slice.Minimal);
+ if (!context.keys().slice(slice,
Routables.Slice.Minimal).containsAll(sliced))
+ throw new AssertionError("Range(s) detected not present in
the PreLoadContext: expected " + context.keys() + " but given " + keysOrRanges);
+ for (RoutableKey key : commandsForKeys.keySet())
+ {
+ //TODO (duplicate code): this is a repeat of Key... only
change is checking contains in range
+ if (!sliced.contains(key)) continue;
+ SafeCommandsForKey forKey = commandsForKey(key);
+ accumulate = map.apply(forKey.current(), accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ }
+ break;
}
return accumulate;
}
@@ -185,8 +241,8 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
@Override
public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice,
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep
testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status
maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
{
- accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev) -> {
- CommandsForKey.CommandTimeseries<?> timeseries;
+ accumulate = mapReduce(keysOrRanges, slice, (forKey, prev) -> {
+ CommandTimeseries<?> timeseries;
switch (testTimestamp)
{
default: throw new AssertionError();
@@ -198,17 +254,17 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
case MAY_EXECUTE_BEFORE:
timeseries = forKey.byExecuteAt();
}
- CommandsForKey.CommandTimeseries.TestTimestamp remapTestTimestamp;
+ CommandTimeseries.TestTimestamp remapTestTimestamp;
switch (testTimestamp)
{
default: throw new AssertionError();
case STARTED_AFTER:
case EXECUTES_AFTER:
- remapTestTimestamp =
CommandsForKey.CommandTimeseries.TestTimestamp.AFTER;
+ remapTestTimestamp = CommandTimeseries.TestTimestamp.AFTER;
break;
case STARTED_BEFORE:
case MAY_EXECUTE_BEFORE:
- remapTestTimestamp =
CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE;
+ remapTestTimestamp =
CommandTimeseries.TestTimestamp.BEFORE;
}
return timeseries.mapReduce(testKind, remapTestTimestamp,
timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
}, accumulate, terminalValue);
@@ -227,16 +283,46 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
@Override
public CommonAttributes completeRegistration(Seekable seekable, Ranges
ranges, AccordSafeCommand liveCommand, CommonAttributes attrs)
{
- Key key = (Key) seekable;
- if (ranges.contains(key))
+ switch (seekable.domain())
{
- AccordSafeCommandsForKey cfk = commandsForKey(key);
- cfk.register(liveCommand.current());
- attrs = attrs.mutable().addListener(new
CommandsForKey.Listener(key));
+ case Key:
+ {
+ Key key = seekable.asKey();
+ if (ranges.contains(key))
+ {
+ AccordSafeCommandsForKey cfk = commandsForKey(key);
+ cfk.register(liveCommand.current());
+ attrs = attrs.mutable().addListener(new
CommandsForKey.Listener(key));
+ }
+ }
+ break;
+ case Range:
+ Range range = seekable.asRange();
+ if (!ranges.intersects(range))
+ return attrs;
+ // TODO (api) : cleaner way to deal with this? This is
tracked at the Ranges level and not Range level
+ // but we register at the Range level...
+ if (!attrs.durableListeners().stream().anyMatch(l -> l
instanceof CommandsForRanges.Listener))
+ {
+ CommandsForRanges.Listener listener = new
CommandsForRanges.Listener(liveCommand.txnId());
+ attrs = attrs.mutable().addListener(listener);
+ // trigger to allow it to run right away
+ listener.onChange(this, liveCommand);
+ }
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown domain: " +
seekable.domain());
}
return attrs;
}
+ protected CommandsForRanges.Updater updateRanges()
+ {
+ if (rangeUpdates == null)
+ rangeUpdates = commandStore.updateRanges();
+ return rangeUpdates;
+ }
+
@Override
protected void invalidateSafeState()
{
@@ -245,7 +331,7 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
}
@Override
- public CommandsForKey.CommandLoader<?> cfkLoader()
+ public CommandLoader<?> cfkLoader()
{
return CommandsForKeySerializer.loader;
}
@@ -256,5 +342,7 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
postExecute();
commands.values().forEach(AccordSafeState::postExecute);
commandsForKeys.values().forEach(AccordSafeState::postExecute);
+ if (rangeUpdates != null)
+ rangeUpdates.apply();
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 4de78375bd..3a979c1332 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -141,7 +141,7 @@ public class AccordService implements IAccordService,
Shutdownable
messageSink,
configService,
AccordService::uniqueNow,
- () -> null,
+ () -> AccordDataStore.INSTANCE,
new KeyspaceSplitter(new
EvenSplit<>(DatabaseDescriptor.getAccordShardCount(),
getPartitioner().accordSplitter())),
agent,
new DefaultRandom(),
@@ -302,6 +302,12 @@ public class AccordService implements IAccordService,
Shutdownable
ExecutorUtils.shutdownAndWait(timeout, unit, this);
}
+ @VisibleForTesting
+ public Node node()
+ {
+ return node;
+ }
+
private static Shutdownable toShutdownable(Node node)
{
return new Shutdownable() {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
index 2bb852eb54..1cb4fb5a45 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
@@ -25,6 +25,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.function.ToLongFunction;
+import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -352,6 +353,13 @@ public class AccordStateCache
this.heapEstimator = heapEstimator;
}
+ public Stream<Node<K, V>> stream()
+ {
+ return cache.entrySet().stream()
+ .filter(e ->
keyClass.isAssignableFrom(e.getKey().getClass()))
+ .map(e -> (Node<K, V>) e.getValue());
+ }
+
@Override
public boolean equals(Object o)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
index 216d64df42..a35737040d 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordVerbHandler.java
@@ -43,6 +43,19 @@ public class AccordVerbHandler<T extends Request> implements
IVerbHandler<T>
public void doVerb(Message<T> message) throws IOException
{
logger.debug("Receiving {} from {}", message.payload, message.from());
- message.payload.process(node, EndpointMapping.getId(message.from()),
message);
+ T request = message.payload;
+ Node.Id from = EndpointMapping.getId(message.from());
+ long knownEpoch = request.knownEpoch();
+ if (!node.topology().hasEpoch(knownEpoch))
+ {
+ node.configService().fetchTopologyForEpoch(knownEpoch);
+ long waitForEpoch = request.waitForEpoch();
+ if (!node.topology().hasEpoch(waitForEpoch))
+ {
+ node.withEpoch(waitForEpoch, () -> request.process(node, from,
message));
+ return;
+ }
+ }
+ request.process(node, from, message);
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
new file mode 100644
index 0000000000..24f595c5da
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/CommandsForRanges.java
@@ -0,0 +1,508 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableSortedMap;
+
+import accord.api.Key;
+import accord.api.RoutingKey;
+import accord.impl.CommandTimeseries;
+import accord.impl.CommandTimeseriesHolder;
+import accord.local.Command;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
+import accord.local.SaveStatus;
+import accord.primitives.AbstractKeys;
+import accord.primitives.PartialDeps;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.primitives.Routable;
+import accord.primitives.RoutableKey;
+import accord.primitives.Seekable;
+import accord.primitives.Seekables;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+import org.apache.cassandra.utils.Interval;
+import org.apache.cassandra.utils.IntervalTree;
+
+public class CommandsForRanges
+{
+ public enum TxnType
+ {
+ UNKNOWN, LOCAL, REMOTE;
+
+ private boolean isSafeToMix(TxnType other)
+ {
+ if (this == UNKNOWN || other == UNKNOWN) return true;
+ return this == other;
+ }
+ }
+
+ public static final class RangeCommandSummary
+ {
+ public final TxnId txnId;
+ public final Ranges ranges;
+ public final SaveStatus status;
+ public final @Nullable Timestamp executeAt;
+ public final List<TxnId> deps;
+
+ RangeCommandSummary(TxnId txnId, Ranges ranges, SaveStatus status,
@Nullable Timestamp executeAt, List<TxnId> deps)
+ {
+ this.txnId = txnId;
+ this.ranges = ranges;
+ this.status = status;
+ this.executeAt = executeAt;
+ this.deps = deps;
+ }
+
+ public boolean equalsDeep(RangeCommandSummary other)
+ {
+ return Objects.equals(txnId, other.txnId)
+ && Objects.equals(ranges, other.ranges)
+ && Objects.equals(status, other.status)
+ && Objects.equals(executeAt, other.executeAt)
+ && Objects.equals(deps, other.deps);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ RangeCommandSummary that = (RangeCommandSummary) o;
+ return txnId.equals(that.txnId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(txnId);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "RangeCommandSummary{" +
+ "txnId=" + txnId +
+ ", status=" + status +
+ ", ranges=" + ranges +
+ '}';
+ }
+
+ public RangeCommandSummary withRanges(Ranges ranges, BiFunction<?
super Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
+ {
+ return new RangeCommandSummary(txnId,
remappingFunction.apply(this.ranges, ranges), status, executeAt, deps);
+ }
+ }
+
+ private enum RangeCommandSummaryLoader implements
CommandTimeseries.CommandLoader<RangeCommandSummary>
+ {
+ INSTANCE;
+
+ @Override
+ public RangeCommandSummary saveForCFK(Command command)
+ {
+ //TODO split write from read?
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TxnId txnId(RangeCommandSummary data)
+ {
+ return data.txnId;
+ }
+
+ @Override
+ public Timestamp executeAt(RangeCommandSummary data)
+ {
+ return data.executeAt;
+ }
+
+ @Override
+ public SaveStatus saveStatus(RangeCommandSummary data)
+ {
+ return data.status;
+ }
+
+ @Override
+ public List<TxnId> depsIds(RangeCommandSummary data)
+ {
+ return data.deps;
+ }
+ }
+
+ public static abstract class AbstractBuilder<T extends AbstractBuilder<T>>
+ {
+ protected final Set<TxnId> localTxns = new HashSet<>();
+ protected final TreeMap<TxnId, RangeCommandSummary> txnToRange = new
TreeMap<>();
+ protected final IntervalTree.Builder<RoutableKey, RangeCommandSummary,
Interval<RoutableKey, RangeCommandSummary>> rangeToTxn = new
IntervalTree.Builder<>();
+
+ public TxnType type(TxnId txnId)
+ {
+ if (!txnToRange.containsKey(txnId)) return TxnType.UNKNOWN;
+ return localTxns.contains(txnId) ? TxnType.LOCAL : TxnType.REMOTE;
+ }
+
+ public T put(TxnId txnId, Ranges ranges, SaveStatus status, Timestamp
execteAt, List<TxnId> dependsOn)
+ {
+ remove(txnId);
+ RangeCommandSummary summary = new RangeCommandSummary(txnId,
ranges, status, execteAt, dependsOn);
+ localTxns.add(txnId);
+ txnToRange.put(txnId, summary);
+ addRanges(summary);
+ return (T) this;
+ }
+
+ private void addRanges(RangeCommandSummary summary)
+ {
+ for (Range range : summary.ranges)
+ {
+ rangeToTxn.add(Interval.create(normalize(range.start(),
range.startInclusive(), true),
+ normalize(range.end(),
range.endInclusive(), false),
+ summary));
+ }
+ }
+
+ public T putAll(CommandsForRanges other)
+ {
+ for (TxnId id : other.localCommands)
+ {
+ TxnType thisType = type(id);
+ TxnType otherType = other.type(id);
+ Invariants.checkArgument(thisType.isSafeToMix(otherType),
"Attempted to add %s; expected %s but was %s", id, thisType, otherType);
+ }
+ localTxns.addAll(other.localCommands);
+ txnToRange.putAll(other.commandsToRanges);
+ // If "put" was called before for a txn present in "other", to
respect the "put" semantics that update must
+ // be removed from "rangeToTxn" (as it got removed from
"txnToRange").
+ // The expected common case is that this method is called on an
empty builder, so the removeIf is off an
+ // empty list (aka no-op)
+ rangeToTxn.removeIf(data ->
other.commandsToRanges.containsKey(data.txnId));
+ rangeToTxn.addAll(other.rangesToCommands);
+ return (T) this;
+ }
+
+ public T mergeRemote(TxnId txnId, Ranges ranges, BiFunction<? super
Ranges, ? super Ranges, ? extends Ranges> remappingFunction)
+ {
+ // TODO (durability) : remote ranges are not made durable for now.
If this command is stored in commands table,
+ // then we have a NotWitnessed command with Ranges, which is not
expected in accord.local.Command.NotWitnessed.
+ // To properly handle this, the long term storage looks like it
will need to store these as well.
+ Invariants.checkArgument(!localTxns.contains(txnId), "Attempted to
merge remote txn %s, but this is a local txn", txnId);
+ // accord.impl.CommandTimeseries.mapReduce does the check on
status and deps type, and NotWitnessed should match the semantics hard coded in
InMemorySafeStore...
+ // in that store, the remote history is only ever included when
minStauts == null and deps == ANY... but mapReduce sees
accord.local.Status.KnownDeps.hasProposedOrDecidedDeps == false
+ // as a mis-match, so will be excluded... since NotWitnessed will
return false it will only be included IFF deps = ANY.
+ // When it comes to the minStatus check, the current usage is
"null", "Committed", "Accepted"... so NotWitnessed will only be included in the
null case;
+ // the only subtle difference is if minStatus = NotWitnessed, this
API will include these but InMemoryStore won't
+ RangeCommandSummary oldValue = txnToRange.get(txnId);
+ RangeCommandSummary newValue = oldValue == null ?
+ new RangeCommandSummary(txnId,
ranges, SaveStatus.NotWitnessed, null, Collections.emptyList())
+ : oldValue.withRanges(ranges,
remappingFunction);
+ if (newValue == null)
+ {
+ remove(txnId);
+ }
+ else if (!oldValue.equalsDeep(newValue))
+ {
+ // changes detected... have to update range index
+ rangeToTxn.removeIf(data -> data.txnId.equals(txnId));
+ addRanges(newValue);
+ }
+ return (T) this;
+ }
+
+ public T remove(TxnId txnId)
+ {
+ if (txnToRange.containsKey(txnId))
+ {
+ localTxns.remove(txnId);
+ txnToRange.remove(txnId);
+ rangeToTxn.removeIf(data -> data.txnId.equals(txnId));
+ }
+ return (T) this;
+ }
+ }
+
+ public static class Builder extends AbstractBuilder<Builder>
+ {
+ public CommandsForRanges build()
+ {
+ CommandsForRanges cfr = new CommandsForRanges();
+ cfr.set(this);
+ return cfr;
+ }
+ }
+
+ public class Updater extends AbstractBuilder<Updater>
+ {
+ private Updater()
+ {
+ putAll(CommandsForRanges.this);
+ }
+
+ public void apply()
+ {
+ CommandsForRanges.this.set(this);
+ }
+ }
+
+ public static class Listener implements
Command.DurableAndIdempotentListener
+ {
+ public final TxnId txnId;
+ private transient SaveStatus saveStatus;
+
+ public Listener(TxnId txnId)
+ {
+ this.txnId = txnId;
+ }
+
+ @Override
+ public void onChange(SafeCommandStore safeStore, SafeCommand
safeCommand)
+ {
+ Command current = safeCommand.current();
+ if (current.saveStatus() == saveStatus)
+ return;
+ saveStatus = current.saveStatus();
+ PartialDeps deps = current.partialDeps();
+ if (deps == null)
+ return;
+ Seekables<?, ?> keysOrRanges = current.partialTxn().keys();
+ Invariants.checkArgument(keysOrRanges.domain() ==
Routable.Domain.Range, "Expected txn %s to be a Range txn, but was a %s",
txnId, keysOrRanges.domain());
+
+ List<TxnId> dependsOn = deps.txnIds();
+ ((AccordSafeCommandStore) safeStore).updateRanges()
+ .put(txnId, (Ranges)
keysOrRanges, current.saveStatus(), current.executeAt(), dependsOn);
+ }
+
+ @Override
+ public PreLoadContext listenerPreLoadContext(TxnId caller)
+ {
+ return caller.equals(txnId) ? PreLoadContext.contextFor(txnId) :
PreLoadContext.contextFor(txnId, Collections.singletonList(caller));
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Listener{" +
+ "txnId=" + txnId +
+ ", saveStatus=" + saveStatus +
+ '}';
+ }
+ }
+
+ private ImmutableSet<TxnId> localCommands;
+ private ImmutableSortedMap<TxnId, RangeCommandSummary> commandsToRanges;
+ private IntervalTree<RoutableKey, RangeCommandSummary,
Interval<RoutableKey, RangeCommandSummary>> rangesToCommands;
+
+ public CommandsForRanges()
+ {
+ localCommands = ImmutableSet.of();
+ commandsToRanges = ImmutableSortedMap.of();
+ rangesToCommands = IntervalTree.emptyTree();
+ }
+
+ private void set(AbstractBuilder<?> builder)
+ {
+ this.localCommands = ImmutableSet.copyOf(builder.localTxns);
+ this.commandsToRanges = ImmutableSortedMap.copyOf(builder.txnToRange);
+ this.rangesToCommands = builder.rangeToTxn.build();
+ }
+
+ public TxnType type(TxnId txnId)
+ {
+ if (!commandsToRanges.containsKey(txnId)) return TxnType.UNKNOWN;
+ return localCommands.contains(txnId) ? TxnType.LOCAL : TxnType.REMOTE;
+ }
+
+ public boolean containsLocally(TxnId txnId)
+ {
+ return localCommands.contains(txnId);
+ }
+
+ public Iterable<CommandTimeseriesHolder> search(AbstractKeys<Key, ?> keys)
+ {
+ // group by the keyspace, as ranges are based off TokenKey, which is
scoped to a range
+ Map<String, List<Key>> groupByKeyspace = new TreeMap<>();
+ for (Key key : keys)
+ groupByKeyspace.computeIfAbsent(((PartitionKey) key).keyspace(),
ignore -> new ArrayList<>()).add(key);
+ return () -> new AbstractIterator<CommandTimeseriesHolder>()
+ {
+ Iterator<String> ksIt = groupByKeyspace.keySet().iterator();
+ Iterator<Map.Entry<Range, Set<RangeCommandSummary>>> rangeIt;
+
+ @Override
+ protected CommandTimeseriesHolder computeNext()
+ {
+ while (true)
+ {
+ if (rangeIt != null && rangeIt.hasNext())
+ {
+ Map.Entry<Range, Set<RangeCommandSummary>> next =
rangeIt.next();
+ return result(next.getKey(), next.getValue());
+ }
+ rangeIt = null;
+ if (!ksIt.hasNext())
+ {
+ ksIt = null;
+ return endOfData();
+ }
+ String ks = ksIt.next();
+ List<Key> keys = groupByKeyspace.get(ks);
+ Map<Range, Set<RangeCommandSummary>> groupByRange = new
TreeMap<>(Range::compare);
+ for (Key key : keys)
+ {
+ List<Interval<RoutableKey, RangeCommandSummary>>
matches = rangesToCommands.matches(key);
+ if (matches.isEmpty())
+ continue;
+ for (Interval<RoutableKey, RangeCommandSummary>
interval : matches)
+ groupByRange.computeIfAbsent(toRange(interval),
ignore -> new HashSet<>()).add(interval.data);
+ }
+ rangeIt = groupByRange.entrySet().iterator();
+ }
+ }
+ };
+ }
+
+ private static Range toRange(Interval<RoutableKey, RangeCommandSummary>
interval)
+ {
+ TokenKey start = (TokenKey) interval.min;
+ TokenKey end = (TokenKey) interval.max;
+ // TODO (correctness) : accord doesn't support wrap around, so
decreaseSlightly may fail in some cases
+ // TODO (correctness) : this logic is mostly used for testing, so is
it actually safe for all partitioners?
+ return new
TokenRange(start.withToken(start.token().decreaseSlightly()), end);
+ }
+
+ @Nullable
+ public CommandTimeseriesHolder search(Range range)
+ {
+ List<RangeCommandSummary> matches =
rangesToCommands.search(Interval.create(normalize(range.start(),
range.startInclusive(), true),
+
normalize(range.end(), range.endInclusive(), false)));
+ return result(range, matches);
+ }
+
+ private CommandTimeseriesHolder result(Seekable seekable,
Collection<RangeCommandSummary> matches)
+ {
+ if (matches.isEmpty())
+ return null;
+ return new Holder(seekable, matches);
+ }
+
+ public int size()
+ {
+ return rangesToCommands.intervalCount();
+ }
+
+ public Updater update()
+ {
+ return new Updater();
+ }
+
+ @Override
+ public String toString()
+ {
+ return rangesToCommands.unbuild().toString();
+ }
+
+ private static RoutingKey normalize(RoutingKey key, boolean inclusive,
boolean upOrDown)
+ {
+ if (inclusive) return key;
+ AccordRoutingKey ak = (AccordRoutingKey) key;
+ switch (ak.kindOfRoutingKey())
+ {
+ case SENTINEL:
+ return normalize(ak.asSentinelKey().toTokenKey(), inclusive,
upOrDown);
+ case TOKEN:
+ TokenKey tk = ak.asTokenKey();
+ return tk.withToken(upOrDown ? tk.token().increaseSlightly() :
tk.token().decreaseSlightly());
+ default:
+ throw new IllegalArgumentException("Unknown kind: " +
ak.kindOfRoutingKey());
+ }
+ }
+
+ private static class Holder implements CommandTimeseriesHolder
+ {
+ private final Seekable keyOrRange;
+ private final Collection<RangeCommandSummary> matches;
+
+ private Holder(Seekable keyOrRange, Collection<RangeCommandSummary>
matches)
+ {
+ this.keyOrRange = keyOrRange;
+ this.matches = matches;
+ }
+
+ @Override
+ public CommandTimeseries<?> byId()
+ {
+ return build(m -> m.txnId);
+ }
+
+ @Override
+ public CommandTimeseries<?> byExecuteAt()
+ {
+ return build(m -> m.executeAt != null ? m.executeAt : m.txnId);
+ }
+
+ @Override
+ public Timestamp max()
+ {
+ return byExecuteAt().maxTimestamp();
+ }
+
+ private CommandTimeseries<?> build(Function<RangeCommandSummary,
Timestamp> fn)
+ {
+ CommandTimeseries.Update<RangeCommandSummary> builder = new
CommandTimeseries.Update<>(keyOrRange, RangeCommandSummaryLoader.INSTANCE);
+ for (RangeCommandSummary m : matches)
+ {
+ if (m.status == SaveStatus.Invalidated)
+ continue;
+ builder.add(fn.apply(m), m);
+ }
+ return builder.build();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Holder{" +
+ "keyOrRange=" + keyOrRange +
+ ", matches=" + matches +
+ '}';
+ }
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index a0c361a1db..51b8796755 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -45,7 +45,7 @@ import static
org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
public abstract class AccordRoutingKey extends AccordRoutableKey implements
RoutingKey
{
- enum RoutingKeyKind
+ public enum RoutingKeyKind
{
TOKEN, SENTINEL
}
@@ -58,6 +58,16 @@ public abstract class AccordRoutingKey extends
AccordRoutableKey implements Rout
public abstract RoutingKeyKind kindOfRoutingKey();
public abstract long estimatedSizeOnHeap();
+ public SentinelKey asSentinelKey()
+ {
+ return (SentinelKey) this;
+ }
+
+ public TokenKey asTokenKey()
+ {
+ return (TokenKey) this;
+ }
+
public static AccordRoutingKey of(Key key)
{
return (AccordRoutingKey) key;
@@ -191,6 +201,11 @@ public abstract class AccordRoutingKey extends
AccordRoutableKey implements Rout
this.token = token;
}
+ public TokenKey withToken(Token token)
+ {
+ return new TokenKey(keyspace, token);
+ }
+
@Override
public Token token()
{
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
index 0a486ee402..9d66e75e6f 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncLoader.java
@@ -23,31 +23,41 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterables;
+import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import accord.api.RoutingKey;
import accord.impl.CommandsForKey;
import accord.local.Command;
import accord.local.PreLoadContext;
+import accord.primitives.Range;
+import accord.primitives.Ranges;
import accord.primitives.RoutableKey;
+import accord.primitives.Seekables;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResult;
import accord.utils.async.AsyncResults;
+import accord.utils.async.Observable;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordLoadingState;
import org.apache.cassandra.service.accord.AccordSafeState;
import org.apache.cassandra.service.accord.AccordStateCache;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.service.accord.api.PartitionKey;
public class AsyncLoader
@@ -65,15 +75,21 @@ public class AsyncLoader
private final AccordCommandStore commandStore;
private final Iterable<TxnId> txnIds;
- private final Iterable<RoutableKey> keys;
+ private final Seekables<?, ?> keysOrRanges;
protected AsyncResult<?> readResult;
- public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId>
txnIds, Iterable<RoutableKey> keys)
+ @Deprecated
+ public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId>
txnIds, Iterable<RoutableKey> keysOrRanges)
+ {
+ this(commandStore, txnIds, (Seekables<?, ?>) keysOrRanges);
+ }
+
+ public AsyncLoader(AccordCommandStore commandStore, Iterable<TxnId>
txnIds, Seekables<?, ?> keysOrRanges)
{
this.commandStore = commandStore;
this.txnIds = txnIds;
- this.keys = keys;
+ this.keysOrRanges = keysOrRanges;
}
protected static Iterable<TxnId> txnIds(PreLoadContext context)
@@ -85,7 +101,7 @@ public class AsyncLoader
return Iterables.concat(Collections.singleton(primaryid),
additionalIds);
}
- private <K, V, S extends AccordSafeState<K, V>> void
referenceAndAssembleReads(Iterable<K> keys,
+ private <K, V, S extends AccordSafeState<K, V>> void
referenceAndAssembleReads(Iterable<? extends K> keys,
Map<K, S> context,
AccordStateCache.Instance<K, V, S> cache,
Function<K, V> loadFunction,
@@ -140,13 +156,24 @@ public class AsyncLoader
loadCommandFunction(),
readRunnables,
chains);
-
- referenceAndAssembleReads(keys,
- context.commandsForKeys,
- commandStore.commandsForKeyCache(),
- loadCommandsPerKeyFunction(),
- readRunnables,
- chains);
+ switch (keysOrRanges.domain())
+ {
+ case Key:
+ // cast to Keys fails...
+ Iterable<RoutableKey> keys = (Iterable<RoutableKey>)
keysOrRanges;
+ referenceAndAssembleReads(keys,
+ context.commandsForKeys,
+ commandStore.commandsForKeyCache(),
+ loadCommandsPerKeyFunction(),
+ readRunnables,
+ chains);
+ break;
+ case Range:
+ chains.add(referenceAndDispatchReadsForRange(context));
+ break;
+ default:
+ throw new UnsupportedOperationException("Unable to process
keys of " + keysOrRanges.domain());
+ }
if (chains.isEmpty())
{
@@ -161,6 +188,66 @@ public class AsyncLoader
return !chains.isEmpty() ? AsyncChains.reduce(chains, (a, b) ->
null).beginAsResult() : null;
}
+ private AsyncChain<?>
referenceAndDispatchReadsForRange(AsyncOperation.Context context)
+ {
+ AsyncChain<Set<? extends RoutableKey>> overlappingKeys =
findOverlappingKeys((Ranges) keysOrRanges);
+ return overlappingKeys.flatMap(keys -> {
+ if (keys.isEmpty())
+ return AsyncChains.success(null);
+ // TODO (duplicate code): repeat of referenceAndDispatchReads
+ List<Runnable> readRunnables = new ArrayList<>();
+ List<AsyncChain<?>> chains = new ArrayList<>();
+ referenceAndAssembleReads(keys,
+ context.commandsForKeys,
+ commandStore.commandsForKeyCache(),
+ loadCommandsPerKeyFunction(),
+ readRunnables,
+ chains);
+ // all keys are already loaded
+ if (chains.isEmpty())
+ return AsyncChains.success(null);
+ // runnable results are already contained in the chains collection
+ if (!readRunnables.isEmpty())
+ AsyncChains.ofRunnables(Stage.READ.executor(),
readRunnables).begin(commandStore.agent());
+ return AsyncChains.reduce(chains, (a, b) -> null);
+ }, commandStore);
+ }
+
+ private AsyncChain<Set<? extends RoutableKey>> findOverlappingKeys(Ranges
ranges)
+ {
+ assert !ranges.isEmpty();
+
+ List<AsyncChain<Set<PartitionKey>>> chains = new
ArrayList<>(ranges.size());
+ for (Range range : ranges)
+ chains.add(findOverlappingKeys(range));
+ return AsyncChains.reduce(chains, (a, b) ->
ImmutableSet.<RoutableKey>builder().addAll(a).addAll(b).build());
+ }
+
+ private AsyncChain<Set<PartitionKey>> findOverlappingKeys(Range range)
+ {
+ Set<PartitionKey> cached = commandStore.commandsForKeyCache().stream()
+ .map(n -> (PartitionKey)
n.key())
+ .filter(range::contains)
+ .collect(Collectors.toSet());
+ // save to a variable as java gets confused when `.map` is called on
the result of asChain
+ AsyncChain<Set<PartitionKey>> map = Observable.asChain(callback ->
+
AccordKeyspace.findAllKeysBetween(commandStore.id(),
+
toTokenKey(range.start()).token(), range.startInclusive(),
+
toTokenKey(range.end()).token(), range.endInclusive(),
+
callback),
+
Collectors.toSet());
+ return map.map(s ->
ImmutableSet.<PartitionKey>builder().addAll(s).addAll(cached).build());
+ }
+
+ private static TokenKey toTokenKey(RoutingKey start)
+ {
+ if (start instanceof TokenKey)
+ return (TokenKey) start;
+ if (start instanceof AccordRoutingKey.SentinelKey)
+ return ((AccordRoutingKey.SentinelKey) start).toTokenKey();
+ throw new IllegalArgumentException(String.format("Unable to convert
RoutingKey %s (type %s) to TokenKey", start, start.getClass()));
+ }
+
@VisibleForTesting
void state(State state)
{
@@ -169,7 +256,7 @@ public class AsyncLoader
public boolean load(AsyncOperation.Context context, BiConsumer<Object,
Throwable> callback)
{
- logger.trace("Running load for {} with state {}: {} {}", callback,
state, txnIds, keys);
+ logger.trace("Running load for {} with state {}: {} {}", callback,
state, txnIds, keysOrRanges);
commandStore.checkInStoreThread();
switch (state)
{
@@ -200,7 +287,7 @@ public class AsyncLoader
throw new IllegalStateException("Unexpected state: " + state);
}
- logger.trace("Exiting load for {} with state {}: {} {}", callback,
state, txnIds, keys);
+ logger.trace("Exiting load for {} with state {}: {} {}", callback,
state, txnIds, keysOrRanges);
return state == State.FINISHED;
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index bf86f6777b..04bd716c1b 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.service.accord.async;
import java.util.HashMap;
+import java.util.TreeMap;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -32,7 +33,6 @@ import accord.local.CommandStore;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
import accord.primitives.RoutableKey;
-import accord.primitives.Seekables;
import accord.primitives.TxnId;
import accord.utils.Invariants;
import accord.utils.async.AsyncChains;
@@ -57,7 +57,7 @@ public abstract class AsyncOperation<R> extends
AsyncChains.Head<R> implements R
static class Context
{
final HashMap<TxnId, AccordSafeCommand> commands = new HashMap<>();
- final HashMap<RoutableKey, AccordSafeCommandsForKey> commandsForKeys =
new HashMap<>();
+ final TreeMap<RoutableKey, AccordSafeCommandsForKey> commandsForKeys =
new TreeMap<>();
void releaseResources(AccordCommandStore commandStore)
{
@@ -145,7 +145,7 @@ public abstract class AsyncOperation<R> extends
AsyncChains.Head<R> implements R
AsyncLoader createAsyncLoader(AccordCommandStore commandStore,
PreLoadContext preLoadContext)
{
- return new AsyncLoader(commandStore, txnIds(preLoadContext),
toRoutableKeys(preLoadContext.keys()));
+ return new AsyncLoader(commandStore, txnIds(preLoadContext),
preLoadContext.keys());
}
@VisibleForTesting
@@ -268,7 +268,6 @@ public abstract class AsyncOperation<R> extends
AsyncChains.Head<R> implements R
}
}
-
@Override
public void run()
{
@@ -307,19 +306,6 @@ public abstract class AsyncOperation<R> extends
AsyncChains.Head<R> implements R
commandStore.executor().execute(this);
}
- private static Iterable<RoutableKey> toRoutableKeys(Seekables<?, ?> keys)
- {
- switch (keys.domain())
- {
- default: throw new AssertionError("Unexpected domain: " +
keys.domain());
- case Key:
- return (Iterable<RoutableKey>) keys;
- case Range:
- // TODO (required): implement
- throw new UnsupportedOperationException();
- }
- }
-
static class ForFunction<R> extends AsyncOperation<R>
{
private final Function<? super SafeCommandStore, R> function;
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
index f65b1fb437..48ec6fae6c 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializer.java
@@ -25,7 +25,7 @@ import java.util.List;
import com.google.common.annotations.VisibleForTesting;
-import accord.impl.CommandsForKey.CommandLoader;
+import accord.impl.CommandTimeseries.CommandLoader;
import accord.local.Command;
import accord.local.SaveStatus;
import accord.primitives.PartialDeps;
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/ListenerSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/ListenerSerializers.java
index 4581ab8cad..f649a3b325 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/ListenerSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/ListenerSerializers.java
@@ -26,13 +26,14 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.CommandsForRanges;
import org.apache.cassandra.service.accord.api.PartitionKey;
public class ListenerSerializers
{
public enum Kind
{
- COMMAND, COMMANDS_FOR_KEY;
+ COMMAND, COMMANDS_FOR_KEY, COMMANDS_FOR_RANGE;
private static Kind of(Command.DurableAndIdempotentListener listener)
{
@@ -42,6 +43,9 @@ public class ListenerSerializers
if (listener instanceof CommandsForKey.Listener)
return COMMANDS_FOR_KEY;
+ if (listener instanceof CommandsForRanges.Listener)
+ return COMMANDS_FOR_RANGE;
+
throw new IllegalArgumentException("Unsupported listener type: " +
listener.getClass().getName());
}
}
@@ -68,6 +72,27 @@ public class ListenerSerializers
}
};
+ private static final IVersionedSerializer<CommandsForRanges.Listener>
cfrListener = new IVersionedSerializer<CommandsForRanges.Listener>()
+ {
+ @Override
+ public void serialize(CommandsForRanges.Listener listener,
DataOutputPlus out, int version) throws IOException
+ {
+ CommandSerializers.txnId.serialize(listener.txnId, out, version);
+ }
+
+ @Override
+ public CommandsForRanges.Listener deserialize(DataInputPlus in, int
version) throws IOException
+ {
+ return new
CommandsForRanges.Listener(CommandSerializers.txnId.deserialize(in, version));
+ }
+
+ @Override
+ public long serializedSize(CommandsForRanges.Listener listener, int
version)
+ {
+ return CommandSerializers.txnId.serializedSize(listener.txnId,
version);
+ }
+ };
+
private static final IVersionedSerializer<CommandsForKey.Listener>
cfkListener = new IVersionedSerializer<CommandsForKey.Listener>()
{
@Override
@@ -104,6 +129,9 @@ public class ListenerSerializers
case COMMANDS_FOR_KEY:
cfkListener.serialize((CommandsForKey.Listener) listener,
out, version);
break;
+ case COMMANDS_FOR_RANGE:
+ cfrListener.serialize((CommandsForRanges.Listener)
listener, out, version);
+ break;
default:
throw new IllegalArgumentException();
}
@@ -119,6 +147,8 @@ public class ListenerSerializers
return commandListener.deserialize(in, version);
case COMMANDS_FOR_KEY:
return cfkListener.deserialize(in, version);
+ case COMMANDS_FOR_RANGE:
+ return cfrListener.deserialize(in, version);
default:
throw new IllegalArgumentException();
}
@@ -137,6 +167,9 @@ public class ListenerSerializers
case COMMANDS_FOR_KEY:
size +=
cfkListener.serializedSize((CommandsForKey.Listener) listener, version);
break;
+ case COMMANDS_FOR_RANGE:
+ size +=
cfrListener.serializedSize((CommandsForRanges.Listener) listener, version);
+ break;
default:
throw new IllegalArgumentException();
}
diff --git a/src/java/org/apache/cassandra/utils/IntervalTree.java
b/src/java/org/apache/cassandra/utils/IntervalTree.java
index 35ec614a81..97044d165b 100644
--- a/src/java/org/apache/cassandra/utils/IntervalTree.java
+++ b/src/java/org/apache/cassandra/utils/IntervalTree.java
@@ -21,6 +21,9 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
+import java.util.function.BiPredicate;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterators;
@@ -69,6 +72,16 @@ public class IntervalTree<C extends Comparable<? super C>,
D, I extends Interval
return EMPTY_TREE;
}
+ public static <C extends Comparable<? super C>, D, I extends Interval<C,
D>> Builder<C, D, I> builder()
+ {
+ return new Builder<>();
+ }
+
+ public Builder<C, D, I> unbuild()
+ {
+ return new Builder<C, D, I>().addAll(this);
+ }
+
public int intervalCount()
{
return count;
@@ -95,13 +108,28 @@ public class IntervalTree<C extends Comparable<? super C>,
D, I extends Interval
return head.low;
}
+ public List<Interval<C, D>> matches(Interval<C, D> searchInterval)
+ {
+ if (head == null)
+ return Collections.emptyList();
+
+ List<Interval<C, D>> results = new ArrayList<>();
+ head.searchInternal(searchInterval, i -> results.add(i));
+ return results;
+ }
+
+ public List<Interval<C, D>> matches(C point)
+ {
+ return matches(Interval.<C, D>create(point, point, null));
+ }
+
public List<D> search(Interval<C, D> searchInterval)
{
if (head == null)
return Collections.<D>emptyList();
List<D> results = new ArrayList<D>();
- head.searchInternal(searchInterval, results);
+ head.searchInternal(searchInterval, i -> results.add(i.data));
return results;
}
@@ -217,7 +245,7 @@ public class IntervalTree<C extends Comparable<? super C>,
D, I extends Interval
}
}
- void searchInternal(Interval<C, D> searchInterval, List<D> results)
+ void searchInternal(Interval<C, D> searchInterval,
Consumer<Interval<C, D>> results)
{
if (center.compareTo(searchInterval.min) < 0)
{
@@ -226,7 +254,7 @@ public class IntervalTree<C extends Comparable<? super C>,
D, I extends Interval
return;
while (i < intersectsRight.size())
- results.add(intersectsRight.get(i++).data);
+ results.accept(intersectsRight.get(i++));
if (right != null)
right.searchInternal(searchInterval, results);
@@ -238,7 +266,7 @@ public class IntervalTree<C extends Comparable<? super C>,
D, I extends Interval
return;
for (int i = 0 ; i < j ; i++)
- results.add(intersectsLeft.get(i).data);
+ results.accept(intersectsLeft.get(i));
if (left != null)
left.searchInternal(searchInterval, results);
@@ -248,7 +276,7 @@ public class IntervalTree<C extends Comparable<? super C>,
D, I extends Interval
// Adds every interval contained in this node to the result
set then search left and right for further
// overlapping intervals
for (Interval<C, D> interval : intersectsLeft)
- results.add(interval.data);
+ results.accept(interval);
if (left != null)
left.searchInternal(searchInterval, results);
@@ -367,4 +395,55 @@ public class IntervalTree<C extends Comparable<? super C>,
D, I extends Interval
return size;
}
}
+
+ public static class Builder<C extends Comparable<? super C>, D, I extends
Interval<C, D>>
+ {
+ private final List<I> intervals = new ArrayList<>();
+
+ public Builder<C, D, I> addAll(IntervalTree<C, D, I> other)
+ {
+ other.forEach(intervals::add);
+ return this;
+ }
+
+ public Builder<C, D, I> add(I interval)
+ {
+ intervals.add(interval);
+ return this;
+ }
+
+ public interface TriPredicate<A, B, C>
+ {
+ boolean test(A a, B b, C c);
+ }
+
+ public Builder<C, D, I> removeIf(TriPredicate<C, C, D> predicate)
+ {
+ intervals.removeIf(i -> predicate.test(i.min, i.max, i.data));
+ return this;
+ }
+
+ public Builder<C, D, I> removeIf(BiPredicate<C, C> predicate)
+ {
+ intervals.removeIf(i -> predicate.test(i.min, i.max));
+ return this;
+ }
+
+ public Builder<C, D, I> removeIf(Predicate<D> predicate)
+ {
+ intervals.removeIf(i -> predicate.test(i.data));
+ return this;
+ }
+
+ public IntervalTree<C, D, I> build()
+ {
+ return IntervalTree.build(intervals);
+ }
+
+ @Override
+ public String toString()
+ {
+ return intervals.toString();
+ }
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 7d333726e5..3bdce4479c 100644
---
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -1061,7 +1061,12 @@ public abstract class AbstractCluster<I extends
IInstance> implements ICluster<I
return true;
});
if (!drain.isEmpty())
- throw new ShutdownException(drain);
+ {
+ ShutdownException shutdownException = new ShutdownException(drain);
+ // also log as java will truncate log lists
+ logger.error("Unexpected errors", shutdownException);
+ throw shutdownException;
+ }
}
private void checkForThreadLeaks()
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
index b0d61723e8..c43227cc65 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTest.java
@@ -26,15 +26,14 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.distributed.Cluster;
import org.assertj.core.api.Assertions;
+
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@@ -50,7 +49,6 @@ import org.apache.cassandra.db.marshal.ListType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.QueryResults;
@@ -142,14 +140,8 @@ public class AccordCQLTest extends AccordTestBase
String currentTable = keyspace + ".tbl";
List<String> ddls = Arrays.asList("CREATE KEYSPACE " + keyspace + "
WITH REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}",
"CREATE TABLE " + currentTable + "
(k blob, c int, v int, primary key (k, c))");
- List<String> tokens = SHARED_CLUSTER.stream()
- .flatMap(i ->
StreamSupport.stream(Splitter.on(",").split(i.config().getString("initial_token")).spliterator(),
false))
- .collect(Collectors.toList());
-
- List<ByteBuffer> keys = tokens.stream()
- .map(t -> (Murmur3Partitioner.LongToken)
Murmur3Partitioner.instance.getTokenFactory().fromString(t))
-
.map(Murmur3Partitioner.LongToken::keyForToken)
- .collect(Collectors.toList());
+ List<String> tokens = tokens();
+ List<ByteBuffer> keys = tokensToKeys(tokens);
List<String> keyStrings = keys.stream().map(bb -> "0x" +
ByteBufferUtil.bytesToHex(bb)).collect(Collectors.toList());
StringBuilder query = new StringBuilder("BEGIN TRANSACTION\n");
@@ -159,12 +151,12 @@ public class AccordCQLTest extends AccordTestBase
query.append(" SELECT row0.v;\n")
.append(" IF ");
- for (int i = 0; i < keys.size(); i++)
+ for (int i = 0; i < keyStrings.size(); i++)
query.append((i > 0 ? " AND row" : "row") + i + " IS NULL");
query.append(" THEN\n");
- for (int i = 0; i < keys.size(); i++)
+ for (int i = 0; i < keyStrings.size(); i++)
query.append(" INSERT INTO " + currentTable + " (k, c, v)
VALUES (" + keyStrings.get(i) + ", 0, " + i +");\n");
query.append(" END IF\n");
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 58bb314cd6..7736c2ec90 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.distributed.test.accord;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -26,7 +27,9 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import com.google.common.base.Splitter;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -42,6 +45,7 @@ import net.bytebuddy.implementation.bind.annotation.This;
import org.apache.cassandra.cql3.statements.ModificationStatement;
import org.apache.cassandra.cql3.statements.TransactionStatement;
import org.apache.cassandra.cql3.transactions.ReferenceValue;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
@@ -56,6 +60,7 @@ import
org.apache.cassandra.service.accord.exceptions.WritePreemptedException;
import org.apache.cassandra.service.accord.txn.TxnData;
import org.apache.cassandra.utils.AssertionUtils;
import org.apache.cassandra.utils.FailingConsumer;
+import org.apache.cassandra.utils.Shared;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
@@ -66,6 +71,12 @@ public abstract class AccordTestBase extends TestBaseImpl
private static final Logger logger =
LoggerFactory.getLogger(AccordTestBase.class);
private static final int MAX_RETRIES = 10;
+ @Shared
+ public static class State
+ {
+ public static AtomicInteger coordinateCounts = new AtomicInteger();
+ }
+
protected static final AtomicInteger COUNTER = new AtomicInteger(0);
protected static Cluster SHARED_CLUSTER;
@@ -128,7 +139,7 @@ public abstract class AccordTestBase extends TestBaseImpl
protected int getAccordCoordinateCount()
{
- return SHARED_CLUSTER.get(1).callOnInstance(() ->
BBAccordCoordinateCountHelper.count.get());
+ return State.coordinateCounts.get();
}
private static Cluster createCluster() throws IOException
@@ -137,7 +148,7 @@ public abstract class AccordTestBase extends TestBaseImpl
// disable vnode for now, but should enable before trunk
return init(Cluster.build(2)
.withoutVNodes()
- .withConfig(c ->
c.with(Feature.NETWORK).set("write_request_timeout", "10s")
+ .withConfig(c -> c.with(Feature.NETWORK,
Feature.GOSSIP).set("write_request_timeout", "10s")
.set("transaction_timeout", "15s")
.set("legacy_paxos_strategy", "accord"))
.withInstanceInitializer(EnforceUpdateDoesNotPerformRead::install)
@@ -188,7 +199,7 @@ public abstract class AccordTestBase extends TestBaseImpl
return result;
}
- private SimpleQueryResult executeWithRetry0(int count, Cluster cluster,
String check, Object... boundValues)
+ private static SimpleQueryResult executeWithRetry0(int count, Cluster
cluster, String check, Object... boundValues)
{
try
{
@@ -206,7 +217,7 @@ public abstract class AccordTestBase extends TestBaseImpl
}
}
- protected SimpleQueryResult executeWithRetry(Cluster cluster, String
check, Object... boundValues)
+ protected static SimpleQueryResult executeWithRetry(Cluster cluster,
String check, Object... boundValues)
{
check = wrapInTxn(check);
@@ -218,7 +229,7 @@ public abstract class AccordTestBase extends TestBaseImpl
return executeWithRetry0(0, cluster, check, boundValues);
}
- private boolean isIdempotent(Cluster cluster, String cql)
+ private static boolean isIdempotent(Cluster cluster, String cql)
{
return cluster.get(1).callOnInstance(() -> {
TransactionStatement stmt = AccordTestUtils.parse(cql);
@@ -257,6 +268,21 @@ public abstract class AccordTestBase extends TestBaseImpl
return numConstants == 0;
}
+ static List<String> tokens()
+ {
+ return SHARED_CLUSTER.stream()
+ .flatMap(i ->
StreamSupport.stream(Splitter.on(",").split(i.config().getString("initial_token")).spliterator(),
false))
+ .collect(Collectors.toList());
+ }
+
+ static List<ByteBuffer> tokensToKeys(List<String> tokens)
+ {
+ return tokens.stream()
+ .map(t -> (Murmur3Partitioner.LongToken)
Murmur3Partitioner.instance.getTokenFactory().fromString(t))
+ .map(Murmur3Partitioner.LongToken::keyForToken)
+ .collect(Collectors.toList());
+ }
+
public static class EnforceUpdateDoesNotPerformRead
{
public static void install(ClassLoader classLoader, Integer num)
@@ -285,7 +311,6 @@ public abstract class AccordTestBase extends TestBaseImpl
public static class BBAccordCoordinateCountHelper
{
- static AtomicInteger count = new AtomicInteger();
static void install(ClassLoader cl, int nodeNumber)
{
if (nodeNumber != 1)
@@ -299,7 +324,7 @@ public abstract class AccordTestBase extends TestBaseImpl
public static TxnData coordinate(Txn txn, @SuperCall Callable<TxnData>
actual) throws Exception
{
- count.incrementAndGet();
+ State.coordinateCounts.incrementAndGet();
return actual.call();
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/NewSchemaTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/NewSchemaTest.java
new file mode 100644
index 0000000000..196638eb86
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/NewSchemaTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.service.accord.AccordService;
+
+public class NewSchemaTest extends AccordTestBase
+{
+ private static final Logger logger =
LoggerFactory.getLogger(NewSchemaTest.class);
+
+ @Override
+ protected Logger logger()
+ {
+ return logger;
+ }
+
+ @Test
+ public void test()
+ {
+ for (int i = 0; i < 20; i++)
+ {
+ String ks = "ks" + i;
+ String table = ks + ".tbl" + i;
+ SHARED_CLUSTER.schemaChange("CREATE KEYSPACE " + ks + " WITH
REPLICATION={'class':'SimpleStrategy', 'replication_factor': 1}");
+ SHARED_CLUSTER.schemaChange(String.format("CREATE TABLE %s (pk
blob primary key)", table));
+ SHARED_CLUSTER.forEach(node -> node.runOnInstance(() ->
AccordService.instance().createEpochFromConfigUnsafe()));
+ SHARED_CLUSTER.forEach(node -> node.runOnInstance(() ->
AccordService.instance().setCacheSize(0)));
+
+ List<ByteBuffer> keys = tokensToKeys(tokens());
+
+ read(table, keys).exec();
+ }
+ }
+
+ private static Query read(String table, List<ByteBuffer> keys)
+ {
+ assert !keys.isEmpty();
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < keys.size(); i++)
+ sb.append("let row").append(i).append(" = (select * from
").append(table).append(" where pk = ?);\n");
+ sb.append("SELECT row0.pk;");
+ return new Query(sb.toString(), keys.toArray());
+ }
+
+ private static class Query
+ {
+ final String cql;
+ final Object[] binds;
+
+ private Query(String cql, Object[] binds)
+ {
+ this.cql = cql;
+ this.binds = binds;
+ }
+
+ SimpleQueryResult exec()
+ {
+ return executeWithRetry(SHARED_CLUSTER, cql, binds);
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index db0dbd6af5..4ffdf8db7b 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service.accord.async;
-import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -34,6 +33,7 @@ import org.junit.Test;
import accord.impl.CommandsForKey;
import accord.local.Command;
+import accord.primitives.Keys;
import accord.primitives.PartialTxn;
import accord.primitives.RoutableKey;
import accord.primitives.TxnId;
@@ -102,7 +102,7 @@ public class AsyncLoaderTest
testLoad(safeCfk, commandsForKey(key));
cfkCache.release(safeCfk);
- AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
singleton(key));
+ AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
Keys.of(key));
// everything is cached, so the loader should return immediately
commandStore.executeBlocking(() -> {
@@ -143,7 +143,7 @@ public class AsyncLoaderTest
AccordKeyspace.getCommandsForKeyMutation(commandStore, cfk,
commandStore.nextSystemTimestampMicros()).apply();
// resources are on disk only, so the loader should suspend...
- AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
singleton(key));
+ AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
Keys.of(key));
AsyncPromise<Void> cbFired = new AsyncPromise<>();
Context context = new Context();
commandStore.executeBlocking(() -> {
@@ -192,7 +192,7 @@ public class AsyncLoaderTest
AccordKeyspace.getCommandsForKeyMutation(commandStore, safeCfk,
commandStore.nextSystemTimestampMicros()).apply();
// resources are on disk only, so the loader should suspend...
- AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
singleton(key));
+ AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
Keys.of(key));
AsyncPromise<Void> cbFired = new AsyncPromise<>();
Context context = new Context();
commandStore.executeBlocking(() -> {
@@ -244,7 +244,7 @@ public class AsyncLoaderTest
testLoad(safeCfk, commandsForKey(key));
cfkCache.release(safeCfk);
- AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
singleton(key));
+ AsyncLoader loader = new AsyncLoader(commandStore, singleton(txnId),
Keys.of(key));
// since there's a read future associated with the txnId, we'll wait
for it to load
AsyncPromise<Void> cbFired = new AsyncPromise<>();
@@ -291,7 +291,7 @@ public class AsyncLoaderTest
execute(commandStore, () -> {
AtomicInteger loadCalls = new AtomicInteger();
- AsyncLoader loader = new AsyncLoader(commandStore,
ImmutableList.of(txnId1, txnId2), Collections.emptyList()){
+ AsyncLoader loader = new AsyncLoader(commandStore,
ImmutableList.of(txnId1, txnId2), Keys.EMPTY){
@Override
Function<TxnId, Command> loadCommandFunction()
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index 547b03c10c..13485f90b3 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.junit.BeforeClass;
import org.junit.Test;
-import accord.impl.CommandsForKey;
+import accord.impl.CommandTimeseries;
import accord.primitives.TxnId;
import accord.utils.AccordGens;
import accord.utils.Gens;
@@ -75,7 +75,7 @@ public class CommandsForKeySerializerTest
@Test
public void serde()
{
- CommandsForKey.CommandLoader<ByteBuffer> loader =
CommandsForKeySerializer.loader;
+ CommandTimeseries.CommandLoader<ByteBuffer> loader =
CommandsForKeySerializer.loader;
qt().forAll(AccordGenerators.commands()).check(cmd -> {
ByteBuffer bb = loader.saveForCFK(cmd);
int size = bb.remaining();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]