This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 59b45eb8bf Add max depth to blocked txn vtable 59b45eb8bf is described below commit 59b45eb8bf753bc75fa37ab327329ef7e88301f2 Author: Alex Petrov <alexpet...@apple.com> AuthorDate: Wed Jul 30 15:31:59 2025 +0200 Add max depth to blocked txn vtable Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20839 --- .../cassandra/db/virtual/AbstractVirtualTable.java | 2 +- .../cassandra/db/virtual/AccordDebugKeyspace.java | 48 ++++++++++++++++++---- .../db/virtual/AccordDebugKeyspaceTest.java | 4 ++ 3 files changed, 46 insertions(+), 8 deletions(-) diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java index df2e4bc7cc..a32ea67ab6 100644 --- a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java +++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java @@ -76,7 +76,7 @@ public abstract class AbstractVirtualTable implements VirtualTable } @Override - public final UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, RowFilter rowFilter) + public UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, RowFilter rowFilter) { Partition partition = data(partitionKey).getPartition(partitionKey); diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index efe683af71..f7c89754e3 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -41,6 +41,14 @@ import javax.annotation.Nullable; import com.google.common.collect.BoundType; import com.google.common.collect.Range; import com.google.common.collect.Sets; +import org.apache.cassandra.cql3.Operator; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,6 +146,7 @@ import static java.lang.String.format; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG; +import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; public class AccordDebugKeyspace extends VirtualKeyspace @@ -1417,8 +1426,33 @@ public class AccordDebugKeyspace extends VirtualKeyspace } @Override - public DataSet data(DecoratedKey partitionKey) + public UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter, RowFilter rowFilter) { + Partition partition = data(partitionKey, rowFilter).getPartition(partitionKey); + + if (null == partition) + return EmptyIterators.unfilteredPartition(metadata); + + long now = currentTimeMillis(); + UnfilteredRowIterator rowIterator = partition.toRowIterator(metadata(), clusteringIndexFilter, columnFilter, now); + return new SingletonUnfilteredPartitionIterator(rowIterator); + } + + public DataSet data(DecoratedKey partitionKey, RowFilter rowFilter) + { + int maxDepth = Integer.MAX_VALUE; + if (rowFilter != null && rowFilter.getExpressions().size() > 0) + { + Invariants.require(rowFilter.getExpressions().size() == 1, "Only depth filter is supported"); + RowFilter.Expression expression = rowFilter.getExpressions().get(0); + Invariants.require(expression.column().name.toString().equals("depth"), "Only depth filter is supported, but got: %s", expression.column().name); + Invariants.require(expression.operator() == Operator.LT || expression.operator() == Operator.LTE, "Only < and <= queries are supported"); + if (expression.operator() == Operator.LT) + maxDepth = expression.getIndexValue().getInt(0); + else + maxDepth = expression.getIndexValue().getInt(0) + 1; + } + TxnId id = TxnId.parse(UTF8Type.instance.compose(partitionKey.getKey())); List<CommandStoreTxnBlockedGraph> shards = AccordService.instance().debugTxnBlockedGraph(id); @@ -1427,7 +1461,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace for (CommandStoreTxnBlockedGraph shard : shards) { Set<TxnId> processed = new HashSet<>(); - process(ds, commandStores, shard, processed, id, 0, id, Reason.Self, null); + process(ds, commandStores, shard, processed, id, 0, maxDepth, id, Reason.Self, null); // everything was processed right? if (!shard.txns.isEmpty() && !shard.txns.keySet().containsAll(processed)) Invariants.expect(false, "Skipped txns: " + Sets.difference(shard.txns.keySet(), processed)); @@ -1436,7 +1470,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace return ds; } - private void process(SimpleDataSet ds, CommandStores commandStores, CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason reason, Runnable onDone) + private void process(SimpleDataSet ds, CommandStores commandStores, CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int depth, int maxDepth, TxnId txnId, Reason reason, Runnable onDone) { if (!processed.add(txnId)) throw new IllegalStateException("Double processed " + txnId); @@ -1462,15 +1496,15 @@ public class AccordDebugKeyspace extends VirtualKeyspace { for (TxnId blockedBy : txn.blockedBy) { - if (!processed.contains(blockedBy)) - process(ds, commandStores, shard, processed, userTxn, depth + 1, blockedBy, Reason.Txn, null); + if (!processed.contains(blockedBy) && depth < maxDepth) + process(ds, commandStores, shard, processed, userTxn, depth + 1, maxDepth, blockedBy, Reason.Txn, null); } for (TokenKey blockedBy : txn.blockedByKey) { TxnId blocking = shard.keys.get(blockedBy); - if (!processed.contains(blocking)) - process(ds, commandStores, shard, processed, userTxn, depth + 1, blocking, Reason.Key, () -> ds.column("key", printToken(blockedBy))); + if (!processed.contains(blocking) && depth < maxDepth) + process(ds, commandStores, shard, processed, userTxn, depth + 1, maxDepth, blocking, Reason.Key, () -> ds.column("key", printToken(blockedBy))); } } } diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java index 1d500f497a..948087e2f9 100644 --- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java @@ -305,6 +305,10 @@ public class AccordDebugKeyspaceTest extends CQLTester assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()), row(second.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.Stable.name()), row(second.toString(), KEYSPACE, tableName, anyInt(), 1, first.toString(), "Key", anyNonNull(), anyNonNull(), SaveStatus.ReadyToExecute.name())); + + assertRows(execute(QUERY_TXN_BLOCKED_BY + " AND depth < 1", second.toString()), + row(second.toString(), KEYSPACE, tableName, anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.Stable.name())); + } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org