This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 716002200c Fix: - Only use persisted RedundantBefore for compaction
- RouteIndex should index only touches, not Route - Flush RangesForEpoch
updates to journal immediately, so we do not rely on the command we are
processing succeeding - DurableBefore updates must wait for the epochs to be
known locally - Shard.mustWitnessEpoch to support guaranteeing to witness
relevant non-topology schema changes - We must propagate RedundantBefore RX
shard bounds along with epoch syncs - [...]
716002200c is described below
commit 716002200c3e90e53c4ce3fa1124bebabda6fbcd
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Wed Feb 19 11:52:31 2025 +0000
Fix:
- Only use persisted RedundantBefore for compaction
- RouteIndex should index only touches, not Route
- Flush RangesForEpoch updates to journal immediately, so we do not rely
on the command we are processing succeeding
- DurableBefore updates must wait for the epochs to be known locally
- Shard.mustWitnessEpoch to support guaranteeing to witness relevant
non-topology schema changes
- We must propagate RedundantBefore RX shard bounds along with epoch syncs
- Prevent a truncated transaction FetchData infinite loop
- GC_BEFORE status being overwritten by bootstrappedAt, permitting old
transaction state to be resurrected
- Avoid CFK.maxUniqueHlc read race on bootstrap
- TopologyManager.awaitEpoch could wait for wrong epoch
- Journal fsync thread could miss notifications
Also improve:
- CommandStores uses SearchableRangeList for finding matching stores
- Refactor RedundantBefore to use a sorted array of TxnId/RedundantStatus
pairs (to better fix GC_BEFORE issue)
- Accord debug keyspace operates on keyspace/table, and sorts correctly by
token
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20361
---
modules/accord | 2 +-
.../db/compaction/CompactionIterator.java | 2 +-
.../cassandra/db/virtual/AccordDebugKeyspace.java | 271 +++++++++++++--------
.../cassandra/index/accord/RangeMemoryIndex.java | 4 +-
.../cassandra/index/accord/RouteIndexFormat.java | 20 +-
.../cassandra/index/accord/RouteJournalIndex.java | 3 +-
src/java/org/apache/cassandra/journal/Flusher.java | 6 +-
src/java/org/apache/cassandra/journal/Journal.java | 2 +-
.../service/accord/AccordCommandStore.java | 48 +++-
.../service/accord/AccordFetchCoordinator.java | 6 +-
.../cassandra/service/accord/AccordKeyspace.java | 2 +-
.../service/accord/AccordObjectSizes.java | 2 +-
.../service/accord/AccordSafeCommandStore.java | 30 +++
.../cassandra/service/accord/AccordService.java | 25 +-
.../cassandra/service/accord/AccordTask.java | 6 +-
.../cassandra/service/accord/AccordTopology.java | 20 +-
.../accord/CommandStoreTxnBlockedGraph.java | 4 +-
.../service/accord/interop/AccordInteropRead.java | 54 ++--
.../accord/interop/AccordInteropReadRepair.java | 2 +-
.../accord/serializers/CommandSerializers.java | 17 +-
.../serializers/CommandStoreSerializers.java | 91 ++++---
.../accord/serializers/TopologySerializers.java | 11 +-
.../compaction/CompactionAccordIteratorsTest.java | 6 +-
.../db/virtual/AccordDebugKeyspaceTest.java | 14 +-
.../cassandra/dht/RandomPartitionerTest.java | 2 -
.../cassandra/index/accord/RouteIndexTest.java | 31 ++-
.../service/accord/AccordJournalOrderTest.java | 2 +-
.../service/accord/AccordSyncPropagatorTest.java | 2 +-
.../serializers/CommandStoreSerializersTest.java | 2 +-
.../apache/cassandra/utils/AccordGenerators.java | 60 +++--
30 files changed, 473 insertions(+), 274 deletions(-)
diff --git a/modules/accord b/modules/accord
index 58f107625a..a341979bd8 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 58f107625a183d77b154221efd2f6f0623214027
+Subproject commit a341979bd8fc1d26192cd6bc1edb145e7945e2e9
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index 50e57ef935..1b8b347ce0 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -810,7 +810,7 @@ public class CompactionIterator extends
CompactionInfo.Holder implements Unfilte
return row;
RedundantBefore redundantBefore = info.redundantBefore;
- RedundantBefore.Entry redundantBeforeEntry =
redundantBefore.get(tokenKey.toUnseekable());
+ RedundantBefore.Bounds redundantBeforeEntry =
redundantBefore.get(tokenKey.toUnseekable());
if (redundantBeforeEntry == null)
return row;
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index ce71048a38..0c66a40d4c 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -36,10 +36,10 @@ import accord.api.RoutingKey;
import accord.impl.DurabilityScheduling;
import accord.impl.progresslog.DefaultProgressLog;
import accord.impl.progresslog.TxnStateKind;
+import accord.local.CommandStore;
import accord.local.CommandStores;
import accord.local.DurableBefore;
import accord.local.MaxConflicts;
-import accord.local.RedundantBefore;
import accord.local.RejectBefore;
import accord.primitives.Status;
import accord.primitives.TxnId;
@@ -49,11 +49,9 @@ import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.TupleType;
import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.dht.NormalizedRanges;
import org.apache.cassandra.dht.Token;
@@ -62,6 +60,7 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.AccordCache;
+import org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordCommandStores;
import org.apache.cassandra.service.accord.AccordExecutor;
import org.apache.cassandra.service.accord.AccordKeyspace;
@@ -71,14 +70,17 @@ import org.apache.cassandra.service.accord.api.TokenKey;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
import org.apache.cassandra.service.consensus.migration.TableMigrationState;
import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.utils.Pair;
+import static accord.local.RedundantStatus.Property.GC_BEFORE;
+import static accord.local.RedundantStatus.Property.LOCALLY_APPLIED;
+import static accord.local.RedundantStatus.Property.LOCALLY_REDUNDANT;
+import static accord.local.RedundantStatus.Property.LOCALLY_SYNCED;
+import static accord.local.RedundantStatus.Property.LOCALLY_WITNESSED;
+import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
+import static accord.local.RedundantStatus.Property.SHARD_ONLY_APPLIED;
import static java.lang.String.format;
-import static java.util.Comparator.comparing;
import static com.google.common.collect.ImmutableList.toImmutableList;
-import static accord.utils.async.AsyncChains.getBlockingAndRethrow;
import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
public class AccordDebugKeyspace extends VirtualKeyspace
@@ -93,10 +95,6 @@ public class AccordDebugKeyspace extends VirtualKeyspace
public static final String REJECT_BEFORE = "reject_before";
public static final String TXN_BLOCKED_BY = "txn_blocked_by";
- // {table_id, token} or {table_id, +Inf/-Inf}
- private static final TupleType ROUTING_KEY_TYPE = new
TupleType(List.of(UUIDType.instance, UTF8Type.instance));
- private static final String ROUTING_KEY_TYPE_STRING =
ROUTING_KEY_TYPE.asCQL3Type().toString();
-
public static final AccordDebugKeyspace instance = new
AccordDebugKeyspace();
private AccordDebugKeyspace()
@@ -122,8 +120,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace
super(parse(VIRTUAL_ACCORD_DEBUG, DURABILITY_SCHEDULING,
"Accord per-Range Durability Scheduling State",
"CREATE TABLE %s (\n" +
- format("range_start %s,\n",
ROUTING_KEY_TYPE_STRING) +
- format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+ " keyspace_name text,\n" +
+ " table_name text,\n" +
+ " token_sort blob,\n" +
+ " token_start text,\n" +
+ " token_end text,\n" +
" node_offset int,\n" +
" \"index\" int,\n" +
" number_of_splits int,\n" +
@@ -131,8 +132,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
" cycle_started_at bigint,\n" +
" retry_delay_micros bigint,\n" +
" is_defunct boolean,\n" +
- " PRIMARY KEY ((range_start, range_end))" +
- ')', CompositeType.getInstance(ROUTING_KEY_TYPE,
ROUTING_KEY_TYPE)));
+ " PRIMARY KEY (keyspace_name, table_name,
token_start)" +
+ ')', UTF8Type.instance));
}
@Override
@@ -143,7 +144,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace
SimpleDataSet ds = new SimpleDataSet(metadata());
while (view.advance())
{
- ds.row(decompose(view.range().start()),
decompose(view.range().end()))
+ TableId tableId = (TableId) view.range().start().prefix();
+ TableMetadata tableMetadata = tableMetadata(tableId);
+ ds.row(keyspace(tableMetadata), table(tableId, tableMetadata),
sortToken(view.range().start()))
+ .column("start_token", printToken(view.range().start()))
+ .column("end_token", printToken(view.range().end()))
.column("node_offset", view.nodeOffset())
.column("index", view.index())
.column("number_of_splits", view.numberOfSplits())
@@ -163,12 +168,15 @@ public class AccordDebugKeyspace extends VirtualKeyspace
super(parse(VIRTUAL_ACCORD_DEBUG, DURABLE_BEFORE,
"Accord Node's DurableBefore State",
"CREATE TABLE %s (\n" +
- format("range_start %s,\n",
ROUTING_KEY_TYPE_STRING) +
- format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+ " keyspace_name text,\n" +
+ " table_name text,\n" +
+ " token_sort blob,\n" +
+ " token_start text,\n" +
+ " token_end text,\n" +
" majority_before text,\n" +
" universal_before text,\n" +
- " PRIMARY KEY ((range_start, range_end))" +
- ')', CompositeType.getInstance(ROUTING_KEY_TYPE,
ROUTING_KEY_TYPE)));
+ " PRIMARY KEY (keyspace_name, table_name,
token_sort)" +
+ ')', UTF8Type.instance));
}
@Override
@@ -177,7 +185,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace
DurableBefore durableBefore =
AccordService.instance().node().durableBefore();
return durableBefore.foldlWithBounds(
(entry, ds, start, end) -> {
- ds.row(decompose(start), decompose(end))
+ TableId tableId = (TableId) start.prefix();
+ TableMetadata tableMetadata = tableMetadata(tableId);
+ ds.row(keyspace(tableMetadata), table(tableId,
tableMetadata), sortToken(start))
+ .column("start_token", printToken(start))
+ .column("end_token", printToken(end))
.column("majority_before",
entry.majorityBefore.toString())
.column("universal_before",
entry.universalBefore.toString());
return ds;
@@ -237,30 +249,38 @@ public class AccordDebugKeyspace extends VirtualKeyspace
super(parse(VIRTUAL_ACCORD_DEBUG, MAX_CONFLICTS,
"Accord per-CommandStore MaxConflicts State",
"CREATE TABLE %s (\n" +
- " command_store_id int,\n" +
- format("range_start %s,\n",
ROUTING_KEY_TYPE_STRING) +
- format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+ " keyspace_name text,\n" +
+ " table_name text,\n" +
+ " token_sort blob,\n" +
+ " token_start text,\n" +
+ " token_end text,\n" +
+ " command_store_id bigint,\n" +
" timestamp text,\n" +
- " PRIMARY KEY (command_store_id, range_start,
range_end)" +
- ')', Int32Type.instance));
+ " PRIMARY KEY (keyspace_name, table_name, token_sort,
command_store_id)" +
+ ')', UTF8Type.instance));
}
@Override
public DataSet data()
{
- CommandStores stores =
AccordService.instance().node().commandStores();
- List<Pair<Integer, MaxConflicts>> rangeMaps =
- getBlockingAndRethrow(stores.map(store ->
Pair.create(store.commandStore().id(),
store.commandStore().unsafeGetMaxConflicts())));
- rangeMaps.sort(comparing(p -> p.left));
+ CommandStores commandStores =
AccordService.instance().node().commandStores();
SimpleDataSet dataSet = new SimpleDataSet(metadata());
- for (Pair<Integer, MaxConflicts> pair : rangeMaps)
+ for (CommandStore commandStore : commandStores.all())
{
- int storeId = pair.left;
- MaxConflicts maxConflicts = pair.right;
+ int commandStoreId = commandStore.id();
+ MaxConflicts maxConflicts =
commandStore.unsafeGetMaxConflicts();
+ TableId tableId = ((AccordCommandStore)
commandStore).tableId();
+ TableMetadata tableMetadata = tableMetadata(tableId);
maxConflicts.foldlWithBounds(
- (timestamp, ds, start, end) -> ds.row(storeId,
decompose(start), decompose(end)).column("timestamp", timestamp.toString()),
+ (timestamp, ds, start, end) -> {
+ return ds.row(keyspace(tableMetadata), table(tableId,
tableMetadata), sortToken(start), commandStoreId)
+ .column("start_token", printToken(start))
+ .column("end_token", printToken(end))
+ .column("timestamp", timestamp.toString())
+ ;
+ },
dataSet,
ignore -> false
);
@@ -365,6 +385,8 @@ public class AccordDebugKeyspace extends VirtualKeyspace
super(parse(VIRTUAL_ACCORD_DEBUG, PROGRESS_LOG,
"Accord per-CommandStore ProgressLog State",
"CREATE TABLE %s (\n" +
+ " keyspace_name text,\n" +
+ " table_name text,\n" +
" command_store_id int,\n" +
" txn_id text,\n" +
// Timer + BaseTxnState
@@ -382,25 +404,23 @@ public class AccordDebugKeyspace extends VirtualKeyspace
" home_progress text,\n" +
" home_retry_counter int,\n" +
" home_scheduled_at timestamp,\n" +
- " PRIMARY KEY (command_store_id, txn_id)" +
- ')', Int32Type.instance));
+ " PRIMARY KEY (keyspace_name, table_name,
command_store_id, txn_id)" +
+ ')', UTF8Type.instance));
}
@Override
public DataSet data()
{
- CommandStores stores =
AccordService.instance().node().commandStores();
- List<DefaultProgressLog.ImmutableView> views =
- getBlockingAndRethrow(stores.map(store ->
((DefaultProgressLog) store.progressLog()).immutableView()));
- views.sort(comparing(DefaultProgressLog.ImmutableView::storeId));
-
+ CommandStores commandStores =
AccordService.instance().node().commandStores();
SimpleDataSet ds = new SimpleDataSet(metadata());
- for (int i = 0, size = views.size(); i < size; ++i)
+ for (CommandStore commandStore : commandStores.all())
{
- DefaultProgressLog.ImmutableView view = views.get(i);
+ DefaultProgressLog.ImmutableView view =
(DefaultProgressLog.ImmutableView) commandStore.unsafeProgressLog();
+ TableId tableId = ((AccordCommandStore)commandStore).tableId();
+ TableMetadata tableMetadata = tableMetadata(tableId);
while (view.advance())
{
- ds.row(view.storeId(), view.txnId().toString())
+ ds.row(keyspace(tableMetadata), table(tableId,
tableMetadata), view.commandStoreId(), view.txnId().toString())
.column("contact_everyone", view.contactEveryone())
.column("waiting_is_uninitialised",
view.isWaitingUninitialised())
.column("waiting_blocked_until",
view.waitingIsBlockedUntil().name())
@@ -436,47 +456,53 @@ public class AccordDebugKeyspace extends VirtualKeyspace
super(parse(VIRTUAL_ACCORD_DEBUG, REDUNDANT_BEFORE,
"Accord per-CommandStore RedundantBefore State",
"CREATE TABLE %s (\n" +
- " command_store_id int,\n" +
- format("range_start %s,\n",
ROUTING_KEY_TYPE_STRING) +
- format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
- " start_ownership_epoch bigint,\n" +
- " end_ownership_epoch bigint,\n" +
- " locally_applied_or_invalidated_before text,\n" +
- " locally_decided_and_applied_or_invalidated_before
text,\n" +
- " shard_applied_or_invalidated_before text,\n" +
+ " keyspace_name text,\n" +
+ " table_name text,\n" +
+ " token_sort blob,\n" +
+ " token_start text,\n" +
+ " token_end text,\n" +
+ " command_store_id bigint,\n" +
+ " start_epoch bigint,\n" +
+ " end_epoch bigint,\n" +
" gc_before text,\n" +
- " shard_only_applied_or_invalidated_before text,\n" +
- " bootstrapped_at text,\n" +
+ " shard_only_applied text,\n" +
+ " locally_applied text,\n" +
+ " locally_synced text,\n" +
+ " locally_redundant text,\n" +
+ " locally_witnessed text,\n" +
+ " pre_bootstrap text,\n" +
" stale_until_at_least text,\n" +
- " PRIMARY KEY (command_store_id, range_start,
range_end)" +
- ')', Int32Type.instance));
+ " PRIMARY KEY (keyspace_name, table_name, token_sort,
command_store_id)" +
+ ')', UTF8Type.instance));
}
@Override
public DataSet data()
{
- CommandStores stores =
AccordService.instance().node().commandStores();
- List<Pair<Integer, RedundantBefore>> rangeMaps =
- getBlockingAndRethrow(stores.map(store ->
Pair.create(store.commandStore().id(),
store.commandStore().unsafeGetRedundantBefore())));
- rangeMaps.sort(comparing(p -> p.left));
+ CommandStores commandStores =
AccordService.instance().node().commandStores();
SimpleDataSet dataSet = new SimpleDataSet(metadata());
- for (Pair<Integer, RedundantBefore> pair : rangeMaps)
+ for (CommandStore commandStore : commandStores.all())
{
- int storeId = pair.left;
- RedundantBefore redundantBefore = pair.right;
-
- redundantBefore.foldlWithBounds(
- (entry, ds, start, end) -> {
- ds.row(storeId, decompose(start), decompose(end))
- .column("start_ownership_epoch",
entry.startOwnershipEpoch)
- .column("end_ownership_epoch",
entry.endOwnershipEpoch)
- .column("locally_applied_before",
entry.locallyAppliedBefore.toString())
- .column("locally_decided_and_applied_before",
entry.locallyDecidedAndAppliedBefore.toString())
- .column("shard_applied_before",
entry.shardAppliedBefore.toString())
- .column("gc_before", entry.gcBefore.toString())
- .column("shard_only_applied_before",
entry.shardOnlyAppliedBefore.toString())
- .column("bootstrapped_at",
entry.bootstrappedAt.toString())
+ int commandStoreId = commandStore.id();
+ TableId tableId = ((AccordCommandStore)commandStore).tableId();
+ TableMetadata tableMetadata = tableMetadata(tableId);
+ String keyspace = keyspace(tableMetadata);
+ String table = table(tableId, tableMetadata);
+ commandStore.unsafeGetRedundantBefore().foldl(
+ (entry, ds) -> {
+ ds.row(keyspace, table,
sortToken(entry.range.start()), commandStoreId)
+ .column("start_token",
printToken(entry.range.start()))
+ .column("end_token", printToken(entry.range.end()))
+ .column("start_epoch", entry.startEpoch)
+ .column("end_epoch", entry.endEpoch)
+ .column("gc_before",
entry.maxBound(GC_BEFORE).toString())
+ .column("shard_only_applied",
entry.maxBound(SHARD_ONLY_APPLIED).toString())
+ .column("locally_applied",
entry.maxBound(LOCALLY_APPLIED).toString())
+ .column("locally_synced",
entry.maxBound(LOCALLY_SYNCED).toString())
+ .column("locally_redundant",
entry.maxBound(LOCALLY_REDUNDANT).toString())
+ .column("locally_witnessed",
entry.maxBound(LOCALLY_WITNESSED).toString())
+ .column("pre_bootstrap",
entry.maxBound(PRE_BOOTSTRAP).toString())
.column("stale_until_at_least",
entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
return ds;
},
@@ -495,33 +521,38 @@ public class AccordDebugKeyspace extends VirtualKeyspace
super(parse(VIRTUAL_ACCORD_DEBUG, REJECT_BEFORE,
"Accord per-CommandStore RejectBefore State",
"CREATE TABLE %s (\n" +
+ " keyspace_name text,\n" +
+ " table_name text,\n" +
+ " token_sort blob,\n" +
+ " token_start text,\n" +
+ " token_end text,\n" +
" command_store_id int,\n" +
- format("range_start %s,\n",
ROUTING_KEY_TYPE_STRING) +
- format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
" txn_id text,\n" +
- " PRIMARY KEY (command_store_id, range_start,
range_end)" +
- ')', Int32Type.instance));
+ " PRIMARY KEY (keyspace_name, table_name, token_sort,
command_store_id)" +
+ ')', UTF8Type.instance));
}
@Override
public DataSet data()
{
- CommandStores stores =
AccordService.instance().node().commandStores();
- List<Pair<Integer, RejectBefore>> rangeMaps =
- getBlockingAndRethrow(stores.map(store ->
Pair.create(store.commandStore().id(),
store.commandStore().unsafeGetRejectBefore())));
- rangeMaps.sort(comparing(p -> p.left));
-
+ CommandStores commandStores =
AccordService.instance().node().commandStores();
SimpleDataSet dataSet = new SimpleDataSet(metadata());
- for (Pair<Integer, RejectBefore> pair : rangeMaps)
+ for (CommandStore commandStore : commandStores.all())
{
- int storeId = pair.left;
- RejectBefore rejectBefore = pair.right;
-
+ RejectBefore rejectBefore =
commandStore.unsafeGetRejectBefore();
if (rejectBefore == null)
continue;
+ TableId tableId = ((AccordCommandStore)commandStore).tableId();
+ TableMetadata tableMetadata = tableMetadata(tableId);
+ String keyspace = keyspace(tableMetadata);
+ String table = table(tableId, tableMetadata);
rejectBefore.foldlWithBounds(
- (txnId, ds, start, end) -> ds.row(storeId,
decompose(start), decompose(end)).column("txn_id", txnId.toString()),
+ (txnId, ds, start, end) -> ds.row(keyspace, table,
sortToken(start), commandStore.id())
+ .column("token_start",
printToken(start))
+ .column("token_end",
printToken(end))
+ .column("txn_id",
txnId.toString())
+ ,
dataSet,
ignore -> false
);
@@ -540,14 +571,16 @@ public class AccordDebugKeyspace extends VirtualKeyspace
"Accord Transactions Blocked By Table" ,
"CREATE TABLE %s (\n" +
" txn_id text,\n" +
+ " keyspace_name text,\n" +
+ " table_name text,\n" +
" command_store_id int,\n" +
" depth int,\n" +
" blocked_by text,\n" +
" reason text,\n" +
" save_status text,\n" +
" execute_at text,\n" +
- format("key %s,\n", ROUTING_KEY_TYPE_STRING) +
- " PRIMARY KEY (txn_id, command_store_id, depth,
blocked_by, reason)" +
+ " key text,\n" +
+ " PRIMARY KEY (txn_id, keyspace_name, table_name,
command_store_id, depth, blocked_by, reason)" +
')', UTF8Type.instance));
}
@@ -558,10 +591,11 @@ public class AccordDebugKeyspace extends VirtualKeyspace
List<CommandStoreTxnBlockedGraph> shards =
AccordService.instance().debugTxnBlockedGraph(id);
SimpleDataSet ds = new SimpleDataSet(metadata());
+ CommandStores commandStores =
AccordService.instance().node().commandStores();
for (CommandStoreTxnBlockedGraph shard : shards)
{
Set<TxnId> processed = new HashSet<>();
- process(ds, shard, processed, id, 0, id, Reason.Self, null);
+ process(ds, commandStores, shard, processed, id, 0, id,
Reason.Self, null);
// everything was processed right?
if (!shard.txns.isEmpty() &&
!shard.txns.keySet().containsAll(processed))
throw new IllegalStateException("Skipped txns: " +
Sets.difference(shard.txns.keySet(), processed));
@@ -570,7 +604,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
return ds;
}
- private void process(SimpleDataSet ds, CommandStoreTxnBlockedGraph
shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason
reason, Runnable onDone)
+ private void process(SimpleDataSet ds, CommandStores commandStores,
CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int
depth, TxnId txnId, Reason reason, Runnable onDone)
{
if (!processed.add(txnId))
throw new IllegalStateException("Double processed " + txnId);
@@ -583,7 +617,10 @@ public class AccordDebugKeyspace extends VirtualKeyspace
// was it applied? If so ignore it
if (reason != Reason.Self &&
txn.saveStatus.hasBeen(Status.Applied))
return;
- ds.row(userTxn.toString(), shard.storeId, depth, reason ==
Reason.Self ? "" : txn.txnId.toString(), reason.name());
+ TableId tableId = tableId(shard.commandStoreId, commandStores);
+ TableMetadata tableMetadata = tableMetadata(tableId);
+ ds.row(userTxn.toString(), keyspace(tableMetadata), table(tableId,
tableMetadata),
+ shard.commandStoreId, depth, reason == Reason.Self ? "" :
txn.txnId.toString(), reason.name());
ds.column("save_status", txn.saveStatus.name());
if (txn.executeAt != null)
ds.column("execute_at", txn.executeAt.toString());
@@ -594,14 +631,14 @@ public class AccordDebugKeyspace extends VirtualKeyspace
for (TxnId blockedBy : txn.blockedBy)
{
if (!processed.contains(blockedBy))
- process(ds, shard, processed, userTxn, depth + 1,
blockedBy, Reason.Txn, null);
+ process(ds, commandStores, shard, processed, userTxn,
depth + 1, blockedBy, Reason.Txn, null);
}
for (TokenKey blockedBy : txn.blockedByKey)
{
TxnId blocking = shard.keys.get(blockedBy);
if (!processed.contains(blocking))
- process(ds, shard, processed, userTxn, depth + 1,
blocking, Reason.Key, () -> ds.column("key", decompose(blockedBy)));
+ process(ds, commandStores, shard, processed, userTxn,
depth + 1, blocking, Reason.Key, () -> ds.column("key", printToken(blockedBy)));
}
}
}
@@ -613,10 +650,46 @@ public class AccordDebugKeyspace extends VirtualKeyspace
}
}
- private static ByteBuffer decompose(RoutingKey routingKey)
+ private static TableId tableId(int commandStoreId, CommandStores
commandStores)
+ {
+ AccordCommandStore commandStore = (AccordCommandStore)
commandStores.forId(commandStoreId);
+ if (commandStore == null)
+ return null;
+ return commandStore.tableId();
+ }
+
+ private static TableMetadata tableMetadata(TableId tableId)
+ {
+ if (tableId == null)
+ return null;
+ return Schema.instance.getTableMetadata(tableId);
+ }
+
+ private static String keyspace(TableMetadata metadata)
+ {
+ return metadata == null ? "Unknown" : metadata.keyspace;
+ }
+
+ private static String table(TableId tableId, TableMetadata metadata)
+ {
+ return metadata == null ? tableId.toString() : metadata.name;
+ }
+
+ private static String printToken(RoutingKey routingKey)
+ {
+ TokenKey key = (TokenKey) routingKey;
+ return
key.token().getPartitioner().getTokenFactory().toString(key.token());
+ }
+
+ private static ByteBuffer sortToken(RoutingKey routingKey)
{
TokenKey key = (TokenKey) routingKey;
- return
ROUTING_KEY_TYPE.pack(UUIDType.instance.decompose(key.table().asUUID()),
bytes(key.suffix().toString()));
+ Token token = key.token();
+ IPartitioner partitioner = token.getPartitioner();
+ ByteBuffer out =
ByteBuffer.allocate(partitioner.accordSerializedSize(token));
+ partitioner.accordSerialize(token, out);
+ out.flip();
+ return out;
}
private static TableMetadata parse(String keyspace, String table, String
comment, String schema, AbstractType<?> partitionKeyType)
diff --git a/src/java/org/apache/cassandra/index/accord/RangeMemoryIndex.java
b/src/java/org/apache/cassandra/index/accord/RangeMemoryIndex.java
index 604d9539d6..43eba0f806 100644
--- a/src/java/org/apache/cassandra/index/accord/RangeMemoryIndex.java
+++ b/src/java/org/apache/cassandra/index/accord/RangeMemoryIndex.java
@@ -52,7 +52,7 @@ import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.RTree;
import org.apache.cassandra.utils.RangeTree;
-import static
org.apache.cassandra.index.accord.RouteIndexFormat.deserializeRoute;
+import static
org.apache.cassandra.index.accord.RouteIndexFormat.deserializeParticipants;
public class RangeMemoryIndex
{
@@ -110,7 +110,7 @@ public class RangeMemoryIndex
Route<?> route;
try
{
- route = deserializeRoute(value);
+ route = deserializeParticipants(value);
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java
b/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java
index ddec40e17f..1821e2c162 100644
--- a/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java
+++ b/src/java/org/apache/cassandra/index/accord/RouteIndexFormat.java
@@ -34,6 +34,7 @@ import java.util.zip.Checksum;
import com.google.common.collect.Maps;
import accord.local.StoreParticipants;
+import accord.primitives.Participants;
import accord.primitives.Route;
import accord.primitives.TxnId;
import org.apache.cassandra.db.DecoratedKey;
@@ -71,30 +72,30 @@ public class RouteIndexFormat
{
public static final Supplier<Checksum> CHECKSUM_SUPPLIER = CRC32C::new;
- static final LocalVersionedSerializer<Route<?>> route =
localSerializer(KeySerializers.route);
+ static final LocalVersionedSerializer<Participants<?>> participants =
localSerializer(KeySerializers.participants);
private static <T> LocalVersionedSerializer<T>
localSerializer(IVersionedSerializer<T> serializer)
{
return new LocalVersionedSerializer<>(AccordSerializerVersion.CURRENT,
AccordSerializerVersion.serializer, serializer);
}
- public static ByteBuffer serialize(Route<?> value) throws IOException
+ public static ByteBuffer serialize(Participants<?> value) throws
IOException
{
- int size = Math.toIntExact(route.serializedSize(value));
+ int size = Math.toIntExact(participants.serializedSize(value));
try (DataOutputBuffer buffer = new DataOutputBuffer(size))
{
- route.serialize(value, buffer);
+ participants.serialize(value, buffer);
return buffer.buffer(true);
}
}
- static Route<?> deserializeRoute(ByteBuffer bytes) throws IOException
+ static Route<?> deserializeParticipants(ByteBuffer bytes) throws
IOException
{
if (bytes == null || ByteBufferAccessor.instance.isEmpty(bytes))
return null;
try (DataInputBuffer in = new DataInputBuffer(bytes, true))
{
- MessageVersionProvider versionProvider =
route.deserializeVersion(in);
+ MessageVersionProvider versionProvider =
participants.deserializeVersion(in);
return KeySerializers.route.deserialize(in,
versionProvider.messageVersion());
}
}
@@ -163,13 +164,14 @@ public class RouteIndexFormat
StoreParticipants participants = builder.participants();
if (participants == null)
return null;
- Route<?> route = participants.route();
- if (route == null)
+
+ Participants<?> touches = participants.touches();
+ if (touches == null)
return null;
try
{
- return serialize(participants.route());
+ return serialize(touches);
}
catch (IOException e)
{
diff --git a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
index 144b1cddea..aa97dd981e 100644
--- a/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
+++ b/src/java/org/apache/cassandra/index/accord/RouteJournalIndex.java
@@ -94,8 +94,7 @@ import static accord.primitives.Routable.Domain.Range;
public class RouteJournalIndex implements Index, INotificationConsumer
{
- public enum RegisterStatus
- {PENDING, REGISTERED, UNREGISTERED}
+ public enum RegisterStatus { PENDING, REGISTERED, UNREGISTERED }
private static final Logger logger =
LoggerFactory.getLogger(RouteJournalIndex.class);
diff --git a/src/java/org/apache/cassandra/journal/Flusher.java
b/src/java/org/apache/cassandra/journal/Flusher.java
index 07c7516d48..6a2a387c32 100644
--- a/src/java/org/apache/cassandra/journal/Flusher.java
+++ b/src/java/org/apache/cassandra/journal/Flusher.java
@@ -149,7 +149,7 @@ final class Flusher<K, V>
return;
awaitingWork = Thread.currentThread();
- do
+ while (true)
{
if (Thread.interrupted())
{
@@ -157,9 +157,11 @@ final class Flusher<K, V>
throw new InterruptedException();
}
+ if (fsyncWaitingSince != lastStartedAt)
+ break;
+
LockSupport.park();
}
- while (fsyncWaitingSince == lastStartedAt);
awaitingWork = null;
}
diff --git a/src/java/org/apache/cassandra/journal/Journal.java
b/src/java/org/apache/cassandra/journal/Journal.java
index 08626b441f..98150f1cee 100644
--- a/src/java/org/apache/cassandra/journal/Journal.java
+++ b/src/java/org/apache/cassandra/journal/Journal.java
@@ -146,7 +146,7 @@ public class Journal<K, V> implements Shutdownable
@Override
public void onFlushFailed(Throwable cause)
{
- // TODO: panic
+ // TODO (required): panic
}
private void submit(RecordPointer pointer, Runnable runnable)
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 77f39f74bf..7355e7f2da 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -25,6 +25,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -61,6 +63,7 @@ import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import org.apache.cassandra.schema.TableId;
import
org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor;
+import org.apache.cassandra.service.accord.IAccordService.AccordCompactionInfo;
import org.apache.cassandra.service.accord.api.TokenKey;
import org.apache.cassandra.service.accord.txn.TxnRead;
import org.apache.cassandra.utils.Clock;
@@ -134,6 +137,10 @@ public class AccordCommandStore extends CommandStore
}
}
+ static final AtomicReferenceFieldUpdater<AccordCommandStore,
SafeRedundantBefore> safeRedundantBeforeUpdater
+ = AtomicReferenceFieldUpdater.newUpdater(AccordCommandStore.class,
SafeRedundantBefore.class, "safeRedundantBefore");
+ static final AtomicLong nextSafeRedundantBeforeTicket = new AtomicLong();
+
public final String loggingId;
private final Journal journal;
private final RangeSearcher rangeSearcher;
@@ -143,6 +150,7 @@ public class AccordCommandStore extends CommandStore
private long lastSystemTimestampMicros = Long.MIN_VALUE;
private final CommandsForRanges.Manager commandsForRanges;
private final TableId tableId;
+ volatile SafeRedundantBefore safeRedundantBefore;
private AccordSafeCommandStore current;
private Thread currentThread;
@@ -384,7 +392,6 @@ public class AccordCommandStore extends CommandStore
Invariants.require(thread == null ? currentThread == self :
currentThread == null);
currentThread = thread;
if (thread != null) CommandStore.register(this);
-
}
public boolean hasSafeStore()
@@ -484,6 +491,15 @@ public class AccordCommandStore extends CommandStore
return journal.loadMinimal(id, txnId, MINIMAL,
unsafeGetRedundantBefore(), durableBefore());
}
+ public AccordCompactionInfo getCompactionInfo()
+ {
+ SafeRedundantBefore safeRedundantBefore = this.safeRedundantBefore;
+ RedundantBefore redundantBefore;
+ if (safeRedundantBefore == null) redundantBefore =
RedundantBefore.EMPTY;
+ else redundantBefore = safeRedundantBefore.redundantBefore;
+ return new AccordCompactionInfo(id, redundantBefore, rangesForEpoch,
tableId);
+ }
+
public RangeSearcher rangeSearcher()
{
return rangeSearcher;
@@ -494,6 +510,12 @@ public class AccordCommandStore extends CommandStore
return loader;
}
+ @VisibleForTesting
+ public void unsafeUpsertRedundantBefore(RedundantBefore addRedundantBefore)
+ {
+ super.unsafeUpsertRedundantBefore(addRedundantBefore);
+ }
+
private static class CommandStoreLoader extends AbstractLoader
{
private final AccordCommandStore store;
@@ -522,7 +544,11 @@ public class AccordCommandStore extends CommandStore
void maybeLoadRedundantBefore(RedundantBefore redundantBefore)
{
if (redundantBefore != null)
+ {
loadRedundantBefore(redundantBefore);
+ Invariants.require(safeRedundantBefore == null);
+ safeRedundantBefore = new SafeRedundantBefore(0, redundantBefore);
+ }
}
void maybeLoadBootstrapBeganAt(NavigableMap<TxnId, Ranges>
bootstrapBeganAt)
@@ -542,4 +568,24 @@ public class AccordCommandStore extends CommandStore
if (rangesForEpoch != null)
loadRangesForEpoch(rangesForEpoch);
}
+
+ // TODO (expected): handle journal failures, and consider how we handle
partial failures.
+ // Very likely we will not be able to safely or cleanly handle partial
failures of this logic, but decide and document.
+ // TODO (desired): consider merging with PersistentField? This version is
cheaper to manage which may be preferable at the CommandStore level.
+ static class SafeRedundantBefore
+ {
+ final long ticket;
+ final RedundantBefore redundantBefore;
+
+ SafeRedundantBefore(long ticket, RedundantBefore redundantBefore)
+ {
+ this.ticket = ticket;
+ this.redundantBefore = redundantBefore;
+ }
+
+ static SafeRedundantBefore max(SafeRedundantBefore a,
SafeRedundantBefore b)
+ {
+ return a.ticket >= b.ticket ? a : b;
+ }
+ }
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java
b/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java
index f56c388012..b9ec97b68a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordFetchCoordinator.java
@@ -421,11 +421,11 @@ public class AccordFetchCoordinator extends
AbstractFetchCoordinator implements
}
@Override
- protected AsyncChain<Data> beginRead(SafeCommandStore safeStore,
Timestamp executeAt, PartialTxn txn, Ranges unavailable)
+ protected AsyncChain<Data> beginRead(SafeCommandStore safeStore,
Timestamp executeAt, PartialTxn txn, Participants<?> execute)
{
- AsyncChain<Data> result = super.beginRead(safeStore, executeAt,
txn, unavailable);
+ AsyncChain<Data> result = super.beginRead(safeStore, executeAt,
txn, execute);
// TODO (required): verify that streaming snapshots have all been
created by now, so we won't stream any data that arrives after this
- readStarted(safeStore, unavailable);
+ readStarted(safeStore);
return result;
}
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index 43ea8add3b..b676b110e3 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -313,7 +313,7 @@ public class AccordKeyspace
}
// TODO (expected): garbage-free filtering, reusing encoding
- public Row withoutRedundantCommands(TokenKey key, Row row,
RedundantBefore.Entry redundantBefore)
+ public Row withoutRedundantCommands(TokenKey key, Row row,
RedundantBefore.Bounds redundantBefore)
{
Invariants.require(row.columnCount() == 1);
Cell<?> cell = row.getCell(data);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index da27eca0f3..9036116067 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -293,7 +293,7 @@ public class AccordObjectSizes
FullKeyRoute route = new FullKeyRoute(EMPTY_KEY, new RoutingKey[]{
EMPTY_KEY });
Participants<?> empty = route.slice(0, 0);
ICommand.Builder builder = new ICommand.Builder(EMPTY_TXNID)
-
.setParticipants(StoreParticipants.create(route, empty, executes ? empty :
null, empty, route))
+
.setParticipants(StoreParticipants.create(route, empty, executes ? empty :
null, executes ? empty : null, empty, route))
.durability(Status.Durability.NotDurable)
.executeAt(EMPTY_TXNID)
.promised(Ballot.ZERO);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index 8e4948a49a..89f3214dc7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -28,6 +28,7 @@ import com.google.common.annotations.VisibleForTesting;
import accord.api.Agent;
import accord.api.DataStore;
+import accord.api.Journal.FieldUpdates;
import accord.api.ProgressLog;
import accord.api.RoutingKey;
import accord.impl.AbstractSafeCommandStore;
@@ -40,6 +41,7 @@ import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
import org.apache.cassandra.service.accord.AccordCommandStore.ExclusiveCaches;
+import
org.apache.cassandra.service.accord.AccordCommandStore.SafeRedundantBefore;
import static accord.utils.Invariants.illegalState;
@@ -115,6 +117,34 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
}
}
+ @Override
+ protected void persistFieldUpdates()
+ {
+ super.persistFieldUpdates();
+ }
+
+ protected void persistFieldUpdatesInternal(Runnable onDone)
+ {
+ FieldUpdates updates = fieldUpdates();
+ if (updates == null)
+ return;
+
+ if (updates.newRedundantBefore != null)
+ {
+ long ticket =
AccordCommandStore.nextSafeRedundantBeforeTicket.incrementAndGet();
+ SafeRedundantBefore update = new SafeRedundantBefore(ticket,
updates.newRedundantBefore);
+ Runnable reportRedundantBefore = () -> {
+
AccordCommandStore.safeRedundantBeforeUpdater.accumulateAndGet((AccordCommandStore)commandStore,
update, SafeRedundantBefore::max);
+ };
+ Runnable prevOnDone = onDone;
+ onDone = prevOnDone == null ? reportRedundantBefore : () -> {
+ try { reportRedundantBefore.run(); }
+ finally { prevOnDone.run(); }
+ };
+ }
+ commandStore.persistFieldUpdates(updates, onDone);
+ }
+
protected AccordSafeCommandsForKey add(AccordSafeCommandsForKey safeCfk,
ExclusiveCaches caches)
{
Object check = task.ensureCommandsForKey().putIfAbsent(safeCfk.key(),
safeCfk);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 26dfa7dc66..bd42e666b7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -360,6 +360,11 @@ public class AccordService implements IAccordService,
Shutdownable
@Override
public synchronized void startup()
+ {
+ unsafeStartupWithOverrides(null);
+ }
+
+ public synchronized void unsafeStartupWithOverrides(@Nullable
Journal.TopologyUpdate overrideNullTopologyUpdate)
{
if (state != State.INIT)
return;
@@ -380,6 +385,8 @@ public class AccordService implements IAccordService,
Shutdownable
Invariants.require(lastSeen == null || update.global.epoch() >
lastSeen.global.epoch());
lastSeen = update;
}
+ if (lastSeen == null)
+ lastSeen = overrideNullTopologyUpdate;
if (lastSeen != null)
{
@@ -1328,21 +1335,9 @@ public class AccordService implements IAccordService,
Shutdownable
public AccordCompactionInfos getCompactionInfo()
{
AccordCompactionInfos compactionInfos = new
AccordCompactionInfos(node.durableBefore());
- if (node.commandStores().all().length > 0)
- {
-
AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> {
- synchronized (compactionInfos)
- {
- int id = safeStore.commandStore().id();
- compactionInfos.put(id, new AccordCompactionInfo(
- id,
- safeStore.redundantBefore(),
- safeStore.ranges(),
-
((AccordCommandStore)safeStore.commandStore()).tableId()
- ));
- }
- }));
- }
+ node.commandStores().forEachCommandStore(commandStore -> {
+ compactionInfos.put(commandStore.id(),
((AccordCommandStore)commandStore).getCompactionInfo());
+ });
return compactionInfos;
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index 209bff7cdd..38bf137f10 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -685,10 +685,8 @@ public abstract class AccordTask<R> extends Task
implements Runnable, Function<S
{
state(PERSISTING);
Runnable onFlush = () -> finish(result, null);
- if (safeStore.fieldUpdates() != null)
- commandStore.persistFieldUpdates(safeStore.fieldUpdates(),
changes == null ? onFlush : null);
- if (changes != null)
- save(changes, onFlush);
+ safeStore.persistFieldUpdatesInternal(changes == null ?
onFlush : null);
+ if (changes != null) save(changes, onFlush);
}
commandStore.complete(safeStore);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
index 140eb7395c..c78a787d99 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
@@ -41,6 +41,7 @@ import accord.topology.Shard;
import accord.topology.Topology;
import accord.utils.Invariants;
import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.TinyEnumSet;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.IPartitioner;
@@ -80,18 +81,18 @@ public class AccordTopology
private static class ShardLookup extends HashMap<accord.primitives.Range,
Shard>
{
- private Shard createOrReuse(boolean pendingRemoval,
accord.primitives.Range range, SortedArrayList<Id> nodes, SortedArrayList<Id>
fastPath, Set<Id> joining)
+ private Shard createOrReuse(TinyEnumSet<Shard.Flag> flags,
accord.primitives.Range range, SortedArrayList<Id> nodes, SortedArrayList<Id>
fastPath, Set<Id> joining)
{
Shard prev = get(range);
if (prev != null
- && prev.pendingRemoval == pendingRemoval
+ && prev.flags().bitset() == flags.bitset()
&& prev.nodes.equals(nodes)
&& prev.fastPathElectorateSize == fastPath.size()
&& prev.nodes.without(prev.notInFastPath).equals(fastPath)
&& joining.size() == prev.joining.size() &&
prev.joining.containsAll(joining))
return prev;
- return Shard.create(range, nodes, fastPath, joining,
pendingRemoval);
+ return Shard.create(range, nodes, fastPath, joining, flags);
}
}
@@ -120,7 +121,7 @@ public class AccordTopology
return strategy;
}
- List<Shard> createForTable(TableMetadata metadata, Set<Id>
unavailable, Map<Id, String> dcMap, ShardLookup lookup)
+ List<Shard> createForTable(Epoch epoch, TableMetadata metadata,
Set<Id> unavailable, Map<Id, String> dcMap, ShardLookup lookup)
{
Ranges ranges = this.ranges.stream()
.map(range ->
Ranges.single(AccordTopology.range(metadata.id, range)))
@@ -131,7 +132,14 @@ public class AccordTopology
List<Shard> shards = new ArrayList<>(ranges.size());
for (accord.primitives.Range range : ranges)
- shards.add(lookup.createOrReuse(metadata.params.pendingDrop,
range, nodes, electorate, pending));
+ {
+ TinyEnumSet<Shard.Flag> flags = Shard.NO_FLAGS;
+ if (metadata.params.pendingDrop)
+ flags = flags.with(Shard.Flag.PENDING_REMOVAL);
+ if (metadata.epoch.isEqualOrAfter(epoch))
+ flags = flags.with(Shard.Flag.MUST_WITNESS);
+ shards.add(lookup.createOrReuse(flags, range, nodes,
electorate, pending));
+ }
return shards;
}
@@ -296,7 +304,7 @@ public class AccordTopology
if (tables.isEmpty())
continue;
List<KeyspaceShard> ksShards = KeyspaceShard.forKeyspace(keyspace,
placements, directory);
- tables.forEach(table -> ksShards.forEach(shard ->
res.addAll(shard.createForTable(table, unavailable, dcMap, lookup))));
+ tables.forEach(table -> ksShards.forEach(shard ->
res.addAll(shard.createForTable(epoch, table, unavailable, dcMap, lookup))));
}
res.sort((a, b) -> a.range.compare(b.range));
diff --git
a/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java
b/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java
index bae7d6b479..40f2c3703e 100644
---
a/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java
+++
b/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java
@@ -36,13 +36,13 @@ import org.apache.cassandra.service.accord.api.TokenKey;
public class CommandStoreTxnBlockedGraph
{
- public final int storeId;
+ public final int commandStoreId;
public final Map<TxnId, TxnState> txns;
public final Map<TokenKey, TxnId> keys;
public CommandStoreTxnBlockedGraph(Builder builder)
{
- storeId = builder.storeId;
+ commandStoreId = builder.storeId;
txns = ImmutableMap.copyOf(builder.txns);
keys = ImmutableMap.copyOf(builder.keys);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
index 6ccf8c4c03..0998c72634 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropRead.java
@@ -34,11 +34,11 @@ import accord.local.Node;
import accord.local.SafeCommandStore;
import accord.messages.MessageType;
import accord.messages.ReadData;
+import accord.primitives.AbstractRanges;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Range;
import accord.primitives.Ranges;
-import accord.primitives.Routables.Slice;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.topology.Topologies;
@@ -241,38 +241,34 @@ public class AccordInteropRead extends ReadData
}
@Override
- protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp
executeAt, PartialTxn txn, Ranges unavailable)
+ protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp
executeAt, PartialTxn txn, Participants<?> execute)
{
TxnRead txnRead = (TxnRead)txn.read();
- Ranges ranges =
safeStore.ranges().allAt(executeAt).without(unavailable).intersecting(scope,
Slice.Minimal);
long nowInSeconds = TxnNamedRead.nowInSeconds(executeAt);
- List<AsyncChain<Data>> chains = new ArrayList<>(ranges.size());
- for (Range r : ranges)
+ if (!command.isRangeRequest())
{
- ReadCommand readCommand = this.command;
- TokenKey routingKey = null;
- final ReadCommand readCommandFinal;
- if (readCommand.isRangeRequest())
- {
- // This path can have a subrange we have never seen before
provided by short read protection or read repair so we need to
- // calculate the intersection with this instance of the
command store and the actual command if it is not empty we
- // will need to execute it
- TokenRange commandRange =
TxnNamedRead.boundsAsAccordRange(readCommand.dataRange().keyRange(),
readCommand.metadata().id);
- Range intersection = commandRange.intersection(r);
- if (intersection == null)
- continue;
- readCommandFinal =
TxnNamedRead.commandForSubrange((PartitionRangeReadCommand) readCommand,
intersection, txnRead.cassandraConsistencyLevel(), nowInSeconds);
- routingKey = ((TokenRange)r).start();
- }
- else
- {
- SinglePartitionReadCommand singlePartitionReadCommand =
((SinglePartitionReadCommand)readCommand);
- if (!r.contains(new
TokenKey(singlePartitionReadCommand.metadata().id,
singlePartitionReadCommand.partitionKey().getToken())))
- continue;
- readCommandFinal =
((SinglePartitionReadCommand)readCommand).withTransactionalSettings(TxnNamedRead.readsWithoutReconciliation(txnRead.cassandraConsistencyLevel()),
nowInSeconds);
- }
- TokenKey routingKeyFinal = routingKey;
- chains.add(AsyncChains.ofCallable(Stage.READ.executor(), () -> new
LocalReadData(routingKeyFinal,
ReadCommandVerbHandler.instance.doRead(readCommandFinal, false), readCommand)));
+ SinglePartitionReadCommand readCommand =
((SinglePartitionReadCommand)command);
+ TokenKey key = new TokenKey(readCommand.metadata().id,
readCommand.partitionKey().getToken());
+ if (!execute.contains(key))
+ return AsyncChains.success(null);
+
+ ReadCommand submit =
readCommand.withTransactionalSettings(TxnNamedRead.readsWithoutReconciliation(txnRead.cassandraConsistencyLevel()),
nowInSeconds);
+ return AsyncChains.ofCallable(Stage.READ.executor(), () -> new
LocalReadData(key, ReadCommandVerbHandler.instance.doRead(submit, false),
command));
+ }
+
+ // This path can have a subrange we have never seen before provided by
short read protection or read repair so we need to
+ // calculate the intersection with this instance of the command store
and the actual command if it is not empty we
+ // will need to execute it
+ TokenRange commandRange =
TxnNamedRead.boundsAsAccordRange(command.dataRange().keyRange(),
command.metadata().id);
+ List<AsyncChain<Data>> chains = new ArrayList<>(execute.size());
+ for (Range r : (AbstractRanges)execute)
+ {
+ Range intersection = commandRange.intersection(r);
+ if (intersection == null)
+ continue;
+ ReadCommand submit =
TxnNamedRead.commandForSubrange((PartitionRangeReadCommand) command,
intersection, txnRead.cassandraConsistencyLevel(), nowInSeconds);
+ TokenKey routingKey = ((TokenRange)r).start();
+ chains.add(AsyncChains.ofCallable(Stage.READ.executor(), () -> new
LocalReadData(routingKey, ReadCommandVerbHandler.instance.doRead(submit,
false), command)));
}
if (chains.isEmpty())
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
index 81f652351f..be71dc0953 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropReadRepair.java
@@ -146,7 +146,7 @@ public class AccordInteropReadRepair extends ReadData
}
@Override
- protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp
executeAt, PartialTxn txn, Ranges unavailable)
+ protected AsyncChain<Data> beginRead(SafeCommandStore safeStore, Timestamp
executeAt, PartialTxn txn, Participants<?> execute)
{
// TODO (required): subtract unavailable ranges, either from read or
from response (or on coordinator)
return AsyncChains.ofCallable(Verb.READ_REPAIR_REQ.stage.executor(),
() -> {
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
index bf349b988b..23d7dd33f1 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java
@@ -328,6 +328,7 @@ public class CommandSerializers
static final int OWNS_EQUALS_TOUCHES = 0x8;
static final int EXECUTES_IS_NULL = 0x10;
static final int EXECUTES_IS_OWNS = 0x20;
+ static final int WAITSON_IS_OWNS = 0x40;
@Override
public void serialize(StoreParticipants t, DataOutputPlus out, int
version) throws IOException
@@ -338,18 +339,21 @@ public class CommandSerializers
boolean ownsEqualsTouches = t.owns() == t.touches();
boolean executesIsNull = t.executes() == null;
boolean executesIsOwns = !executesIsNull && t.executes() ==
t.owns();
+ boolean waitsOnIsOwns = !executesIsNull && t.waitsOn() == t.owns();
out.writeByte((hasRoute ? HAS_ROUTE : 0)
| (hasTouchedEqualsRoute ? HAS_TOUCHED_EQUALS_ROUTE
: 0)
| (touchesEqualsHasTouched ?
TOUCHES_EQUALS_HAS_TOUCHED : 0)
| (ownsEqualsTouches ? OWNS_EQUALS_TOUCHES : 0)
| (executesIsNull ? EXECUTES_IS_NULL : 0)
| (executesIsOwns ? EXECUTES_IS_OWNS : 0)
+ | (waitsOnIsOwns ? WAITSON_IS_OWNS : 0)
);
if (hasRoute) KeySerializers.route.serialize(t.route(), out,
version);
if (!hasTouchedEqualsRoute)
KeySerializers.participants.serialize(t.hasTouched(), out, version);
if (!touchesEqualsHasTouched)
KeySerializers.participants.serialize(t.touches(), out, version);
if (!ownsEqualsTouches)
KeySerializers.participants.serialize(t.owns(), out, version);
if (!executesIsNull && !executesIsOwns)
KeySerializers.participants.serialize(t.executes(), out, version);
+ if (!executesIsNull && !waitsOnIsOwns)
KeySerializers.participants.serialize(t.waitsOn(), out, version);
}
public void skip(DataInputPlus in, int version) throws IOException
@@ -360,6 +364,7 @@ public class CommandSerializers
if (0 == (flags & TOUCHES_EQUALS_HAS_TOUCHED))
KeySerializers.participants.skip(in, version);
if (0 == (flags & OWNS_EQUALS_TOUCHES))
KeySerializers.participants.skip(in, version);
if (0 == (flags & (EXECUTES_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in, version);
+ if (0 == (flags & (WAITSON_IS_OWNS | EXECUTES_IS_NULL)))
KeySerializers.participants.skip(in, version);
}
@Override
@@ -371,16 +376,8 @@ public class CommandSerializers
Participants<?> touches = 0 != (flags &
TOUCHES_EQUALS_HAS_TOUCHED) ? hasTouched :
KeySerializers.participants.deserialize(in, version);
Participants<?> owns = 0 != (flags & OWNS_EQUALS_TOUCHES) ?
touches : KeySerializers.participants.deserialize(in, version);
Participants<?> executes = 0 != (flags & EXECUTES_IS_NULL) ? null
: 0 != (flags & EXECUTES_IS_OWNS) ? owns :
KeySerializers.participants.deserialize(in, version);
- return StoreParticipants.create(route, owns, executes, touches,
hasTouched);
- }
-
- public Route<?> deserializeRouteOnly(DataInputPlus in, int version)
throws IOException
- {
- int flags = in.readByte();
- if (0 == (flags & HAS_ROUTE))
- return null;
-
- return KeySerializers.route.deserialize(in, version);
+ Participants<?> waitsOn = 0 != (flags & EXECUTES_IS_NULL) ? null :
0 != (flags & WAITSON_IS_OWNS) ? owns :
KeySerializers.participants.deserialize(in, version);
+ return StoreParticipants.create(route, owns, executes, waitsOn,
touches, hasTouched);
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
index 9fa9333def..e2a9f0dfa6 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java
@@ -41,6 +41,10 @@ import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.CollectionSerializers;
import org.apache.cassandra.utils.NullableSerializer;
+import static
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer.deserializeNullable;
+import static
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer.serializeNullable;
+import static
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer.serializedNullableSize;
+
public class CommandStoreSerializers
{
private CommandStoreSerializers() {}
@@ -132,63 +136,74 @@ public class CommandStoreSerializers
}
}), DurableBefore.Entry[]::new, DurableBefore.SerializerSupport::create);
- public static final IVersionedSerializer<RedundantBefore.Entry>
redundantBeforeEntry = new IVersionedSerializer<>()
+ public static final IVersionedSerializer<RedundantBefore.Bounds>
redundantBeforeEntry = new IVersionedSerializer<>()
{
@Override
- public void serialize(RedundantBefore.Entry t, DataOutputPlus out, int
version) throws IOException
+ public void serialize(RedundantBefore.Bounds b, DataOutputPlus out,
int version) throws IOException
{
- KeySerializers.range.serialize(t.range, out, version);
- Invariants.require(t.startOwnershipEpoch <= t.endOwnershipEpoch);
- out.writeUnsignedVInt(t.startOwnershipEpoch);
- if (t.endOwnershipEpoch == Long.MAX_VALUE)
out.writeUnsignedVInt(0L);
- else out.writeUnsignedVInt(1 + t.endOwnershipEpoch -
t.startOwnershipEpoch);
- CommandSerializers.txnId.serialize(t.locallyWitnessedBefore, out,
version);
- CommandSerializers.txnId.serialize(t.locallyAppliedBefore, out,
version);
-
CommandSerializers.txnId.serialize(t.locallyDecidedAndAppliedBefore, out,
version);
- CommandSerializers.txnId.serialize(t.shardOnlyAppliedBefore, out,
version);
- CommandSerializers.txnId.serialize(t.shardAppliedBefore, out,
version);
- CommandSerializers.txnId.serialize(t.gcBefore, out, version);
- CommandSerializers.txnId.serialize(t.bootstrappedAt, out, version);
-
CommandSerializers.nullableTimestamp.serialize(t.staleUntilAtLeast, out,
version);
+ KeySerializers.range.serialize(b.range, out, version);
+ Invariants.require(b.startEpoch <= b.endEpoch);
+ out.writeUnsignedVInt(b.startEpoch);
+ if (b.endEpoch == Long.MAX_VALUE) out.writeUnsignedVInt(0L);
+ else out.writeUnsignedVInt(1 + b.endEpoch - b.startEpoch);
+ serializeNullable(b.staleUntilAtLeast, out);
+ out.writeUnsignedVInt32(b.bounds.length);
+ for (TxnId bound : b.bounds)
+ {
+ CommandSerializers.txnId.serialize(bound, out, version);
+ }
+ int prev = 0;
+ for (int status : b.statuses)
+ {
+ out.writeUnsignedVInt32(status ^ prev);
+ prev = status;
+ }
}
@Override
- public RedundantBefore.Entry deserialize(DataInputPlus in, int
version) throws IOException
+ public RedundantBefore.Bounds deserialize(DataInputPlus in, int
version) throws IOException
{
Range range = KeySerializers.range.deserialize(in, version);
long startEpoch = in.readUnsignedVInt();
long endEpoch = in.readUnsignedVInt();
if (endEpoch == 0) endEpoch = Long.MAX_VALUE;
else endEpoch = endEpoch - 1 + startEpoch;
- TxnId locallyWitnessedOrInvalidatedBefore =
CommandSerializers.txnId.deserialize(in, version);
- TxnId locallyAppliedOrInvalidatedBefore =
CommandSerializers.txnId.deserialize(in, version);
- TxnId locallyDecidedAndAppliedOrInvalidatedBefore =
CommandSerializers.txnId.deserialize(in, version);
- TxnId shardOnlyAppliedOrInvalidatedBefore =
CommandSerializers.txnId.deserialize(in, version);
- TxnId shardAppliedOrInvalidatedBefore =
CommandSerializers.txnId.deserialize(in, version);
- TxnId gcBefore = CommandSerializers.txnId.deserialize(in, version);
- TxnId bootstrappedAt = CommandSerializers.txnId.deserialize(in,
version);
- Timestamp staleUntilAtLeast =
CommandSerializers.nullableTimestamp.deserialize(in, version);
- return new RedundantBefore.Entry(range, startEpoch, endEpoch,
locallyWitnessedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore,
locallyDecidedAndAppliedOrInvalidatedBefore,
shardOnlyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore,
bootstrappedAt, staleUntilAtLeast);
+ Timestamp staleUntilAtLeast = deserializeNullable(in);
+ int count = in.readUnsignedVInt32();
+
+ TxnId[] bounds = new TxnId[count];
+ for (int i = 0 ; i < bounds.length ; ++i)
+ bounds[i] = CommandSerializers.txnId.deserialize(in);
+ int[] statuses = new int[count * 2];
+ int prev = 0;
+ for (int i = 0 ; i < statuses.length ; ++i)
+ statuses[i] = prev = in.readUnsignedVInt32() ^ prev;
+
+ return new RedundantBefore.Bounds(range, startEpoch, endEpoch,
bounds, statuses, staleUntilAtLeast);
}
@Override
- public long serializedSize(RedundantBefore.Entry t, int version)
+ public long serializedSize(RedundantBefore.Bounds b, int version)
{
- long size = KeySerializers.range.serializedSize(t.range, version);
- size += TypeSizes.sizeofUnsignedVInt(t.startOwnershipEpoch);
- size += TypeSizes.sizeofUnsignedVInt(t.endOwnershipEpoch ==
Long.MAX_VALUE ? 0 : 1 + t.endOwnershipEpoch - t.startOwnershipEpoch);
- size +=
CommandSerializers.txnId.serializedSize(t.locallyWitnessedBefore, version);
- size +=
CommandSerializers.txnId.serializedSize(t.locallyAppliedBefore, version);
- size +=
CommandSerializers.txnId.serializedSize(t.locallyDecidedAndAppliedBefore,
version);
- size +=
CommandSerializers.txnId.serializedSize(t.shardOnlyAppliedBefore, version);
- size +=
CommandSerializers.txnId.serializedSize(t.shardAppliedBefore, version);
- size += CommandSerializers.txnId.serializedSize(t.gcBefore,
version);
- size += CommandSerializers.txnId.serializedSize(t.bootstrappedAt,
version);
- size +=
CommandSerializers.nullableTimestamp.serializedSize(t.staleUntilAtLeast,
version);
+ long size = KeySerializers.range.serializedSize(b.range, version);
+ size += TypeSizes.sizeofUnsignedVInt(b.startEpoch);
+ size += TypeSizes.sizeofUnsignedVInt(b.endEpoch == Long.MAX_VALUE
? 0 : 1 + b.endEpoch - b.startEpoch);
+ size += serializedNullableSize(b.staleUntilAtLeast);
+ size += TypeSizes.sizeofUnsignedVInt(b.bounds.length);
+ for (TxnId bound : b.bounds)
+ {
+ size += CommandSerializers.txnId.serializedSize(bound,
version);
+ }
+ int prev = 0;
+ for (int status : b.statuses)
+ {
+ size += TypeSizes.sizeofUnsignedVInt(status ^ prev);
+ prev = status;
+ }
return size;
}
};
- public static IVersionedSerializer<RedundantBefore> redundantBefore = new
ReducingRangeMapSerializer<>(NullableSerializer.wrap(redundantBeforeEntry),
RedundantBefore.Entry[]::new, RedundantBefore.SerializerSupport::create);
+ public static IVersionedSerializer<RedundantBefore> redundantBefore = new
ReducingRangeMapSerializer<>(NullableSerializer.wrap(redundantBeforeEntry),
RedundantBefore.Bounds[]::new, RedundantBefore.SerializerSupport::create);
private static class TimestampToRangesSerializer<T extends Timestamp>
implements IVersionedSerializer<NavigableMap<T, Ranges>>
{
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
index 2e91542f1a..fca97c5d45 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
@@ -26,6 +26,7 @@ import accord.primitives.Range;
import accord.topology.Shard;
import accord.topology.Topology;
import accord.utils.SortedArrays.SortedArrayList;
+import accord.utils.TinyEnumSet;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.marshal.ValueAccessor;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -124,6 +125,8 @@ public class TopologySerializers
public static class ShardSerializer implements IVersionedSerializer<Shard>
{
+ private static final int PENDING_REMOVAL = 1;
+ private static final int MUST_WITNESS = 2;
protected IVersionedSerializer<Range> range;
public ShardSerializer(IVersionedSerializer<Range> range)
@@ -138,7 +141,7 @@ public class TopologySerializers
CollectionSerializers.serializeList(shard.nodes, out, version,
nodeId);
CollectionSerializers.serializeList(shard.notInFastPath, out,
version, nodeId);
CollectionSerializers.serializeList(shard.joining, out, version,
nodeId);
- out.writeBoolean(shard.pendingRemoval);
+ out.writeUnsignedVInt32(shard.flags().bitset());
}
@Override
@@ -148,8 +151,8 @@ public class TopologySerializers
SortedArrayList<Node.Id> nodes =
CollectionSerializers.deserializeSortedArrayList(in, version, nodeId,
Node.Id[]::new);
SortedArrayList<Node.Id> notInFastPath =
CollectionSerializers.deserializeSortedArrayList(in, version, nodeId,
Node.Id[]::new);
SortedArrayList<Node.Id> joining =
CollectionSerializers.deserializeSortedArrayList(in, version, nodeId,
Node.Id[]::new);
- boolean pendingRemoval = in.readBoolean();
- return Shard.SerializerSupport.create(range, nodes, notInFastPath,
joining, pendingRemoval);
+ int flags = in.readUnsignedVInt32();
+ return Shard.SerializerSupport.create(range, nodes, notInFastPath,
joining, new TinyEnumSet<>(flags));
}
@Override
@@ -159,7 +162,7 @@ public class TopologySerializers
size += CollectionSerializers.serializedListSize(shard.nodes,
version, nodeId);
size +=
CollectionSerializers.serializedListSize(shard.notInFastPath, version, nodeId);
size += CollectionSerializers.serializedListSize(shard.joining,
version, nodeId);
- size += TypeSizes.BOOL_SIZE;
+ size += TypeSizes.sizeofUnsignedVInt(shard.flags().bitset());
return size;
}
};
diff --git
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
index d4e9343fc9..f3137c8c51 100644
---
a/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
+++
b/test/unit/org/apache/cassandra/db/compaction/CompactionAccordIteratorsTest.java
@@ -43,6 +43,7 @@ import accord.local.Command;
import accord.local.CommandStore;
import accord.local.DurableBefore;
import accord.local.RedundantBefore;
+import accord.local.RedundantStatus;
import accord.local.StoreParticipants;
import accord.local.cfk.CommandsForKey;
import accord.local.cfk.Serialize;
@@ -92,6 +93,7 @@ import static accord.local.KeyHistory.SYNC;
import static accord.local.PreLoadContext.contextFor;
import static accord.primitives.Routable.Domain.Range;
import static accord.primitives.Timestamp.Flag.HLC_BOUND;
+import static accord.primitives.Timestamp.Flag.SHARD_BOUND;
import static accord.utils.async.AsyncChains.getUninterruptibly;
import static org.apache.cassandra.Util.spinAssertEquals;
import static
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
@@ -227,8 +229,8 @@ public class CompactionAccordIteratorsTest
private static RedundantBefore redundantBefore(TxnId txnId)
{
Ranges ranges = AccordTestUtils.fullRange(AccordTestUtils.keys(table,
42));
- txnId = txnId.as(Kind.ExclusiveSyncPoint, Range);
- return RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE,
txnId, txnId, txnId, txnId, LT_TXN_ID.as(Range));
+ txnId = txnId.as(Kind.ExclusiveSyncPoint, Range).addFlag(SHARD_BOUND);
+ return RedundantBefore.create(ranges, Long.MIN_VALUE, Long.MAX_VALUE,
txnId, RedundantStatus.GC_BEFORE_AND_LOCALLY_APPLIED, LT_TXN_ID.as(Range));
}
enum DurableBeforeType
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 877ba587c4..ea03f381c0 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -95,7 +95,7 @@ public class AccordDebugKeyspaceTest extends CQLTester
AsyncChains.getBlocking(accord.node().coordinate(id, txn));
spinUntilSuccess(() -> assertRows(execute(QUERY_TXN_BLOCKED_BY,
id.toString()),
- row(id.toString(), anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null,
anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(),
SaveStatus.Applied.name()))));
+ row(id.toString(), KEYSPACE,
tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null,
anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(),
SaveStatus.Applied.name()))));
}
@Test
@@ -120,11 +120,11 @@ public class AccordDebugKeyspaceTest extends CQLTester
filter.preAccept.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
- row(id.toString(), anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null,
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+ row(id.toString(), KEYSPACE, tableName, anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null,
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
filter.apply.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
- row(id.toString(), anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null,
SaveStatus.ReadyToExecute.name()));
+ row(id.toString(), KEYSPACE, tableName, anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null,
SaveStatus.ReadyToExecute.name()));
}
finally
{
@@ -154,11 +154,11 @@ public class AccordDebugKeyspaceTest extends CQLTester
filter.preAccept.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
- row(first.toString(), anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null,
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+ row(first.toString(), KEYSPACE, tableName, anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null,
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
filter.apply.awaitThrowUncheckedOnInterrupt();
assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
- row(first.toString(), anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null,
SaveStatus.ReadyToExecute.name()));
+ row(first.toString(), KEYSPACE, tableName, anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null,
SaveStatus.ReadyToExecute.name()));
filter.reset();
@@ -173,8 +173,8 @@ public class AccordDebugKeyspaceTest extends CQLTester
return rs.size() == 2;
});
assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()),
- row(second.toString(), anyInt(), 0,
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null,
SaveStatus.Stable.name()),
- row(second.toString(), anyInt(), 1, first.toString(),
"Key", anyNonNull(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
+ row(second.toString(), KEYSPACE, tableName, anyInt(),
0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null,
SaveStatus.Stable.name()),
+ row(second.toString(), KEYSPACE, tableName, anyInt(),
1, first.toString(), "Key", anyNonNull(), anyNonNull(),
SaveStatus.ReadyToExecute.name()));
}
finally
{
diff --git a/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java
b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java
index fa3ca0fa61..1fefa1d382 100644
--- a/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java
+++ b/test/unit/org/apache/cassandra/dht/RandomPartitionerTest.java
@@ -25,8 +25,6 @@ import org.junit.Test;
import org.apache.cassandra.cql3.functions.types.utils.Bytes;
import org.apache.cassandra.harry.checker.TestHelper;
-import org.apache.cassandra.harry.gen.EntropySource;
-import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
public class RandomPartitionerTest extends PartitionerTestCase
{
diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
index 118ecbb6cd..2af230b466 100644
--- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
+++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
@@ -38,7 +38,9 @@ import org.junit.Test;
import accord.api.Journal;
import accord.api.RoutingKey;
+import accord.local.CommandStore;
import accord.local.CommandStores;
+import accord.local.CommandStores.RangesForEpoch;
import accord.local.DurableBefore;
import accord.local.Node;
import accord.local.RedundantBefore;
@@ -48,7 +50,6 @@ import accord.primitives.Ballot;
import accord.primitives.Deps;
import accord.primitives.FullKeyRoute;
import accord.primitives.PartialDeps;
-import accord.primitives.Participants;
import accord.primitives.Range;
import accord.primitives.Ranges;
import accord.primitives.Routable.Domain;
@@ -57,6 +58,8 @@ import accord.primitives.SaveStatus;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
+import accord.topology.Shard;
+import accord.topology.Topology;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.Property.Command;
@@ -80,6 +83,7 @@ import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordJournal;
import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordService;
@@ -102,6 +106,8 @@ import org.mockito.Mockito;
import static accord.utils.Property.commands;
import static accord.utils.Property.stateful;
+import static accord.utils.SortedArrays.SortedArrayList.ofSorted;
+import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
import static org.apache.cassandra.schema.SchemaConstants.ACCORD_KEYSPACE_NAME;
public class RouteIndexTest extends CQLTester.InMemory
@@ -449,6 +455,8 @@ public class RouteIndexTest extends CQLTester.InMemory
{
private final Int2ObjectHashMap<Map<TableId,
Long2ObjectHashMap<List<TxnId>>>> storeToTableToRoutingKeysToTxns = new
Int2ObjectHashMap<>();
private final Int2ObjectHashMap<Map<TableId, RangeTree<TokenKey,
TokenRange, TxnId>>> storeToTableToRangesToTxns = new Int2ObjectHashMap<>();
+ private final Int2ObjectHashMap<RangesForEpoch> storeRangesForEpochs =
new Int2ObjectHashMap<>();
+ private final RedundantBefore emptyRedundantBefore =
RedundantBefore.create(Ranges.of(TokenRange.fullRange(tableId,
getPartitioner())), TxnId.NONE, RedundantStatus.NONE);
private final int numStores;
private final List<TableId> tables;
@@ -468,8 +476,10 @@ public class RouteIndexTest extends CQLTester.InMemory
tokenGen = TOKEN_DISTRIBUTION.next(rs);
rangeGen = rangeGen(rs, tables);
domainGen = DOMAIN_DISTRIBUTION.next(rs);
+ journalTable =
Keyspace.open(ACCORD_KEYSPACE_NAME).getColumnFamilyStore(AccordKeyspace.JOURNAL);
- this.journalTable =
Keyspace.open(ACCORD_KEYSPACE_NAME).getColumnFamilyStore(AccordKeyspace.JOURNAL);
+ for (int i = 0 ; i < numStores ; ++i)
+ storeRangesForEpochs.put(i, new RangesForEpoch(1,
Ranges.of(TokenRange.fullRange(tableId, getPartitioner()))));
accordService = startAccord();
accordService.configurationService().listener.notifyPostCommit(null,
ClusterMetadata.current(), false);
@@ -480,11 +490,13 @@ public class RouteIndexTest extends CQLTester.InMemory
{
NodeId tcmNodeId = ClusterMetadata.current().myNodeId();
AccordService as = new
AccordService(AccordTopology.tcmIdToAccord(tcmNodeId));
- as.startup();
-
+ Topology topology = new Topology(1,
Shard.create(TokenRange.fullRange(tableId, getPartitioner()), ofSorted(new
Node.Id(1)), ofSorted(new Node.Id(1))));
+ as.unsafeStartupWithOverrides(new
Journal.TopologyUpdate(storeRangesForEpochs, topology, topology));
+ for (CommandStore commandStore : as.node().commandStores().all())
+
((AccordCommandStore)commandStore).unsafeUpsertRedundantBefore(emptyRedundantBefore);
// the reason for the mocking is to speed up compaction.
Collecting the info from the stores has been slow and its always empty in this
test... so stub it out to speed up the test
AccordService mock = Mockito.spy(as);
-
Mockito.doReturn(emptyCompactionInfo(tableId)).when(mock).getCompactionInfo();
+ Mockito.doReturn(emptyCompactionInfo(tableId,
emptyRedundantBefore, storeRangesForEpochs)).when(mock).getCompactionInfo();
AccordService.unsafeSetNewAccordService(mock);
AccordService.replayJournal(as);
@@ -628,14 +640,11 @@ public class RouteIndexTest extends CQLTester.InMemory
}
};
- private static IAccordService.AccordCompactionInfos
emptyCompactionInfo(TableId tableId)
+ private static IAccordService.AccordCompactionInfos
emptyCompactionInfo(TableId tableId, RedundantBefore redundantBefore,
Int2ObjectHashMap<RangesForEpoch> storeRangesForEpoch)
{
IAccordService.AccordCompactionInfos compactionInfos = new
IAccordService.AccordCompactionInfos(DurableBefore.EMPTY);
- RedundantBefore redundantBefore = Mockito.spy(RedundantBefore.EMPTY);
-
Mockito.doReturn(RedundantStatus.NONE).when(redundantBefore).status(Mockito.any(),
Mockito.any(), (Participants<?>) Mockito.any());
-
Mockito.doReturn(RedundantStatus.NONE).when(redundantBefore).status(Mockito.any(),
Mockito.any(), (RoutingKey) Mockito.any());
- for (int i = 0; i < MAX_STORES; i++)
- compactionInfos.put(i, new AccordCompactionInfo(i,
redundantBefore, new CommandStores.RangesForEpoch(1, Ranges.EMPTY), tableId));
+ for (int i = 0; i < storeRangesForEpoch.size(); i++)
+ compactionInfos.put(i, new AccordCompactionInfo(i,
redundantBefore, storeRangesForEpoch.get(i), tableId));
return compactionInfos;
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
index f1fa4ac57c..14e162955d 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java
@@ -88,7 +88,7 @@ public class AccordJournalOrderTest
JournalKey key = new JournalKey(txnId,
JournalKey.Type.COMMAND_DIFF, randomSource.nextInt(5));
res.compute(key, (k, prev) -> prev == null ? 1 : prev + 1);
Participants<?> participants = RoutingKeys.of(new
TokenKey(TableId.generate(), new ByteOrderedPartitioner.BytesToken(new
byte[1])));
- Command command = Command.NotDefined.notDefined(txnId,
SaveStatus.NotDefined, Status.Durability.NotDurable,
StoreParticipants.create(null, participants, null, participants, participants),
Ballot.ZERO);
+ Command command = Command.NotDefined.notDefined(txnId,
SaveStatus.NotDefined, Status.Durability.NotDurable,
StoreParticipants.create(null, participants, null, null, participants,
participants), Ballot.ZERO);
accordJournal.saveCommand(key.commandStoreId,
new Journal.CommandUpdate(null, command),
() -> {});
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
index bbdaf617e8..1385edc1e7 100644
---
a/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/AccordSyncPropagatorTest.java
@@ -106,7 +106,7 @@ public class AccordSyncPropagatorTest
RandomDelayQueue delayQueue = new
RandomDelayQueue.Factory(rs).get();
PendingQueue queue = new MonitoredPendingQueue(failures,
delayQueue);
Agent agent = new TestAgent.RethrowAgent();
- SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, agent);
+ SimulatedDelayedExecutorService globalExecutor = new
SimulatedDelayedExecutorService(queue, agent, null);
ScheduledExecutorPlus scheduler = new
AdaptingScheduledExecutorPlus(globalExecutor);
Cluster cluster = new Cluster(nodes, rs, scheduler);
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandStoreSerializersTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandStoreSerializersTest.java
index 738de4809e..35b0cee6a5 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandStoreSerializersTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandStoreSerializersTest.java
@@ -50,7 +50,7 @@ public class CommandStoreSerializersTest
DataOutputBuffer buffer = new DataOutputBuffer();
qt().forAll(Gens.random(), AccordGenerators.partitioner()).check((rs,
partitioner) -> {
DatabaseDescriptor.setPartitionerUnsafe(partitioner);
- RedundantBefore.Entry entry =
AccordGenerators.redundantBeforeEntry(partitioner).next(rs);
+ RedundantBefore.Bounds entry =
AccordGenerators.redundantBeforeEntry(partitioner).next(rs);
for (Version version : SUPPORTED_VERSIONS)
IVersionedSerializers.testSerde(buffer,
CommandStoreSerializers.redundantBeforeEntry, entry, version.value);
});
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index 6c5638e42b..19a922450a 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.function.BiFunction;
-import java.util.stream.Stream;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
@@ -37,6 +36,7 @@ import accord.local.Command.Truncated;
import accord.local.ICommand;
import accord.local.DurableBefore;
import accord.local.RedundantBefore;
+import accord.local.RedundantBefore.Bounds;
import accord.local.StoreParticipants;
import accord.primitives.Ballot;
import accord.primitives.Deps;
@@ -75,7 +75,14 @@ import org.apache.cassandra.service.accord.txn.TxnWrite;
import org.quicktheories.impl.JavaRandom;
import static accord.local.CommandStores.RangesForEpoch;
+import static accord.local.RedundantStatus.LOCALLY_APPLIED_ONLY;
+import static accord.local.RedundantStatus.LOCALLY_WITNESSED_ONLY;
+import static accord.local.RedundantStatus.Property.GC_BEFORE;
+import static accord.local.RedundantStatus.Property.PRE_BOOTSTRAP;
+import static accord.local.RedundantStatus.SHARD_ONLY_APPLIED_ONLY;
+import static accord.local.RedundantStatus.oneSlow;
import static accord.primitives.Status.Durability.NotDurable;
+import static accord.primitives.Timestamp.Flag.SHARD_BOUND;
import static accord.primitives.Txn.Kind.Write;
import static org.apache.cassandra.service.accord.AccordTestUtils.TABLE_ID1;
import static
org.apache.cassandra.service.accord.AccordTestUtils.createPartialTxn;
@@ -442,28 +449,47 @@ public class AccordGenerators
return AccordGens.deps(keyDepsGen(partitioner),
rangeDepsGen(partitioner), directKeyDepsGen(partitioner));
}
- public static Gen<RedundantBefore.Entry> redundantBeforeEntry(IPartitioner
partitioner)
+ public static Gen<Bounds> redundantBeforeEntry(IPartitioner partitioner)
{
return redundantBeforeEntry(Gens.bools().all(), range(partitioner),
AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore ->
Routable.Domain.Range));
}
- public static Gen<RedundantBefore.Entry> redundantBeforeEntry(Gen<Boolean>
emptyGen, Gen<Range> rangeGen, Gen<TxnId> txnIdGen)
+ public static Gen<Bounds> redundantBeforeEntry(Gen<Boolean> emptyGen,
Gen<Range> rangeGen, Gen<TxnId> txnIdGen)
{
return rs -> {
Range range = rangeGen.next(rs);
- TxnId locallyWitnessedOrInvalidatedBefore = emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
- TxnId locallyAppliedOrInvalidatedBefore =
TxnId.nonNullOrMin(locallyWitnessedOrInvalidatedBefore, emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs)); // emptyable or range
- TxnId locallyDecidedAndAppliedOrInvalidatedBefore =
TxnId.nonNullOrMin(locallyAppliedOrInvalidatedBefore, emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs)); // emptyable or range
- TxnId shardOnlyAppliedOrInvalidatedBefore = emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs); // emptyable or range
- TxnId shardAppliedOrInvalidatedBefore =
TxnId.nonNullOrMin(locallyAppliedOrInvalidatedBefore,
TxnId.nonNullOrMin(shardOnlyAppliedOrInvalidatedBefore, emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs))); // emptyable or range
- TxnId gcBefore =
TxnId.nonNullOrMin(shardAppliedOrInvalidatedBefore, emptyGen.next(rs) ?
TxnId.NONE : txnIdGen.next(rs)); // emptyable or range
- TxnId bootstrappedAt = txnIdGen.next(rs);
- Timestamp staleUntilAtLeast = emptyGen.next(rs) ? null :
txnIdGen.next(rs); // nullable
-
- long maxEpoch = Stream.of(locallyAppliedOrInvalidatedBefore,
shardAppliedOrInvalidatedBefore, bootstrappedAt, staleUntilAtLeast).filter(t ->
t != null).mapToLong(Timestamp::epoch).max().getAsLong();
- long startEpoch = rs.nextLong(maxEpoch);
- long endEpoch = emptyGen.next(rs) ? Long.MAX_VALUE : 1 +
rs.nextLong(startEpoch, Long.MAX_VALUE);
- return new RedundantBefore.Entry(range, startEpoch, endEpoch,
locallyWitnessedOrInvalidatedBefore, locallyAppliedOrInvalidatedBefore,
locallyDecidedAndAppliedOrInvalidatedBefore,
shardOnlyAppliedOrInvalidatedBefore, shardAppliedOrInvalidatedBefore, gcBefore,
bootstrappedAt, staleUntilAtLeast);
+
+ List<Bounds> bounds = new ArrayList<>();
+ if (rs.nextBoolean())
+ bounds.add(Bounds.create(range, txnIdGen.next(rs),
LOCALLY_WITNESSED_ONLY, null ));
+ if (rs.nextBoolean())
+ bounds.add(Bounds.create(range, txnIdGen.next(rs),
LOCALLY_APPLIED_ONLY, null ));
+ if (rs.nextBoolean())
+ bounds.add(Bounds.create(range, txnIdGen.next(rs),
SHARD_ONLY_APPLIED_ONLY, null ));
+ if (rs.nextBoolean())
+ bounds.add(Bounds.create(range,
txnIdGen.next(rs).addFlag(SHARD_BOUND), oneSlow(GC_BEFORE), null ));
+ if (rs.nextBoolean())
+ bounds.add(Bounds.create(range, txnIdGen.next(rs),
oneSlow(PRE_BOOTSTRAP), null ));
+ if (rs.nextBoolean())
+ bounds.add(new Bounds(range, Long.MIN_VALUE, Long.MAX_VALUE,
new TxnId[0], new int[0], txnIdGen.next(rs)));
+
+ Collections.shuffle(bounds);
+ long endEpoch = emptyGen.next(rs) ? Long.MAX_VALUE :
rs.nextLong(0, Long.MAX_VALUE);
+ long minEpoch = Long.MAX_VALUE;
+ Bounds result = null;
+ for (Bounds b : bounds)
+ {
+ if (b.bounds.length > 0)
+ minEpoch = Math.min(minEpoch, b.bounds[0].epoch());
+ if (result == null) result = b;
+ else result = Bounds.reduce(result, b);
+ }
+
+ long startEpoch = rs.nextLong(Math.min(minEpoch, endEpoch));
+ Bounds epochBounds = new Bounds(range, startEpoch, endEpoch, new
TxnId[0], new int[0], null);
+ if (result == null)
+ return epochBounds;
+ return Bounds.reduce(result, epochBounds);
};
}
@@ -471,7 +497,7 @@ public class AccordGenerators
{
Gen<Ranges> rangeGen = rangesArbitrary(partitioner);
Gen<TxnId> txnIdGen =
AccordGens.txnIds(Gens.pick(Txn.Kind.ExclusiveSyncPoint), ignore ->
Routable.Domain.Range);
- BiFunction<RandomSource, Range, RedundantBefore.Entry> entryGen = (rs,
range) -> redundantBeforeEntry(Gens.bools().all(), i -> range,
txnIdGen).next(rs);
+ BiFunction<RandomSource, Range, Bounds> entryGen = (rs, range) ->
redundantBeforeEntry(Gens.bools().all(), i -> range, txnIdGen).next(rs);
return AccordGens.redundantBefore(rangeGen, entryGen);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]