This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 59b45eb8bf Add max depth to blocked txn vtable
59b45eb8bf is described below

commit 59b45eb8bf753bc75fa37ab327329ef7e88301f2
Author: Alex Petrov <alexpet...@apple.com>
AuthorDate: Wed Jul 30 15:31:59 2025 +0200

    Add max depth to blocked txn vtable
    
    Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20839
---
 .../cassandra/db/virtual/AbstractVirtualTable.java |  2 +-
 .../cassandra/db/virtual/AccordDebugKeyspace.java  | 48 ++++++++++++++++++----
 .../db/virtual/AccordDebugKeyspaceTest.java        |  4 ++
 3 files changed, 46 insertions(+), 8 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java 
b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
index df2e4bc7cc..a32ea67ab6 100644
--- a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -76,7 +76,7 @@ public abstract class AbstractVirtualTable implements 
VirtualTable
     }
 
     @Override
-    public final UnfilteredPartitionIterator select(DecoratedKey partitionKey, 
ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, 
RowFilter rowFilter)
+    public UnfilteredPartitionIterator select(DecoratedKey partitionKey, 
ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, 
RowFilter rowFilter)
     {
         Partition partition = data(partitionKey).getPartition(partitionKey);
 
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index efe683af71..f7c89754e3 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -41,6 +41,14 @@ import javax.annotation.Nullable;
 import com.google.common.collect.BoundType;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -138,6 +146,7 @@ import static java.lang.String.format;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 public class AccordDebugKeyspace extends VirtualKeyspace
@@ -1417,8 +1426,33 @@ public class AccordDebugKeyspace extends VirtualKeyspace
         }
 
         @Override
-        public DataSet data(DecoratedKey partitionKey)
+        public UnfilteredPartitionIterator select(DecoratedKey partitionKey, 
ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, 
RowFilter rowFilter)
         {
+            Partition partition = data(partitionKey, 
rowFilter).getPartition(partitionKey);
+
+            if (null == partition)
+                return EmptyIterators.unfilteredPartition(metadata);
+
+            long now = currentTimeMillis();
+            UnfilteredRowIterator rowIterator = 
partition.toRowIterator(metadata(), clusteringIndexFilter, columnFilter, now);
+            return new SingletonUnfilteredPartitionIterator(rowIterator);
+        }
+
+        public DataSet data(DecoratedKey partitionKey, RowFilter rowFilter)
+        {
+            int maxDepth = Integer.MAX_VALUE;
+            if (rowFilter != null && rowFilter.getExpressions().size() > 0)
+            {
+                Invariants.require(rowFilter.getExpressions().size() == 1, 
"Only depth filter is supported");
+                RowFilter.Expression expression = 
rowFilter.getExpressions().get(0);
+                
Invariants.require(expression.column().name.toString().equals("depth"), "Only 
depth filter is supported, but got: %s", expression.column().name);
+                Invariants.require(expression.operator() == Operator.LT || 
expression.operator() == Operator.LTE, "Only < and <= queries are supported");
+                if (expression.operator() == Operator.LT)
+                    maxDepth = expression.getIndexValue().getInt(0);
+                else
+                    maxDepth = expression.getIndexValue().getInt(0) + 1;
+            }
+
             TxnId id = 
TxnId.parse(UTF8Type.instance.compose(partitionKey.getKey()));
             List<CommandStoreTxnBlockedGraph> shards = 
AccordService.instance().debugTxnBlockedGraph(id);
 
@@ -1427,7 +1461,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
             for (CommandStoreTxnBlockedGraph shard : shards)
             {
                 Set<TxnId> processed = new HashSet<>();
-                process(ds, commandStores, shard, processed, id, 0, id, 
Reason.Self, null);
+                process(ds, commandStores, shard, processed, id, 0, maxDepth, 
id, Reason.Self, null);
                 // everything was processed right?
                 if (!shard.txns.isEmpty() && 
!shard.txns.keySet().containsAll(processed))
                     Invariants.expect(false, "Skipped txns: " + 
Sets.difference(shard.txns.keySet(), processed));
@@ -1436,7 +1470,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
             return ds;
         }
 
-        private void process(SimpleDataSet ds, CommandStores commandStores, 
CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int 
depth, TxnId txnId, Reason reason, Runnable onDone)
+        private void process(SimpleDataSet ds, CommandStores commandStores, 
CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int 
depth, int maxDepth, TxnId txnId, Reason reason, Runnable onDone)
         {
             if (!processed.add(txnId))
                 throw new IllegalStateException("Double processed " + txnId);
@@ -1462,15 +1496,15 @@ public class AccordDebugKeyspace extends VirtualKeyspace
             {
                 for (TxnId blockedBy : txn.blockedBy)
                 {
-                    if (!processed.contains(blockedBy))
-                        process(ds, commandStores, shard, processed, userTxn, 
depth + 1, blockedBy, Reason.Txn, null);
+                    if (!processed.contains(blockedBy) && depth < maxDepth)
+                        process(ds, commandStores, shard, processed, userTxn, 
depth + 1, maxDepth, blockedBy, Reason.Txn, null);
                 }
 
                 for (TokenKey blockedBy : txn.blockedByKey)
                 {
                     TxnId blocking = shard.keys.get(blockedBy);
-                    if (!processed.contains(blocking))
-                        process(ds, commandStores, shard, processed, userTxn, 
depth + 1, blocking, Reason.Key, () -> ds.column("key", printToken(blockedBy)));
+                    if (!processed.contains(blocking) && depth < maxDepth)
+                        process(ds, commandStores, shard, processed, userTxn, 
depth + 1, maxDepth, blocking, Reason.Key, () -> ds.column("key", 
printToken(blockedBy)));
                 }
             }
         }
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 1d500f497a..948087e2f9 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -305,6 +305,10 @@ public class AccordDebugKeyspaceTest extends CQLTester
             assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()),
                        row(second.toString(), KEYSPACE, tableName, anyInt(), 
0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, 
SaveStatus.Stable.name()),
                        row(second.toString(), KEYSPACE, tableName, anyInt(), 
1, first.toString(), "Key", anyNonNull(), anyNonNull(), 
SaveStatus.ReadyToExecute.name()));
+
+            assertRows(execute(QUERY_TXN_BLOCKED_BY + " AND depth < 1", 
second.toString()),
+                       row(second.toString(), KEYSPACE, tableName, anyInt(), 
0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, 
SaveStatus.Stable.name()));
+
         }
         finally
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to