This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new b8057f0921 Add a table to inspect the current state of a txn
b8057f0921 is described below

commit b8057f092130e866ca4f34fe9770c003581a36d9
Author: David Capwell <[email protected]>
AuthorDate: Fri Aug 16 09:53:00 2024 -0700

    Add a table to inspect the current state of a txn
    
    patch by David Capwell; reviewed by Benedict Elliott Smith for 
CASSANDRA-19838
---
 modules/accord                                     |   2 +-
 .../cassandra/db/virtual/AccordVirtualTables.java  | 118 ++++++++++-
 .../cassandra/db/virtual/VirtualKeyspace.java      |  12 +-
 .../apache/cassandra/db/virtual/VirtualTable.java  |   8 +
 .../cassandra/service/accord/AccordService.java    | 216 +++++++++++++++++---
 .../accord/CommandStoreTxnBlockedGraph.java        | 127 ++++++++++++
 .../cassandra/service/accord/IAccordService.java   |   3 +
 .../accord/exceptions/ReadExhaustedException.java  |  13 +-
 test/unit/org/apache/cassandra/cql3/CQLTester.java | 196 +++++++++++++++++-
 .../db/virtual/AccordVirtualTablesTest.java        | 227 +++++++++++++++++++++
 .../service/accord/AccordServiceTest.java          |  19 +-
 .../cassandra/service/accord/AccordTestUtils.java  |   5 +
 12 files changed, 882 insertions(+), 64 deletions(-)

diff --git a/modules/accord b/modules/accord
index 81c02769f9..e2ccee4f51 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 81c02769f9ad73ef3aba0675c2217fc74b8a4a4c
+Subproject commit e2ccee4f51fe4c7c7f3ea8911897135ed7e37114
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java 
b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
index 1b2e041c16..ac89e45133 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
@@ -18,29 +18,39 @@
 
 package org.apache.cassandra.db.virtual;
 
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.local.CommandStores;
+import accord.local.Status;
 import accord.primitives.TxnId;
+import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.FieldIdentifier;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -51,12 +61,15 @@ import 
org.apache.cassandra.service.accord.AccordCommandStore;
 import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.AccordStateCache;
+import org.apache.cassandra.service.accord.CommandStoreTxnBlockedGraph;
+import org.apache.cassandra.service.accord.api.PartitionKey;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import org.apache.cassandra.service.consensus.migration.TableMigrationState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.utils.Clock;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class AccordVirtualTables
 {
@@ -70,7 +83,8 @@ public class AccordVirtualTables
         return List.of(
             new CommandStoreCache(keyspace),
             new MigrationState(keyspace),
-            new CoordinationStatus(keyspace)
+            new CoordinationStatus(keyspace),
+            new TxnBlockedByTable(keyspace)
         );
     }
 
@@ -246,6 +260,108 @@ public class AccordVirtualTables
         }
     }
 
+    public static class TxnBlockedByTable extends AbstractVirtualTable
+    {
+        enum Reason { Self, Txn, Key }
+        private final UserType partitionKeyType;
+
+        protected TxnBlockedByTable(String keyspace)
+        {
+            super(TableMetadata.builder(keyspace, "txn_blocked_by")
+                               .kind(TableMetadata.Kind.VIRTUAL)
+                               .addPartitionKeyColumn("txn_id", 
UTF8Type.instance)
+                               .addClusteringColumn("store_id", 
Int32Type.instance)
+                               .addClusteringColumn("depth", 
Int32Type.instance)
+                               .addClusteringColumn("blocked_by", 
UTF8Type.instance)
+                               .addClusteringColumn("reason", 
UTF8Type.instance)
+                               .addRegularColumn("save_status", 
UTF8Type.instance)
+                               .addRegularColumn("execute_at", 
UTF8Type.instance)
+                               .addRegularColumn("key", pkType(keyspace))
+                               .build());
+            partitionKeyType = pkType(keyspace);
+        }
+
+        private static UserType pkType(String keyspace)
+        {
+            return new UserType(keyspace, bytes("partition_key"),
+                                
Arrays.asList(FieldIdentifier.forQuoted("table"), 
FieldIdentifier.forQuoted("token")),
+                                Arrays.asList(UTF8Type.instance, 
UTF8Type.instance), false);
+        }
+
+        private ByteBuffer pk(PartitionKey pk)
+        {
+            var tm = Schema.instance.getTableMetadata(pk.table());
+            return 
partitionKeyType.pack(UTF8Type.instance.decompose(tm.toString()),
+                                         
UTF8Type.instance.decompose(pk.token().toString()));
+        }
+
+        @Override
+        public Iterable<UserType> userTypes()
+        {
+            return Arrays.asList(partitionKeyType);
+        }
+
+        @Override
+        public DataSet data(DecoratedKey partitionKey)
+        {
+            TxnId id = 
TxnId.parse(UTF8Type.instance.compose(partitionKey.getKey()));
+            List<CommandStoreTxnBlockedGraph> shards = 
AccordService.instance().debugTxnBlockedGraph(id);
+
+            SimpleDataSet ds = new SimpleDataSet(metadata());
+            for (CommandStoreTxnBlockedGraph shard : shards)
+            {
+                Set<TxnId> processed = new HashSet<>();
+                process(ds, shard, processed, id, 0, id, Reason.Self, null);
+                // everything was processed right?
+                if (!shard.txns.isEmpty() && 
!shard.txns.keySet().containsAll(processed))
+                    throw new IllegalStateException("Skipped txns: " + 
Sets.difference(shard.txns.keySet(), processed));
+            }
+
+            return ds;
+        }
+
+        private void process(SimpleDataSet ds, CommandStoreTxnBlockedGraph 
shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason 
reason, Runnable onDone)
+        {
+            if (!processed.add(txnId))
+                throw new IllegalStateException("Double processed " + txnId);
+            CommandStoreTxnBlockedGraph.TxnState txn = shard.txns.get(txnId);
+            if (txn == null)
+            {
+                Invariants.checkState(reason == Reason.Self, "Txn %s unknown 
for reason %s", txnId, reason);
+                return;
+            }
+            // was it applied?  If so ignore it
+            if (reason != Reason.Self && 
txn.saveStatus.hasBeen(Status.Applied))
+                return;
+            ds.row(userTxn.toString(), shard.storeId, depth, reason == 
Reason.Self ? "" : txn.txnId.toString(), reason.name());
+            ds.column("save_status", txn.saveStatus.name());
+            if (txn.executeAt != null)
+                ds.column("execute_at", txn.executeAt.toString());
+            if (onDone != null)
+                onDone.run();
+            if (txn.isBlocked())
+            {
+                for (TxnId blockedBy : txn.blockedBy)
+                {
+                    if (processed.contains(blockedBy)) continue; // already 
listed
+                    process(ds, shard, processed, userTxn, depth + 1, 
blockedBy, Reason.Txn, null);
+                }
+                for (PartitionKey blockedBy : txn.blockedByKey)
+                {
+                    TxnId blocking = shard.keys.get(blockedBy);
+                    if (processed.contains(blocking)) continue; // already 
listed
+                    process(ds, shard, processed, userTxn, depth + 1, 
blocking, Reason.Key, () -> ds.column("key", pk(blockedBy)));
+                }
+            }
+        }
+
+        @Override
+        public DataSet data()
+        {
+            throw new InvalidRequestException("Must select a single txn_id");
+        }
+    }
+
     private static TableMetadata parse(String keyspace, String comment, String 
query)
     {
         return CreateTableStatement.parse(query, keyspace)
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
index 044c11476b..3316aa4fb3 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
@@ -50,7 +50,17 @@ public class VirtualKeyspace
         if (!duplicates.isEmpty())
             throw new IllegalArgumentException(String.format("Duplicate table 
names in virtual keyspace %s: %s", name, duplicates));
 
-        metadata = KeyspaceMetadata.virtual(name, 
Tables.of(Iterables.transform(tables, VirtualTable::metadata)));
+        KeyspaceMetadata metadata = KeyspaceMetadata.virtual(name, 
Tables.of(Iterables.transform(tables, VirtualTable::metadata)));
+        for (var t : tables)
+        {
+            for (var udt : t.userTypes())
+            {
+                if (metadata.types.getNullable(udt.name) != null)
+                    throw new IllegalStateException("UDT " + 
udt.getNameAsString() + " already exists");
+                metadata = metadata.withUpdatedUserType(udt);
+            }
+        }
+        this.metadata = metadata;
     }
 
     public String name()
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java 
b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
index 53a9f2ac7f..93047faad8 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
@@ -17,10 +17,13 @@
  */
 package org.apache.cassandra.db.virtual;
 
+import java.util.Collections;
+
 import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.schema.TableMetadata;
@@ -87,4 +90,9 @@ public interface VirtualTable
     {
         return true;
     }
+
+    default Iterable<UserType> userTypes()
+    {
+        return Collections.emptyList();
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index dab05b621e..6a46c24371 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -18,7 +18,9 @@
 
 package org.apache.cassandra.service.accord;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -27,6 +29,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
@@ -37,7 +40,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,13 +59,20 @@ import accord.impl.AbstractConfigurationService;
 import accord.impl.CoordinateDurabilityScheduling;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
+import accord.local.Command;
+import accord.local.CommandStore;
 import accord.local.CommandStores;
 import accord.local.DurableBefore;
+import accord.local.KeyHistory;
 import accord.local.Node;
 import accord.local.Node.Id;
 import accord.local.NodeTimeService;
+import accord.local.PreLoadContext;
 import accord.local.RedundantBefore;
+import accord.local.SaveStatus;
 import accord.local.ShardDistributor.EvenSplit;
+import accord.local.Status;
+import accord.local.cfk.CommandsForKey;
 import accord.messages.LocalRequest;
 import accord.messages.Request;
 import accord.primitives.Keys;
@@ -147,10 +156,8 @@ public class AccordService implements IAccordService, 
Shutdownable
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordService.class);
 
-    private enum State { INIT, STARTED, SHUTDOWN}
+    private enum State {INIT, STARTED, SHUTDOWN}
 
-    public static final AccordClientRequestMetrics readMetrics = new 
AccordClientRequestMetrics("AccordRead");
-    public static final AccordClientRequestMetrics writeMetrics = new 
AccordClientRequestMetrics("AccordWrite");
     private static final Future<Void> BOOTSTRAP_SUCCESS = 
ImmediateFuture.success(null);
 
     private final Node node;
@@ -262,6 +269,12 @@ public class AccordService implements IAccordService, 
Shutdownable
         {
             return new CompactionInfo(new Int2ObjectHashMap<>(), new 
Int2ObjectHashMap<>(), DurableBefore.EMPTY);
         }
+
+        @Override
+        public List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId 
txnId)
+        {
+            return Collections.emptyList();
+        }
     };
 
     private static volatile IAccordService instance = null;
@@ -379,7 +392,7 @@ public class AccordService implements IAccordService, 
Shutdownable
             return;
         journal.start(node);
         configService.start();
-        ClusterMetadataService.instance().log().addListener(configService);
+
         fastPathCoordinator.start();
         
ClusterMetadataService.instance().log().addListener(fastPathCoordinator);
         
durabilityScheduling.setGlobalCycleTime(Ints.checkedCast(DatabaseDescriptor.getAccordGlobalDurabilityCycle(SECONDS)),
 SECONDS);
@@ -399,7 +412,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     private <S extends Seekables<?, ?>> Seekables barrier(@Nonnull S 
keysOrRanges, long epoch, long queryStartNanos, long timeoutNanos, BarrierType 
barrierType, boolean isForWrite, BiFunction<Node, S, AsyncResult<SyncPoint<S>>> 
syncPoint)
     {
         Stopwatch sw = Stopwatch.createStarted();
-        keysOrRanges = (S)intersectionWithAccordManagedRanges(keysOrRanges);
+        keysOrRanges = (S) intersectionWithAccordManagedRanges(keysOrRanges);
         // It's possible none of them were Accord managed and we aren't going 
to treat that as an error
         if (keysOrRanges.isEmpty())
         {
@@ -408,14 +421,12 @@ public class AccordService implements IAccordService, 
Shutdownable
         }
 
         AccordClientRequestMetrics metrics = isForWrite ? accordWriteMetrics : 
accordReadMetrics;
-        TxnId txnId = null;
         try
         {
             logger.debug("Starting barrier key: {} epoch: {} barrierType: {} 
isForWrite {}", keysOrRanges, epoch, barrierType, isForWrite);
-            txnId = node.nextTxnId(Kind.SyncPoint, keysOrRanges.domain());
             AsyncResult<TxnId> asyncResult = syncPoint == null
-                                                 ? Barrier.barrier(node, 
keysOrRanges, epoch, barrierType)
-                                                 : Barrier.barrier(node, 
keysOrRanges, epoch, barrierType, syncPoint);
+                                             ? Barrier.barrier(node, 
keysOrRanges, epoch, barrierType)
+                                             : Barrier.barrier(node, 
keysOrRanges, epoch, barrierType, syncPoint);
             long deadlineNanos = queryStartNanos + timeoutNanos;
             Timestamp barrierExecuteAt = AsyncChains.getBlocking(asyncResult, 
deadlineNanos - nanoTime(), NANOSECONDS);
             logger.debug("Completed barrier attempt in {}ms, {}ms since 
attempts start, barrier key: {} epoch: {} barrierType: {} isForWrite {}",
@@ -429,21 +440,24 @@ public class AccordService implements IAccordService, 
Shutdownable
             Throwable cause = Throwables.getRootCause(e);
             if (cause instanceof Timeout)
             {
+                TxnId txnId = ((Timeout) cause).txnId();
                 metrics.timeouts.mark();
-                throw newBarrierTimeout(txnId, barrierType.global);
+                throw newBarrierTimeout(txnId, barrierType, isForWrite, 
keysOrRanges);
             }
             if (cause instanceof Preempted)
             {
+                TxnId txnId = ((Preempted) cause).txnId();
                 //TODO need to improve
                 // Coordinator "could" query the accord state to see whats 
going on but that doesn't exist yet.
                 // Protocol also doesn't have a way to denote "unknown" 
outcome, so using a timeout as the closest match
-                throw newBarrierPreempted(txnId, barrierType.global);
+                throw newBarrierPreempted(txnId, barrierType, isForWrite, 
keysOrRanges);
             }
             if (cause instanceof Exhausted)
             {
+                TxnId txnId = ((Exhausted) cause).txnId();
                 // this case happens when a non-timeout exception is seen, and 
we are unable to move forward
                 metrics.failures.mark();
-                throw newBarrierExhausted(txnId, barrierType.global);
+                throw newBarrierExhausted(txnId, barrierType, isForWrite, 
keysOrRanges);
             }
             // unknown error
             metrics.failures.mark();
@@ -457,7 +471,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         catch (TimeoutException e)
         {
             metrics.timeouts.mark();
-            throw newBarrierTimeout(txnId, barrierType.global);
+            throw newBarrierTimeout(null, barrierType, isForWrite, 
keysOrRanges);
         }
         finally
         {
@@ -484,16 +498,16 @@ public class AccordService implements IAccordService, 
Shutdownable
         return barrier(keysOrRanges, epoch, queryStartNanos, timeoutNanos, 
barrierType, isForWrite, repairSyncPoint(allNodes));
     }
 
-    private static <S extends Seekables<?,?>> Seekables 
intersectionWithAccordManagedRanges(Seekables<?, ?> keysOrRanges)
+    private static <S extends Seekables<?, ?>> Seekables 
intersectionWithAccordManagedRanges(Seekables<?, ?> keysOrRanges)
     {
         TableId tableId = null;
         for (Seekable seekable : keysOrRanges)
         {
             TableId newTableId;
             if (keysOrRanges.domain() == Key)
-                newTableId = ((PartitionKey)seekable).table();
+                newTableId = ((PartitionKey) seekable).table();
             else if (keysOrRanges.domain() == Range)
-                newTableId = ((TokenRange)seekable).table();
+                newTableId = ((TokenRange) seekable).table();
             else
                 throw new IllegalStateException("Unexpected domain " + 
keysOrRanges.domain());
 
@@ -532,22 +546,21 @@ public class AccordService implements IAccordService, 
Shutdownable
     }
 
     @VisibleForTesting
-    static ReadTimeoutException newBarrierTimeout(TxnId txnId, boolean global)
+    static ReadTimeoutException newBarrierTimeout(TxnId txnId, BarrierType 
barrierType, boolean isForWrite, Seekables<?, ?> keysOrRanges)
     {
-        return new ReadTimeoutException(global ? ConsistencyLevel.ANY : 
ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString());
+        return new ReadTimeoutException(barrierType.global ? 
ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false, 
String.format("Timeout waiting on barrier %s / %s / %s; impacted ranges %s", 
txnId, barrierType, isForWrite ? "write" : "not write", keysOrRanges));
     }
 
     @VisibleForTesting
-    static ReadTimeoutException newBarrierPreempted(TxnId txnId, boolean 
global)
+    static ReadTimeoutException newBarrierPreempted(TxnId txnId, BarrierType 
barrierType, boolean isForWrite, Seekables<?, ?> keysOrRanges)
     {
-        return new ReadPreemptedException(global ? ConsistencyLevel.ANY : 
ConsistencyLevel.QUORUM, 0, 0, false, txnId.toString());
+        return new ReadPreemptedException(barrierType.global ? 
ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false, 
String.format("Preempted waiting on barrier %s / %s / %s; impacted ranges %s", 
txnId, barrierType, isForWrite ? "write" : "not write", keysOrRanges));
     }
 
     @VisibleForTesting
-    static ReadExhaustedException newBarrierExhausted(TxnId txnId, boolean 
global)
+    static ReadExhaustedException newBarrierExhausted(TxnId txnId, BarrierType 
barrierType, boolean isForWrite, Seekables<?, ?> keysOrRanges)
     {
-        //TODO (usability): not being able to show the txn is a bad UX, this 
becomes harder to trace back in logs
-        return new ReadExhaustedException(global ? ConsistencyLevel.ANY : 
ConsistencyLevel.QUORUM, 0, 0, false, ImmutableMap.of());
+        return new ReadExhaustedException(barrierType.global ? 
ConsistencyLevel.ANY : ConsistencyLevel.QUORUM, 0, 0, false, 
String.format("Exhausted (too many failures from peers) waiting on barrier %s / 
%s / %s; impacted ranges %s", txnId, barrierType, isForWrite ? "write" : "not 
write", keysOrRanges));
     }
 
     @VisibleForTesting
@@ -615,9 +628,9 @@ public class AccordService implements IAccordService, 
Shutdownable
     public Seekables barrierWithRetries(Seekables keysOrRanges, long minEpoch, 
BarrierType barrierType, boolean isForWrite) throws InterruptedException
     {
         return doWithRetries(Blocking.Default.instance, () -> 
AccordService.instance().barrier(keysOrRanges, minEpoch, 
Clock.Global.nanoTime(), 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos(), barrierType, 
isForWrite),
-                      DatabaseDescriptor.getAccordBarrierRetryAttempts(),
-                      
DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(),
-                      
DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis());
+                             
DatabaseDescriptor.getAccordBarrierRetryAttempts(),
+                             
DatabaseDescriptor.getAccordBarrierRetryInitialBackoffMillis(),
+                             
DatabaseDescriptor.getAccordBarrierRetryMaxBackoffMillis());
     }
 
     @Override
@@ -666,12 +679,12 @@ public class AccordService implements IAccordService, 
Shutdownable
             Throwable cause = failure != null ? 
Throwables.getRootCause(failure) : null;
             if (success != null)
             {
-                if (((TxnResult)success).kind() == 
TxnResult.Kind.retry_new_protocol)
+                if (((TxnResult) success).kind() == 
TxnResult.Kind.retry_new_protocol)
                 {
                     metrics.retryDifferentSystem.mark();
                     Tracing.trace("Got retry different system error from 
Accord, will retry");
                 }
-                asyncTxnResult.trySuccess((TxnResult)success);
+                asyncTxnResult.trySuccess((TxnResult) success);
                 return;
             }
 
@@ -725,7 +738,7 @@ public class AccordService implements IAccordService, 
Shutdownable
                 throw (RequestTimeoutException) cause;
             }
             else if (cause instanceof RuntimeException)
-                throw (RuntimeException)cause;
+                throw (RuntimeException) cause;
             else
                 throw new RuntimeException(cause);
         }
@@ -754,7 +767,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         if (consistencyLevel == null)
             consistencyLevel = ConsistencyLevel.ANY;
         return isWrite ? new WriteTimeoutException(WriteType.CAS, 
consistencyLevel, 0, 0, txnId.toString())
-                            : new ReadTimeoutException(consistencyLevel, 0, 0, 
false, txnId.toString());
+                       : new ReadTimeoutException(consistencyLevel, 0, 0, 
false, txnId.toString());
     }
 
     private static RuntimeException newPreempted(TxnId txnId, boolean isWrite, 
ConsistencyLevel consistencyLevel)
@@ -762,7 +775,7 @@ public class AccordService implements IAccordService, 
Shutdownable
         if (consistencyLevel == null)
             consistencyLevel = ConsistencyLevel.ANY;
         return isWrite ? new WritePreemptedException(WriteType.CAS, 
consistencyLevel, 0, 0, txnId.toString())
-                            : new ReadPreemptedException(consistencyLevel, 0, 
0, false, txnId.toString());
+                       : new ReadPreemptedException(consistencyLevel, 0, 0, 
false, txnId.toString());
     }
 
     @Override
@@ -833,6 +846,143 @@ public class AccordService implements IAccordService, 
Shutdownable
         return node.id();
     }
 
+    @Override
+    public List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId txnId)
+    {
+        AsyncChain<List<CommandStoreTxnBlockedGraph>> states = 
loadDebug(txnId);
+        try
+        {
+            return AsyncChains.getBlocking(states);
+        }
+        catch (InterruptedException e)
+        {
+            throw new UncheckedInterruptedException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e.getCause());
+        }
+    }
+
+    public AsyncChain<List<CommandStoreTxnBlockedGraph>> loadDebug(TxnId 
original)
+    {
+        CommandStores commandStores = node.commandStores();
+        if (commandStores.count() == 0)
+            return AsyncChains.success(Collections.emptyList());
+        int[] ids = commandStores.ids();
+        List<AsyncChain<CommandStoreTxnBlockedGraph>> chains = new 
ArrayList<>(ids.length);
+        for (int id : ids)
+            chains.add(loadDebug(original, commandStores.forId(id)));
+        return AsyncChains.all(chains);
+    }
+
+    private AsyncChain<CommandStoreTxnBlockedGraph> loadDebug(TxnId txnId, 
CommandStore store)
+    {
+        CommandStoreTxnBlockedGraph.Builder state = new 
CommandStoreTxnBlockedGraph.Builder(store.id());
+        return populate(state, store, txnId).map(ignore -> state.build());
+    }
+
+    private static AsyncChain<Void> 
populate(CommandStoreTxnBlockedGraph.Builder state, CommandStore store, TxnId 
txnId)
+    {
+        AsyncChain<AsyncChain<Void>> submit = 
store.submit(PreLoadContext.contextFor(txnId), in -> {
+            AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore) 
in, txnId);
+            return chain == null ? AsyncChains.success(null) : chain;
+        });
+        return submit.flatMap(Function.identity());
+    }
+
+    private static AsyncChain<Void> 
populate(CommandStoreTxnBlockedGraph.Builder state, CommandStore commandStore, 
PartitionKey blockedBy, TxnId txnId, Timestamp executeAt)
+    {
+        AsyncChain<AsyncChain<Void>> submit = 
commandStore.submit(PreLoadContext.contextFor(txnId, Keys.of(blockedBy), 
KeyHistory.COMMANDS), in -> {
+            AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore) 
in, blockedBy, txnId, executeAt);
+            return chain == null ? AsyncChains.success(null) : chain;
+        });
+        return submit.flatMap(Function.identity());
+    }
+
+    @Nullable
+    private static AsyncChain<Void> 
populate(CommandStoreTxnBlockedGraph.Builder state, AccordSafeCommandStore 
safeStore, TxnId txnId)
+    {
+        AccordSafeCommand safeCommand = safeStore.getIfLoaded(txnId);
+        Invariants.nonNull(safeCommand, "Txn %s is not in the cache", txnId);
+        if (safeCommand.current() == null || 
safeCommand.current().saveStatus() == SaveStatus.Uninitialised)
+            return null;
+        CommandStoreTxnBlockedGraph.TxnState cmdTxnState = populate(state, 
safeCommand.current());
+        if (cmdTxnState.notBlocked())
+            return null;
+        //TODO (safety): check depth
+        List<AsyncChain<Void>> chains = new ArrayList<>();
+        for (TxnId blockedBy : cmdTxnState.blockedBy)
+        {
+            if (state.knows(blockedBy)) continue;
+            // need to fetch the state
+            if (safeStore.getIfLoaded(blockedBy) != null)
+            {
+                AsyncChain<Void> chain = populate(state, safeStore, blockedBy);
+                if (chain != null)
+                    chains.add(chain);
+            }
+            else
+            {
+                // go fetch it
+                chains.add(populate(state, safeStore.commandStore(), 
blockedBy));
+            }
+        }
+        for (PartitionKey blockedBy : cmdTxnState.blockedByKey)
+        {
+            if (state.keys.containsKey(blockedBy)) continue;
+            if (safeStore.getCommandsForKeyIfLoaded(blockedBy) != null)
+            {
+                AsyncChain<Void> chain = populate(state, safeStore, blockedBy, 
txnId, safeCommand.current().executeAt());
+                if (chain != null)
+                    chains.add(chain);
+            }
+            else
+            {
+                // go fetch it
+                chains.add(populate(state, safeStore.commandStore(), 
blockedBy, txnId, safeCommand.current().executeAt()));
+            }
+        }
+        if (chains.isEmpty())
+            return null;
+        return AsyncChains.all(chains).map(ignore -> null);
+    }
+
+    private static AsyncChain<Void> 
populate(CommandStoreTxnBlockedGraph.Builder state, AccordSafeCommandStore 
safeStore, PartitionKey pk, TxnId txnId, Timestamp executeAt)
+    {
+        AccordSafeCommandsForKey commandsForKey = 
safeStore.getCommandsForKeyIfLoaded(pk);
+        TxnId blocking = commandsForKey.current().blockedOnTxnId(txnId, 
executeAt);
+        if (blocking instanceof CommandsForKey.TxnInfo)
+            blocking = ((CommandsForKey.TxnInfo) blocking).plainTxnId();
+        state.keys.put(pk, blocking);
+        if (state.txns.containsKey(blocking)) return null;
+        if (safeStore.getIfLoaded(blocking) != null) return populate(state, 
safeStore, blocking);
+        return populate(state, safeStore.commandStore(), blocking);
+    }
+
+    private static CommandStoreTxnBlockedGraph.TxnState 
populate(CommandStoreTxnBlockedGraph.Builder state, Command cmd)
+    {
+        CommandStoreTxnBlockedGraph.Builder.TxnBuilder cmdTxnState = 
state.txn(cmd.txnId(), cmd.executeAt(), cmd.saveStatus());
+        if (!cmd.hasBeen(Status.Applied) && cmd.isCommitted())
+        {
+            // check blocking state
+            Command.WaitingOn waitingOn = cmd.asCommitted().waitingOn();
+            waitingOn.waitingOn.reverseForEach(null, null, null, null, (i1, 
i2, i3, i4, i) -> {
+                if (i < waitingOn.txnIdCount())
+                {
+                    // blocked on txn
+                    cmdTxnState.blockedBy.add(waitingOn.txnId(i));
+                }
+                else
+                {
+                    // blocked on key
+                    cmdTxnState.blockedByKey.add((PartitionKey) 
waitingOn.keys.get(i - waitingOn.txnIdCount()));
+                }
+            });
+        }
+        return cmdTxnState.build();
+    }
+
     public Node node()
     {
         return node;
@@ -921,7 +1071,7 @@ public class AccordService implements IAccordService, 
Shutdownable
     public CompactionInfo getCompactionInfo()
     {
         Int2ObjectHashMap<RedundantBefore> redundantBefores = new 
Int2ObjectHashMap<>();
-        Int2ObjectHashMap<CommandStores.RangesForEpoch>ranges = new 
Int2ObjectHashMap<>();
+        Int2ObjectHashMap<CommandStores.RangesForEpoch> ranges = new 
Int2ObjectHashMap<>();
         AtomicReference<DurableBefore> durableBefore = new 
AtomicReference<>(DurableBefore.EMPTY);
         
AsyncChains.getBlockingAndRethrow(node.commandStores().forEach(safeStore -> {
             synchronized (redundantBefores)
diff --git 
a/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java 
b/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java
new file mode 100644
index 0000000000..c7d0147add
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/accord/CommandStoreTxnBlockedGraph.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import accord.local.SaveStatus;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import org.apache.cassandra.service.accord.api.PartitionKey;
+
+public class CommandStoreTxnBlockedGraph
+{
+    public final int storeId;
+    public final Map<TxnId, TxnState> txns;
+    public final Map<PartitionKey, TxnId> keys;
+
+    public CommandStoreTxnBlockedGraph(Builder builder)
+    {
+        storeId = builder.storeId;
+        txns = ImmutableMap.copyOf(builder.txns);
+        keys = ImmutableMap.copyOf(builder.keys);
+    }
+
+    public static class TxnState
+    {
+        public final TxnId txnId;
+        public final Timestamp executeAt;
+        public final SaveStatus saveStatus;
+        public final List<TxnId> blockedBy;
+        public final Set<PartitionKey> blockedByKey;
+
+        public TxnState(Builder.TxnBuilder builder)
+        {
+            txnId = builder.txnId;
+            executeAt = builder.executeAt;
+            saveStatus = builder.saveStatus;
+            blockedBy = ImmutableList.copyOf(builder.blockedBy);
+            blockedByKey = ImmutableSet.copyOf(builder.blockedByKey);
+        }
+
+        public boolean isBlocked()
+        {
+            return !notBlocked();
+        }
+
+        public boolean notBlocked()
+        {
+            return blockedBy.isEmpty() && blockedByKey.isEmpty();
+        }
+    }
+
+    public static class Builder
+    {
+        final int storeId;
+        final Map<TxnId, TxnState> txns = new LinkedHashMap<>();
+        final Map<PartitionKey, TxnId> keys = new LinkedHashMap<>();
+
+        public Builder(int storeId)
+        {
+            this.storeId = storeId;
+        }
+
+        boolean knows(TxnId id)
+        {
+            return txns.containsKey(id);
+        }
+
+        public CommandStoreTxnBlockedGraph build()
+        {
+            return new CommandStoreTxnBlockedGraph(this);
+        }
+
+        public TxnBuilder txn(TxnId txnId, Timestamp executeAt, SaveStatus 
saveStatus)
+        {
+            return new TxnBuilder(txnId, executeAt, saveStatus);
+        }
+
+        public class TxnBuilder
+        {
+            final TxnId txnId;
+            final Timestamp executeAt;
+            final SaveStatus saveStatus;
+            List<TxnId> blockedBy = new ArrayList<>();
+            Set<PartitionKey> blockedByKey = new LinkedHashSet<>();
+
+            public TxnBuilder(TxnId txnId, Timestamp executeAt, SaveStatus 
saveStatus)
+            {
+                this.txnId = txnId;
+                this.executeAt = executeAt;
+                this.saveStatus = saveStatus;
+            }
+
+            public TxnState build()
+            {
+                TxnState state = new TxnState(this);
+                txns.put(txnId, state);
+                return state;
+            }
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java 
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index c79bbbfcd3..9e49cf3c47 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.utils.concurrent.Future;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+
 public interface IAccordService
 {
     Set<ConsistencyLevel> SUPPORTED_COMMIT_CONSISTENCY_LEVELS = 
ImmutableSet.of(ConsistencyLevel.ANY, ConsistencyLevel.ONE, 
ConsistencyLevel.LOCAL_ONE, ConsistencyLevel.QUORUM, ConsistencyLevel.SERIAL, 
ConsistencyLevel.ALL);
@@ -143,4 +144,6 @@ public interface IAccordService
     CompactionInfo getCompactionInfo();
 
     default Id nodeId() { throw new UnsupportedOperationException(); }
+
+    List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId txnId);
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
 
b/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
index 4ebfc8fdb0..c9fc1bd14b 100644
--- 
a/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
+++ 
b/src/java/org/apache/cassandra/service/accord/exceptions/ReadExhaustedException.java
@@ -18,22 +18,15 @@
 
 package org.apache.cassandra.service.accord.exceptions;
 
-import java.util.Map;
+import com.google.common.collect.ImmutableMap;
 
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.exceptions.ReadFailureException;
-import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.locator.InetAddressAndPort;
 
 public class ReadExhaustedException extends ReadFailureException
 {
-    public ReadExhaustedException(ConsistencyLevel consistency, int received, 
int blockFor, boolean dataPresent, Map<InetAddressAndPort, 
RequestFailureReason> failureReasonByEndpoint)
+    public ReadExhaustedException(ConsistencyLevel consistency, int received, 
int blockFor, boolean dataPresent, String msg)
     {
-        super(consistency, received, blockFor, dataPresent, 
failureReasonByEndpoint);
-    }
-
-    protected ReadExhaustedException(String msg, ConsistencyLevel consistency, 
int received, int blockFor, boolean dataPresent, Map<InetAddressAndPort, 
RequestFailureReason> failureReasonByEndpoint)
-    {
-        super(msg, consistency, received, blockFor, dataPresent, 
failureReasonByEndpoint);
+        super(msg, consistency, received, blockFor, dataPresent, 
ImmutableMap.of());
     }
 }
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java 
b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index dfb65e07f2..4c93a78e4d 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -69,6 +69,7 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
 import org.apache.cassandra.db.marshal.ByteBufferAccessor;
+import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
 import org.assertj.core.api.Assertions;
 import org.awaitility.Awaitility;
 import org.apache.commons.lang3.ArrayUtils;
@@ -499,6 +500,11 @@ public abstract class CQLTester
         VirtualKeyspaceRegistry.instance.register(new 
VirtualKeyspace(VIRTUAL_METRICS, createMetricsKeyspaceTables()));
     }
 
+    protected static void addVirtualKeyspace()
+    {
+        
VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
+    }
+
     protected void resetSchema() throws Throwable
     {
         for (TableMetadata table : SchemaKeyspace.metadata().tables)
@@ -1943,6 +1949,133 @@ public abstract class CQLTester
         Assert.assertEquals(String.format("expected %d rows but received %d", 
expectedCount, actualRowCount), expectedCount, actualRowCount);
     }
 
+    public abstract static class CellValidator
+    {
+        public abstract ByteBuffer expected();
+        public abstract boolean equals(ByteBuffer bb);
+
+        @Override
+        public boolean equals(Object obj)
+        {
+            if (obj instanceof ByteBuffer)
+                return equals((ByteBuffer) obj);
+            return false;
+        }
+
+        public abstract String describe();
+    }
+
+    protected static CellValidator any()
+    {
+        return new CellValidator()
+        {
+            @Override
+            public ByteBuffer expected()
+            {
+                return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            }
+
+            @Override
+            public boolean equals(ByteBuffer bb)
+            {
+                return true;
+            }
+
+            @Override
+            public String describe()
+            {
+                return "any";
+            }
+        };
+    }
+
+    protected static CellValidator anyNonNull()
+    {
+        return new CellValidator()
+        {
+            @Override
+            public ByteBuffer expected()
+            {
+                return ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            }
+
+            @Override
+            public boolean equals(ByteBuffer bb)
+            {
+                return !(bb == null || !bb.hasRemaining());
+            }
+
+            @Override
+            public String describe()
+            {
+                return "any non-null";
+            }
+        };
+    }
+
+    protected static CellValidator anyInt()
+    {
+        return new CellValidator()
+        {
+            @Override
+            public ByteBuffer expected()
+            {
+                return ByteBufferUtil.bytes(0);
+            }
+
+            @Override
+            public boolean equals(ByteBuffer bb)
+            {
+                if (bb == null) return false;
+                Int32Type.instance.validate(bb);
+                return bb.hasRemaining();
+            }
+
+            @Override
+            public String describe()
+            {
+                return "any non-null int";
+            }
+        };
+    }
+
+    protected static CellValidator anyOf(String... values)
+    {
+        return anyOf(UTF8Type.instance, values);
+    }
+
+    protected static <T> CellValidator anyOf(AbstractType<T> type, T... values)
+    {
+        assert values.length > 0;
+        ByteBuffer[] bbs = new ByteBuffer[values.length];
+        for (int i = 0; i < values.length; i++)
+            bbs[i] = type.decompose(values[i]);
+        return new CellValidator()
+        {
+            @Override
+            public ByteBuffer expected()
+            {
+                return bbs[0];
+            }
+
+            @Override
+            public boolean equals(ByteBuffer bb)
+            {
+                for (int i = 0; i < bbs.length; i++)
+                {
+                    if (Objects.equal(bbs[i], bb)) return true;
+                }
+                return false;
+            }
+
+            @Override
+            public String describe()
+            {
+                return formatValue(bbs[0], type);
+            }
+        };
+    }
+
     public static void assertRows(UntypedResultSet result, Object[]... rows)
     {
         if (result == null)
@@ -1966,24 +2099,22 @@ public abstract class CQLTester
             for (int j = 0; j < meta.size(); j++)
             {
                 ColumnSpecification column = meta.get(j);
-                ByteBuffer expectedByteValue = makeByteBuffer(expected == null 
? null : expected[j], column.type);
+                CellValidator cellValidator = makeCellValidator(expected == 
null ? null : expected[j], column.type);
                 ByteBuffer actualValue = 
actual.getBytes(column.name.toString());
 
-                if (expectedByteValue != null)
-                    expectedByteValue = expectedByteValue.duplicate();
-                if (!Objects.equal(expectedByteValue, actualValue))
+                if (!((cellValidator == null && actualValue == null) || 
(cellValidator != null && cellValidator.equals(actualValue))))
                 {
                     Object actualValueDecoded = actualValue == null ? null : 
column.type.getSerializer().deserialize(actualValue);
                     if (!Objects.equal(expected != null ? expected[j] : null, 
actualValueDecoded))
                     {
-                        if (isEmptyContainerNull(column.type, 
expectedByteValue, actualValue))
+                        if (isEmptyContainerNull(column.type, cellValidator != 
null ? cellValidator.expected() : null, actualValue))
                             continue;
                         error.append(String.format("Invalid value for row %d 
column %d (%s of type %s), expected <%s> but got <%s>",
                                                    i,
                                                    j,
                                                    column.name,
                                                    column.type.asCQL3Type(),
-                                                   
formatValue(expectedByteValue != null ? expectedByteValue.duplicate() : null, 
column.type),
+                                                   cellValidator != null ? 
cellValidator.describe() : "null",
                                                    formatValue(actualValue, 
column.type))).append("\n");
                     }
                 }
@@ -2007,14 +2138,30 @@ public abstract class CQLTester
                     ByteBuffer actualValue = 
actual.getBytes(column.name.toString());
                     str.append(String.format("%s=%s ", column.name, 
formatValue(actualValue, column.type)));
                 }
-                logger.info("Extra row num {}: {}", i, str.toString());
+                logger.info("Extra row num {}: {}", i, str);
             }
-            Assert.fail(String.format("Got more rows than expected. Expected 
%d but got %d.", rows.length, i));
+            Assert.fail(String.format("Got more rows than expected. Expected 
%d but got %d.\nExpected: %s\nActual: %s", rows.length, i, toString(rows), 
result.toStringUnsafe()));
         }
 
         Assert.assertTrue(String.format("Got %s rows than expected. Expected 
%d but got %d", rows.length>i ? "less" : "more", rows.length, i), i == 
rows.length);
     }
 
+    private static String toString(Object o)
+    {
+        if (o == null)
+            return "null";
+        if (o instanceof CellValidator)
+            return ((CellValidator) o).describe();
+        if (o instanceof Object[])
+            return toString((Object[]) o);
+        return o.toString();
+    }
+
+    private static String toString(Object[] array)
+    {
+        return 
Stream.of(array).map(CQLTester::toString).collect(Collectors.joining(", ", "[", 
"]"));
+    }
+
     /**
      * Like assertRows(), but ignores the ordering of rows.
      */
@@ -2615,11 +2762,42 @@ public abstract class CQLTester
             return ((TupleValue)value).toByteBuffer();
 
         if (value instanceof ByteBuffer)
-            return (ByteBuffer)value;
+            return ((ByteBuffer)value);
 
         return type.decomposeUntyped(serializeTuples(value));
     }
 
+    public static CellValidator makeCellValidator(Object value, 
AbstractType<?> type)
+    {
+        if (value == null)
+            return null;
+        if (value instanceof CellValidator)
+            return (CellValidator) value;
+
+        ByteBuffer byteBuffer = makeByteBuffer(value, type);
+        return new CellValidator()
+        {
+            @Override
+            public ByteBuffer expected()
+            {
+                return byteBuffer;
+            }
+
+            @Override
+            public boolean equals(ByteBuffer bb)
+            {
+                if (bb == null) return false;
+                return byteBuffer.equals(bb);
+            }
+
+            @Override
+            public String describe()
+            {
+                return formatValue(byteBuffer, type);
+            }
+        };
+    }
+
     private static String formatValue(ByteBuffer bb, AbstractType<?> type)
     {
         if (bb == null)
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
new file mode 100644
index 0000000000..59eae7ab0c
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.local.SaveStatus;
+import accord.messages.TxnRequest;
+import accord.primitives.Routable;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.utils.async.AsyncChains;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.OptionaldPositiveInt;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.concurrent.Condition;
+import org.awaitility.Awaitility;
+
+import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
+
+public class AccordVirtualTablesTest extends CQLTester
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordVirtualTablesTest.class);
+
+    private static final String QUERY_TXN_BLOCKED_BY = "SELECT * FROM 
system_views.txn_blocked_by WHERE txn_id=?";
+    private static final String QUERY_TXN_STATUS = "SELECT save_status FROM 
system_views.txn_blocked_by WHERE txn_id=? LIMIT 1";
+
+    @BeforeClass
+    public static void setUpClass()
+    {
+        daemonInitialization();
+        DatabaseDescriptor.getAccord().shard_count = new 
OptionaldPositiveInt(1);
+
+        CQLTester.setUpClass();
+
+        AccordService.startup(ClusterMetadata.current().myNodeId());
+        addVirtualKeyspace();
+        requireNetwork();
+    }
+
+    @Test
+    public void unknownIsEmpty()
+    {
+        createTable("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY (k, c)) 
WITH transactional_mode = 'full'");
+        assertRows(execute(QUERY_TXN_BLOCKED_BY, TxnId.NONE.toString()));
+    }
+
+    @Test
+    public void completedTxn() throws ExecutionException, InterruptedException
+    {
+        String tableName = createTable("CREATE TABLE %s (k int, c int, v int, 
PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+        var accord = accord();
+        TxnId id = accord.node().nextTxnId(Txn.Kind.Write, 
Routable.Domain.Key);
+        Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, 
v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
+        AsyncChains.getBlocking(accord.node().coordinate(id, txn));
+
+        assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
+                   row(id.toString(), anyInt(), 0, "", "Self", any(), null, 
anyOf(SaveStatus.Applying.name(), SaveStatus.Applied.name())));
+    }
+
+    @Test
+    public void inflight() throws ExecutionException, InterruptedException
+    {
+        AccordMsgFilter filter = new AccordMsgFilter();
+        MessagingService.instance().outboundSink.add(filter);
+        try
+        {
+            String tableName = createTable("CREATE TABLE %s (k int, c int, v 
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+            var accord = accord();
+            TxnId id = accord.node().nextTxnId(Txn.Kind.Write, 
Routable.Domain.Key);
+            Txn txn = createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, 
c, v) VALUES (?, ?, ?)", KEYSPACE, tableName)), 0, 0, 0);
+            accord.node().coordinate(id, txn);
+
+            filter.preAccept.awaitThrowUncheckedOnInterrupt();
+
+            assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
+                       row(id.toString(), anyInt(), 0, "", "Self", any(), 
null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+
+            filter.apply.awaitThrowUncheckedOnInterrupt();
+            assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
+                       row(id.toString(), anyInt(), 0, "", "Self", any(), 
null, SaveStatus.ReadyToExecute.name()));
+        }
+        finally
+        {
+            MessagingService.instance().outboundSink.remove(filter);
+        }
+    }
+
+    @Test
+    public void blocked() throws ExecutionException, InterruptedException
+    {
+        AccordMsgFilter filter = new AccordMsgFilter();
+        MessagingService.instance().outboundSink.add(filter);
+        try
+        {
+            String tableName = createTable("CREATE TABLE %s (k int, c int, v 
int, PRIMARY KEY (k, c)) WITH transactional_mode = 'full'");
+            var accord = accord();
+            TxnId first = accord.node().nextTxnId(Txn.Kind.Write, 
Routable.Domain.Key);
+            accord.node().coordinate(first, 
createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, 
?)", KEYSPACE, tableName)), 0, 0, 0));
+
+            filter.preAccept.awaitThrowUncheckedOnInterrupt();
+            assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
+                       row(first.toString(), anyInt(), 0, "", "Self", any(), 
null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+
+            filter.apply.awaitThrowUncheckedOnInterrupt();
+            assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
+                       row(first.toString(), anyInt(), 0, "", "Self", 
anyNonNull(), null, SaveStatus.ReadyToExecute.name()));
+
+            filter.reset();
+
+            TxnId second = accord.node().nextTxnId(Txn.Kind.Write, 
Routable.Domain.Key);
+            accord.node().coordinate(second, 
createTxn(wrapInTxn(String.format("INSERT INTO %s.%s(k, c, v) VALUES (?, ?, 
?)", KEYSPACE, tableName)), 0, 0, 0));
+
+            filter.commit.awaitThrowUncheckedOnInterrupt();
+
+            Awaitility.await("waiting on key").atMost(1, TimeUnit.MINUTES)
+                                          .until(() -> {
+                                              UntypedResultSet rs = 
execute(QUERY_TXN_BLOCKED_BY, second.toString());
+                                              return rs.size() == 2;
+                                          });
+            assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()),
+                       row(second.toString(), anyInt(), 0, "", "Self", 
anyNonNull(), null, SaveStatus.Stable.name()),
+                       row(second.toString(), anyInt(), 1, first.toString(), 
"Key", anyNonNull(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
+        }
+        finally
+        {
+            MessagingService.instance().outboundSink.remove(filter);
+        }
+    }
+
+    private static AccordService accord()
+    {
+        return (AccordService) AccordService.instance();
+    }
+
+    private static class AccordMsgFilter implements BiPredicate<Message<?>, 
InetAddressAndPort>
+    {
+        volatile Condition preAccept = Condition.newOneTimeCondition();
+        volatile Condition commit = Condition.newOneTimeCondition();
+        volatile Condition apply = Condition.newOneTimeCondition();
+
+        void reset()
+        {
+            preAccept = Condition.newOneTimeCondition();
+            commit = Condition.newOneTimeCondition();
+            apply = Condition.newOneTimeCondition();
+        }
+
+        ConcurrentMap<TxnId, ConcurrentSkipListSet<Verb>> txnToVerbs = new 
ConcurrentHashMap<>();
+
+        @Override
+        public boolean test(Message<?> msg, InetAddressAndPort to)
+        {
+            if (!msg.verb().name().startsWith("ACCORD_"))
+                return true;
+            TxnId txnId = null;
+            if (msg.payload instanceof TxnRequest)
+            {
+                txnId = ((TxnRequest) msg.payload).txnId;
+            }
+            Set<Verb> seen = null;
+            if (txnId != null)
+            {
+                seen = txnToVerbs.computeIfAbsent(txnId, ignore -> new 
ConcurrentSkipListSet<>());
+                seen.add(msg.verb());
+            }
+            switch (msg.verb())
+            {
+                case ACCORD_APPLY_REQ:
+                case ACCORD_APPLY_AND_WAIT_REQ:
+                    apply.signalAll();
+                case ACCORD_BEGIN_RECOVER_REQ:
+                    return false;
+                case ACCORD_PRE_ACCEPT_RSP:
+                    preAccept.signalAll();
+                    return true;
+                case ACCORD_COMMIT_REQ:
+                    commit.signalAll();
+                    return true;
+                case ACCORD_PRE_ACCEPT_REQ:
+                case ACCORD_CHECK_STATUS_REQ:
+                case ACCORD_CHECK_STATUS_RSP:
+                case ACCORD_READ_RSP:
+                    return true;
+                default:
+                    // many code paths don't log the error...
+                    UnsupportedOperationException e = new 
UnsupportedOperationException(msg.verb().name());
+                    logger.error("Unexpected verb {}", msg.verb(), e);
+                    throw e;
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java
index 3abdcf8080..f6918422e9 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordServiceTest.java
@@ -26,6 +26,7 @@ import java.util.function.Supplier;
 
 import org.junit.Test;
 
+import accord.api.BarrierType;
 import accord.coordinate.Exhausted;
 import accord.coordinate.Preempted;
 import accord.coordinate.Timeout;
@@ -64,19 +65,19 @@ public class AccordServiceTest
                         throw new Timeout(null, null);
                     case 1:
                         attempts++;
-                        throw AccordService.newBarrierTimeout(TxnId.NONE, 
true);
+                        throw AccordService.newBarrierTimeout(TxnId.NONE, 
BarrierType.local, true, Ranges.EMPTY);
                     case 2:
                         attempts++;
                         throw new Preempted(null, null);
                     case 3:
                         attempts++;
-                        throw AccordService.newBarrierPreempted(TxnId.NONE, 
true);
+                        throw AccordService.newBarrierPreempted(TxnId.NONE, 
BarrierType.local, true, Ranges.EMPTY);
                     case 4:
                         attempts++;
                         throw new Exhausted(null, null);
                     case 5:
                         attempts++;
-                        throw AccordService.newBarrierExhausted(TxnId.NONE, 
true);
+                        throw AccordService.newBarrierExhausted(TxnId.NONE, 
BarrierType.local, true, Ranges.EMPTY);
                     default:
                         return Ranges.of(IntKey.range(1, 2));
                 }
@@ -98,9 +99,9 @@ public class AccordServiceTest
         qt().check(rs -> {
             List<Runnable> timeoutFailures = new ArrayList<>(4);
             timeoutFailures.add(() -> {throw new Timeout(null, null);});
-            timeoutFailures.add(() -> {throw 
AccordService.newBarrierTimeout(TxnId.NONE, true);});
+            timeoutFailures.add(() -> {throw 
AccordService.newBarrierTimeout(TxnId.NONE, BarrierType.local, true, 
Ranges.EMPTY);});
             timeoutFailures.add(() -> {throw new Preempted(null, null);});
-            timeoutFailures.add(() -> {throw 
AccordService.newBarrierPreempted(TxnId.NONE, true);});
+            timeoutFailures.add(() -> {throw 
AccordService.newBarrierPreempted(TxnId.NONE, BarrierType.local, true, 
Ranges.EMPTY);});
             Collections.shuffle(timeoutFailures, rs.asJdkRandom());
             Iterator<Runnable> it = timeoutFailures.iterator();
             Supplier<Seekables> failing = () -> {
@@ -120,9 +121,9 @@ public class AccordServiceTest
         qt().check(rs -> {
             List<Runnable> timeoutFailures = new ArrayList<>(5);
             timeoutFailures.add(() -> {throw new Timeout(null, null);});
-            timeoutFailures.add(() -> {throw 
AccordService.newBarrierTimeout(TxnId.NONE, true);});
+            timeoutFailures.add(() -> {throw 
AccordService.newBarrierTimeout(TxnId.NONE, BarrierType.local, true, 
Ranges.EMPTY);});
             timeoutFailures.add(() -> {throw new Preempted(null, null);});
-            timeoutFailures.add(() -> {throw 
AccordService.newBarrierPreempted(TxnId.NONE, true);});
+            timeoutFailures.add(() -> {throw 
AccordService.newBarrierPreempted(TxnId.NONE, BarrierType.local, true, 
Ranges.EMPTY);});
             timeoutFailures.add(() -> {throw new Exhausted(null, null);});
             Collections.shuffle(timeoutFailures, rs.asJdkRandom());
             Iterator<Runnable> it = timeoutFailures.iterator();
@@ -158,9 +159,9 @@ public class AccordServiceTest
         qt().check(rs -> {
             List<Runnable> failures = new ArrayList<>(6);
             failures.add(() -> {throw new Timeout(null, null);});
-            failures.add(() -> {throw 
AccordService.newBarrierTimeout(TxnId.NONE, true);});
+            failures.add(() -> {throw 
AccordService.newBarrierTimeout(TxnId.NONE, BarrierType.local, true, 
Ranges.EMPTY);});
             failures.add(() -> {throw new Preempted(null, null);});
-            failures.add(() -> {throw 
AccordService.newBarrierPreempted(TxnId.NONE, true);});
+            failures.add(() -> {throw 
AccordService.newBarrierPreempted(TxnId.NONE, BarrierType.local, true, 
Ranges.EMPTY);});
             failures.add(() -> {throw new Exhausted(null, null);});
             boolean isError = rs.nextBoolean();
             failures.add(new Unexpected(isError));
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index bfbf4e5095..54a311a67b 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -287,6 +287,11 @@ public class AccordTestUtils
         return createTxn(query, QueryOptions.DEFAULT);
     }
 
+    public static Txn createTxn(String query, Object... binds)
+    {
+        return createTxn(query, Arrays.asList(binds));
+    }
+
     public static Txn createTxn(String query, List<Object> binds)
     {
         TransactionStatement statement = parse(query);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to