This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit ec23e3cc4b82a4ef0fe7ec157ba7261578c9f02f Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Mon Jan 9 12:28:26 2023 +0000 Refactor Timestamp/TxnId - Combine real and logical into a single 64-but HLC - Introduce 16 flag bits - Pack epoch (48-bits), HLC (64-bits) and flags (16-bits) into two longs in memory patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-18172 --- .build/include-accord.sh | 2 +- .../cassandra/service/accord/AccordCommand.java | 14 +-------- .../service/accord/AccordCommandStore.java | 6 ++-- .../service/accord/AccordCommandsForKey.java | 8 ++--- .../cassandra/service/accord/AccordKeyspace.java | 36 +++++++++++----------- .../service/accord/AccordObjectSizes.java | 2 +- .../service/accord/AccordPartialCommand.java | 20 +++--------- .../cassandra/service/accord/api/AccordAgent.java | 2 +- .../service/accord/async/AsyncWriter.java | 5 +-- .../accord/serializers/CommandSerializers.java | 32 ++++++++----------- .../service/accord/serializers/DepsSerializer.java | 3 ++ .../service/accord/AccordCommandStoreTest.java | 34 ++++++++++---------- .../service/accord/AccordCommandTest.java | 10 +++--- .../cassandra/service/accord/AccordTestUtils.java | 19 ++++++------ .../service/accord/async/AsyncLoaderTest.java | 18 +++++------ .../service/accord/async/AsyncOperationTest.java | 4 +-- .../service/accord/async/AsyncWriterTest.java | 12 ++++---- 17 files changed, 101 insertions(+), 126 deletions(-) diff --git a/.build/include-accord.sh b/.build/include-accord.sh index 37bdcbe079..2144e80b17 100755 --- a/.build/include-accord.sh +++ b/.build/include-accord.sh @@ -25,7 +25,7 @@ set -o nounset bin="$(cd "$(dirname "$0")" > /dev/null; pwd)" accord_repo='https://github.com/apache/cassandra-accord.git' -accord_branch='804a77d32c8ae45751a3a7f450b372560f08cacc' +accord_branch='ad326d5df8d99d4799fa87de81482e3cb1fb92de' accord_src="$bin/cassandra-accord" checkout() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java index 2003e77ae1..5b4a8c2e9b 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java @@ -532,18 +532,6 @@ public class AccordCommand extends Command implements AccordState<TxnId> return executeAt.get(); } - @Override - public Txn.Kind kind() - { - return kind.get(); - } - - @Override - public void setKind(Txn.Kind kind) - { - this.kind.set(kind); - } - @Override public void setExecuteAt(Timestamp timestamp) { @@ -636,7 +624,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> private boolean canApplyWithCurrentScope(SafeCommandStore safeStore) { - Ranges ranges = safeStore.ranges().at(executeAt().epoch); + Ranges ranges = safeStore.ranges().at(executeAt().epoch()); Seekables<?, ?> keys = partialTxn().keys(); for (int i=0,mi=keys.size(); i<mi; i++) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index f2b54492fa..96c2a26c35 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -184,7 +184,7 @@ public class AccordCommandStore extends CommandStore public void forEach(Routable keyOrRange, Consumer<CommandsForKey> forEach) { - switch (keyOrRange.kind()) + switch (keyOrRange.domain()) { default: throw new AssertionError(); case Key: @@ -217,7 +217,7 @@ public class AccordCommandStore extends CommandStore @Override public void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach) { - switch (keyOrRange.kind()) + switch (keyOrRange.domain()) { default: throw new AssertionError(); case Key: @@ -272,7 +272,7 @@ public class AccordCommandStore extends CommandStore { Timestamp max = maxConflict(keys); long epoch = latestEpoch(); - if (txnId.compareTo(max) > 0 && txnId.epoch >= epoch && !agent.isExpired(txnId, time.now())) + if (txnId.compareTo(max) > 0 && txnId.epoch() >= epoch && !agent.isExpired(txnId, time.now())) return txnId; return time.uniqueNow(max); diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java index 8a1715384e..437c3b5f66 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java @@ -53,7 +53,7 @@ import org.assertj.core.util.VisibleForTesting; import static accord.local.CommandsForKey.CommandTimeseries.TestDep.ANY_DEPS; import static accord.local.CommandsForKey.CommandTimeseries.TestDep.WITHOUT; import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs; -import static accord.primitives.Txn.Kind.WRITE; +import static accord.primitives.Txn.Kind.Write; import static org.apache.cassandra.service.accord.AccordState.WriteOnly.applyMapChanges; import static org.apache.cassandra.service.accord.AccordState.WriteOnly.applySetChanges; @@ -158,7 +158,7 @@ public class AccordCommandsForKey extends CommandsForKey implements AccordState< public Stream<T> before(Timestamp timestamp, TestKind testKind, TestDep testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status status) { return idsToCommands(map.getView().headMap(timestamp, false).values()) - .filter(cmd -> testKind == RorWs || cmd.kind() == WRITE) + .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite()) .filter(cmd -> testDep == ANY_DEPS || (cmd.hasDep(depId) ^ (testDep == WITHOUT))) .filter(cmd -> TestStatus.test(cmd.status(), testStatus, status)) .map(translate); @@ -168,7 +168,7 @@ public class AccordCommandsForKey extends CommandsForKey implements AccordState< public Stream<T> after(Timestamp timestamp, TestKind testKind, TestDep testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status status) { return idsToCommands(map.getView().tailMap(timestamp, false).values()) - .filter(cmd -> testKind == RorWs || cmd.kind() == WRITE) + .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite()) .filter(cmd -> testDep == ANY_DEPS || (cmd.hasDep(depId) ^ (testDep == WITHOUT))) .filter(cmd -> TestStatus.test(cmd.status(), testStatus, status)) .map(translate); @@ -386,7 +386,7 @@ public class AccordCommandsForKey extends CommandsForKey implements AccordState< private static long getTimestampMicros(Timestamp timestamp) { - return timestamp.real + timestamp.logical; + return timestamp.hlc(); } private void maybeUpdatelastTimestamp(Timestamp executeAt, boolean isForWriteTxn) diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 12da1ffc10..e17172de66 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -122,8 +122,8 @@ public class AccordKeyspace public static final String COMMANDS = "commands"; public static final String COMMANDS_FOR_KEY = "commands_for_key"; - private static final String TIMESTAMP_TUPLE = "tuple<bigint, bigint, int, bigint>"; - private static final TupleType TIMESTAMP_TYPE = new TupleType(Lists.newArrayList(LongType.instance, LongType.instance, Int32Type.instance, LongType.instance)); + private static final String TIMESTAMP_TUPLE = "tuple<bigint, bigint, bigint>"; + private static final TupleType TIMESTAMP_TYPE = new TupleType(Lists.newArrayList(LongType.instance, LongType.instance, LongType.instance)); private static final String KEY_TUPLE = "tuple<uuid, blob>"; private static final ClusteringIndexFilter FULL_PARTITION = new ClusteringIndexSliceFilter(Slices.ALL, false); @@ -328,7 +328,7 @@ public class AccordKeyspace NavigableMap<Timestamp, TxnId> result = new TreeMap<>(); for (Map.Entry<ByteBuffer, ByteBuffer> entry : serialized.entrySet()) - result.put(deserializeTimestampOrNull(entry.getKey(), Timestamp::new), deserializeTimestampOrNull(entry.getValue(), TxnId::new)); + result.put(deserializeTimestampOrNull(entry.getKey(), Timestamp::fromBits), deserializeTimestampOrNull(entry.getValue(), TxnId::fromBits)); return result; } @@ -346,7 +346,7 @@ public class AccordKeyspace private static NavigableSet<TxnId> deserializeTxnIdNavigableSet(UntypedResultSet.Row row, String name) { - return deserializeTimestampSet(row.getSet(name, BytesType.instance), TreeSet::new, TxnId::new); + return deserializeTimestampSet(row.getSet(name, BytesType.instance), TreeSet::new, TxnId::fromBits); } private static DeterministicIdentitySet<ListenerProxy> deserializeListeners(Set<ByteBuffer> serialized) throws IOException @@ -521,12 +521,12 @@ public class AccordKeyspace private static ByteBuffer serializeTimestamp(Timestamp timestamp) { - return TupleType.buildValue(new ByteBuffer[]{bytes(timestamp.epoch), bytes(timestamp.real), bytes(timestamp.logical), bytes(timestamp.node.id)}); + return TupleType.buildValue(new ByteBuffer[]{bytes(timestamp.msb), bytes(timestamp.lsb), bytes(timestamp.node.id)}); } public interface TimestampFactory<T extends Timestamp> { - T create(long epoch, long real, int logical, Node.Id node); + T create(long msb, long lsb, Node.Id node); } public static <T extends Timestamp> T deserializeTimestampOrNull(ByteBuffer bytes, TimestampFactory<T> factory) @@ -534,7 +534,7 @@ public class AccordKeyspace if (bytes == null || ByteBufferAccessor.instance.isEmpty(bytes)) return null; ByteBuffer[] split = TIMESTAMP_TYPE.split(ByteBufferAccessor.instance, bytes); - return factory.create(split[0].getLong(), split[1].getLong(), split[2].getInt(), new Node.Id(split[3].getLong())); + return factory.create(split[0].getLong(), split[1].getLong(), new Node.Id(split[2].getLong())); } private static <T extends Timestamp> T deserializeTimestampOrNull(UntypedResultSet.Row row, String name, TimestampFactory<T> factory) @@ -561,11 +561,11 @@ public class AccordKeyspace { String cql = "SELECT * FROM %s.%s " + "WHERE store_id = ? " + - "AND txn_id=(?, ?, ?, ?)"; + "AND txn_id=(?, ?, ?)"; return executeOnceInternal(String.format(cql, ACCORD_KEYSPACE_NAME, COMMANDS), commandStore.id(), - txnId.epoch, txnId.real, txnId.logical, txnId.node.id); + txnId.msb, txnId.lsb, txnId.node.id); } public static void loadCommand(AccordCommandStore commandStore, AccordCommand command) @@ -585,7 +585,7 @@ public class AccordKeyspace try { UntypedResultSet.Row row = result.one(); - Preconditions.checkState(deserializeTimestampOrNull(row, "txn_id", TxnId::new).equals(txnId)); + Preconditions.checkState(deserializeTimestampOrNull(row, "txn_id", TxnId::fromBits).equals(txnId)); command.status.load(SaveStatus.values()[row.getInt("status")]); command.homeKey.load(deserializeOrNull(row.getBlob("home_key"), CommandsSerializers.routingKey)); command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"), CommandsSerializers.routingKey)); @@ -594,9 +594,9 @@ public class AccordKeyspace command.durability.load(Status.Durability.values()[row.getInt("durability", 0)]); command.partialTxn.load(deserializeOrNull(row.getBlob("txn"), CommandsSerializers.partialTxn)); command.kind.load(row.has("kind") ? Txn.Kind.values()[row.getInt("kind")] : null); - command.executeAt.load(deserializeTimestampOrNull(row, "execute_at", Timestamp::new)); - command.promised.load(deserializeTimestampOrNull(row, "promised_ballot", Ballot::new)); - command.accepted.load(deserializeTimestampOrNull(row, "accepted_ballot", Ballot::new)); + command.executeAt.load(deserializeTimestampOrNull(row, "execute_at", Timestamp::fromBits)); + command.promised.load(deserializeTimestampOrNull(row, "promised_ballot", Ballot::fromBits)); + command.accepted.load(deserializeTimestampOrNull(row, "accepted_ballot", Ballot::fromBits)); command.partialDeps.load(deserializeOrNull(row.getBlob("dependencies"), CommandsSerializers.partialDeps)); command.writes.load(deserializeWithVersionOr(row, "writes", CommandsSerializers.writes, () -> null)); command.result.load(deserializeWithVersionOr(row, "result", CommandsSerializers.result, () -> null)); @@ -769,11 +769,11 @@ public class AccordKeyspace // empty static row will be interpreted as all null cells which will cause everything to be initialized Row staticRow = partition.staticRow(); Cell<?> cell = staticRow.getCell(CommandsForKeyColumns.max_timestamp); - cfk.maxTimestamp.load(cell != null && !cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell), Timestamp::new) + cfk.maxTimestamp.load(cell != null && !cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell), Timestamp::fromBits) : AccordCommandsForKey.Defaults.maxTimestamp); cell = staticRow.getCell(CommandsForKeyColumns.last_executed_timestamp); - cfk.lastExecutedTimestamp.load(cell != null && !cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell), Timestamp::new) + cfk.lastExecutedTimestamp.load(cell != null && !cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell), Timestamp::fromBits) : AccordCommandsForKey.Defaults.lastExecutedTimestamp); cell = staticRow.getCell(CommandsForKeyColumns.last_executed_micros); @@ -782,13 +782,13 @@ public class AccordKeyspace : AccordCommandsForKey.Defaults.lastExecutedMicros); cell = staticRow.getCell(CommandsForKeyColumns.last_write_timestamp); - cfk.lastWriteTimestamp.load(cell != null && !cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell), Timestamp::new) + cfk.lastWriteTimestamp.load(cell != null && !cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell), Timestamp::fromBits) : AccordCommandsForKey.Defaults.lastWriteTimestamp); TreeSet<Timestamp> blindWitnessed = new TreeSet<>(); ComplexColumnData cmplx = staticRow.getComplexColumnData(CommandsForKeyColumns.blind_witnessed); if (cmplx != null) - cmplx.forEach(c -> blindWitnessed.add(deserializeTimestampOrNull(c.path().get(0), Timestamp::new))); + cmplx.forEach(c -> blindWitnessed.add(deserializeTimestampOrNull(c.path().get(0), Timestamp::fromBits))); cfk.blindWitnessed.load(blindWitnessed); while (partition.hasNext()) @@ -796,7 +796,7 @@ public class AccordKeyspace Row row = partition.next(); Clustering<?> clustering = row.clustering(); int ordinal = Int32Type.instance.compose(clusteringValue(clustering, 0)); - Timestamp timestamp = deserializeTimestampOrNull(clusteringValue(clustering, 1), Timestamp::new); + Timestamp timestamp = deserializeTimestampOrNull(clusteringValue(clustering, 1), Timestamp::fromBits); ByteBuffer data = cellValue(row, CommandsForKeyColumns.data); if (data == null) continue; diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java index 7f7a8a86e3..0246ee6780 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java +++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java @@ -182,7 +182,7 @@ public class AccordObjectSizes return size; } - private static final long TIMESTAMP_SIZE = ObjectSizes.measureDeep(new Timestamp(0, 0, 0, new Node.Id(0))); + private static final long TIMESTAMP_SIZE = ObjectSizes.measureDeep(Timestamp.fromBits(0, 0, new Node.Id(0))); public static long timestamp() { diff --git a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java index 5036e2b57c..b3ef93d89b 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java +++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java @@ -29,7 +29,6 @@ import accord.local.Command; import accord.local.CommandsForKey; import accord.local.Status; import accord.primitives.Timestamp; -import accord.primitives.Txn; import accord.primitives.TxnId; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; @@ -53,21 +52,19 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt private final List<TxnId> deps; // TODO (soon): we only require this for Accepted; perhaps more tightly couple query API for efficiency private final Status status; - private final Txn.Kind kind; - AccordPartialCommand(TxnId txnId, Timestamp executeAt, List<TxnId> deps, Status status, Txn.Kind kind) + AccordPartialCommand(TxnId txnId, Timestamp executeAt, List<TxnId> deps, Status status) { super(txnId, executeAt); this.deps = deps; this.status = status; - this.kind = kind; } public AccordPartialCommand(Key key, Command command) { this(command.txnId(), command.executeAt(), command.partialDeps() == null ? Collections.emptyList() : command.partialDeps().txnIds(key), - command.status(), command.kind()); + command.status()); } public TxnId txnId() @@ -95,11 +92,6 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt return status; } - public Txn.Kind kind() - { - return kind; - } - @Override public boolean equals(Object obj) { @@ -109,8 +101,7 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt return txnId.equals(that.txnId) && Objects.equals(executeAt, that.executeAt) && Objects.equals(deps, that.deps) - && status == that.status - && kind == that.kind; + && status == that.status; } public static class PartialCommandSerializer @@ -121,7 +112,6 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt CommandSerializers.txnId.serialize(command.txnId(), out, version.msgVersion); serializeNullable(command.executeAt(), out, version.msgVersion, CommandSerializers.timestamp); CommandSerializers.status.serialize(command.status(), out, version.msgVersion); - CommandSerializers.kind.serialize(command.kind(), out, version.msgVersion); serializeCollection(command.deps, out, version.msgVersion, CommandSerializers.txnId); } @@ -157,9 +147,8 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt Timestamp executeAt = deserializeNullable(in, version.msgVersion, CommandSerializers.timestamp); Status status = CommandSerializers.status.deserialize(in, version.msgVersion); - Txn.Kind kind = CommandSerializers.kind.deserialize(in, version.msgVersion); List<TxnId> deps = deserializeList(in, version.msgVersion, CommandSerializers.txnId); - AccordPartialCommand partial = new AccordPartialCommand(txnId, executeAt, deps, status, kind); + AccordPartialCommand partial = new AccordPartialCommand(txnId, executeAt, deps, status); addToContext(partial, context); return partial; } @@ -182,7 +171,6 @@ public class AccordPartialCommand extends CommandsForKey.TxnIdWithExecuteAt size += CommandSerializers.txnId.serializedSize(); size += serializedSizeNullable(command.executeAt(), version.msgVersion, CommandSerializers.timestamp); size += CommandSerializers.status.serializedSize(command.status(), version.msgVersion); - size += CommandSerializers.kind.serializedSize(command.kind(), version.msgVersion); size += serializedCollectionSize(command.deps, version.msgVersion, CommandSerializers.txnId); return size; } diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java index f8a8832a5d..31db8b9646 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java @@ -60,6 +60,6 @@ public class AccordAgent implements Agent public boolean isExpired(TxnId initiated, long now) { // TODO: should distinguish between reads and writes - return now - initiated.real > getReadRpcTimeout(MICROSECONDS); + return now - initiated.hlc() > getReadRpcTimeout(MICROSECONDS); } } diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java index cd8aca5ab5..c920a0f7bb 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java @@ -32,7 +32,6 @@ import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.primitives.Routable; import accord.primitives.Seekable; import accord.primitives.Timestamp; import accord.primitives.TxnId; @@ -50,6 +49,8 @@ import org.apache.cassandra.service.accord.store.StoredSet; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.FutureCombiner; +import static accord.primitives.Routable.Domain.Range; + public class AsyncWriter { private static final Logger logger = LoggerFactory.getLogger(AsyncWriter.class); @@ -263,7 +264,7 @@ public class AsyncWriter for (Seekable key : command.partialTxn().keys()) { // TODO: implement - if (key.kind() == Routable.Kind.Range) + if (key.domain() == Range) throw new UnsupportedOperationException(); PartitionKey partitionKey = (PartitionKey) key; AccordCommandsForKey cfk = cfkForDenormalization(partitionKey, context); diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java index e4e589d87b..98c1f93eaa 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java @@ -48,16 +48,16 @@ public class CommandSerializers { private CommandSerializers() {} - public static final TimestampSerializer<TxnId> txnId = new TimestampSerializer<>(TxnId::new); - public static final TimestampSerializer<Timestamp> timestamp = new TimestampSerializer<>(Timestamp::new); - public static final TimestampSerializer<Ballot> ballot = new TimestampSerializer<>(Ballot::new); + public static final TimestampSerializer<TxnId> txnId = new TimestampSerializer<>(TxnId::fromBits); + public static final TimestampSerializer<Timestamp> timestamp = new TimestampSerializer<>(Timestamp::fromBits); + public static final TimestampSerializer<Ballot> ballot = new TimestampSerializer<>(Ballot::fromBits); public static final EnumSerializer<Txn.Kind> kind = new EnumSerializer<>(Txn.Kind.class); public static class TimestampSerializer<T extends Timestamp> implements IVersionedSerializer<T> { interface Factory<T extends Timestamp> { - T create(long epoch, long real, int logical, Node.Id node); + T create(long msb, long lsb, Node.Id node); } private final TimestampSerializer.Factory<T> factory; @@ -70,18 +70,16 @@ public class CommandSerializers @Override public void serialize(T ts, DataOutputPlus out, int version) throws IOException { - out.writeLong(ts.epoch); - out.writeLong(ts.real); - out.writeInt(ts.logical); + out.writeLong(ts.msb); + out.writeLong(ts.lsb); TopologySerializers.nodeId.serialize(ts.node, out, version); } public <V> int serialize(T ts, V dst, ValueAccessor<V> accessor, int offset) { int position = offset; - position += accessor.putLong(dst, position, ts.epoch); - position += accessor.putLong(dst, position, ts.real); - position += accessor.putInt(dst, position, ts.logical); + position += accessor.putLong(dst, position, ts.msb); + position += accessor.putLong(dst, position, ts.lsb); position += TopologySerializers.nodeId.serialize(ts.node, dst, accessor, position); int size = position - offset; Preconditions.checkState(size == serializedSize()); @@ -93,20 +91,17 @@ public class CommandSerializers { return factory.create(in.readLong(), in.readLong(), - in.readInt(), TopologySerializers.nodeId.deserialize(in, version)); } public <V> T deserialize(V src, ValueAccessor<V> accessor, int offset) { - long epoch = accessor.getLong(src, offset); + long msb = accessor.getLong(src, offset); offset += TypeSizes.LONG_SIZE; - long real = accessor.getLong(src, offset); + long lsb = accessor.getLong(src, offset); offset += TypeSizes.LONG_SIZE; - int logical = accessor.getInt(src, offset); - offset += TypeSizes.INT_SIZE; Node.Id node = TopologySerializers.nodeId.deserialize(src, accessor, offset); - return factory.create(epoch, real, logical, node); + return factory.create(msb, lsb, node); } @Override @@ -117,9 +112,8 @@ public class CommandSerializers public int serializedSize() { - return TypeSizes.LONG_SIZE + // ts.epoch - TypeSizes.LONG_SIZE + // ts.real - TypeSizes.INT_SIZE + // ts.logical + return TypeSizes.LONG_SIZE + // ts.msb + TypeSizes.LONG_SIZE + // ts.lsb TopologySerializers.nodeId.serializedSize(); // ts.node } } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java index 22b80554f1..083315bc7c 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java @@ -89,12 +89,15 @@ public abstract class DepsSerializer<D extends Deps> implements IVersionedSerial public D deserialize(DataInputPlus in, int version) throws IOException { Keys keys = KeySerializers.keys.deserialize(in, version); + TxnId[] txnIds = new TxnId[(int) in.readUnsignedVInt()]; for (int i=0; i<txnIds.length; i++) txnIds[i] = CommandSerializers.txnId.deserialize(in, version); + int[] keyToTxnIds = new int[(int) in.readUnsignedVInt()]; for (int i=0; i<keyToTxnIds.length; i++) keyToTxnIds[i] = (int) in.readUnsignedVInt(); + return deserialize(keys, txnIds, keyToTxnIds, in, version); } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java index 2cd933c7b4..9f49138c8b 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java @@ -82,22 +82,22 @@ public class AccordCommandStoreTest AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(depTxn.covering(), false); - builder.add(key, txnId(1, clock.incrementAndGet(), 0, 1)); + builder.add(key, txnId(1, clock.incrementAndGet(), 1)); PartialDeps dependencies = builder.build(); QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES (0, 0, 1)"); - TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 0, 1); - TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 0, 1); - TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 0, 1); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1); + TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1); + TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); AccordCommand command = new AccordCommand(txnId).initialize(); command.setPartialTxn(createPartialTxn(0)); command.homeKey(key.toUnseekable()); command.progressKey(key.toUnseekable()); command.setDurability(Durable); - command.setPromised(ballot(1, clock.incrementAndGet(), 0, 1)); - command.setAccepted(ballot(1, clock.incrementAndGet(), 0, 1)); - command.setExecuteAt(timestamp(1, clock.incrementAndGet(), 0, 1)); + command.setPromised(ballot(1, clock.incrementAndGet(), 1)); + command.setAccepted(ballot(1, clock.incrementAndGet(), 1)); + command.setExecuteAt(timestamp(1, clock.incrementAndGet(), 1)); command.setPartialDeps(dependencies); command.setStatus(Status.Accepted); command.addWaitingOnCommit(oldTxnId1); @@ -119,26 +119,26 @@ public class AccordCommandStoreTest { AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - Timestamp maxTimestamp = timestamp(1, clock.incrementAndGet(), 0, 1); + Timestamp maxTimestamp = timestamp(1, clock.incrementAndGet(), 1); PartialTxn txn = createPartialTxn(1); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); - TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1); - TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1); + TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1); AccordCommand command1 = new AccordCommand(txnId1).initialize(); AccordCommand command2 = new AccordCommand(txnId2).initialize(); command1.setPartialTxn(txn); command2.setPartialTxn(txn); - command1.setExecuteAt(timestamp(1, clock.incrementAndGet(), 0, 1)); - command2.setExecuteAt(timestamp(1, clock.incrementAndGet(), 0, 1)); + command1.setExecuteAt(timestamp(1, clock.incrementAndGet(), 1)); + command2.setExecuteAt(timestamp(1, clock.incrementAndGet(), 1)); AccordCommandsForKey cfk = new AccordCommandsForKey(commandStore, key).initialize(); cfk.updateMax(maxTimestamp); - Assert.assertEquals(txnId1.real, cfk.timestampMicrosFor(txnId1, true)); - Assert.assertEquals(txnId2.real, cfk.timestampMicrosFor(txnId2, true)); + Assert.assertEquals(txnId1.hlc(), cfk.timestampMicrosFor(txnId1, true)); + Assert.assertEquals(txnId2.hlc(), cfk.timestampMicrosFor(txnId2, true)); Assert.assertEquals(txnId2, cfk.lastExecutedTimestamp.get()); - Assert.assertEquals(txnId2.real, cfk.lastExecutedMicros.get()); + Assert.assertEquals(txnId2.hlc(), cfk.lastExecutedMicros.get()); cfk.register(command1); cfk.register(command2); @@ -165,7 +165,7 @@ public class AccordCommandStoreTest for (int i=0; i<4; i++) { - maxTimestamp = timestamp(1, clock.incrementAndGet(), 0, 1); + maxTimestamp = timestamp(1, clock.incrementAndGet(), 1); expected.add(maxTimestamp); writeOnlyCfk.updateMax(maxTimestamp); } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java index fa16a474f8..112d666550 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java @@ -89,7 +89,7 @@ public class AccordCommandTest commandStore.execute(PreLoadContext.empty(), instance -> { ((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get(); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); Txn txn = createTxn(1); Key key = (Key)txn.keys().get(0); RoutingKey homeKey = key.toUnseekable(); @@ -121,8 +121,8 @@ public class AccordCommandTest }).get(); // check accept - TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1); - Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 0, 1); + TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1); + Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1); PartialDeps.OrderedBuilder builder = PartialDeps.orderedBuilder(route.covering(), false); builder.add(key, txnId2); PartialDeps deps = builder.build(); @@ -170,7 +170,7 @@ public class AccordCommandTest AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); commandStore.execute(PreLoadContext.empty(), instance -> { ((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get(); - TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1); Txn txn = createTxn(2); Key key = (Key)txn.keys().get(0); RoutingKey homeKey = key.toUnseekable(); @@ -182,7 +182,7 @@ public class AccordCommandTest commandStore.execute(preAccept1, preAccept1::apply).get(); // second preaccept should identify txnId1 as a dependency - TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1); PreAccept preAccept2 = PreAccept.SerializerSupport.create(txnId2, route, 1, 1, false, 1, partialTxn, fullRoute); commandStore.execute(preAccept2, instance -> { PreAccept.PreAcceptReply reply = preAccept2.apply(instance); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index b835e0fbf5..f61205d374 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -66,6 +66,7 @@ import org.apache.cassandra.service.accord.txn.TxnRead; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; +import static accord.primitives.Routable.Domain.Key; import static java.lang.String.format; public class AccordTestUtils @@ -90,19 +91,19 @@ public class AccordTestUtils @Override public void waiting(TxnId blockedBy, Known blockedUntil, Unseekables<?, ?> blockedOn) {} }; - public static TxnId txnId(long epoch, long real, int logical, long node) + public static TxnId txnId(long epoch, long hlc, long node) { - return new TxnId(epoch, real, logical, new Node.Id(node)); + return new TxnId(epoch, hlc, Txn.Kind.Write, Key, new Node.Id(node)); } - public static Timestamp timestamp(long epoch, long real, int logical, long node) + public static Timestamp timestamp(long epoch, long hlc, long node) { - return new Timestamp(epoch, real, logical, new Node.Id(node)); + return Timestamp.fromValues(epoch, hlc, new Node.Id(node)); } - public static Ballot ballot(long epoch, long real, int logical, long node) + public static Ballot ballot(long epoch, long hlc, long node) { - return new Ballot(epoch, real, logical, new Node.Id(node)); + return Ballot.fromValues(epoch, hlc, new Node.Id(node)); } /** @@ -119,7 +120,7 @@ public class AccordTestUtils .map(key -> { try { - return read.read(key, command.kind(), instance, command.executeAt(), null).get(); + return read.read(key, command.txnId().rw(), instance, command.executeAt(), null).get(); } catch (InterruptedException e) { @@ -211,7 +212,7 @@ public class AccordTestUtils @Override public Id id() { return node;} @Override public long epoch() {return 1; } @Override public long now() {return now.getAsLong(); } - @Override public Timestamp uniqueNow(Timestamp atLeast) { return new Timestamp(1, now.getAsLong(), 0, node); } + @Override public Timestamp uniqueNow(Timestamp atLeast) { return Timestamp.fromValues(1, now.getAsLong(), node); } }; return new InMemoryCommandStore.Synchronized(0, time, @@ -228,7 +229,7 @@ public class AccordTestUtils @Override public Id id() { return node;} @Override public long epoch() {return 1; } @Override public long now() {return now.getAsLong(); } - @Override public Timestamp uniqueNow(Timestamp atLeast) { return new Timestamp(1, now.getAsLong(), 0, node); } + @Override public Timestamp uniqueNow(Timestamp atLeast) { return Timestamp.fromValues(1, now.getAsLong(), node); } }; return new AccordCommandStore(0, time, diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java index fbbb53ee31..c5bc97984d 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java @@ -74,7 +74,7 @@ public class AsyncLoaderTest AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); AccordStateCache.Instance<TxnId, AccordCommand> commandCache = commandStore.commandCache(); AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> cfkCacche = commandStore.commandsForKeyCache(); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); PartialTxn txn = createPartialTxn(0); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); @@ -108,7 +108,7 @@ public class AsyncLoaderTest AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); AccordStateCache.Instance<TxnId, AccordCommand> commandCache = commandStore.commandCache(); AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> cfkCacche = commandStore.commandsForKeyCache(); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); PartialTxn txn = createPartialTxn(0); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); @@ -150,7 +150,7 @@ public class AsyncLoaderTest AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); AccordStateCache.Instance<TxnId, AccordCommand> commandCache = commandStore.commandCache(); AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> cfkCacche = commandStore.commandsForKeyCache(); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); PartialTxn txn = createPartialTxn(0); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); @@ -192,7 +192,7 @@ public class AsyncLoaderTest AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); AccordStateCache.Instance<TxnId, AccordCommand> commandCache = commandStore.commandCache(); AccordStateCache.Instance<PartitionKey, AccordCommandsForKey> cfkCacche = commandStore.commandsForKeyCache(); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); PartialTxn txn = createPartialTxn(0); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); @@ -237,9 +237,9 @@ public class AsyncLoaderTest AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); - TxnId blockApply = txnId(1, clock.incrementAndGet(), 0, 1); - TxnId blockCommit = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); + TxnId blockApply = txnId(1, clock.incrementAndGet(), 1); + TxnId blockCommit = txnId(1, clock.incrementAndGet(), 1); PartialTxn txn = createPartialTxn(0); PartitionKey key = (PartitionKey) getOnlyElement(txn.keys()); @@ -283,8 +283,8 @@ public class AsyncLoaderTest { AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1); - TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1); + TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1); AsyncPromise<Void> promise1 = new AsyncPromise<>(); AsyncPromise<Void> promise2 = new AsyncPromise<>(); diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java index f12943bbb5..87a00fce02 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java @@ -89,7 +89,7 @@ public class AsyncOperationTest public void optionalCommandTest() throws Throwable { AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); Txn txn = createTxn((int)clock.incrementAndGet()); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); @@ -158,7 +158,7 @@ public class AsyncOperationTest { AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); AccordCommand command = createCommittedAndPersist(commandStore, txnId); diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java index 3e148498ce..5c8d72f5ba 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java @@ -83,8 +83,8 @@ public class AsyncWriterTest AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - TxnId blockingId = txnId(1, clock.incrementAndGet(), 0, 1); - TxnId waitingId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId blockingId = txnId(1, clock.incrementAndGet(), 1); + TxnId waitingId = txnId(1, clock.incrementAndGet(), 1); Txn txn = createTxn(0); Ranges ranges = fullRange(txn); AccordCommand blocking = new AccordCommand(blockingId).initialize(); @@ -133,8 +133,8 @@ public class AsyncWriterTest AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); - Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 0, 1); + TxnId txnId = txnId(1, clock.incrementAndGet(), 1); + Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1); Txn txn = createTxn(0); Ranges ranges = fullRange(txn); PartitionKey key = (PartitionKey) getOnlyElement(txn.keys()); @@ -198,8 +198,8 @@ public class AsyncWriterTest AtomicLong clock = new AtomicLong(0); AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - TxnId blockingId = txnId(1, clock.incrementAndGet(), 0, 1); - TxnId waitingId = txnId(1, clock.incrementAndGet(), 0, 1); + TxnId blockingId = txnId(1, clock.incrementAndGet(), 1); + TxnId waitingId = txnId(1, clock.incrementAndGet(), 1); Txn txn = createTxn(0); Ranges ranges = fullRange(txn); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org