This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 76f0fb5e98 Introduce Range transactions
76f0fb5e98 is described below
commit 76f0fb5e98cd5f9bc82e7c7f0373b374cb1fa608
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Jan 9 15:40:43 2023 +0000
Introduce Range transactions
patch by Benedict; reviewed by Ariel Weisberg for CASSANDRA-18174
---
.build/include-accord.sh | 2 +-
.../service/accord/AccordCommandStore.java | 172 ++++++++++-----------
.../service/accord/AccordCommandsForKey.java | 153 ++++++++----------
.../cassandra/service/accord/AccordKeyspace.java | 22 ++-
.../service/accord/AccordObjectSizes.java | 4 +-
.../service/accord/AccordPartialCommand.java | 30 ++--
.../cassandra/service/accord/TokenRange.java | 7 +-
.../service/accord/api/AccordRoutingKey.java | 19 ++-
.../service/accord/async/AsyncOperation.java | 4 +-
.../accord/serializers/AcceptSerializers.java | 7 +-
.../accord/serializers/CommandSerializers.java | 6 +-
.../service/accord/serializers/KeySerializers.java | 4 +-
.../accord/serializers/PreacceptSerializers.java | 4 +-
.../accord/serializers/TxnRequestSerializer.java | 4 +-
.../cassandra/service/accord/txn/TxnNamedRead.java | 11 +-
.../cassandra/service/accord/txn/TxnRead.java | 6 +-
.../cassandra/service/accord/txn/TxnUpdate.java | 7 +-
.../cassandra/service/accord/txn/TxnWrite.java | 10 +-
.../service/accord/AccordCommandTest.java | 24 ++-
.../service/accord/async/AsyncOperationTest.java | 7 +-
.../service/accord/async/AsyncWriterTest.java | 23 ++-
21 files changed, 256 insertions(+), 270 deletions(-)
diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index e7cedab46e..2f55c1d18c 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -25,7 +25,7 @@ set -o nounset
bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='686326eedc8d2553c98a30abc81a925be3942b8c'
+accord_branch='1230eceb077c928123e6bf848a103964fe90c9f7'
accord_src="$bin/cassandra-accord"
checkout() {
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 96c2a26c35..87c9c752d9 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -23,27 +23,31 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
+import javax.annotation.Nullable;
+
import accord.api.Agent;
import accord.api.DataStore;
import accord.api.Key;
import accord.api.ProgressLog;
+import accord.impl.CommandsForKey;
import accord.local.Command;
import accord.local.CommandListener;
import accord.local.CommandStore;
import accord.local.CommandStores.RangesForEpoch;
import accord.local.CommandStores.RangesForEpochHolder;
-import accord.local.CommandsForKey;
import accord.local.NodeTimeService;
import accord.local.PreLoadContext;
import accord.local.SafeCommandStore;
+import accord.local.Status;
import accord.primitives.Keys;
import accord.primitives.Ranges;
-import accord.primitives.Routable;
import accord.primitives.Routables;
+import accord.primitives.Seekable;
import accord.primitives.Seekables;
import accord.primitives.Timestamp;
import accord.primitives.AbstractKeys;
@@ -104,43 +108,14 @@ public class AccordCommandStore extends CommandStore
return null;
}
- @Override
- public CommandsForKey commandsForKey(Key key)
- {
- AccordCommandsForKey commandsForKey =
getCommandsForKeyInternal(key);
- if (commandsForKey.isEmpty())
- commandsForKey.initialize();
- return commandsForKey;
- }
-
- @Override
- public CommandsForKey maybeCommandsForKey(Key key)
- {
- AccordCommandsForKey commandsForKey =
getCommandsForKeyInternal(key);
- return !commandsForKey.isEmpty() ? commandsForKey : null;
- }
-
- @Override
- public void addAndInvokeListener(TxnId txnId, CommandListener listener)
- {
- AccordCommand.WriteOnly command = (AccordCommand.WriteOnly)
getContext().commands.getOrCreateWriteOnly(txnId, (ignore, id) -> new
AccordCommand.WriteOnly(id), commandStore());
- command.addListener(listener);
- execute(listener.listenerPreLoadContext(txnId), store -> {
- listener.onChange(store, store.command(txnId));
- });
- }
-
- @Override
- public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice,
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+ public <T> T mapReduce(Routables<?, ?> keysOrRanges,
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
{
- switch (keysOrRanges.kindOfContents()) {
+ switch (keysOrRanges.domain()) {
default:
throw new AssertionError();
case Key:
- // TODO: efficiency
AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>)
keysOrRanges;
return keys.stream()
- .filter(slice::contains)
.map(this::commandsForKey)
.map(map)
.reduce(initialValue, reduce);
@@ -150,85 +125,102 @@ public class AccordCommandStore extends CommandStore
}
}
- public <T> T mapReduce(Routables<?, ?> keysOrRanges,
Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue)
+ private <O> O mapReduceForKey(Routables<?, ?> keysOrRanges, Ranges
slice, BiFunction<CommandsForKey, O, O> map, O accumulate, O terminalValue)
{
- switch (keysOrRanges.kindOfContents()) {
+ switch (keysOrRanges.domain()) {
default:
throw new AssertionError();
case Key:
+ // TODO: efficiency
AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>)
keysOrRanges;
- return keys.stream()
- .map(this::commandsForKey)
- .map(map)
- .reduce(initialValue, reduce);
+ for (Key key : keys)
+ {
+ if (!slice.contains(key)) continue;
+ CommandsForKey forKey = commandsForKey(key);
+ accumulate = map.apply(forKey, accumulate);
+ if (accumulate.equals(terminalValue))
+ return accumulate;
+ }
+ break;
case Range:
- // TODO: implement
+ // TODO (required): implement
throw new UnsupportedOperationException();
}
+ return accumulate;
}
- public void forEach(Routables<?, ?> keysOrRanges,
Consumer<CommandsForKey> forEach)
+ @Override
+ public <T> T mapReduce(Seekables<?, ?> keysOrRanges, Ranges slice,
TestKind testKind, TestTimestamp testTimestamp, Timestamp timestamp, TestDep
testDep, @Nullable TxnId depId, @Nullable Status minStatus, @Nullable Status
maxStatus, CommandFunction<T, T> map, T accumulate, T terminalValue)
{
- switch (keysOrRanges.kindOfContents()) {
- default:
- throw new AssertionError();
- case Key:
- AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>)
keysOrRanges;
- keys.forEach(key -> forEach.accept(commandsForKey(key)));
- break;
- case Range:
- // TODO: implement
- throw new UnsupportedOperationException();
- }
+ accumulate = mapReduceForKey(keysOrRanges, slice, (forKey, prev)
-> {
+ CommandsForKey.CommandTimeseries timeseries;
+ switch (testTimestamp)
+ {
+ default: throw new AssertionError();
+ case STARTED_AFTER:
+ case STARTED_BEFORE:
+ timeseries = forKey.byId();
+ break;
+ case EXECUTES_AFTER:
+ case MAY_EXECUTE_BEFORE:
+ timeseries = forKey.byExecuteAt();
+ }
+ CommandsForKey.CommandTimeseries.TestTimestamp
remapTestTimestamp;
+ switch (testTimestamp)
+ {
+ default: throw new AssertionError();
+ case STARTED_AFTER:
+ case EXECUTES_AFTER:
+ remapTestTimestamp =
CommandsForKey.CommandTimeseries.TestTimestamp.AFTER;
+ break;
+ case STARTED_BEFORE:
+ case MAY_EXECUTE_BEFORE:
+ remapTestTimestamp =
CommandsForKey.CommandTimeseries.TestTimestamp.BEFORE;
+ }
+ return timeseries.mapReduce(testKind, remapTestTimestamp,
timestamp, testDep, depId, minStatus, maxStatus, map, prev, terminalValue);
+ }, accumulate, terminalValue);
+
+ return accumulate;
}
- public void forEach(Routable keyOrRange, Consumer<CommandsForKey>
forEach)
+ @Override
+ public void register(Seekables<?, ?> keysOrRanges, Ranges slice,
Command command)
{
- switch (keyOrRange.domain())
- {
- default: throw new AssertionError();
- case Key:
- forEach.accept(commandsForKey((Key) keyOrRange));
- break;
- case Range:
- // TODO: implement
- throw new UnsupportedOperationException();
- }
+ // TODO (required): support ranges
+ Routables.foldl((Keys)keysOrRanges, slice, (k, v, i) -> {
commandsForKey(k).register(command); return v; }, null);
}
@Override
- public void forEach(Routables<?, ?> keysOrRanges, Ranges slice,
Consumer<CommandsForKey> forEach)
+ public void register(Seekable keyOrRange, Ranges slice, Command
command)
{
- switch (keysOrRanges.kindOfContents()) {
- default:
- throw new AssertionError();
- case Key:
- AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>)
keysOrRanges;
- keys.forEach(slice, key -> {
- forEach.accept(commandsForKey(key));
- });
- break;
- case Range:
- // TODO: implement
- throw new UnsupportedOperationException();
- }
+ // TODO (required): support ranges
+ Key key = (Key) keyOrRange;
+ if (slice.contains(key))
+ commandsForKey(key).register(command);
+ }
+
+ public AccordCommandsForKey commandsForKey(Key key)
+ {
+ AccordCommandsForKey commandsForKey =
getCommandsForKeyInternal(key);
+ if (commandsForKey.isEmpty())
+ commandsForKey.initialize();
+ return commandsForKey;
+ }
+
+ public AccordCommandsForKey maybeCommandsForKey(Key key)
+ {
+ AccordCommandsForKey commandsForKey =
getCommandsForKeyInternal(key);
+ return !commandsForKey.isEmpty() ? commandsForKey : null;
}
@Override
- public void forEach(Routable keyOrRange, Ranges slice,
Consumer<CommandsForKey> forEach)
+ public void addAndInvokeListener(TxnId txnId, CommandListener listener)
{
- switch (keyOrRange.domain())
- {
- default: throw new AssertionError();
- case Key:
- Key key = (Key) keyOrRange;
- if (slice.contains(key))
- forEach.accept(commandsForKey(key));
- break;
- case Range:
- // TODO: implement
- throw new UnsupportedOperationException();
- }
+ AccordCommand.WriteOnly command = (AccordCommand.WriteOnly)
getContext().commands.getOrCreateWriteOnly(txnId, (ignore, id) -> new
AccordCommand.WriteOnly(id), commandStore());
+ command.addListener(listener);
+ execute(listener.listenerPreLoadContext(txnId), store -> {
+ listener.onChange(store, store.command(txnId));
+ });
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
index ada3ee0742..b93dc27763 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
@@ -24,7 +24,6 @@ import java.util.Objects;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
import java.util.stream.Stream;
import javax.annotation.Nullable;
@@ -36,7 +35,10 @@ import org.slf4j.LoggerFactory;
import accord.local.Command;
import accord.local.CommandStore;
-import accord.local.CommandsForKey;
+import accord.impl.CommandsForKey;
+import accord.local.SafeCommandStore;
+import accord.local.SafeCommandStore.TestDep;
+import accord.local.SafeCommandStore.TestKind;
import accord.local.Status;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
@@ -50,9 +52,10 @@ import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.concurrent.Future;
import org.assertj.core.util.VisibleForTesting;
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.ANY_DEPS;
-import static accord.local.CommandsForKey.CommandTimeseries.TestDep.WITHOUT;
-import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
+import static accord.local.SafeCommandStore.TestDep.ANY_DEPS;
+import static accord.local.SafeCommandStore.TestDep.WITH;
+import static accord.local.SafeCommandStore.TestKind.Ws;
+import static accord.local.Status.KnownDeps.DepsUnknown;
import static
org.apache.cassandra.service.accord.AccordState.WriteOnly.applyMapChanges;
import static
org.apache.cassandra.service.accord.AccordState.WriteOnly.applySetChanges;
@@ -97,37 +100,25 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
public void applyChanges(AccordCommandsForKey instance)
{
applySetChanges(this, instance, cfk -> cfk.blindWitnessed);
- applyMapChanges(this, instance, cfk -> cfk.uncommitted.map);
- applyMapChanges(this, instance, cfk -> cfk.committedById.map);
- applyMapChanges(this, instance, cfk ->
cfk.committedByExecuteAt.map);
+ applyMapChanges(this, instance, cfk -> cfk.byId.map);
+ applyMapChanges(this, instance, cfk -> cfk.byExecuteAt.map);
}
}
public enum SeriesKind
{
- UNCOMMITTED(Command::txnId),
- COMMITTED_BY_ID(Command::txnId),
- COMMITTED_BY_EXECUTE_AT(Command::executeAt);
-
- private final Function<Command, Timestamp> getTimestamp;
-
- SeriesKind(Function<Command, Timestamp> timestampFunction)
- {
- this.getTimestamp = timestampFunction;
- }
+ BY_ID, BY_EXECUTE_AT;
}
- public class Series<T> implements CommandTimeseries<T>
+ public class Series implements CommandTimeseries
{
public final SeriesKind kind;
public final StoredNavigableMap<Timestamp, ByteBuffer> map;
- private final Function<AccordPartialCommand, T> translate;
- public Series(ReadWrite readWrite, SeriesKind kind,
Function<AccordPartialCommand, T> translate)
+ public Series(ReadWrite readWrite, SeriesKind kind)
{
this.kind = kind;
map = new StoredNavigableMap<>(readWrite);
- this.translate = translate;
}
@Override
@@ -153,24 +144,28 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
return map.getView().isEmpty();
}
- @Override
- public Stream<T> before(Timestamp timestamp, TestKind testKind,
TestDep testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status
status)
+ public <T> T mapReduce(TestKind testKind, TestTimestamp testTimestamp,
Timestamp timestamp,
+ TestDep testDep, @Nullable TxnId depId,
+ @Nullable Status minStatus, @Nullable Status maxStatus,
+ SafeCommandStore.CommandFunction<T, T> map, T
initialValue, T terminalValue)
{
- return idsToCommands(map.getView().headMap(timestamp,
false).values())
- .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
- .filter(cmd -> testDep == ANY_DEPS || (cmd.hasDep(depId) ^
(testDep == WITHOUT)))
- .filter(cmd -> TestStatus.test(cmd.status(), testStatus,
status))
- .map(translate);
- }
- @Override
- public Stream<T> after(Timestamp timestamp, TestKind testKind, TestDep
testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status status)
- {
- return idsToCommands(map.getView().tailMap(timestamp,
false).values())
- .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
- .filter(cmd -> testDep == ANY_DEPS || (cmd.hasDep(depId) ^
(testDep == WITHOUT)))
- .filter(cmd -> TestStatus.test(cmd.status(), testStatus,
status))
- .map(translate);
+ for (ByteBuffer buffer : (testTimestamp == TestTimestamp.BEFORE ?
this.map.getView().headMap(timestamp, false) :
this.map.getView().tailMap(timestamp, false)).values())
+ {
+ AccordPartialCommand cmd =
AccordPartialCommand.serializer.deserialize(AccordCommandsForKey.this,
commandStore, buffer);
+ if (testKind == Ws && cmd.txnId().isRead()) continue;
+ // If we don't have any dependencies, we treat a dependency
filter as a mismatch
+ if (testDep != ANY_DEPS && (cmd.known().deps == DepsUnknown ||
(cmd.deps().contains(depId) != (testDep == WITH))))
+ continue;
+ if (minStatus != null && minStatus.compareTo(cmd.status()) > 0)
+ continue;
+ if (maxStatus != null && maxStatus.compareTo(cmd.status()) < 0)
+ continue;
+ initialValue = map.apply(key, cmd.txnId(), cmd.executeAt(),
initialValue);
+ if (initialValue.equals(terminalValue))
+ break;
+ }
+ return initialValue;
}
@VisibleForTesting
@@ -195,9 +190,8 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
public final StoredLong lastExecutedMicros;
public final StoredValue<Timestamp> lastWriteTimestamp;
public final StoredSet.Navigable<Timestamp> blindWitnessed;
- public final Series<TxnIdWithExecuteAt> uncommitted;
- public final Series<TxnId> committedById;
- public final Series<TxnId> committedByExecuteAt;
+ public final Series byId;
+ public final Series byExecuteAt;
public AccordCommandsForKey(AccordCommandStore commandStore, PartitionKey
key)
{
@@ -208,9 +202,8 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
lastExecutedMicros = new StoredLong(rw());
lastWriteTimestamp = new StoredValue<>(rw());
blindWitnessed = new StoredSet.Navigable<>(rw());
- uncommitted = new Series<>(rw(), SeriesKind.UNCOMMITTED, x -> x);
- committedById = new Series<>(rw(), SeriesKind.COMMITTED_BY_ID,
AccordPartialCommand::txnId);
- committedByExecuteAt = new Series<>(rw(),
SeriesKind.COMMITTED_BY_EXECUTE_AT, AccordPartialCommand::txnId);
+ byId = new Series(rw(), SeriesKind.BY_ID);
+ byExecuteAt = new Series(rw(), SeriesKind.BY_EXECUTE_AT);
}
@Override
@@ -221,9 +214,8 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
&& lastExecutedMicros.isEmpty()
&& lastWriteTimestamp.isEmpty()
&& blindWitnessed.isEmpty()
- && uncommitted.map.isEmpty()
- && committedById.map.isEmpty()
- && committedByExecuteAt.map.isEmpty();
+ && byId.map.isEmpty()
+ && byExecuteAt.map.isEmpty();
}
public void setEmpty()
@@ -233,9 +225,8 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
lastExecutedMicros.setEmpty();
lastWriteTimestamp.setEmpty();
blindWitnessed.setEmpty();
- uncommitted.map.setEmpty();
- committedById.map.setEmpty();
- committedByExecuteAt.map.setEmpty();
+ byId.map.setEmpty();
+ byExecuteAt.map.setEmpty();
}
public AccordCommandsForKey initialize()
@@ -245,9 +236,8 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
lastExecutedMicros.load(Defaults.lastExecutedMicros);
lastWriteTimestamp.load(Defaults.lastWriteTimestamp);
blindWitnessed.load(new TreeSet<>());
- uncommitted.map.load(new TreeMap<>());
- committedById.map.load(new TreeMap<>());
- committedByExecuteAt.map.load(new TreeMap<>());
+ byId.map.load(new TreeMap<>());
+ byExecuteAt.map.load(new TreeMap<>());
return this;
}
@@ -259,9 +249,8 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
|| lastExecutedMicros.hasModifications()
|| lastWriteTimestamp.hasModifications()
|| blindWitnessed.hasModifications()
- || uncommitted.map.hasModifications()
- || committedById.map.hasModifications()
- || committedByExecuteAt.map.hasModifications();
+ || byId.map.hasModifications()
+ || byExecuteAt.map.hasModifications();
}
@Override
@@ -272,9 +261,8 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
lastExecutedMicros.clearModifiedFlag();
lastWriteTimestamp.clearModifiedFlag();
blindWitnessed.clearModifiedFlag();
- uncommitted.map.clearModifiedFlag();
- committedById.map.clearModifiedFlag();
- committedByExecuteAt.map.clearModifiedFlag();
+ byId.map.clearModifiedFlag();
+ byExecuteAt.map.clearModifiedFlag();
}
@Override
@@ -285,9 +273,8 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
&& lastExecutedMicros.isLoaded()
&& lastWriteTimestamp.isLoaded()
&& blindWitnessed.isLoaded()
- && uncommitted.map.isLoaded()
- && committedById.map.isLoaded()
- && committedByExecuteAt.map.isLoaded();
+ && byId.map.isLoaded()
+ && byExecuteAt.map.isLoaded();
}
public CommandStore commandStore()
@@ -310,28 +297,21 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
size += lastExecutedMicros.estimatedSizeOnHeap();
size +=
lastWriteTimestamp.estimatedSizeOnHeap(AccordObjectSizes::timestamp);
size +=
blindWitnessed.estimatedSizeOnHeap(AccordObjectSizes::timestamp);
- size +=
uncommitted.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp,
ByteBufferUtil::estimatedSizeOnHeap);
- size +=
committedById.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp,
ByteBufferUtil::estimatedSizeOnHeap);
- size +=
committedByExecuteAt.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp,
ByteBufferUtil::estimatedSizeOnHeap);
+ size += byId.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp,
ByteBufferUtil::estimatedSizeOnHeap);
+ size +=
byExecuteAt.map.estimatedSizeOnHeap(AccordObjectSizes::timestamp,
ByteBufferUtil::estimatedSizeOnHeap);
return size;
}
@Override
- public Series<TxnIdWithExecuteAt> uncommitted()
+ public Series byId()
{
- return uncommitted;
+ return byId;
}
@Override
- public Series<TxnId> committedById()
+ public Series byExecuteAt()
{
- return committedById;
- }
-
- @Override
- public Series<TxnId> committedByExecuteAt()
- {
- return committedByExecuteAt;
+ return byExecuteAt;
}
@Override
@@ -368,19 +348,9 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
public void updateSummaries(AccordCommand command)
{
- if (command.status().hasBeen(Status.Committed))
- {
- if (command.status.previous() == null ||
!command.status.previous().status.hasBeen(Status.Committed))
- uncommitted.map.blindRemove(command.txnId());
-
- ByteBuffer partialCommand =
AccordPartialCommand.serializer.serialize(new AccordPartialCommand(key,
command));
- committedById.map.blindPut(command.txnId(), partialCommand);
- committedByExecuteAt.map.blindPut(command.executeAt(),
partialCommand);
- }
- else
- { // TODO: somebody is inserting large buffers into this map
(presumably from loading from disk)
- uncommitted.map.blindPut(command.txnId(),
AccordPartialCommand.serializer.serialize(new AccordPartialCommand(key,
command)));
- }
+ ByteBuffer partialCommand =
AccordPartialCommand.serializer.serialize(new AccordPartialCommand(key,
command));
+ byId.map.blindPut(command.txnId(), partialCommand);
+ byExecuteAt.map.blindPut(command.executeAt(), partialCommand);
}
private static long getTimestampMicros(Timestamp timestamp)
@@ -439,15 +409,14 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
&& lastExecutedMicros.equals(that.lastExecutedMicros)
&& lastWriteTimestamp.equals(that.lastWriteTimestamp)
&& blindWitnessed.equals(that.blindWitnessed)
- && uncommitted.map.equals(that.uncommitted.map)
- && committedById.map.equals(that.committedById.map)
- &&
committedByExecuteAt.map.equals(that.committedByExecuteAt.map);
+ && byId.map.equals(that.byId.map)
+ && byExecuteAt.map.equals(that.byExecuteAt.map);
}
@Override
public int hashCode()
{
- return Objects.hash(commandStore, key, blindWitnessed, maxTimestamp,
lastExecutedTimestamp, lastExecutedMicros, lastWriteTimestamp, uncommitted,
committedById, committedByExecuteAt);
+ return Objects.hash(commandStore, key, blindWitnessed, maxTimestamp,
lastExecutedTimestamp, lastExecutedMicros, lastWriteTimestamp, byId,
byExecuteAt);
}
@Override
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index bee00a965b..9148bb4311 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -251,9 +251,8 @@ public class AccordKeyspace
private static boolean hasRegularChanges(AccordCommandsForKey
commandsForKey)
{
- return commandsForKey.uncommitted.map.hasModifications()
- || commandsForKey.committedById.map.hasModifications()
- ||
commandsForKey.committedByExecuteAt.map.hasModifications();
+ return commandsForKey.byId.map.hasModifications()
+ || commandsForKey.byExecuteAt.map.hasModifications();
}
static RegularAndStaticColumns columnsFor(AccordCommandsForKey
commandsForKey)
@@ -617,7 +616,7 @@ public class AccordKeyspace
}
}
- private static void addSeriesMutations(AccordCommandsForKey.Series<?>
series,
+ private static void addSeriesMutations(AccordCommandsForKey.Series series,
PartitionUpdate.Builder
partitionBuilder,
Row.Builder rowBuilder,
long timestampMicros,
@@ -661,9 +660,8 @@ public class AccordKeyspace
int nowInSeconds = (int)
TimeUnit.MICROSECONDS.toSeconds(timestampMicros);
int expectedRows = (CommandsForKeyColumns.hasStaticChanges(cfk) ? 1 :
0)
- + cfk.uncommitted.map.totalModifications()
- + cfk.committedById.map.totalModifications()
- + cfk.committedByExecuteAt.map.totalModifications();
+ + cfk.byId.map.totalModifications()
+ + cfk.byExecuteAt.map.totalModifications();
PartitionUpdate.Builder partitionBuilder = new
PartitionUpdate.Builder(CommandsForKey,
makeKey(cfk),
@@ -700,9 +698,8 @@ public class AccordKeyspace
partitionBuilder.add(rowBuilder.build());
}
- addSeriesMutations(cfk.uncommitted, partitionBuilder, rowBuilder,
timestampMicros, nowInSeconds);
- addSeriesMutations(cfk.committedById, partitionBuilder, rowBuilder,
timestampMicros, nowInSeconds);
- addSeriesMutations(cfk.committedByExecuteAt, partitionBuilder,
rowBuilder, timestampMicros, nowInSeconds);
+ addSeriesMutations(cfk.byId, partitionBuilder, rowBuilder,
timestampMicros, nowInSeconds);
+ addSeriesMutations(cfk.byExecuteAt, partitionBuilder, rowBuilder,
timestampMicros, nowInSeconds);
return new Mutation(partitionBuilder.build());
}
@@ -804,9 +801,8 @@ public class AccordKeyspace
}
Preconditions.checkState(!partitions.hasNext());
- cfk.uncommitted.map.load(seriesMaps.get(SeriesKind.UNCOMMITTED));
-
cfk.committedById.map.load(seriesMaps.get(SeriesKind.COMMITTED_BY_ID));
-
cfk.committedByExecuteAt.map.load(seriesMaps.get(SeriesKind.COMMITTED_BY_EXECUTE_AT));
+ cfk.byId.map.load(seriesMaps.get(SeriesKind.BY_ID));
+ cfk.byExecuteAt.map.load(seriesMaps.get(SeriesKind.BY_EXECUTE_AT));
}
catch (Throwable t)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index ca18f67c75..bef7b43ad2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -89,7 +89,7 @@ public class AccordObjectSizes
public static long seekables(Seekables<?, ?> seekables)
{
- switch (seekables.kindOfContents())
+ switch (seekables.domain())
{
default: throw new AssertionError();
case Key: return keys((Keys) seekables);
@@ -219,7 +219,7 @@ public class AccordObjectSizes
{
long size = EMPTY_WRITES_SIZE;
size += timestamp(writes.executeAt);
- size += keys(writes.keys);
+ size += seekables(writes.keys);
if (writes.write != null)
size += ((TxnWrite) writes.write).estimatedSizeOnHeap();
return size;
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index 2f708c53b4..19f71da75c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -26,8 +26,9 @@ import java.util.Objects;
import accord.api.Key;
import accord.local.Command;
-import accord.local.CommandsForKey;
+import accord.local.SaveStatus;
import accord.local.Status;
+import accord.local.Status.Known;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import org.apache.cassandra.io.util.DataInputBuffer;
@@ -44,18 +45,22 @@ import static
org.apache.cassandra.utils.NullableSerializer.deserializeNullable;
import static org.apache.cassandra.utils.NullableSerializer.serializeNullable;
import static
org.apache.cassandra.utils.NullableSerializer.serializedNullableSize;
-public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt
+public class AccordPartialCommand
{
public static final PartialCommandSerializer serializer = new
PartialCommandSerializer();
+ private final TxnId txnId;
+ private final Timestamp executeAt;
+
// TODO (soon): this should only be a list of TxnId (the deps for the key
we are persisted against); but should also be stored separately and not brought
into memory
private final List<TxnId> deps;
// TODO (soon): we only require this for Accepted; perhaps more tightly
couple query API for efficiency
- private final Status status;
+ private final SaveStatus status;
- AccordPartialCommand(TxnId txnId, Timestamp executeAt, List<TxnId> deps,
Status status)
+ AccordPartialCommand(TxnId txnId, Timestamp executeAt, List<TxnId> deps,
SaveStatus status)
{
- super(txnId, executeAt);
+ this.txnId = txnId;
+ this.executeAt = executeAt;
this.deps = deps;
this.status = status;
}
@@ -64,7 +69,7 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
{
this(command.txnId(), command.executeAt(),
command.partialDeps() == null ? Collections.emptyList() :
command.partialDeps().txnIds(key),
- command.status());
+ command.saveStatus());
}
public TxnId txnId()
@@ -89,7 +94,12 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
public Status status()
{
- return status;
+ return status.status;
+ }
+
+ public Known known()
+ {
+ return status.known;
}
@Override
@@ -111,7 +121,7 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
out.write(version.version);
CommandSerializers.txnId.serialize(command.txnId(), out,
version.msgVersion);
serializeNullable(command.executeAt(), out, version.msgVersion,
CommandSerializers.timestamp);
- CommandSerializers.status.serialize(command.status(), out,
version.msgVersion);
+ CommandSerializers.saveStatus.serialize(command.status, out,
version.msgVersion);
serializeCollection(command.deps, out, version.msgVersion,
CommandSerializers.txnId);
}
@@ -146,7 +156,7 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
return command;
Timestamp executeAt = deserializeNullable(in, version.msgVersion,
CommandSerializers.timestamp);
- Status status = CommandSerializers.status.deserialize(in,
version.msgVersion);
+ SaveStatus status = CommandSerializers.saveStatus.deserialize(in,
version.msgVersion);
List<TxnId> deps = deserializeList(in, version.msgVersion,
CommandSerializers.txnId);
AccordPartialCommand partial = new AccordPartialCommand(txnId,
executeAt, deps, status);
addToContext(partial, context);
@@ -170,7 +180,7 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
int size =
Math.toIntExact(AccordSerializerVersion.serializer.serializedSize(version));
size += CommandSerializers.txnId.serializedSize();
size += serializedNullableSize(command.executeAt(),
version.msgVersion, CommandSerializers.timestamp);
- size += CommandSerializers.status.serializedSize(command.status(),
version.msgVersion);
+ size +=
CommandSerializers.saveStatus.serializedSize(command.status,
version.msgVersion);
size += serializedCollectionSize(command.deps, version.msgVersion,
CommandSerializers.txnId);
return size;
}
diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java
b/src/java/org/apache/cassandra/service/accord/TokenRange.java
index 81cb329f59..a0b6f67ccc 100644
--- a/src/java/org/apache/cassandra/service/accord/TokenRange.java
+++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import accord.api.RoutingKey;
import accord.primitives.Range;
+import accord.primitives.Ranges;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -41,15 +42,15 @@ public class TokenRange extends Range.EndInclusive
}
@Override
- public TokenRange subRange(RoutingKey start, RoutingKey end)
+ public TokenRange newRange(RoutingKey start, RoutingKey end)
{
return new TokenRange((AccordRoutingKey) start, (AccordRoutingKey)
end);
}
@Override
- public RoutingKey someIntersectingRoutingKey()
+ public RoutingKey someIntersectingRoutingKey(Ranges ranges)
{
- RoutingKey pick = startInclusive() ? start() : end();
+ RoutingKey pick = super.someIntersectingRoutingKey(ranges);
if (pick instanceof SentinelKey)
pick = ((SentinelKey) pick).toTokenKey();
return pick;
diff --git
a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
index 8a763f5e61..a0c361a1db 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.TokenRange;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
@@ -154,6 +155,12 @@ public abstract class AccordRoutingKey extends
AccordRoutableKey implements Rout
return TypeSizes.BOOL_SIZE + TypeSizes.sizeof(key.keyspace);
}
};
+
+ @Override
+ public Range asRange()
+ {
+ throw new UnsupportedOperationException();
+ }
}
// final in part because we refer to its class directly in
AccordRoutableKey.compareToe
@@ -161,6 +168,16 @@ public abstract class AccordRoutingKey extends
AccordRoutableKey implements Rout
{
private static final long EMPTY_SIZE;
+ @Override
+ public Range asRange()
+ {
+ AccordRoutingKey before = token.isMinimum()
+ ? new SentinelKey(keyspace, true)
+ : new TokenKey(keyspace,
token.decreaseSlightly());
+
+ return new TokenRange(before, this);
+ }
+
static
{
Token key =
getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER).getToken();
@@ -308,7 +325,7 @@ public abstract class AccordRoutingKey extends
AccordRoutableKey implements Rout
for (int i = 0; i < splits.size(); i++)
{
if (i == results.size()) results.add(Ranges.EMPTY);
- results.set(i, results.get(i).union(splits.get(i)));
+ results.set(i, results.get(i).with(splits.get(i)));
}
}
return results;
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index f03eab2f0d..e302b42489 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -212,13 +212,13 @@ public abstract class AsyncOperation<R> extends
AsyncPromise<R> implements Runna
private static Iterable<PartitionKey> toPartitionKeys(Seekables<?, ?> keys)
{
- switch (keys.kindOfContents())
+ switch (keys.domain())
{
default: throw new AssertionError();
case Key:
return (Iterable<PartitionKey>) keys;
case Range:
- // TODO: implement
+ // TODO (required): implement
throw new UnsupportedOperationException();
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
index 91bf1fed10..609e7ee080 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/AcceptSerializers.java
@@ -45,7 +45,6 @@ public class AcceptSerializers
CommandSerializers.timestamp.serialize(accept.executeAt, out,
version);
KeySerializers.seekables.serialize(accept.keys, out, version);
DepsSerializer.partialDeps.serialize(accept.partialDeps, out,
version);
- CommandSerializers.kind.serialize(accept.kind, out, version);
}
@Override
@@ -55,8 +54,7 @@ public class AcceptSerializers
CommandSerializers.ballot.deserialize(in, version),
CommandSerializers.timestamp.deserialize(in,
version),
KeySerializers.seekables.deserialize(in, version),
- DepsSerializer.partialDeps.deserialize(in, version),
- CommandSerializers.kind.deserialize(in, version));
+ DepsSerializer.partialDeps.deserialize(in, version));
}
@Override
@@ -65,8 +63,7 @@ public class AcceptSerializers
return CommandSerializers.ballot.serializedSize(accept.ballot,
version)
+
CommandSerializers.timestamp.serializedSize(accept.executeAt, version)
+ KeySerializers.seekables.serializedSize(accept.keys,
version)
- +
DepsSerializer.partialDeps.serializedSize(accept.partialDeps, version)
- + CommandSerializers.kind.serializedSize(accept.kind,
version);
+ +
DepsSerializer.partialDeps.serializedSize(accept.partialDeps, version);
}
};
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index 98c1f93eaa..0441c640c8 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -170,7 +170,7 @@ public class CommandSerializers
public void serialize(Writes writes, DataOutputPlus out, int version)
throws IOException
{
timestamp.serialize(writes.executeAt, out, version);
- KeySerializers.keys.serialize(writes.keys, out, version);
+ KeySerializers.seekables.serialize(writes.keys, out, version);
boolean hasWrites = writes.write != null;
out.writeBoolean(hasWrites);
if (hasWrites)
@@ -181,7 +181,7 @@ public class CommandSerializers
public Writes deserialize(DataInputPlus in, int version) throws
IOException
{
return new Writes(timestamp.deserialize(in, version),
- KeySerializers.keys.deserialize(in, version),
+ KeySerializers.seekables.deserialize(in,
version),
in.readBoolean() ?
TxnWrite.serializer.deserialize(in, version) : null);
}
@@ -189,7 +189,7 @@ public class CommandSerializers
public long serializedSize(Writes writes, int version)
{
long size = timestamp.serializedSize(writes.executeAt, version);
- size += KeySerializers.keys.serializedSize(writes.keys, version);
+ size += KeySerializers.seekables.serializedSize(writes.keys,
version);
boolean hasWrites = writes.write != null;
size += TypeSizes.sizeof(hasWrites);
if (hasWrites)
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
index 5d6453f807..078051bfe1 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/KeySerializers.java
@@ -288,7 +288,7 @@ public class KeySerializers
@Override
public void serialize(Seekables<?, ?> t, DataOutputPlus out, int
version) throws IOException
{
- switch (t.kindOfContents())
+ switch (t.domain())
{
default: throw new AssertionError();
case Key:
@@ -317,7 +317,7 @@ public class KeySerializers
@Override
public long serializedSize(Seekables<?, ?> t, int version)
{
- switch (t.kindOfContents())
+ switch (t.domain())
{
default: throw new AssertionError();
case Key:
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
index cce1942a07..c12e92e18b 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/PreacceptSerializers.java
@@ -49,7 +49,7 @@ public class PreacceptSerializers
{
CommandSerializers.partialTxn.serialize(msg.partialTxn, out,
version);
serializeNullable(msg.route, out, version,
KeySerializers.fullRoute);
- out.writeUnsignedVInt(msg.maxEpoch - msg.minEpoch);
+ out.writeUnsignedVInt(msg.maxEpoch - msg.minUnsyncedEpoch);
}
@Override
@@ -67,7 +67,7 @@ public class PreacceptSerializers
{
return
CommandSerializers.partialTxn.serializedSize(msg.partialTxn, version)
+ serializedNullableSize(msg.route, version,
KeySerializers.fullRoute)
- + TypeSizes.sizeofUnsignedVInt(msg.maxEpoch - msg.minEpoch);
+ + TypeSizes.sizeofUnsignedVInt(msg.maxEpoch -
msg.minUnsyncedEpoch);
}
};
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
index 1db73e38ab..c9da12801e 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/TxnRequestSerializer.java
@@ -79,7 +79,7 @@ public abstract class TxnRequestSerializer<T extends
TxnRequest<?>> implements I
void serializeHeader(T msg, DataOutputPlus out, int version) throws
IOException
{
super.serializeHeader(msg, out, version);
- out.writeUnsignedVInt(msg.minEpoch);
+ out.writeUnsignedVInt(msg.minUnsyncedEpoch);
out.writeBoolean(msg.doNotComputeProgressKey);
}
@@ -97,7 +97,7 @@ public abstract class TxnRequestSerializer<T extends
TxnRequest<?>> implements I
long serializedHeaderSize(T msg, int version)
{
long size = super.serializedHeaderSize(msg, version);
- size += TypeSizes.sizeofUnsignedVInt(msg.minEpoch);
+ size += TypeSizes.sizeofUnsignedVInt(msg.minUnsyncedEpoch);
size += TypeSizes.BOOL_SIZE;
return size;
}
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 8eda32ad82..ea11312d72 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import accord.api.Data;
import accord.local.SafeCommandStore;
@@ -37,7 +38,6 @@ import
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.service.accord.AccordCommandsForKey;
import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
@@ -114,8 +114,13 @@ public class TxnNamedRead extends
AbstractSerialized<ReadCommand>
public Future<Data> read(boolean isForWriteTxn, SafeCommandStore
safeStore, Timestamp executeAt)
{
SinglePartitionReadCommand command = (SinglePartitionReadCommand)
get();
- AccordCommandsForKey cfk = (AccordCommandsForKey)
safeStore.commandsForKey(key);
- int nowInSeconds = cfk.nowInSecondsFor(executeAt, isForWriteTxn);
+ // TODO (required, safety): before release, double check reasoning
that this is safe
+// AccordCommandsForKey cfk =
((SafeAccordCommandStore)safeStore).commandsForKey(key);
+// int nowInSeconds = cfk.nowInSecondsFor(executeAt, isForWriteTxn);
+ // It's fine for our nowInSeconds to lag slightly our insertion
timestamp, as to the user
+ // this simply looks like the transaction witnessed TTL'd data and the
data then expired
+ // immediately after the transaction executed, and this simplifies
things a great deal
+ int nowInSeconds = (int)
TimeUnit.MICROSECONDS.toSeconds(executeAt.hlc());
return Stage.READ.submit(() ->
{
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index 1b42635021..eeb57f13f5 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -28,11 +28,11 @@ import com.google.common.collect.ImmutableList;
import accord.api.Data;
import accord.api.DataStore;
-import accord.api.Key;
import accord.api.Read;
import accord.local.SafeCommandStore;
import accord.primitives.Keys;
import accord.primitives.Ranges;
+import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import org.apache.cassandra.db.SinglePartitionReadCommand;
@@ -139,11 +139,11 @@ public class TxnRead extends
AbstractKeySorted<TxnNamedRead> implements Read
if (!reads.contains(namedRead))
reads.add(namedRead);
- return new TxnRead(reads, txnKeys.union(read.keys()));
+ return new TxnRead(reads, txnKeys.with((Keys)read.keys()));
}
@Override
- public Future<Data> read(Key key, Txn.Kind kind, SafeCommandStore
safeStore, Timestamp executeAt, DataStore store)
+ public Future<Data> read(Seekable key, Txn.Kind kind, SafeCommandStore
safeStore, Timestamp executeAt, DataStore store)
{
List<Future<Data>> futures = new ArrayList<>();
forEachWithKey((PartitionKey) key, read ->
futures.add(read.read(kind.isWrite(), safeStore, executeAt)));
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
index ddd57eaf72..f7b6565dbe 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.ObjectSizes;
+import static accord.utils.SortedArrays.Search.CEIL;
import static org.apache.cassandra.service.accord.AccordSerializers.serialize;
import static org.apache.cassandra.utils.ArraySerializers.deserializeArray;
import static org.apache.cassandra.utils.ArraySerializers.serializeArray;
@@ -135,7 +136,7 @@ public class TxnUpdate implements Update
int j = 0;
for (int i = 0 ; i < out.size() ; ++i)
{
- j = in.findNext(out.get(i), j);
+ j = in.findNext(j, out.get(i), CEIL);
result[i] = from[j];
}
return result;
@@ -146,7 +147,7 @@ public class TxnUpdate implements Update
{
// TODO: special method for linear merging keyed and non-keyed lists
simultaneously
TxnUpdate that = (TxnUpdate) update;
- Keys mergedKeys = this.keys.union(that.keys);
+ Keys mergedKeys = this.keys.with(that.keys);
ByteBuffer[] mergedFragments = merge(this.keys, that.keys,
this.fragments, that.fragments, mergedKeys.size());
return new TxnUpdate(mergedKeys, mergedFragments, condition);
}
@@ -226,7 +227,7 @@ public class TxnUpdate implements Update
while (j < mi && toKey.apply(items.get(j)).equals(key))
++j;
- int nextki = keys.findNext(key, ki);
+ int nextki = keys.findNext(ki, key, CEIL);
Arrays.fill(result, ki, nextki, ByteBufferUtil.EMPTY_BYTE_BUFFER);
ki = nextki;
result[ki++] = toSerializedValues(items, i, j, serializer,
MessagingService.current_version);
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
index 42ec0143e3..94593ffc52 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnWrite.java
@@ -34,6 +34,7 @@ import accord.api.DataStore;
import accord.api.Key;
import accord.api.Write;
import accord.local.SafeCommandStore;
+import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.primitives.Writes;
import org.apache.cassandra.concurrent.Stage;
@@ -50,6 +51,7 @@ import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.schema.ColumnMetadata;
+import
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
import org.apache.cassandra.service.accord.AccordCommandsForKey;
import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -340,10 +342,14 @@ public class TxnWrite extends
AbstractKeySorted<TxnWrite.Update> implements Writ
}
@Override
- public Future<Void> apply(Key key, SafeCommandStore safeStore, Timestamp
executeAt, DataStore store)
+ public Future<Void> apply(Seekable key, SafeCommandStore safeStore,
Timestamp executeAt, DataStore store)
{
- AccordCommandsForKey cfk = (AccordCommandsForKey)
safeStore.commandsForKey(key);
+ AccordCommandsForKey cfk = ((SafeAccordCommandStore)
safeStore).commandsForKey((Key)key);
+ // TODO (expected, efficiency): 99.9999% of the time we can just use
executeAt.hlc(), so can avoid bringing
+ // cfk into memory by retaining at all times in memory key ranges
that are dirty and must use this logic;
+ // any that aren't can just use executeAt.hlc
long timestamp = cfk.timestampMicrosFor(executeAt, true);
+ // TODO (low priority - do we need to compute nowInSeconds, or can we
just use executeAt?)
int nowInSeconds = cfk.nowInSecondsFor(executeAt, true);
List<Future<?>> futures = new ArrayList<>();
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index 1f6a3b2b6f..6d89246593 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -28,7 +28,6 @@ import org.junit.Test;
import accord.api.Key;
import accord.api.RoutingKey;
import accord.local.Command;
-import accord.local.CommandsForKey;
import accord.local.Node;
import accord.local.PreLoadContext;
import accord.local.Status;
@@ -113,11 +112,10 @@ public class AccordCommandTest
Assert.assertEquals(Status.PreAccepted, command.status());
Assert.assertTrue(command.partialDeps().isEmpty());
- CommandsForKey cfk = instance.commandsForKey(key(1));
+ AccordCommandsForKey cfk =
((SafeAccordCommandStore)instance).commandsForKey(key(1));
Assert.assertEquals(txnId, cfk.max());
-
Assert.assertNotNull(((AccordCommandsForKey.Series<?>)cfk.uncommitted()).get(txnId));
-
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.committedById()).get(txnId));
-
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.committedByExecuteAt()).get(txnId));
+ Assert.assertNotNull((cfk.byId()).get(txnId));
+ Assert.assertNotNull((cfk.byExecuteAt()).get(txnId));
}).get();
// check accept
@@ -129,7 +127,7 @@ public class AccordCommandTest
builder.add(key, txnId2);
deps = builder.build();
}
- Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1,
false, Ballot.ZERO, executeAt, partialTxn.keys(), deps, partialTxn.kind());
+ Accept accept = Accept.SerializerSupport.create(txnId, route, 1, 1,
false, Ballot.ZERO, executeAt, partialTxn.keys(), deps);
commandStore.execute(accept, instance -> {
Accept.AcceptReply reply = accept.apply(instance);
@@ -143,11 +141,10 @@ public class AccordCommandTest
Assert.assertEquals(Status.Accepted, command.status());
Assert.assertEquals(deps, command.partialDeps());
- CommandsForKey cfk = instance.commandsForKey(key(1));
+ AccordCommandsForKey cfk =
((SafeAccordCommandStore)instance).commandsForKey(key(1));
Assert.assertEquals(executeAt, cfk.max());
-
Assert.assertNotNull(((AccordCommandsForKey.Series<?>)cfk.uncommitted()).get(txnId));
-
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.committedById()).get(txnId));
-
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.committedByExecuteAt()).get(txnId));
+ Assert.assertNotNull((cfk.byId()).get(txnId));
+ Assert.assertNotNull((cfk.byExecuteAt()).get(txnId));
}).get();
// check commit
@@ -160,10 +157,9 @@ public class AccordCommandTest
Assert.assertTrue(command.hasBeen(Status.Committed));
Assert.assertEquals(commit.partialDeps, command.partialDeps());
- CommandsForKey cfk = instance.commandsForKey(key(1));
-
Assert.assertNull(((AccordCommandsForKey.Series<?>)cfk.uncommitted()).get(txnId));
-
Assert.assertNotNull(((AccordCommandsForKey.Series<?>)cfk.committedById()).get(txnId));
-
Assert.assertNotNull(((AccordCommandsForKey.Series<?>)cfk.committedByExecuteAt()).get(commit.executeAt));
+ AccordCommandsForKey cfk =
((SafeAccordCommandStore)instance).commandsForKey(key(1));
+ Assert.assertNotNull((cfk.byId()).get(txnId));
+ Assert.assertNotNull((cfk.byExecuteAt()).get(commit.executeAt));
}).get();
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index 87a00fce02..470df82832 100644
---
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -30,7 +30,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import accord.local.Command;
-import accord.local.CommandsForKey;
import accord.local.SafeCommandStore;
import accord.local.Status;
import accord.primitives.Keys;
@@ -47,6 +46,8 @@ import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.AccordCommand;
import org.apache.cassandra.service.accord.AccordCommandStore;
+import
org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore;
+import org.apache.cassandra.service.accord.AccordCommandsForKey;
import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordStateCache;
import org.apache.cassandra.service.accord.api.PartitionKey;
@@ -109,8 +110,8 @@ public class AsyncOperationTest
Txn txn = createTxn((int)clock.incrementAndGet());
PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
- commandStore.execute(contextFor(Collections.emptyList(),
Keys.of(key)), instance -> {
- CommandsForKey cfk = instance.maybeCommandsForKey(key);
+ commandStore.execute(contextFor(Collections.emptyList(),
Keys.of(key)),instance -> {
+ AccordCommandsForKey cfk =
((SafeAccordCommandStore)instance).maybeCommandsForKey(key);
Assert.assertNull(cfk);
}).get();
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
index 5c8d72f5ba..8526daa675 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
@@ -141,9 +141,8 @@ public class AsyncWriterTest
AccordCommandsForKey cfk = new AccordCommandsForKey(commandStore,
key).initialize();
AccordKeyspace.getCommandsForKeyMutation(commandStore, cfk,
commandStore.nextSystemTimestampMicros()).apply();
- Assert.assertTrue(cfk.uncommitted.isEmpty());
- Assert.assertTrue(cfk.committedByExecuteAt.isEmpty());
- Assert.assertTrue(cfk.committedById.isEmpty());
+ Assert.assertTrue(cfk.byExecuteAt.isEmpty());
+ Assert.assertTrue(cfk.byId.isEmpty());
AccordCommand command = new AccordCommand(txnId).initialize();
command.setPartialTxn(txn.slice(ranges, true));
@@ -157,13 +156,11 @@ public class AsyncWriterTest
execute(commandStore, () -> {
AsyncContext ctx = new AsyncContext();
commandStore.setContext(ctx);
- AccordPartialCommand summary =
getOnlyElement(cfkUncommitted.uncommitted().all().collect(Collectors.toList()));
-
Assert.assertTrue(cfkUncommitted.uncommitted.map.getView().containsKey(txnId));
+ AccordPartialCommand summary =
getOnlyElement(cfkUncommitted.byId().all().collect(Collectors.toList()));
+
Assert.assertTrue(cfkUncommitted.byId.map.getView().containsKey(txnId));
+
Assert.assertTrue(cfkUncommitted.byExecuteAt.map.getView().containsKey(executeAt));
Assert.assertEquals(Status.Accepted, summary.status());
Assert.assertEquals(executeAt, summary.executeAt());
-
- Assert.assertTrue(cfkUncommitted.committedByExecuteAt.isEmpty());
- Assert.assertTrue(cfkUncommitted.committedById.isEmpty());
commandStore.unsetContext(ctx);
});
@@ -177,17 +174,15 @@ public class AsyncWriterTest
execute(commandStore, () -> {
AsyncContext ctx = new AsyncContext();
commandStore.setContext(ctx);
- AccordPartialCommand idSummary =
getOnlyElement(cfkCommitted.committedById().all().collect(Collectors.toList()));
- AccordPartialCommand executeSummary =
getOnlyElement(cfkCommitted.committedByExecuteAt().all().collect(Collectors.toList()));
+ AccordPartialCommand idSummary =
getOnlyElement(cfkCommitted.byId().all().collect(Collectors.toList()));
+ AccordPartialCommand executeSummary =
getOnlyElement(cfkCommitted.byExecuteAt().all().collect(Collectors.toList()));
-
Assert.assertTrue(cfkCommitted.committedById.map.getView().containsKey(txnId));
-
Assert.assertTrue(cfkCommitted.committedByExecuteAt.map.getView().containsKey(executeAt));
+
Assert.assertTrue(cfkCommitted.byId.map.getView().containsKey(txnId));
+
Assert.assertTrue(cfkCommitted.byExecuteAt.map.getView().containsKey(executeAt));
Assert.assertEquals(idSummary, executeSummary);
Assert.assertEquals(Status.Committed, idSummary.status());
Assert.assertEquals(executeAt, idSummary.executeAt());
-
- Assert.assertTrue(cfkCommitted.uncommitted.isEmpty());
commandStore.unsetContext(ctx);
});
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]