This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new b8057f0921 Add a table to inspect the current state of a txn
b8057f0921 is described below
commit b8057f092130e866ca4f34fe9770c003581a36d9
Author: David Capwell <[email protected]>
AuthorDate: Fri Aug 16 09:53:00 2024 -0700
Add a table to inspect the current state of a txn
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-19838
---
modules/accord | 2 +-
.../cassandra/db/virtual/AccordVirtualTables.java | 118 ++++++++++-
.../cassandra/db/virtual/VirtualKeyspace.java | 12 +-
.../apache/cassandra/db/virtual/VirtualTable.java | 8 +
.../cassandra/service/accord/AccordService.java | 216 +++++++++++++++++---
.../accord/CommandStoreTxnBlockedGraph.java | 127 ++++++++++++
.../cassandra/service/accord/IAccordService.java | 3 +
.../accord/exceptions/ReadExhaustedException.java | 13 +-
test/unit/org/apache/cassandra/cql3/CQLTester.java | 196 +++++++++++++++++-
.../db/virtual/AccordVirtualTablesTest.java | 227 +++++++++++++++++++++
.../service/accord/AccordServiceTest.java | 19 +-
.../cassandra/service/accord/AccordTestUtils.java | 5 +
12 files changed, 882 insertions(+), 64 deletions(-)
diff --git a/modules/accord b/modules/accord
index 81c02769f9..e2ccee4f51 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 81c02769f9ad73ef3aba0675c2217fc74b8a4a4c
+Subproject commit e2ccee4f51fe4c7c7f3ea8911897135ed7e37114
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
index 1b2e041c16..ac89e45133 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
@@ -18,29 +18,39 @@
package org.apache.cassandra.db.virtual;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import accord.local.CommandStores;
+import accord.local.Status;
import accord.primitives.TxnId;
+import accord.utils.Invariants;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.FieldIdentifier;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -51,12 +61,15 @@ import
org.apache.cassandra.service.accord.AccordCommandStore;
import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.AccordStateCache;
+import org.apache.cassandra.service.accord.CommandStoreTxnBlockedGraph;
+import org.apache.cassandra.service.accord.api.PartitionKey;
import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
import org.apache.cassandra.service.consensus.migration.TableMigrationState;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.utils.Clock;
import static com.google.common.collect.ImmutableList.toImmutableList;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
public class AccordVirtualTables
{
@@ -70,7 +83,8 @@ public class AccordVirtualTables
return List.of(
new CommandStoreCache(keyspace),
new MigrationState(keyspace),
- new CoordinationStatus(keyspace)
+ new CoordinationStatus(keyspace),
+ new TxnBlockedByTable(keyspace)
);
}
@@ -246,6 +260,108 @@ public class AccordVirtualTables
}
}
+ public static class TxnBlockedByTable extends AbstractVirtualTable
+ {
+ enum Reason { Self, Txn, Key }
+ private final UserType partitionKeyType;
+
+ protected TxnBlockedByTable(String keyspace)
+ {
+ super(TableMetadata.builder(keyspace, "txn_blocked_by")
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .addPartitionKeyColumn("txn_id",
UTF8Type.instance)
+ .addClusteringColumn("store_id",
Int32Type.instance)
+ .addClusteringColumn("depth",
Int32Type.instance)
+ .addClusteringColumn("blocked_by",
UTF8Type.instance)
+ .addClusteringColumn("reason",
UTF8Type.instance)
+ .addRegularColumn("save_status",
UTF8Type.instance)
+ .addRegularColumn("execute_at",
UTF8Type.instance)
+ .addRegularColumn("key", pkType(keyspace))
+ .build());
+ partitionKeyType = pkType(keyspace);
+ }
+
+ private static UserType pkType(String keyspace)
+ {
+ return new UserType(keyspace, bytes("partition_key"),
+
Arrays.asList(FieldIdentifier.forQuoted("table"),
FieldIdentifier.forQuoted("token")),
+ Arrays.asList(UTF8Type.instance,
UTF8Type.instance), false);
+ }
+
+ private ByteBuffer pk(PartitionKey pk)
+ {
+ var tm = Schema.instance.getTableMetadata(pk.table());
+ return
partitionKeyType.pack(UTF8Type.instance.decompose(tm.toString()),
+
UTF8Type.instance.decompose(pk.token().toString()));
+ }
+
+ @Override
+ public Iterable<UserType> userTypes()
+ {
+ return Arrays.asList(partitionKeyType);
+ }
+
+ @Override
+ public DataSet data(DecoratedKey partitionKey)
+ {
+ TxnId id =
TxnId.parse(UTF8Type.instance.compose(partitionKey.getKey()));
+ List<CommandStoreTxnBlockedGraph> shards =
AccordService.instance().debugTxnBlockedGraph(id);
+
+ SimpleDataSet ds = new SimpleDataSet(metadata());
+ for (CommandStoreTxnBlockedGraph shard : shards)
+ {
+ Set<TxnId> processed = new HashSet<>();
+ process(ds, shard, processed, id, 0, id, Reason.Self, null);
+ // everything was processed right?
+ if (!shard.txns.isEmpty() &&
!shard.txns.keySet().containsAll(processed))
+ throw new IllegalStateException("Skipped txns: " +
Sets.difference(shard.txns.keySet(), processed));
+ }
+
+ return ds;
+ }
+
+ private void process(SimpleDataSet ds, CommandStoreTxnBlockedGraph
shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason
reason, Runnable onDone)
+ {
+ if (!processed.add(txnId))
+ throw new IllegalStateException("Double processed " + txnId);
+ CommandStoreTxnBlockedGraph.TxnState txn = shard.txns.get(txnId);
+ if (txn == null)
+ {
+ Invariants.checkState(reason == Reason.Self, "Txn %s unknown
for reason %s", txnId, reason);
+ return;
+ }
+ // was it applied? If so ignore it
+ if (reason != Reason.Self &&
txn.saveStatus.hasBeen(Status.Applied))
+ return;
+ ds.row(userTxn.toString(), shard.storeId, depth, reason ==
Reason.Self ? "" : txn.txnId.toString(), reason.name());
+ ds.column("save_status", txn.saveStatus.name());
+ if (txn.executeAt != null)
+ ds.column("execute_at", txn.executeAt.toString());
+ if (onDone != null)
+ onDone.run();
+ if (txn.isBlocked())
+ {
+ for (TxnId blockedBy : txn.blockedBy)
+ {
+ if (processed.contains(blockedBy)) continue; // already
listed
+ process(ds, shard, processed, userTxn, depth + 1,
blockedBy, Reason.Txn, null);
+ }
+ for (PartitionKey blockedBy : txn.blockedByKey)
+ {
+ TxnId blocking = shard.keys.get(blockedBy);
+ if (processed.contains(blocking)) continue; // already
listed
+ process(ds, shard, processed, userTxn, depth + 1,
blocking, Reason.Key, () -> ds.column("key", pk(blockedBy)));
+ }
+ }
+ }
+
+ @Override
+ public DataSet data()
+ {
+ throw new InvalidRequestException("Must select a single txn_id");
+ }
+ }
+
private static TableMetadata parse(String keyspace, String comment, String
query)
{
return CreateTableStatement.parse(query, keyspace)
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
index 044c11476b..3316aa4fb3 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
@@ -50,7 +50,17 @@ public class VirtualKeyspace
if (!duplicates.isEmpty())
throw new IllegalArgumentException(String.format("Duplicate table
names in virtual keyspace %s: %s", name, duplicates));
- metadata = KeyspaceMetadata.virtual(name,
Tables.of(Iterables.transform(tables, VirtualTable::metadata)));
+ KeyspaceMetadata metadata = KeyspaceMetadata.virtual(name,
Tables.of(Iterables.transform(tables, VirtualTable::metadata)));
+ for (var t : tables)
+ {
+ for (var udt : t.userTypes())
+ {
+ if (metadata.types.getNullable(udt.name) != null)
+ throw new IllegalStateException("UDT " +
udt.getNameAsString() + " already exists");
+ metadata = metadata.withUpdatedUserType(udt);
+ }
+ }
+ this.metadata = metadata;
}
public String name()
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
index 53a9f2ac7f..93047faad8 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
@@ -17,10 +17,13 @@
*/
package org.apache.cassandra.db.virtual;
+import java.util.Collections;
+
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.UserType;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.schema.TableMetadata;
@@ -87,4 +90,9 @@ public interface VirtualTable
{
return true;
}
+
+ default Iterable<UserType> userTypes()
+ {
+ return Collections.emptyList();
+ }
}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index dab05b621e..6a46c24371 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -18,7 +18,9 @@
package org.apache.cassandra.service.accord;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -27,6 +29,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@@ -37,7 +40,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,13 +59,20 @@ import accord.impl.AbstractConfigurationService;
import accord.impl.CoordinateDurabilityScheduling;
import accord.impl.SimpleProgressLog;
import accord.impl.SizeOfIntersectionSorter;
+import accord.local.Command;
+import accord.local.CommandStore;
import accord.local.CommandStores;
import accord.local.DurableBefore;
+import accord.local.KeyHistory;
import accord.local.Node;
import accord.local.Node.Id;
import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
import accord.local.RedundantBefore;
+import accord.local.SaveStatus;
import accord.local.ShardDistributor.EvenSplit;
+import accord.local.Status;
+import accord.local.cfk.CommandsForKey;
import accord.messages.LocalRequest;
import accord.messages.Request;
import accord.primitives.Keys;
@@ -147,10 +156,8 @@ public class AccordService implements IAccordService,
Shutdownable
{
private static final Logger logger =
LoggerFactory.getLogger(AccordService.class);
- private enum State { INIT, STARTED, SHUTDOWN}
+ private enum State {INIT, STARTED, SHUTDOWN}
- public static final AccordClientRequestMetrics readMetrics = new
AccordClientRequestMetrics("AccordRead");
- public static final AccordClientRequestMetrics writeMetrics = new
AccordClientRequestMetrics("AccordWrite");
private static final Future<Void> BOOTSTRAP_SUCCESS =
ImmediateFuture.success(null);
private final Node node;
@@ -262,6 +269,12 @@ public class AccordService implements IAccordService,
Shutdownable
{
return new CompactionInfo(new Int2ObjectHashMap<>(), new
Int2ObjectHashMap<>(), DurableBefore.EMPTY);
}
+
+ @Override
+ public List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId
txnId)
+ {
+ return Collections.emptyList();
+ }
};
private static volatile IAccordService instance = null;
@@ -379,7 +392,7 @@ public class AccordService implements IAccordService,
Shutdownable
return;
journal.start(node);
configService.start();
- ClusterMetadataService.instance().log().addListener(configService);
+
fastPathCoordinator.start();
ClusterMetadataService.instance().log().addListener(fastPathCoordinator);
durabilityScheduling.setGlobalCycleTime(Ints.checkedCast(DatabaseDescriptor.getAccordGlobalDurabilityCycle(SECONDS)),
SECONDS);
@@ -399,7 +412,7 @@ public class AccordService implements IAccordService,
Shutdownable
private <S extends Seekables<?, ?>> Seekables barrier(@Nonnull S
keysOrRanges, long epoch, long queryStartNanos, long timeoutNanos, BarrierType
barrierType, boolean isForWrite, BiFunction<Node, S, AsyncResult<SyncPoint<S>>>
syncPoint)
{
Stopwatch sw = Stopwatch.createStarted();
- keysOrRanges = (S)intersectionWithAccordManagedRanges(keysOrRanges);
+ keysOrRanges = (S) intersectionWithAccordManagedRanges(keysOrRanges);
// It's possible none of them were Accord managed and we aren't going
to treat that as an error
if (keysOrRanges.isEmpty())
{
@@ -408,14 +421,12 @@ public class AccordService implements IAccordService,
Shutdownable
}
AccordClientRequestMetrics metrics = isForWrite ? accordWriteMetrics :
accordReadMetrics;
- TxnId txnId = null;
try
{
logger.debug("Starting barrier key: {} epoch: {} barrierType: {}
isForWrite {}", keysOrRanges, epoch, barrierType, isForWrite);
- txnId = node.nextTxnId(Kind.SyncPoint, keysOrRanges.domain());
AsyncResult<TxnId> asyncResult = syncPoint == null
- ? Barrier.barrier(node,
keysOrRanges, epoch, barrierType)
- : Barrier.barrier(node,
keysOrRanges, epoch, barrierType, syncPoint);
+ ? Barrier.barrier(node,
keysOrRanges, epoch, barrierType)
+ : Barrier.barrier(node,
keysOrRanges, epoch, barrierType, syncPoint);
long deadlineNanos = queryStartNanos + timeoutNanos;
Timestamp barrierExecuteAt = AsyncChains.getBlocking(asyncResult,
deadlineNanos - nanoTime(), NANOSECONDS);
logger.debug("Completed barrier attempt in {}ms, {}ms since
attempts start, barrier key: {} epoch: {} barrierType: {} isForWrite {}",
@@ -429,21 +440,24 @@ public class AccordService implements IAccordService,
Shutdownable
Throwable cause = Throwables.getRootCause(e);
if (cause instanceof Timeout)
{
+ TxnId txnId = ((Timeout) cause).txnId();
metrics.timeouts.mark();
- throw newBarrierTimeout(txnId, barrierType.global);
+ throw newBarrierTimeout(txnId, barrierType, isForWrite,
keysOrRanges);
}
if (cause instanceof Preempted)
{
+ TxnId txnId = ((Preempted) cause).txnId();
//TODO need to improve
// Coordinator "could" query the accord state to see whats
going on but that doesn't exist yet.
// Protocol also doesn't have a way to denote "unknown"
outcome, so using a timeout as the closest match
- throw newBarrierPreempted(txnId, barrierType.global);
+ throw newBarrierPreempted(txnId, barrierType, isForWrite,
keysOrRanges);
}
if (cause instanceof Exhausted)
{
+ TxnId txnId = ((Exhausted) cause).txnId();
// this case happens when a non-timeout exception is seen, and
we are unable to move forward
metrics.failures.mark();
- throw newBarrierExhausted(txnId, barrierType.global);
+ throw newBarrierExhausted(txnId, barrierType, isForWrite,
keysOrRanges);
}
// unknown error
metrics.failures.mark();
@@ -457,7 +471,7 @@ public class AccordService implements IAccordService,
Shutdownable
catch (TimeoutException e)
{
metrics.timeouts.mark();
- throw newBarrierTimeout(txnId, barrierType.global);
+ throw newBarrierTimeout(null, barrierType, isForWrite,
keysOrRanges);
}
finally
{
@@ -484,16 +498,16 @@ public class AccordService implements IAccordService,
Shutdownable
return barrier(keysOrRanges, epoch, queryStartNanos, timeoutNanos,
barrierType, isForWrite, repairSyncPoint(allNodes));
}
- private static <S extends Seekables<?,?>> Seekables
intersectionWithAccordManagedRanges(Seekables<?, ?> keysOrRanges)
+ private static <S extends Seekables<?, ?>> Seekables
intersectionWithAccordManagedRanges(Seekables<?, ?> keysOrRanges)
{
TableId tableId = null;
for (Seekable seekable : keysOrRanges)
{
TableId newTableId;
if (keysOrRanges.domain() == Key)
- newTableId = ((PartitionKey)seekable).table();
+ newTableId = ((PartitionKey) seekable).table();
else if (keysOrRanges.domain() == Range)
- newTableId = ((TokenRange)seekable).table();
+ newTableId = ((TokenRange) seekable).table();
else
throw new IllegalStateException("Unexpected domain " +
keysOrRanges.domain());
@@ -532,22 +546,21 @@ public class AccordService implements IAccordService,
Shutdownable
}
@VisibleForTesting
- static ReadTimeoutException newBarrierTimeout(TxnId txnId, boolean global)
+ static ReadTimeoutException newBarrierTimeout(TxnId txnId, BarrierType
barrierType, boolean isForWrite, Seekables<?, ?> keysOrRanges)
{
- return new ReadTimeoutException(global ? ConsistencyLevel.ANY :
ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString());
+ return new ReadTimeoutException(barrierType.global ?
ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false,
String.format("Timeout waiting on barrier %s / %s / %s; impacted ranges %s",
txnId, barrierType, isForWrite ? "write" : "not write", keysOrRanges));
}
@VisibleForTesting
- static ReadTimeoutException newBarrierPreempted(TxnId txnId, boolean
global)
+ static ReadTimeoutException newBarrierPreempted(TxnId txnId, BarrierType
barrierType, boolean isForWrite, Seekables<?, ?> keysOrRanges)
{
- return new ReadPreemptedException(global ? ConsistencyLevel.ANY :
ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString());
+ return new ReadPreemptedException(barrierType.global ?
ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false,
String.format("Preempted waiting on barrier %s / %s / %s; impacted ranges %s",
txnId, barrierType, isForWrite ? "write" : "not write", keysOrRanges));
}
@VisibleForTesting
- static ReadExhaustedException newBarrierExhausted(TxnId txnId, boolean
global)
+ static ReadExhaustedException newBarrierExhausted(TxnId txnId, BarrierType
barrierType, boolean isForWrite, Seekables<?, ?> keysOrRanges)
{
- //TODO (usability): not being able to show the txn is a bad UX, this
becomes harder to trace back in logs
- return new ReadExhaustedException(global ? ConsistencyLevel.ANY :
ConsistencyLevel.QUORUM, 0, 0, false, ImmutableMap.of());
+ return new ReadExhaustedException(barrierType.global ?
ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false,
String.format("Exhausted (too many failures from peers) waiting on barrier %s /
%s / %s; impacted ranges %s", txnId, barrierType, isForWrite ? "write" : "not
write", keysOrRanges));
}
@VisibleForTesting
@@ -615,9 +628,9 @@ public class AccordService implements IAccordService,
Shutdownable
public Seekables barrierWithRetries(Seekables keysOrRanges, long minEpoch,
BarrierType barrierType, boolean isForWrite) throws InterruptedException
{
return doWithRetries(Blocking.Default.instance, () ->
AccordService.instance().barrier(keysOrRanges, minEpoch,
Clock.Global.nanoTime(),
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType,
isForWrite),
- DatabaseDescriptor.getAccordBarrierRetryAttempts(),
-
DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(),
-
DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis());
+
DatabaseDescriptor.getAccordBarrierRetryAttempts(),
+
DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(),
+
DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis());
}
@Override
@@ -666,12 +679,12 @@ public class AccordService implements IAccordService,
Shutdownable
Throwable cause = failure != null ?
Throwables.getRootCause(failure) : null;
if (success != null)
{
- if (((TxnResult)success).kind() ==
TxnResult.Kind.retry_new_protocol)
+ if (((TxnResult) success).kind() ==
TxnResult.Kind.retry_new_protocol)
{
metrics.retryDifferentSystem.mark();
Tracing.trace("Got retry different system error from
Accord, will retry");
}
- asyncTxnResult.trySuccess((TxnResult)success);
+ asyncTxnResult.trySuccess((TxnResult) success);
return;
}
@@ -725,7 +738,7 @@ public class AccordService implements IAccordService,
Shutdownable
throw (RequestTimeoutException) cause;
}
else if (cause instanceof RuntimeException)
- throw (RuntimeException)cause;
+ throw (RuntimeException) cause;
else
throw new RuntimeException(cause);
}
@@ -754,7 +767,7 @@ public class AccordService implements IAccordService,
Shutdownable
if (consistencyLevel == null)
consistencyLevel = ConsistencyLevel.ANY;
return isWrite ? new WriteTimeoutException(WriteType.CAS,
consistencyLevel, 0, 0, txnId.toString())
- : new ReadTimeoutException(consistencyLevel, 0, 0,
false, txnId.toString());
+ : new ReadTimeoutException(consistencyLevel, 0, 0,
false, txnId.toString());
}
private static RuntimeException newPreempted(TxnId txnId, boolean isWrite,
ConsistencyLevel consistencyLevel)
@@ -762,7 +775,7 @@ public class AccordService implements IAccordService,
Shutdownable
if (consistencyLevel == null)
consistencyLevel = ConsistencyLevel.ANY;
return isWrite ? new WritePreemptedException(WriteType.CAS,
consistencyLevel, 0, 0, txnId.toString())
- : new ReadPreemptedException(consistencyLevel, 0,
0, false, txnId.toString());
+ : new ReadPreemptedException(consistencyLevel, 0, 0,
false, txnId.toString());
}
@Override
@@ -833,6 +846,143 @@ public class AccordService implements IAccordService,
Shutdownable
return node.id();
}
+ @Override
+ public List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId txnId)
+ {
+ AsyncChain<List<CommandStoreTxnBlockedGraph>> states =
loadDebug(txnId);
+ try
+ {
+ return AsyncChains.getBlocking(states);
+ }
+ catch (InterruptedException e)
+ {
+ throw new UncheckedInterruptedException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
+ public AsyncChain<List<CommandStoreTxnBlockedGraph>> loadDebug(TxnId
original)
+ {
+ CommandStores commandStores = node.commandStores();
+ if (commandStores.count() == 0)
+ return AsyncChains.success(Collections.emptyList());
+ int[] ids = commandStores.ids();
+ List<AsyncChain<CommandStoreTxnBlockedGraph>> chains = new
ArrayList<>(ids.length);
+ for (int id : ids)
+ chains.add(loadDebug(original, commandStores.forId(id)));
+ return AsyncChains.all(chains);
+ }
+
+ private AsyncChain<CommandStoreTxnBlockedGraph> loadDebug(TxnId txnId,
CommandStore store)
+ {
+ CommandStoreTxnBlockedGraph.Builder state = new
CommandStoreTxnBlockedGraph.Builder(store.id());
+ return populate(state, store, txnId).map(ignore -> state.build());
+ }
+
+ private static AsyncChain<Void>
populate(CommandStoreTxnBlockedGraph.Builder state, CommandStore store, TxnId
txnId)
+ {
+ AsyncChain<AsyncChain<Void>> submit =
store.submit(PreLoadContext.contextFor(txnId), in -> {
+ AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore)
in, txnId);
+ return chain == null ? AsyncChains.success(null) : chain;
+ });
+ return submit.flatMap(Function.identity());
+ }
+
+ private static AsyncChain<Void>
populate(CommandStoreTxnBlockedGraph.Builder state, CommandStore commandStore,
PartitionKey blockedBy, TxnId txnId, Timestamp executeAt)
+ {
+ AsyncChain<AsyncChain<Void>> submit =
commandStore.submit(PreLoadContext.contextFor(txnId, Keys.of(blockedBy),
KeyHistory.COMMANDS), in -> {
+ AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore)
in, blockedBy, txnId, executeAt);
+ return chain == null ? AsyncChains.success(null) : chain;
+ });
+ return submit.flatMap(Function.identity());
+ }
+
+ @Nullable
+ private static AsyncChain<Void>
populate(CommandStoreTxnBlockedGraph.Builder state, AccordSafeCommandStore
safeStore, TxnId txnId)
+ {
+ AccordSafeCommand safeCommand = safeStore.getIfLoaded(txnId);
+ Invariants.nonNull(safeCommand, "Txn %s is not in the cache", txnId);
+ if (safeCommand.current() == null ||
safeCommand.current().saveStatus() == SaveStatus.Uninitialised)
+ return null;
+ CommandStoreTxnBlockedGraph.TxnState cmdTxnState = populate(state,
safeCommand.current());
+ if (cmdTxnState.notBlocked())
+ return null;
+ //TODO (safety): check depth
+ List<AsyncChain<Void>> chains = new ArrayList<>();
+ for (TxnId blockedBy : cmdTxnState.blockedBy)
+ {
+ if (state.knows(blockedBy)) continue;
+ // need to fetch the state
+ if (safeStore.getIfLoaded(blockedBy) != null)
+ {
+ AsyncChain<Void> chain = populate(state, safeStore, blockedBy);
+ if (chain != null)
+ chains.add(chain);
+ }
+ else
+ {
+ // go fetch it
+ chains.add(populate(state, safeStore.commandStore(),
blockedBy));
+ }
+ }
+ for (PartitionKey blockedBy : cmdTxnState.blockedByKey)
+ {
+ if (state.keys.containsKey(blockedBy)) continue;
+ if (safeStore.getCommandsForKeyIfLoaded(blockedBy) != null)
+ {
+ AsyncChain<Void> chain = populate(state, safeStore, blockedBy,
txnId, safeCommand.current().executeAt());
+ if (chain != null)
+ chains.add(chain);
+ }
+ else
+ {
+ // go fetch it
+ chains.add(populate(state, safeStore.commandStore(),
blockedBy, txnId, safeCommand.current().executeAt()));
+ }
+ }
+ if (chains.isEmpty())
+ return null;
+ return AsyncChains.all(chains).map(ignore -> null);
+ }
+
+ private static AsyncChain<Void>
populate(CommandStoreTxnBlockedGraph.Builder state, AccordSafeCommandStore
safeStore, PartitionKey pk, TxnId txnId, Timestamp executeAt)
+ {
+ AccordSafeCommandsForKey commandsForKey =
safeStore.getCommandsForKeyIfLoaded(pk);
+ TxnId blocking = commandsForKey.current().blockedOnTxnId(txnId,
executeAt);
+ if (blocking instanceof CommandsForKey.TxnInfo)
+ blocking = ((CommandsForKey.TxnInfo) blocking).plainTxnId();
+ state.keys.put(pk, blocking);
+ if (state.txns.containsKey(blocking)) return null;
+ if (safeStore.getIfLoaded(blocking) != null) return populate(state,
safeStore, blocking);
+ return populate(state, safeStore.commandStore(), blocking);
+ }
+
+ private static CommandStoreTxnBlockedGraph.TxnState
populate(CommandStoreTxnBlockedGraph.Builder state, Command cmd)
+ {
+ CommandStoreTxnBlockedGraph.Builder.TxnBuilder cmdTxnState =
state.txn(cmd.txnId(), cmd.executeAt(), cmd.saveStatus());
+ if (!cmd.hasBeen(Status.Applied) && cmd.isCommitted())
+ {
+ // check blocking state
+ Command.WaitingOn waitingOn = cmd.asCommitted().waitingOn();
+ waitingOn.waitingOn.reverseForEach(null, null, null, null, (i1,
i2, i3, i4, i) -> {
+ if (i < waitingOn.txnIdCount())
+ {
+ // blocked on txn
+ cmdTxnState.blockedBy.add(waitingOn.txnId(i));
+ }
+ else
+ {
+ // blocked on key
+ cmdTxnState.blockedByKey.add((PartitionKey)
waitingOn.keys.get(i - waitingOn.txnIdCount()));
+ }
+ });
+ }
+ return cmdTxnState.build();
+ }
+
public Node node()
{
return node;
@@ -921,7 +1071,7 @@ public class AccordService implements IAccordService,
Shutdownable
public CompactionInfo getCompactionInfo()
{
Int2ObjectHashMap<RedundantBefore> redundantBefores = new
Int2ObjectHashMap<>();
- Int2ObjectHashMap<CommandStores.RangesForEpoch>ranges = new
Int2ObjectHashMap<>();
+ Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges = new
Int2ObjectHashMap<>();
AtomicReference<DurableBefore> durableBefore = new
AtomicReference<>(DurableBefore.EMPTY);
AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> {
synchronized (redundantBefores)
diff --git
a/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java
b/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java
new file mode 100644
index 0000000000..c7d0147add
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import accord.local.SaveStatus;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+
+public class CommandStoreTxnBlockedGraph
+{
+ public final int storeId;
+ public final Map<TxnId, TxnState> txns;
+ public final Map<PartitionKey, TxnId> keys;
+
+ public CommandStoreTxnBlockedGraph(Builder builder)
+ {
+ storeId = builder.storeId;
+ txns = ImmutableMap.copyOf(builder.txns);
+ keys = ImmutableMap.copyOf(builder.keys);
+ }
+
+ public static class TxnState
+ {
+ public final TxnId txnId;
+ public final Timestamp executeAt;
+ public final SaveStatus saveStatus;
+ public final List<TxnId> blockedBy;
+ public final Set<PartitionKey> blockedByKey;
+
+ public TxnState(Builder.TxnBuilder builder)
+ {
+ txnId = builder.txnId;
+ executeAt = builder.executeAt;
+ saveStatus = builder.saveStatus;
+ blockedBy = ImmutableList.copyOf(builder.blockedBy);
+ blockedByKey = ImmutableSet.copyOf(builder.blockedByKey);
+ }
+
+ public boolean isBlocked()
+ {
+ return !notBlocked();
+ }
+
+ public boolean notBlocked()
+ {
+ return blockedBy.isEmpty() && blockedByKey.isEmpty();
+ }
+ }
+
+ public static class Builder
+ {
+ final int storeId;
+ final Map<TxnId, TxnState> txns = new LinkedHashMap<>();
+ final Map<PartitionKey, TxnId> keys = new LinkedHashMap<>();
+
+ public Builder(int storeId)
+ {
+ this.storeId = storeId;
+ }
+
+ boolean knows(TxnId id)
+ {
+ return txns.containsKey(id);
+ }
+
+ public CommandStoreTxnBlockedGraph build()
+ {
+ return new CommandStoreTxnBlockedGraph(this);
+ }
+
+ public TxnBuilder txn(TxnId txnId, Timestamp executeAt, SaveStatus
saveStatus)
+ {
+ return new TxnBuilder(txnId, executeAt, saveStatus);
+ }
+
+ public class TxnBuilder
+ {
+ final TxnId txnId;
+ final Timestamp executeAt;
+ final SaveStatus saveStatus;
+ List<TxnId> blockedBy = new ArrayList<>();
+ Set<PartitionKey> blockedByKey = new LinkedHashSet<>();
+
+ public TxnBuilder(TxnId txnId, Timestamp executeAt, SaveStatus
saveStatus)
+ {
+ this.txnId = txnId;
+ this.executeAt = executeAt;
+ this.saveStatus = saveStatus;
+ }
+
+ public TxnState build()
+ {
+ TxnState state = new TxnState(this);
+ txns.put(txnId, state);
+ return state;
+ }
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index c79bbbfcd3..9e49cf3c47 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.utils.concurrent.Future;
import static com.google.common.base.Preconditions.checkNotNull;
+
public interface IAccordService
{
Set<ConsistencyLevel> SUPPORTED_COMMIT_CONSISTENCY_LEVELS =
ImmutableSet.of(ConsistencyLevel.ANY, ConsistencyLevel.ONE,
ConsistencyLevel.LOCAL_ONE, ConsistencyLevel.QUORUM, ConsistencyLevel.SERIAL,
ConsistencyLevel.ALL);
@@ -143,4 +144,6 @@ public interface IAccordService
CompactionInfo getCompactionInfo();
default Id nodeId() { throw new UnsupportedOperationException(); }
+
+ List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId txnId);
}
diff --git
a/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
b/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
index 4ebfc8fdb0..c9fc1bd14b 100644
---
a/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
+++
b/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
@@ -18,22 +18,15 @@
package org.apache.cassandra.service.accord.exceptions;
-import java.util.Map;
+import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.exceptions.ReadFailureException;
-import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.locator.InetAddressAndPort;
public class ReadExhaustedException extends ReadFailureException
{
- public ReadExhaustedException(ConsistencyLevel consistency, int received,
int blockFor, boolean dataPresent, Map<InetAddressAndPort,
RequestFailureReason> failureReasonByEndpoint)
+ public ReadExhaustedException(ConsistencyLevel consistency, int received,
int blockFor, boolean dataPresent, String msg)
{
- super(consistency, received, blockFor, dataPresent,
failureReasonByEndpoint);
- }
-
- protected ReadExhaustedException(String msg, ConsistencyLevel consistency,
int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort,
RequestFailureReason> failureReasonByEndpoint)
- {
- super(msg, consistency, received, blockFor, dataPresent,
failureReasonByEndpoint);
+ super(msg, consistency, received, blockFor, dataPresent,
ImmutableMap.of());
}
}
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index dfb65e07f2..4c93a78e4d 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -69,6 +69,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.apache.commons.lang3.ArrayUtils;
@@ -499,6 +500,11 @@ public abstract class CQLTester
VirtualKeyspaceRegistry.instance.register(new
VirtualKeyspace(VIRTUAL_METRICS, createMetricsKeyspaceTables()));
}
+ protected static void addVirtualKeyspace()
+ {
+
VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
+ }
+
protected void resetSchema() throws Throwable
{
for (TableMetadata table : SchemaKeyspace.metadata().tables)
@@ -1943,6 +1949,133 @@ public abstract class CQLTester
Assert.assertEquals(String.format("expected %d rows but received %d",
expectedCount, actualRowCount), expectedCount, actualRowCount);
}
+ public abstract static class CellValidator
+ {
+ public abstract ByteBuffer expected();
+ public abstract boolean equals(ByteBuffer bb);
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj instanceof ByteBuffer)
+ return equals((ByteBuffer) obj);
+ return false;
+ }
+
+ public abstract String describe();
+ }
+
+ protected static CellValidator any()
+ {
+ return new CellValidator()
+ {
+ @Override
+ public ByteBuffer expected()
+ {
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ }
+
+ @Override
+ public boolean equals(ByteBuffer bb)
+ {
+ return true;
+ }
+
+ @Override
+ public String describe()
+ {
+ return "any";
+ }
+ };
+ }
+
+ protected static CellValidator anyNonNull()
+ {
+ return new CellValidator()
+ {
+ @Override
+ public ByteBuffer expected()
+ {
+ return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+ }
+
+ @Override
+ public boolean equals(ByteBuffer bb)
+ {
+ return !(bb == null || !bb.hasRemaining());
+ }
+
+ @Override
+ public String describe()
+ {
+ return "any non-null";
+ }
+ };
+ }
+
+ protected static CellValidator anyInt()
+ {
+ return new CellValidator()
+ {
+ @Override
+ public ByteBuffer expected()
+ {
+ return ByteBufferUtil.bytes(0);
+ }
+
+ @Override
+ public boolean equals(ByteBuffer bb)
+ {
+ if (bb == null) return false;
+ Int32Type.instance.validate(bb);
+ return bb.hasRemaining();
+ }
+
+ @Override
+ public String describe()
+ {
+ return "any non-null int";
+ }
+ };
+ }
+
+ protected static CellValidator anyOf(String... values)
+ {
+ return anyOf(UTF8Type.instance, values);
+ }
+
+ protected static <T> CellValidator anyOf(AbstractType<T> type, T... values)
+ {
+ assert values.length > 0;
+ ByteBuffer[] bbs = new ByteBuffer[values.length];
+ for (int i = 0; i < values.length; i++)
+ bbs[i] = type.decompose(values[i]);
+ return new CellValidator()
+ {
+ @Override
+ public ByteBuffer expected()
+ {
+ return bbs[0];
+ }
+
+ @Override
+ public boolean equals(ByteBuffer bb)
+ {
+ for (int i = 0; i < bbs.length; i++)
+ {
+ if (Objects.equal(bbs[i], bb)) return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String describe()
+ {
+ return formatValue(bbs[0], type);
+ }
+ };
+ }
+
public static void assertRows(UntypedResultSet result, Object[]... rows)
{
if (result == null)
@@ -1966,24 +2099,22 @@ public abstract class CQLTester
for (int j = 0; j < meta.size(); j++)
{
ColumnSpecification column = meta.get(j);
- ByteBuffer expectedByteValue = makeByteBuffer(expected == null
? null : expected[j], column.type);
+ CellValidator cellValidator = makeCellValidator(expected ==
null ? null : expected[j], column.type);
ByteBuffer actualValue =
actual.getBytes(column.name.toString());
- if (expectedByteValue != null)
- expectedByteValue = expectedByteValue.duplicate();
- if (!Objects.equal(expectedByteValue, actualValue))
+ if (!((cellValidator == null && actualValue == null) ||
(cellValidator != null && cellValidator.equals(actualValue))))
{
Object actualValueDecoded = actualValue == null ? null :
column.type.getSerializer().deserialize(actualValue);
if (!Objects.equal(expected != null ? expected[j] : null,
actualValueDecoded))
{
- if (isEmptyContainerNull(column.type,
expectedByteValue, actualValue))
+ if (isEmptyContainerNull(column.type, cellValidator !=
null ? cellValidator.expected() : null, actualValue))
continue;
error.append(String.format("Invalid value for row %d
column %d (%s of type %s), expected <%s> but got <%s>",
i,
j,
column.name,
column.type.asCQL3Type(),
-
formatValue(expectedByteValue != null ? expectedByteValue.duplicate() : null,
column.type),
+ cellValidator != null ?
cellValidator.describe() : "null",
formatValue(actualValue,
column.type))).append("\n");
}
}
@@ -2007,14 +2138,30 @@ public abstract class CQLTester
ByteBuffer actualValue =
actual.getBytes(column.name.toString());
str.append(String.format("%s=%s ", column.name,
formatValue(actualValue, column.type)));
}
- logger.info("Extra row num {}: {}", i, str.toString());
+ logger.info("Extra row num {}: {}", i, str);
}
- Assert.fail(String.format("Got more rows than expected. Expected
%d but got %d.", rows.length, i));
+ Assert.fail(String.format("Got more rows than expected. Expected
%d but got %d.\nExpected: %s\nActual: %s", rows.length, i, toString(rows),
result.toStringUnsafe()));
}
Assert.assertTrue(String.format("Got %s rows than expected. Expected
%d but got %d", rows.length>i ? "less" : "more", rows.length, i), i ==
rows.length);
}
+ private static String toString(Object o)
+ {
+ if (o == null)
+ return "null";
+ if (o instanceof CellValidator)
+ return ((CellValidator) o).describe();
+ if (o instanceof Object[])
+ return toString((Object[]) o);
+ return o.toString();
+ }
+
+ private static String toString(Object[] array)
+ {
+ return
Stream.of(array).map(CQLTester::toString).collect(Collectors.joining(", ", "[",
"]"));
+ }
+
/**
* Like assertRows(), but ignores the ordering of rows.
*/
@@ -2615,11 +2762,42 @@ public abstract class CQLTester
return ((TupleValue)value).toByteBuffer();
if (value instanceof ByteBuffer)
- return (ByteBuffer)value;
+ return ((ByteBuffer)value);
return type.decomposeUntyped(serializeTuples(value));
}
+ public static CellValidator makeCellValidator(Object value,
AbstractType<?> type)
+ {
+ if (value == null)
+ return null;
+ if (value instanceof CellValidator)
+ return (CellValidator) value;
+
+ ByteBuffer byteBuffer = makeByteBuffer(value, type);
+ return new CellValidator()
+ {
+ @Override
+ public ByteBuffer expected()
+ {
+ return byteBuffer;
+ }
+
+ @Override
+ public boolean equals(ByteBuffer bb)
+ {
+ if (bb == null) return false;
+ return byteBuffer.equals(bb);
+ }
+
+ @Override
+ public String describe()
+ {
+ return formatValue(byteBuffer, type);
+ }
+ };
+ }
+
private static String formatValue(ByteBuffer bb, AbstractType<?> type)
{
if (bb == null)
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
new file mode 100644
index 0000000000..59eae7ab0c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.local.SaveStatus;
+import accord.messages.TxnRequest;
+import accord.primitives.Routable;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.utils.async.AsyncChains;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.OptionaldPositiveInt;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.concurrent.Condition;
+import org.awaitility.Awaitility;
+
+import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
+
+public class AccordVirtualTablesTest extends CQLTester
+{
+ private static final Logger logger =
LoggerFactory.getLogger(AccordVirtualTablesTest.class);
+
+ private static final String QUERY_TXN_BLOCKED_BY = "SELECT * FROM
system_views.txn_blocked_by WHERE txn_id=?";
+ private static final String QUERY_TXN_STATUS = "SELECT save_status FROM
system_views.txn_blocked_by WHERE txn_id=? LIMIT 1";
+
+ @BeforeClass
+ public static void setUpClass()
+ {
+ daemonInitialization();
+ DatabaseDescriptor.getAccord().shard_count = new
OptionaldPositiveInt(1);
+
+ CQLTester.setUpClass();
+
+ AccordService.startup(ClusterMetadata.current().myNodeId());
+ addVirtualKeyspace();
+ requireNetwork();
+ }
+
+ @Test
+ public void unknownIsEmpty()
+ {
+ createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c))
WITH transactional_mode = 'full'");
+ assertRows(execute(QUERY_TXN_BLOCKED_BY, TxnId.NONE.toString()));
+ }
+
+ @Test
+ public void completedTxn() throws ExecutionException, InterruptedException
+ {
+ String tableName = createTable("CREATE TABLE %s (k int, c int, v int,
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+ var accord = accord();
+ TxnId id = accord.node().nextTxnId(Txn.Kind.Write,
Routable.Domain.Key);
+ Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c,
v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
+ AsyncChains.getBlocking(accord.node().coordinate(id, txn));
+
+ assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
+ row(id.toString(), anyInt(), 0, "", "Self", any(), null,
anyOf(SaveStatus.Applying.name(), SaveStatus.Applied.name())));
+ }
+
+ @Test
+ public void inflight() throws ExecutionException, InterruptedException
+ {
+ AccordMsgFilter filter = new AccordMsgFilter();
+ MessagingService.instance().outboundSink.add(filter);
+ try
+ {
+ String tableName = createTable("CREATE TABLE %s (k int, c int, v
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+ var accord = accord();
+ TxnId id = accord.node().nextTxnId(Txn.Kind.Write,
Routable.Domain.Key);
+ Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k,
c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
+ accord.node().coordinate(id, txn);
+
+ filter.preAccept.awaitThrowUncheckedOnInterrupt();
+
+ assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
+ row(id.toString(), anyInt(), 0, "", "Self", any(),
null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+
+ filter.apply.awaitThrowUncheckedOnInterrupt();
+ assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
+ row(id.toString(), anyInt(), 0, "", "Self", any(),
null, SaveStatus.ReadyToExecute.name()));
+ }
+ finally
+ {
+ MessagingService.instance().outboundSink.remove(filter);
+ }
+ }
+
+ @Test
+ public void blocked() throws ExecutionException, InterruptedException
+ {
+ AccordMsgFilter filter = new AccordMsgFilter();
+ MessagingService.instance().outboundSink.add(filter);
+ try
+ {
+ String tableName = createTable("CREATE TABLE %s (k int, c int, v
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+ var accord = accord();
+ TxnId first = accord.node().nextTxnId(Txn.Kind.Write,
Routable.Domain.Key);
+ accord.node().coordinate(first,
createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?,
?)", KEYSPACE, tableName)), 0, 0, 0));
+
+ filter.preAccept.awaitThrowUncheckedOnInterrupt();
+ assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
+ row(first.toString(), anyInt(), 0, "", "Self", any(),
null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+
+ filter.apply.awaitThrowUncheckedOnInterrupt();
+ assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
+ row(first.toString(), anyInt(), 0, "", "Self",
anyNonNull(), null, SaveStatus.ReadyToExecute.name()));
+
+ filter.reset();
+
+ TxnId second = accord.node().nextTxnId(Txn.Kind.Write,
Routable.Domain.Key);
+ accord.node().coordinate(second,
createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?,
?)", KEYSPACE, tableName)), 0, 0, 0));
+
+ filter.commit.awaitThrowUncheckedOnInterrupt();
+
+ Awaitility.await("waiting on key").atMost(1, TimeUnit.MINUTES)
+ .until(() -> {
+ UntypedResultSet rs =
execute(QUERY_TXN_BLOCKED_BY, second.toString());
+ return rs.size() == 2;
+ });
+ assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()),
+ row(second.toString(), anyInt(), 0, "", "Self",
anyNonNull(), null, SaveStatus.Stable.name()),
+ row(second.toString(), anyInt(), 1, first.toString(),
"Key", anyNonNull(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
+ }
+ finally
+ {
+ MessagingService.instance().outboundSink.remove(filter);
+ }
+ }
+
+ private static AccordService accord()
+ {
+ return (AccordService) AccordService.instance();
+ }
+
+ private static class AccordMsgFilter implements BiPredicate<Message<?>,
InetAddressAndPort>
+ {
+ volatile Condition preAccept = Condition.newOneTimeCondition();
+ volatile Condition commit = Condition.newOneTimeCondition();
+ volatile Condition apply = Condition.newOneTimeCondition();
+
+ void reset()
+ {
+ preAccept = Condition.newOneTimeCondition();
+ commit = Condition.newOneTimeCondition();
+ apply = Condition.newOneTimeCondition();
+ }
+
+ ConcurrentMap<TxnId, ConcurrentSkipListSet<Verb>> txnToVerbs = new
ConcurrentHashMap<>();
+
+ @Override
+ public boolean test(Message<?> msg, InetAddressAndPort to)
+ {
+ if (!msg.verb().name().startsWith("ACCORD_"))
+ return true;
+ TxnId txnId = null;
+ if (msg.payload instanceof TxnRequest)
+ {
+ txnId = ((TxnRequest) msg.payload).txnId;
+ }
+ Set<Verb> seen = null;
+ if (txnId != null)
+ {
+ seen = txnToVerbs.computeIfAbsent(txnId, ignore -> new
ConcurrentSkipListSet<>());
+ seen.add(msg.verb());
+ }
+ switch (msg.verb())
+ {
+ case ACCORD_APPLY_REQ:
+ case ACCORD_APPLY_AND_WAIT_REQ:
+ apply.signalAll();
+ case ACCORD_BEGIN_RECOVER_REQ:
+ return false;
+ case ACCORD_PRE_ACCEPT_RSP:
+ preAccept.signalAll();
+ return true;
+ case ACCORD_COMMIT_REQ:
+ commit.signalAll();
+ return true;
+ case ACCORD_PRE_ACCEPT_REQ:
+ case ACCORD_CHECK_STATUS_REQ:
+ case ACCORD_CHECK_STATUS_RSP:
+ case ACCORD_READ_RSP:
+ return true;
+ default:
+ // many code paths don't log the error...
+ UnsupportedOperationException e = new
UnsupportedOperationException(msg.verb().name());
+ logger.error("Unexpected verb {}", msg.verb(), e);
+ throw e;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java
index 3abdcf8080..f6918422e9 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java
@@ -26,6 +26,7 @@ import java.util.function.Supplier;
import org.junit.Test;
+import accord.api.BarrierType;
import accord.coordinate.Exhausted;
import accord.coordinate.Preempted;
import accord.coordinate.Timeout;
@@ -64,19 +65,19 @@ public class AccordServiceTest
throw new Timeout(null, null);
case 1:
attempts++;
- throw AccordService.newBarrierTimeout(TxnId.NONE,
true);
+ throw AccordService.newBarrierTimeout(TxnId.NONE,
BarrierType.local, true, Ranges.EMPTY);
case 2:
attempts++;
throw new Preempted(null, null);
case 3:
attempts++;
- throw AccordService.newBarrierPreempted(TxnId.NONE,
true);
+ throw AccordService.newBarrierPreempted(TxnId.NONE,
BarrierType.local, true, Ranges.EMPTY);
case 4:
attempts++;
throw new Exhausted(null, null);
case 5:
attempts++;
- throw AccordService.newBarrierExhausted(TxnId.NONE,
true);
+ throw AccordService.newBarrierExhausted(TxnId.NONE,
BarrierType.local, true, Ranges.EMPTY);
default:
return Ranges.of(IntKey.range(1, 2));
}
@@ -98,9 +99,9 @@ public class AccordServiceTest
qt().check(rs -> {
List<Runnable> timeoutFailures = new ArrayList<>(4);
timeoutFailures.add(() -> {throw new Timeout(null, null);});
- timeoutFailures.add(() -> {throw
AccordService.newBarrierTimeout(TxnId.NONE, true);});
+ timeoutFailures.add(() -> {throw
AccordService.newBarrierTimeout(TxnId.NONE, BarrierType.local, true,
Ranges.EMPTY);});
timeoutFailures.add(() -> {throw new Preempted(null, null);});
- timeoutFailures.add(() -> {throw
AccordService.newBarrierPreempted(TxnId.NONE, true);});
+ timeoutFailures.add(() -> {throw
AccordService.newBarrierPreempted(TxnId.NONE, BarrierType.local, true,
Ranges.EMPTY);});
Collections.shuffle(timeoutFailures, rs.asJdkRandom());
Iterator<Runnable> it = timeoutFailures.iterator();
Supplier<Seekables> failing = () -> {
@@ -120,9 +121,9 @@ public class AccordServiceTest
qt().check(rs -> {
List<Runnable> timeoutFailures = new ArrayList<>(5);
timeoutFailures.add(() -> {throw new Timeout(null, null);});
- timeoutFailures.add(() -> {throw
AccordService.newBarrierTimeout(TxnId.NONE, true);});
+ timeoutFailures.add(() -> {throw
AccordService.newBarrierTimeout(TxnId.NONE, BarrierType.local, true,
Ranges.EMPTY);});
timeoutFailures.add(() -> {throw new Preempted(null, null);});
- timeoutFailures.add(() -> {throw
AccordService.newBarrierPreempted(TxnId.NONE, true);});
+ timeoutFailures.add(() -> {throw
AccordService.newBarrierPreempted(TxnId.NONE, BarrierType.local, true,
Ranges.EMPTY);});
timeoutFailures.add(() -> {throw new Exhausted(null, null);});
Collections.shuffle(timeoutFailures, rs.asJdkRandom());
Iterator<Runnable> it = timeoutFailures.iterator();
@@ -158,9 +159,9 @@ public class AccordServiceTest
qt().check(rs -> {
List<Runnable> failures = new ArrayList<>(6);
failures.add(() -> {throw new Timeout(null, null);});
- failures.add(() -> {throw
AccordService.newBarrierTimeout(TxnId.NONE, true);});
+ failures.add(() -> {throw
AccordService.newBarrierTimeout(TxnId.NONE, BarrierType.local, true,
Ranges.EMPTY);});
failures.add(() -> {throw new Preempted(null, null);});
- failures.add(() -> {throw
AccordService.newBarrierPreempted(TxnId.NONE, true);});
+ failures.add(() -> {throw
AccordService.newBarrierPreempted(TxnId.NONE, BarrierType.local, true,
Ranges.EMPTY);});
failures.add(() -> {throw new Exhausted(null, null);});
boolean isError = rs.nextBoolean();
failures.add(new Unexpected(isError));
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index bfbf4e5095..54a311a67b 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -287,6 +287,11 @@ public class AccordTestUtils
return createTxn(query, QueryOptions.DEFAULT);
}
+ public static Txn createTxn(String query, Object... binds)
+ {
+ return createTxn(query, Arrays.asList(binds));
+ }
+
public static Txn createTxn(String query, List<Object> binds)
{
TransactionStatement statement = parse(query);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]