This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 722ba2e7c6 Refactor Timestamp/TxnId - Combine real and logical into a
single 64-but HLC - Introduce 16 flag bits - Pack epoch (48-bits), HLC
(64-bits) and flags (16-bits) into two longs in memory
722ba2e7c6 is described below
commit 722ba2e7c6e7f1f0a09710ea9dacbc036e11479b
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Mon Jan 9 12:28:26 2023 +0000
Refactor Timestamp/TxnId
- Combine real and logical into a single 64-but HLC
- Introduce 16 flag bits
- Pack epoch (48-bits), HLC (64-bits) and flags (16-bits) into two longs
in memory
patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-18172
---
.build/include-accord.sh | 2 +-
.../cassandra/service/accord/AccordCommand.java | 14 +--------
.../service/accord/AccordCommandStore.java | 6 ++--
.../service/accord/AccordCommandsForKey.java | 8 ++---
.../cassandra/service/accord/AccordKeyspace.java | 36 +++++++++++-----------
.../service/accord/AccordObjectSizes.java | 2 +-
.../service/accord/AccordPartialCommand.java | 20 +++---------
.../cassandra/service/accord/api/AccordAgent.java | 2 +-
.../service/accord/async/AsyncWriter.java | 5 +--
.../accord/serializers/CommandSerializers.java | 32 ++++++++-----------
.../service/accord/serializers/DepsSerializer.java | 3 ++
.../service/accord/AccordCommandStoreTest.java | 34 ++++++++++----------
.../service/accord/AccordCommandTest.java | 10 +++---
.../cassandra/service/accord/AccordTestUtils.java | 19 ++++++------
.../service/accord/async/AsyncLoaderTest.java | 18 +++++------
.../service/accord/async/AsyncOperationTest.java | 4 +--
.../service/accord/async/AsyncWriterTest.java | 12 ++++----
17 files changed, 101 insertions(+), 126 deletions(-)
diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index 37bdcbe079..2144e80b17 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -25,7 +25,7 @@ set -o nounset
bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='804a77d32c8ae45751a3a7f450b372560f08cacc'
+accord_branch='ad326d5df8d99d4799fa87de81482e3cb1fb92de'
accord_src="$bin/cassandra-accord"
checkout() {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index 2003e77ae1..5b4a8c2e9b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -532,18 +532,6 @@ public class AccordCommand extends Command implements
AccordState<TxnId>
return executeAt.get();
}
- @Override
- public Txn.Kind kind()
- {
- return kind.get();
- }
-
- @Override
- public void setKind(Txn.Kind kind)
- {
- this.kind.set(kind);
- }
-
@Override
public void setExecuteAt(Timestamp timestamp)
{
@@ -636,7 +624,7 @@ public class AccordCommand extends Command implements
AccordState<TxnId>
private boolean canApplyWithCurrentScope(SafeCommandStore safeStore)
{
- Ranges ranges = safeStore.ranges().at(executeAt().epoch);
+ Ranges ranges = safeStore.ranges().at(executeAt().epoch());
Seekables<?, ?> keys = partialTxn().keys();
for (int i=0,mi=keys.size(); i<mi; i++)
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index f2b54492fa..96c2a26c35 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -184,7 +184,7 @@ public class AccordCommandStore extends CommandStore
public void forEach(Routable keyOrRange, Consumer<CommandsForKey>
forEach)
{
- switch (keyOrRange.kind())
+ switch (keyOrRange.domain())
{
default: throw new AssertionError();
case Key:
@@ -217,7 +217,7 @@ public class AccordCommandStore extends CommandStore
@Override
public void forEach(Routable keyOrRange, Ranges slice,
Consumer<CommandsForKey> forEach)
{
- switch (keyOrRange.kind())
+ switch (keyOrRange.domain())
{
default: throw new AssertionError();
case Key:
@@ -272,7 +272,7 @@ public class AccordCommandStore extends CommandStore
{
Timestamp max = maxConflict(keys);
long epoch = latestEpoch();
- if (txnId.compareTo(max) > 0 && txnId.epoch >= epoch &&
!agent.isExpired(txnId, time.now()))
+ if (txnId.compareTo(max) > 0 && txnId.epoch() >= epoch &&
!agent.isExpired(txnId, time.now()))
return txnId;
return time.uniqueNow(max);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
index 8a1715384e..437c3b5f66 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandsForKey.java
@@ -53,7 +53,7 @@ import org.assertj.core.util.VisibleForTesting;
import static accord.local.CommandsForKey.CommandTimeseries.TestDep.ANY_DEPS;
import static accord.local.CommandsForKey.CommandTimeseries.TestDep.WITHOUT;
import static accord.local.CommandsForKey.CommandTimeseries.TestKind.RorWs;
-import static accord.primitives.Txn.Kind.WRITE;
+import static accord.primitives.Txn.Kind.Write;
import static
org.apache.cassandra.service.accord.AccordState.WriteOnly.applyMapChanges;
import static
org.apache.cassandra.service.accord.AccordState.WriteOnly.applySetChanges;
@@ -158,7 +158,7 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
public Stream<T> before(Timestamp timestamp, TestKind testKind,
TestDep testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status
status)
{
return idsToCommands(map.getView().headMap(timestamp,
false).values())
- .filter(cmd -> testKind == RorWs || cmd.kind() == WRITE)
+ .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
.filter(cmd -> testDep == ANY_DEPS || (cmd.hasDep(depId) ^
(testDep == WITHOUT)))
.filter(cmd -> TestStatus.test(cmd.status(), testStatus,
status))
.map(translate);
@@ -168,7 +168,7 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
public Stream<T> after(Timestamp timestamp, TestKind testKind, TestDep
testDep, @Nullable TxnId depId, TestStatus testStatus, @Nullable Status status)
{
return idsToCommands(map.getView().tailMap(timestamp,
false).values())
- .filter(cmd -> testKind == RorWs || cmd.kind() == WRITE)
+ .filter(cmd -> testKind == RorWs || cmd.txnId().isWrite())
.filter(cmd -> testDep == ANY_DEPS || (cmd.hasDep(depId) ^
(testDep == WITHOUT)))
.filter(cmd -> TestStatus.test(cmd.status(), testStatus,
status))
.map(translate);
@@ -386,7 +386,7 @@ public class AccordCommandsForKey extends CommandsForKey
implements AccordState<
private static long getTimestampMicros(Timestamp timestamp)
{
- return timestamp.real + timestamp.logical;
+ return timestamp.hlc();
}
private void maybeUpdatelastTimestamp(Timestamp executeAt, boolean
isForWriteTxn)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 12da1ffc10..e17172de66 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -122,8 +122,8 @@ public class AccordKeyspace
public static final String COMMANDS = "commands";
public static final String COMMANDS_FOR_KEY = "commands_for_key";
- private static final String TIMESTAMP_TUPLE = "tuple<bigint, bigint, int,
bigint>";
- private static final TupleType TIMESTAMP_TYPE = new
TupleType(Lists.newArrayList(LongType.instance, LongType.instance,
Int32Type.instance, LongType.instance));
+ private static final String TIMESTAMP_TUPLE = "tuple<bigint, bigint,
bigint>";
+ private static final TupleType TIMESTAMP_TYPE = new
TupleType(Lists.newArrayList(LongType.instance, LongType.instance,
LongType.instance));
private static final String KEY_TUPLE = "tuple<uuid, blob>";
private static final ClusteringIndexFilter FULL_PARTITION = new
ClusteringIndexSliceFilter(Slices.ALL, false);
@@ -328,7 +328,7 @@ public class AccordKeyspace
NavigableMap<Timestamp, TxnId> result = new TreeMap<>();
for (Map.Entry<ByteBuffer, ByteBuffer> entry : serialized.entrySet())
- result.put(deserializeTimestampOrNull(entry.getKey(),
Timestamp::new), deserializeTimestampOrNull(entry.getValue(), TxnId::new));
+ result.put(deserializeTimestampOrNull(entry.getKey(),
Timestamp::fromBits), deserializeTimestampOrNull(entry.getValue(),
TxnId::fromBits));
return result;
}
@@ -346,7 +346,7 @@ public class AccordKeyspace
private static NavigableSet<TxnId>
deserializeTxnIdNavigableSet(UntypedResultSet.Row row, String name)
{
- return deserializeTimestampSet(row.getSet(name, BytesType.instance),
TreeSet::new, TxnId::new);
+ return deserializeTimestampSet(row.getSet(name, BytesType.instance),
TreeSet::new, TxnId::fromBits);
}
private static DeterministicIdentitySet<ListenerProxy>
deserializeListeners(Set<ByteBuffer> serialized) throws IOException
@@ -521,12 +521,12 @@ public class AccordKeyspace
private static ByteBuffer serializeTimestamp(Timestamp timestamp)
{
- return TupleType.buildValue(new ByteBuffer[]{bytes(timestamp.epoch),
bytes(timestamp.real), bytes(timestamp.logical), bytes(timestamp.node.id)});
+ return TupleType.buildValue(new ByteBuffer[]{bytes(timestamp.msb),
bytes(timestamp.lsb), bytes(timestamp.node.id)});
}
public interface TimestampFactory<T extends Timestamp>
{
- T create(long epoch, long real, int logical, Node.Id node);
+ T create(long msb, long lsb, Node.Id node);
}
public static <T extends Timestamp> T
deserializeTimestampOrNull(ByteBuffer bytes, TimestampFactory<T> factory)
@@ -534,7 +534,7 @@ public class AccordKeyspace
if (bytes == null || ByteBufferAccessor.instance.isEmpty(bytes))
return null;
ByteBuffer[] split = TIMESTAMP_TYPE.split(ByteBufferAccessor.instance,
bytes);
- return factory.create(split[0].getLong(), split[1].getLong(),
split[2].getInt(), new Node.Id(split[3].getLong()));
+ return factory.create(split[0].getLong(), split[1].getLong(), new
Node.Id(split[2].getLong()));
}
private static <T extends Timestamp> T
deserializeTimestampOrNull(UntypedResultSet.Row row, String name,
TimestampFactory<T> factory)
@@ -561,11 +561,11 @@ public class AccordKeyspace
{
String cql = "SELECT * FROM %s.%s " +
"WHERE store_id = ? " +
- "AND txn_id=(?, ?, ?, ?)";
+ "AND txn_id=(?, ?, ?)";
return executeOnceInternal(String.format(cql, ACCORD_KEYSPACE_NAME,
COMMANDS),
commandStore.id(),
- txnId.epoch, txnId.real, txnId.logical,
txnId.node.id);
+ txnId.msb, txnId.lsb, txnId.node.id);
}
public static void loadCommand(AccordCommandStore commandStore,
AccordCommand command)
@@ -585,7 +585,7 @@ public class AccordKeyspace
try
{
UntypedResultSet.Row row = result.one();
- Preconditions.checkState(deserializeTimestampOrNull(row, "txn_id",
TxnId::new).equals(txnId));
+ Preconditions.checkState(deserializeTimestampOrNull(row, "txn_id",
TxnId::fromBits).equals(txnId));
command.status.load(SaveStatus.values()[row.getInt("status")]);
command.homeKey.load(deserializeOrNull(row.getBlob("home_key"),
CommandsSerializers.routingKey));
command.progressKey.load(deserializeOrNull(row.getBlob("progress_key"),
CommandsSerializers.routingKey));
@@ -594,9 +594,9 @@ public class AccordKeyspace
command.durability.load(Status.Durability.values()[row.getInt("durability",
0)]);
command.partialTxn.load(deserializeOrNull(row.getBlob("txn"),
CommandsSerializers.partialTxn));
command.kind.load(row.has("kind") ?
Txn.Kind.values()[row.getInt("kind")] : null);
- command.executeAt.load(deserializeTimestampOrNull(row,
"execute_at", Timestamp::new));
- command.promised.load(deserializeTimestampOrNull(row,
"promised_ballot", Ballot::new));
- command.accepted.load(deserializeTimestampOrNull(row,
"accepted_ballot", Ballot::new));
+ command.executeAt.load(deserializeTimestampOrNull(row,
"execute_at", Timestamp::fromBits));
+ command.promised.load(deserializeTimestampOrNull(row,
"promised_ballot", Ballot::fromBits));
+ command.accepted.load(deserializeTimestampOrNull(row,
"accepted_ballot", Ballot::fromBits));
command.partialDeps.load(deserializeOrNull(row.getBlob("dependencies"),
CommandsSerializers.partialDeps));
command.writes.load(deserializeWithVersionOr(row, "writes",
CommandsSerializers.writes, () -> null));
command.result.load(deserializeWithVersionOr(row, "result",
CommandsSerializers.result, () -> null));
@@ -769,11 +769,11 @@ public class AccordKeyspace
// empty static row will be interpreted as all null cells
which will cause everything to be initialized
Row staticRow = partition.staticRow();
Cell<?> cell =
staticRow.getCell(CommandsForKeyColumns.max_timestamp);
- cfk.maxTimestamp.load(cell != null && !cell.isTombstone() ?
deserializeTimestampOrNull(cellValue(cell), Timestamp::new)
+ cfk.maxTimestamp.load(cell != null && !cell.isTombstone() ?
deserializeTimestampOrNull(cellValue(cell), Timestamp::fromBits)
:
AccordCommandsForKey.Defaults.maxTimestamp);
cell =
staticRow.getCell(CommandsForKeyColumns.last_executed_timestamp);
- cfk.lastExecutedTimestamp.load(cell != null &&
!cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell),
Timestamp::new)
+ cfk.lastExecutedTimestamp.load(cell != null &&
!cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell),
Timestamp::fromBits)
: AccordCommandsForKey.Defaults.lastExecutedTimestamp);
cell =
staticRow.getCell(CommandsForKeyColumns.last_executed_micros);
@@ -782,13 +782,13 @@ public class AccordKeyspace
:
AccordCommandsForKey.Defaults.lastExecutedMicros);
cell =
staticRow.getCell(CommandsForKeyColumns.last_write_timestamp);
- cfk.lastWriteTimestamp.load(cell != null &&
!cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell),
Timestamp::new)
+ cfk.lastWriteTimestamp.load(cell != null &&
!cell.isTombstone() ? deserializeTimestampOrNull(cellValue(cell),
Timestamp::fromBits)
: AccordCommandsForKey.Defaults.lastWriteTimestamp);
TreeSet<Timestamp> blindWitnessed = new TreeSet<>();
ComplexColumnData cmplx =
staticRow.getComplexColumnData(CommandsForKeyColumns.blind_witnessed);
if (cmplx != null)
- cmplx.forEach(c ->
blindWitnessed.add(deserializeTimestampOrNull(c.path().get(0),
Timestamp::new)));
+ cmplx.forEach(c ->
blindWitnessed.add(deserializeTimestampOrNull(c.path().get(0),
Timestamp::fromBits)));
cfk.blindWitnessed.load(blindWitnessed);
while (partition.hasNext())
@@ -796,7 +796,7 @@ public class AccordKeyspace
Row row = partition.next();
Clustering<?> clustering = row.clustering();
int ordinal =
Int32Type.instance.compose(clusteringValue(clustering, 0));
- Timestamp timestamp =
deserializeTimestampOrNull(clusteringValue(clustering, 1), Timestamp::new);
+ Timestamp timestamp =
deserializeTimestampOrNull(clusteringValue(clustering, 1), Timestamp::fromBits);
ByteBuffer data = cellValue(row,
CommandsForKeyColumns.data);
if (data == null)
continue;
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 7f7a8a86e3..0246ee6780 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -182,7 +182,7 @@ public class AccordObjectSizes
return size;
}
- private static final long TIMESTAMP_SIZE = ObjectSizes.measureDeep(new
Timestamp(0, 0, 0, new Node.Id(0)));
+ private static final long TIMESTAMP_SIZE =
ObjectSizes.measureDeep(Timestamp.fromBits(0, 0, new Node.Id(0)));
public static long timestamp()
{
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
index 5036e2b57c..b3ef93d89b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordPartialCommand.java
@@ -29,7 +29,6 @@ import accord.local.Command;
import accord.local.CommandsForKey;
import accord.local.Status;
import accord.primitives.Timestamp;
-import accord.primitives.Txn;
import accord.primitives.TxnId;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataInputPlus;
@@ -53,21 +52,19 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
private final List<TxnId> deps;
// TODO (soon): we only require this for Accepted; perhaps more tightly
couple query API for efficiency
private final Status status;
- private final Txn.Kind kind;
- AccordPartialCommand(TxnId txnId, Timestamp executeAt, List<TxnId> deps,
Status status, Txn.Kind kind)
+ AccordPartialCommand(TxnId txnId, Timestamp executeAt, List<TxnId> deps,
Status status)
{
super(txnId, executeAt);
this.deps = deps;
this.status = status;
- this.kind = kind;
}
public AccordPartialCommand(Key key, Command command)
{
this(command.txnId(), command.executeAt(),
command.partialDeps() == null ? Collections.emptyList() :
command.partialDeps().txnIds(key),
- command.status(), command.kind());
+ command.status());
}
public TxnId txnId()
@@ -95,11 +92,6 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
return status;
}
- public Txn.Kind kind()
- {
- return kind;
- }
-
@Override
public boolean equals(Object obj)
{
@@ -109,8 +101,7 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
return txnId.equals(that.txnId)
&& Objects.equals(executeAt, that.executeAt)
&& Objects.equals(deps, that.deps)
- && status == that.status
- && kind == that.kind;
+ && status == that.status;
}
public static class PartialCommandSerializer
@@ -121,7 +112,6 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
CommandSerializers.txnId.serialize(command.txnId(), out,
version.msgVersion);
serializeNullable(command.executeAt(), out, version.msgVersion,
CommandSerializers.timestamp);
CommandSerializers.status.serialize(command.status(), out,
version.msgVersion);
- CommandSerializers.kind.serialize(command.kind(), out,
version.msgVersion);
serializeCollection(command.deps, out, version.msgVersion,
CommandSerializers.txnId);
}
@@ -157,9 +147,8 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
Timestamp executeAt = deserializeNullable(in, version.msgVersion,
CommandSerializers.timestamp);
Status status = CommandSerializers.status.deserialize(in,
version.msgVersion);
- Txn.Kind kind = CommandSerializers.kind.deserialize(in,
version.msgVersion);
List<TxnId> deps = deserializeList(in, version.msgVersion,
CommandSerializers.txnId);
- AccordPartialCommand partial = new AccordPartialCommand(txnId,
executeAt, deps, status, kind);
+ AccordPartialCommand partial = new AccordPartialCommand(txnId,
executeAt, deps, status);
addToContext(partial, context);
return partial;
}
@@ -182,7 +171,6 @@ public class AccordPartialCommand extends
CommandsForKey.TxnIdWithExecuteAt
size += CommandSerializers.txnId.serializedSize();
size += serializedSizeNullable(command.executeAt(),
version.msgVersion, CommandSerializers.timestamp);
size += CommandSerializers.status.serializedSize(command.status(),
version.msgVersion);
- size += CommandSerializers.kind.serializedSize(command.kind(),
version.msgVersion);
size += serializedCollectionSize(command.deps, version.msgVersion,
CommandSerializers.txnId);
return size;
}
diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
index f8a8832a5d..31db8b9646 100644
--- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
+++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java
@@ -60,6 +60,6 @@ public class AccordAgent implements Agent
public boolean isExpired(TxnId initiated, long now)
{
// TODO: should distinguish between reads and writes
- return now - initiated.real > getReadRpcTimeout(MICROSECONDS);
+ return now - initiated.hlc() > getReadRpcTimeout(MICROSECONDS);
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
index cd8aca5ab5..c920a0f7bb 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncWriter.java
@@ -32,7 +32,6 @@ import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import accord.primitives.Routable;
import accord.primitives.Seekable;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
@@ -50,6 +49,8 @@ import org.apache.cassandra.service.accord.store.StoredSet;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.utils.concurrent.FutureCombiner;
+import static accord.primitives.Routable.Domain.Range;
+
public class AsyncWriter
{
private static final Logger logger =
LoggerFactory.getLogger(AsyncWriter.class);
@@ -263,7 +264,7 @@ public class AsyncWriter
for (Seekable key : command.partialTxn().keys())
{
// TODO: implement
- if (key.kind() == Routable.Kind.Range)
+ if (key.domain() == Range)
throw new UnsupportedOperationException();
PartitionKey partitionKey = (PartitionKey) key;
AccordCommandsForKey cfk = cfkForDenormalization(partitionKey,
context);
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index e4e589d87b..98c1f93eaa 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -48,16 +48,16 @@ public class CommandSerializers
{
private CommandSerializers() {}
- public static final TimestampSerializer<TxnId> txnId = new
TimestampSerializer<>(TxnId::new);
- public static final TimestampSerializer<Timestamp> timestamp = new
TimestampSerializer<>(Timestamp::new);
- public static final TimestampSerializer<Ballot> ballot = new
TimestampSerializer<>(Ballot::new);
+ public static final TimestampSerializer<TxnId> txnId = new
TimestampSerializer<>(TxnId::fromBits);
+ public static final TimestampSerializer<Timestamp> timestamp = new
TimestampSerializer<>(Timestamp::fromBits);
+ public static final TimestampSerializer<Ballot> ballot = new
TimestampSerializer<>(Ballot::fromBits);
public static final EnumSerializer<Txn.Kind> kind = new
EnumSerializer<>(Txn.Kind.class);
public static class TimestampSerializer<T extends Timestamp> implements
IVersionedSerializer<T>
{
interface Factory<T extends Timestamp>
{
- T create(long epoch, long real, int logical, Node.Id node);
+ T create(long msb, long lsb, Node.Id node);
}
private final TimestampSerializer.Factory<T> factory;
@@ -70,18 +70,16 @@ public class CommandSerializers
@Override
public void serialize(T ts, DataOutputPlus out, int version) throws
IOException
{
- out.writeLong(ts.epoch);
- out.writeLong(ts.real);
- out.writeInt(ts.logical);
+ out.writeLong(ts.msb);
+ out.writeLong(ts.lsb);
TopologySerializers.nodeId.serialize(ts.node, out, version);
}
public <V> int serialize(T ts, V dst, ValueAccessor<V> accessor, int
offset)
{
int position = offset;
- position += accessor.putLong(dst, position, ts.epoch);
- position += accessor.putLong(dst, position, ts.real);
- position += accessor.putInt(dst, position, ts.logical);
+ position += accessor.putLong(dst, position, ts.msb);
+ position += accessor.putLong(dst, position, ts.lsb);
position += TopologySerializers.nodeId.serialize(ts.node, dst,
accessor, position);
int size = position - offset;
Preconditions.checkState(size == serializedSize());
@@ -93,20 +91,17 @@ public class CommandSerializers
{
return factory.create(in.readLong(),
in.readLong(),
- in.readInt(),
TopologySerializers.nodeId.deserialize(in,
version));
}
public <V> T deserialize(V src, ValueAccessor<V> accessor, int offset)
{
- long epoch = accessor.getLong(src, offset);
+ long msb = accessor.getLong(src, offset);
offset += TypeSizes.LONG_SIZE;
- long real = accessor.getLong(src, offset);
+ long lsb = accessor.getLong(src, offset);
offset += TypeSizes.LONG_SIZE;
- int logical = accessor.getInt(src, offset);
- offset += TypeSizes.INT_SIZE;
Node.Id node = TopologySerializers.nodeId.deserialize(src,
accessor, offset);
- return factory.create(epoch, real, logical, node);
+ return factory.create(msb, lsb, node);
}
@Override
@@ -117,9 +112,8 @@ public class CommandSerializers
public int serializedSize()
{
- return TypeSizes.LONG_SIZE + // ts.epoch
- TypeSizes.LONG_SIZE + // ts.real
- TypeSizes.INT_SIZE + // ts.logical
+ return TypeSizes.LONG_SIZE + // ts.msb
+ TypeSizes.LONG_SIZE + // ts.lsb
TopologySerializers.nodeId.serializedSize(); // ts.node
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
index 22b80554f1..083315bc7c 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/DepsSerializer.java
@@ -89,12 +89,15 @@ public abstract class DepsSerializer<D extends Deps>
implements IVersionedSerial
public D deserialize(DataInputPlus in, int version) throws IOException
{
Keys keys = KeySerializers.keys.deserialize(in, version);
+
TxnId[] txnIds = new TxnId[(int) in.readUnsignedVInt()];
for (int i=0; i<txnIds.length; i++)
txnIds[i] = CommandSerializers.txnId.deserialize(in, version);
+
int[] keyToTxnIds = new int[(int) in.readUnsignedVInt()];
for (int i=0; i<keyToTxnIds.length; i++)
keyToTxnIds[i] = (int) in.readUnsignedVInt();
+
return deserialize(keys, txnIds, keyToTxnIds, in, version);
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index 2cd933c7b4..9f49138c8b 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -82,22 +82,22 @@ public class AccordCommandStoreTest
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
PartialDeps.OrderedBuilder builder =
PartialDeps.orderedBuilder(depTxn.covering(), false);
- builder.add(key, txnId(1, clock.incrementAndGet(), 0, 1));
+ builder.add(key, txnId(1, clock.incrementAndGet(), 1));
PartialDeps dependencies = builder.build();
QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, c, v) VALUES
(0, 0, 1)");
- TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 0, 1);
- TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 0, 1);
- TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 0, 1);
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId oldTxnId1 = txnId(1, clock.incrementAndGet(), 1);
+ TxnId oldTxnId2 = txnId(1, clock.incrementAndGet(), 1);
+ TxnId oldTimestamp = txnId(1, clock.incrementAndGet(), 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
AccordCommand command = new AccordCommand(txnId).initialize();
command.setPartialTxn(createPartialTxn(0));
command.homeKey(key.toUnseekable());
command.progressKey(key.toUnseekable());
command.setDurability(Durable);
- command.setPromised(ballot(1, clock.incrementAndGet(), 0, 1));
- command.setAccepted(ballot(1, clock.incrementAndGet(), 0, 1));
- command.setExecuteAt(timestamp(1, clock.incrementAndGet(), 0, 1));
+ command.setPromised(ballot(1, clock.incrementAndGet(), 1));
+ command.setAccepted(ballot(1, clock.incrementAndGet(), 1));
+ command.setExecuteAt(timestamp(1, clock.incrementAndGet(), 1));
command.setPartialDeps(dependencies);
command.setStatus(Status.Accepted);
command.addWaitingOnCommit(oldTxnId1);
@@ -119,26 +119,26 @@ public class AccordCommandStoreTest
{
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- Timestamp maxTimestamp = timestamp(1, clock.incrementAndGet(), 0, 1);
+ Timestamp maxTimestamp = timestamp(1, clock.incrementAndGet(), 1);
PartialTxn txn = createPartialTxn(1);
PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
- TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1);
- TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1);
+ TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
AccordCommand command1 = new AccordCommand(txnId1).initialize();
AccordCommand command2 = new AccordCommand(txnId2).initialize();
command1.setPartialTxn(txn);
command2.setPartialTxn(txn);
- command1.setExecuteAt(timestamp(1, clock.incrementAndGet(), 0, 1));
- command2.setExecuteAt(timestamp(1, clock.incrementAndGet(), 0, 1));
+ command1.setExecuteAt(timestamp(1, clock.incrementAndGet(), 1));
+ command2.setExecuteAt(timestamp(1, clock.incrementAndGet(), 1));
AccordCommandsForKey cfk = new AccordCommandsForKey(commandStore,
key).initialize();
cfk.updateMax(maxTimestamp);
- Assert.assertEquals(txnId1.real, cfk.timestampMicrosFor(txnId1, true));
- Assert.assertEquals(txnId2.real, cfk.timestampMicrosFor(txnId2, true));
+ Assert.assertEquals(txnId1.hlc(), cfk.timestampMicrosFor(txnId1,
true));
+ Assert.assertEquals(txnId2.hlc(), cfk.timestampMicrosFor(txnId2,
true));
Assert.assertEquals(txnId2, cfk.lastExecutedTimestamp.get());
- Assert.assertEquals(txnId2.real, cfk.lastExecutedMicros.get());
+ Assert.assertEquals(txnId2.hlc(), cfk.lastExecutedMicros.get());
cfk.register(command1);
cfk.register(command2);
@@ -165,7 +165,7 @@ public class AccordCommandStoreTest
for (int i=0; i<4; i++)
{
- maxTimestamp = timestamp(1, clock.incrementAndGet(), 0, 1);
+ maxTimestamp = timestamp(1, clock.incrementAndGet(), 1);
expected.add(maxTimestamp);
writeOnlyCfk.updateMax(maxTimestamp);
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index fa16a474f8..112d666550 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -89,7 +89,7 @@ public class AccordCommandTest
commandStore.execute(PreLoadContext.empty(), instance -> {
((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get();
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
Txn txn = createTxn(1);
Key key = (Key)txn.keys().get(0);
RoutingKey homeKey = key.toUnseekable();
@@ -121,8 +121,8 @@ public class AccordCommandTest
}).get();
// check accept
- TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1);
- Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
+ Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1);
PartialDeps.OrderedBuilder builder =
PartialDeps.orderedBuilder(route.covering(), false);
builder.add(key, txnId2);
PartialDeps deps = builder.build();
@@ -170,7 +170,7 @@ public class AccordCommandTest
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
commandStore.execute(PreLoadContext.empty(), instance -> {
((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get();
- TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1);
Txn txn = createTxn(2);
Key key = (Key)txn.keys().get(0);
RoutingKey homeKey = key.toUnseekable();
@@ -182,7 +182,7 @@ public class AccordCommandTest
commandStore.execute(preAccept1, preAccept1::apply).get();
// second preaccept should identify txnId1 as a dependency
- TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
PreAccept preAccept2 = PreAccept.SerializerSupport.create(txnId2,
route, 1, 1, false, 1, partialTxn, fullRoute);
commandStore.execute(preAccept2, instance -> {
PreAccept.PreAcceptReply reply = preAccept2.apply(instance);
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index b835e0fbf5..f61205d374 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -66,6 +66,7 @@ import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
+import static accord.primitives.Routable.Domain.Key;
import static java.lang.String.format;
public class AccordTestUtils
@@ -90,19 +91,19 @@ public class AccordTestUtils
@Override public void waiting(TxnId blockedBy, Known blockedUntil,
Unseekables<?, ?> blockedOn) {}
};
- public static TxnId txnId(long epoch, long real, int logical, long node)
+ public static TxnId txnId(long epoch, long hlc, long node)
{
- return new TxnId(epoch, real, logical, new Node.Id(node));
+ return new TxnId(epoch, hlc, Txn.Kind.Write, Key, new Node.Id(node));
}
- public static Timestamp timestamp(long epoch, long real, int logical, long
node)
+ public static Timestamp timestamp(long epoch, long hlc, long node)
{
- return new Timestamp(epoch, real, logical, new Node.Id(node));
+ return Timestamp.fromValues(epoch, hlc, new Node.Id(node));
}
- public static Ballot ballot(long epoch, long real, int logical, long node)
+ public static Ballot ballot(long epoch, long hlc, long node)
{
- return new Ballot(epoch, real, logical, new Node.Id(node));
+ return Ballot.fromValues(epoch, hlc, new Node.Id(node));
}
/**
@@ -119,7 +120,7 @@ public class AccordTestUtils
.map(key -> {
try
{
- return read.read(key, command.kind(),
instance, command.executeAt(), null).get();
+ return read.read(key,
command.txnId().rw(), instance, command.executeAt(), null).get();
}
catch (InterruptedException e)
{
@@ -211,7 +212,7 @@ public class AccordTestUtils
@Override public Id id() { return node;}
@Override public long epoch() {return 1; }
@Override public long now() {return now.getAsLong(); }
- @Override public Timestamp uniqueNow(Timestamp atLeast) { return
new Timestamp(1, now.getAsLong(), 0, node); }
+ @Override public Timestamp uniqueNow(Timestamp atLeast) { return
Timestamp.fromValues(1, now.getAsLong(), node); }
};
return new InMemoryCommandStore.Synchronized(0,
time,
@@ -228,7 +229,7 @@ public class AccordTestUtils
@Override public Id id() { return node;}
@Override public long epoch() {return 1; }
@Override public long now() {return now.getAsLong(); }
- @Override public Timestamp uniqueNow(Timestamp atLeast) { return
new Timestamp(1, now.getAsLong(), 0, node); }
+ @Override public Timestamp uniqueNow(Timestamp atLeast) { return
Timestamp.fromValues(1, now.getAsLong(), node); }
};
return new AccordCommandStore(0,
time,
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
index fbbb53ee31..c5bc97984d 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncLoaderTest.java
@@ -74,7 +74,7 @@ public class AsyncLoaderTest
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
AccordStateCache.Instance<TxnId, AccordCommand> commandCache =
commandStore.commandCache();
AccordStateCache.Instance<PartitionKey, AccordCommandsForKey>
cfkCacche = commandStore.commandsForKeyCache();
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
PartialTxn txn = createPartialTxn(0);
PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
@@ -108,7 +108,7 @@ public class AsyncLoaderTest
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
AccordStateCache.Instance<TxnId, AccordCommand> commandCache =
commandStore.commandCache();
AccordStateCache.Instance<PartitionKey, AccordCommandsForKey>
cfkCacche = commandStore.commandsForKeyCache();
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
PartialTxn txn = createPartialTxn(0);
PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
@@ -150,7 +150,7 @@ public class AsyncLoaderTest
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
AccordStateCache.Instance<TxnId, AccordCommand> commandCache =
commandStore.commandCache();
AccordStateCache.Instance<PartitionKey, AccordCommandsForKey>
cfkCacche = commandStore.commandsForKeyCache();
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
PartialTxn txn = createPartialTxn(0);
PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
@@ -192,7 +192,7 @@ public class AsyncLoaderTest
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
AccordStateCache.Instance<TxnId, AccordCommand> commandCache =
commandStore.commandCache();
AccordStateCache.Instance<PartitionKey, AccordCommandsForKey>
cfkCacche = commandStore.commandsForKeyCache();
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
PartialTxn txn = createPartialTxn(0);
PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
@@ -237,9 +237,9 @@ public class AsyncLoaderTest
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
- TxnId blockApply = txnId(1, clock.incrementAndGet(), 0, 1);
- TxnId blockCommit = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
+ TxnId blockApply = txnId(1, clock.incrementAndGet(), 1);
+ TxnId blockCommit = txnId(1, clock.incrementAndGet(), 1);
PartialTxn txn = createPartialTxn(0);
PartitionKey key = (PartitionKey) getOnlyElement(txn.keys());
@@ -283,8 +283,8 @@ public class AsyncLoaderTest
{
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1);
- TxnId txnId2 = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId1 = txnId(1, clock.incrementAndGet(), 1);
+ TxnId txnId2 = txnId(1, clock.incrementAndGet(), 1);
AsyncPromise<Void> promise1 = new AsyncPromise<>();
AsyncPromise<Void> promise2 = new AsyncPromise<>();
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
index f12943bbb5..87a00fce02 100644
---
a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java
@@ -89,7 +89,7 @@ public class AsyncOperationTest
public void optionalCommandTest() throws Throwable
{
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
Txn txn = createTxn((int)clock.incrementAndGet());
PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys());
@@ -158,7 +158,7 @@ public class AsyncOperationTest
{
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
AccordCommand command = createCommittedAndPersist(commandStore, txnId);
diff --git
a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
index 3e148498ce..5c8d72f5ba 100644
--- a/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncWriterTest.java
@@ -83,8 +83,8 @@ public class AsyncWriterTest
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- TxnId blockingId = txnId(1, clock.incrementAndGet(), 0, 1);
- TxnId waitingId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId blockingId = txnId(1, clock.incrementAndGet(), 1);
+ TxnId waitingId = txnId(1, clock.incrementAndGet(), 1);
Txn txn = createTxn(0);
Ranges ranges = fullRange(txn);
AccordCommand blocking = new AccordCommand(blockingId).initialize();
@@ -133,8 +133,8 @@ public class AsyncWriterTest
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1);
- Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 0, 1);
+ TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
+ Timestamp executeAt = timestamp(1, clock.incrementAndGet(), 1);
Txn txn = createTxn(0);
Ranges ranges = fullRange(txn);
PartitionKey key = (PartitionKey) getOnlyElement(txn.keys());
@@ -198,8 +198,8 @@ public class AsyncWriterTest
AtomicLong clock = new AtomicLong(0);
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
- TxnId blockingId = txnId(1, clock.incrementAndGet(), 0, 1);
- TxnId waitingId = txnId(1, clock.incrementAndGet(), 0, 1);
+ TxnId blockingId = txnId(1, clock.incrementAndGet(), 1);
+ TxnId waitingId = txnId(1, clock.incrementAndGet(), 1);
Txn txn = createTxn(0);
Ranges ranges = fullRange(txn);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]