This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit dc2d113677e940e9a099b37e05887c66cc717e39
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Thu Oct 24 13:02:52 2024 +0100

    Implement missing virtual tables for Accord debugging
    
    patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith and David
    Capwell for CASSANDRA-20062
---
 .../org/apache/cassandra/config/AccordSpec.java    |  15 +
 .../cassandra/db/virtual/AccordDebugKeyspace.java  | 634 +++++++++++++++++++++
 .../cassandra/db/virtual/AccordVirtualTables.java  | 316 ----------
 .../cassandra/db/virtual/VirtualKeyspace.java      |  20 +-
 .../apache/cassandra/db/virtual/VirtualTable.java  |   8 -
 .../apache/cassandra/schema/SchemaConstants.java   |   4 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   4 +
 .../cassandra/service/accord/AccordService.java    |   6 +
 .../cassandra/distributed/impl/InstanceConfig.java |   1 +
 .../cassandra/distributed/shared/ClusterUtils.java |   7 +-
 .../distributed/test/QueriesTableTest.java         |   8 -
 .../distributed/test/accord/AccordMetricsTest.java |   7 +-
 .../test/accord/AccordMigrationTest.java           |   6 +-
 ...ablesTest.java => AccordDebugKeyspaceTest.java} |  26 +-
 14 files changed, 693 insertions(+), 369 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index b445e4492e..f062c9a570 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -38,6 +38,21 @@ public class AccordSpec
 
     public volatile boolean enable_journal_compaction = true;
 
+    /**
+     * Enables the virtual Accord debug-only keyspace with tables
+     * that expose internal state to aid the developers working
+     * on Accord implementation.
+     * <p/>
+     * These tables can and will change and/or go away at any point,
+     * including in a minor release, are not to be considered part of the API,
+     * and are NOT to be relied on for anything.
+     * <p/>
+     * Only enable this keyspace if you are working on Accord and
+     * need to debug an issue with Accord implementation, or if an Accord
+     * developer asked you to.
+     */
+    public boolean enable_virtual_debug_only_keyspace = false;
+
     public enum QueueShardModel
     {
         /**
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
new file mode 100644
index 0000000000..77fda644a2
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.RoutingKey;
+import accord.impl.DurabilityScheduling;
+import accord.impl.progresslog.DefaultProgressLog;
+import accord.impl.progresslog.TxnStateKind;
+import accord.local.CommandStores;
+import accord.local.DurableBefore;
+import accord.local.MaxConflicts;
+import accord.local.RedundantBefore;
+import accord.local.RejectBefore;
+import accord.primitives.Status;
+import accord.primitives.TxnId;
+import accord.utils.Invariants;
+import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.TupleType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.UUIDType;
+import org.apache.cassandra.dht.NormalizedRanges;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordCache;
+import org.apache.cassandra.service.accord.AccordCommandStores;
+import org.apache.cassandra.service.accord.AccordExecutor;
+import org.apache.cassandra.service.accord.AccordKeyspace;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.CommandStoreTxnBlockedGraph;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
+import org.apache.cassandra.service.consensus.migration.TableMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.Pair;
+
+import static java.lang.String.format;
+import static java.util.Comparator.comparing;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static accord.utils.async.AsyncChains.getBlockingAndRethrow;
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
+
+public class AccordDebugKeyspace extends VirtualKeyspace
+{
+    public static final String DURABILITY_SCHEDULING = "durability_scheduling";
+    public static final String DURABLE_BEFORE = "durable_before";
+    public static final String EXECUTOR_CACHE = "executor_cache";
+    public static final String MAX_CONFLICTS = "max_conflicts";
+    public static final String MIGRATION_STATE = "migration_state";
+    public static final String PROGRESS_LOG = "progress_log";
+    public static final String REDUNDANT_BEFORE = "redundant_before";
+    public static final String REJECT_BEFORE = "reject_before";
+    public static final String TXN_BLOCKED_BY = "txn_blocked_by";
+
+    // {table_id, token} or {table_id, +Inf/-Inf}
+    private static final TupleType ROUTING_KEY_TYPE = new 
TupleType(List.of(UUIDType.instance, UTF8Type.instance));
+    private static final String ROUTING_KEY_TYPE_STRING = 
ROUTING_KEY_TYPE.asCQL3Type().toString();
+
+    public static final AccordDebugKeyspace instance = new 
AccordDebugKeyspace();
+
+    private AccordDebugKeyspace()
+    {
+        super(VIRTUAL_ACCORD_DEBUG, List.of(
+            new DurabilitySchedulingTable(),
+            new DurableBeforeTable(),
+            new ExecutorCacheTable(),
+            new MaxConflictsTable(),
+            new MigrationStateTable(),
+            new ProgressLogTable(),
+            new RedundantBeforeTable(),
+            new RejectBeforeTable(),
+            new TxnBlockedByTable()
+        ));
+    }
+
+    // TODO (consider): use a different type for the three timestamps in micros
+    public static final class DurabilitySchedulingTable extends 
AbstractVirtualTable
+    {
+        private DurabilitySchedulingTable()
+        {
+            super(parse(VIRTUAL_ACCORD_DEBUG, DURABILITY_SCHEDULING,
+                        "Accord per-Range Durability Scheduling State",
+                        "CREATE TABLE %s (\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                        "  node_offset int,\n" +
+                        "  \"index\" int,\n" +
+                        "  number_of_splits int,\n" +
+                        "  range_started_at bigint,\n" +
+                        "  cycle_started_at bigint,\n" +
+                        "  retry_delay_micros bigint,\n" +
+                        "  is_defunct boolean,\n" +
+                        "  PRIMARY KEY ((range_start, range_end))" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            DurabilityScheduling.ImmutableView view = ((AccordService) 
AccordService.instance()).durabilityScheduling();
+
+            SimpleDataSet ds = new SimpleDataSet(metadata());
+            while (view.advance())
+            {
+                ds.row(decompose(view.range().start()), 
decompose(view.range().end()))
+                  .column("node_offset", view.nodeOffset())
+                  .column("index", view.index())
+                  .column("number_of_splits", view.numberOfSplits())
+                  .column("range_started_at", view.rangeStartedAtMicros())
+                  .column("cycle_started_at", view.cycleStartedAtMicros())
+                  .column("retry_delay_micros", view.retryDelayMicros())
+                  .column("is_defunct", view.isDefunct());
+            }
+            return ds;
+        }
+    }
+
+    public static final class DurableBeforeTable extends AbstractVirtualTable
+    {
+        private DurableBeforeTable()
+        {
+            super(parse(VIRTUAL_ACCORD_DEBUG, DURABLE_BEFORE,
+                        "Accord Node's DurableBefore State",
+                        "CREATE TABLE %s (\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                        "  majority_before text,\n" +
+                        "  universal_before text,\n" +
+                        "  PRIMARY KEY ((range_start, range_end))" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            DurableBefore durableBefore = 
AccordService.instance().node().durableBefore();
+            return durableBefore.foldlWithBounds(
+                (entry, ds, start, end) -> {
+                    ds.row(decompose(start), decompose(end))
+                      .column("majority_before", 
entry.majorityBefore.toString())
+                      .column("universal_before", 
entry.universalBefore.toString());
+                    return ds;
+                },
+                new SimpleDataSet(metadata()),
+                ignore -> false
+            );
+        }
+    }
+
+    public static final class ExecutorCacheTable extends AbstractVirtualTable
+    {
+        private ExecutorCacheTable()
+        {
+            super(parse(VIRTUAL_ACCORD_DEBUG, EXECUTOR_CACHE,
+                        "Accord Executor Cache Metrics",
+                        "CREATE TABLE %s (\n" +
+                        "  executor_id int,\n" +
+                        "  scope text,\n" +
+                        "  queries bigint,\n" +
+                        "  hits bigint,\n" +
+                        "  misses bigint,\n" +
+                        "  PRIMARY KEY (executor_id, scope)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            AccordCommandStores stores = (AccordCommandStores) 
AccordService.instance().node().commandStores();
+            SimpleDataSet ds = new SimpleDataSet(metadata());
+            for (AccordExecutor executor : stores.executors())
+            {
+                try (AccordExecutor.ExclusiveGlobalCaches cache = 
executor.lockCaches())
+                {
+                    addRow(ds, executor.executorId(), AccordKeyspace.COMMANDS, 
cache.commands.statsSnapshot());
+                    addRow(ds, executor.executorId(), 
AccordKeyspace.COMMANDS_FOR_KEY, cache.commandsForKey.statsSnapshot());
+                    addRow(ds, executor.executorId(), 
AccordKeyspace.TIMESTAMPS_FOR_KEY, cache.timestampsForKey.statsSnapshot());
+                }
+            }
+            return ds;
+        }
+
+        private static void addRow(SimpleDataSet ds, int executorId, String 
scope, AccordCache.ImmutableStats stats)
+        {
+            ds.row(executorId, scope)
+              .column("queries", stats.queries)
+              .column("hits", stats.hits)
+              .column("misses", stats.misses);
+        }
+    }
+
+
+    public static final class MaxConflictsTable extends AbstractVirtualTable
+    {
+        private MaxConflictsTable()
+        {
+            super(parse(VIRTUAL_ACCORD_DEBUG, MAX_CONFLICTS,
+                        "Accord per-CommandStore MaxConflicts State",
+                        "CREATE TABLE %s (\n" +
+                        "  command_store_id int,\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                        "  timestamp text,\n" +
+                        "  PRIMARY KEY (command_store_id, range_start, 
range_end)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            CommandStores stores = 
AccordService.instance().node().commandStores();
+            List<Pair<Integer, MaxConflicts>> rangeMaps =
+                getBlockingAndRethrow(stores.map(store -> 
Pair.create(store.commandStore().id(), 
store.commandStore().unsafeGetMaxConflicts())));
+            rangeMaps.sort(comparing(p -> p.left));
+
+            SimpleDataSet dataSet = new SimpleDataSet(metadata());
+            for (Pair<Integer, MaxConflicts> pair : rangeMaps)
+            {
+                int storeId = pair.left;
+                MaxConflicts maxConflicts = pair.right;
+
+                maxConflicts.foldlWithBounds(
+                    (timestamp, ds, start, end) -> ds.row(storeId, 
decompose(start), decompose(end)).column("timestamp", timestamp.toString()),
+                    dataSet,
+                    ignore -> false
+                );
+            }
+            return dataSet;
+        }
+    }
+
+    public static final class MigrationStateTable extends AbstractVirtualTable
+    {
+        private static final Logger logger = 
LoggerFactory.getLogger(MigrationStateTable.class);
+        
+        private MigrationStateTable()
+        {
+            super(parse(VIRTUAL_ACCORD_DEBUG, MIGRATION_STATE,
+                        "Accord Consensus Migration State",
+                        "CREATE TABLE %s (\n" +
+                        "  keyspace_name text,\n" +
+                        "  table_name text,\n" +
+                        "  table_id uuid,\n" +
+                        "  target_protocol text,\n" +
+                        "  transactional_mode text,\n" +
+                        "  transactional_migration_from text,\n" +
+                        "  migrated_ranges frozen<list<text>>,\n" +
+                        "  repair_pending_ranges frozen<list<text>>,\n" +
+                        "  migrating_ranges_by_epoch frozen<map<bigint, 
list<text>>>,\n" +
+                        "  PRIMARY KEY (keyspace_name, table_name)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            ConsensusMigrationState snapshot = 
ClusterMetadata.current().consensusMigrationState;
+            Collection<TableMigrationState> tableStates = 
snapshot.tableStates();
+            return data(tableStates);
+        }
+
+        @Override
+        public DataSet data(DecoratedKey key)
+        {
+            String keyspaceName = UTF8Type.instance.compose(key.getKey());
+            Keyspace keyspace = 
Schema.instance.getKeyspaceInstance(keyspaceName);
+
+            if (keyspace == null)
+                throw new InvalidRequestException("Unknown keyspace: '" + 
keyspaceName + '\'');
+
+            List<TableId> tableIDs = keyspace.getColumnFamilyStores()
+                                             .stream()
+                                             
.map(ColumnFamilyStore::getTableId)
+                                             .collect(Collectors.toList());
+
+            ConsensusMigrationState snapshot = 
ClusterMetadata.current().consensusMigrationState;
+            Collection<TableMigrationState> tableStates = 
snapshot.tableStatesFor(tableIDs);
+
+            return data(tableStates);
+        }
+
+        private SimpleDataSet data(Collection<TableMigrationState> tableStates)
+        {
+            SimpleDataSet result = new SimpleDataSet(metadata());
+
+            for (TableMigrationState state : tableStates)
+            {
+                TableMetadata table = 
Schema.instance.getTableMetadata(state.tableId);
+
+                if (table == null)
+                {
+                    logger.warn("Table {}.{} (id: {}) no longer exists. It may 
have been dropped.",
+                                state.keyspaceName, state.tableName, 
state.tableId);
+                    continue;
+                }
+
+                result.row(state.keyspaceName, state.tableName);
+                result.column("table_id", state.tableId.asUUID());
+                result.column("target_protocol", 
state.targetProtocol.toString());
+                result.column("transactional_mode", 
table.params.transactionalMode.toString());
+                result.column("transactional_migration_from", 
table.params.transactionalMode.toString());
+
+                List<String> primitiveMigratedRanges = 
state.migratedRanges.stream().map(Objects::toString).collect(toImmutableList());
+                result.column("migrated_ranges", primitiveMigratedRanges);
+
+                List<String> primitiveRepairPendingRanges = 
state.repairPendingRanges.stream().map(Objects::toString).collect(toImmutableList());
+                result.column("repair_pending_ranges", 
primitiveRepairPendingRanges);
+        
+                Map<Long, List<String>> primitiveRangesByEpoch = new 
LinkedHashMap<>();
+                for (Map.Entry<org.apache.cassandra.tcm.Epoch, 
NormalizedRanges<Token>> entry : state.migratingRangesByEpoch.entrySet())
+                    primitiveRangesByEpoch.put(entry.getKey().getEpoch(), 
entry.getValue().stream().map(Objects::toString).collect(toImmutableList()));
+
+                result.column("migrating_ranges_by_epoch", 
primitiveRangesByEpoch);
+            }
+
+            return result;
+        }
+    }
+
+    // TODO (desired): human readable packed key tracker (but requires loading 
Txn, so might be preferable to only do conditionally)
+    public static final class ProgressLogTable extends AbstractVirtualTable
+    {
+        private ProgressLogTable()
+        {
+            super(parse(VIRTUAL_ACCORD_DEBUG, PROGRESS_LOG,
+                        "Accord per-CommandStore ProgressLog State",
+                        "CREATE TABLE %s (\n" +
+                        "  command_store_id int,\n" +
+                        "  txn_id text,\n" +
+                        // Timer + BaseTxnState
+                        "  contact_everyone boolean,\n" +
+                        // WaitingState
+                        "  waiting_is_uninitialised boolean,\n" +
+                        "  waiting_blocked_until text,\n" +
+                        "  waiting_home_satisfies text,\n" +
+                        "  waiting_progress text,\n" +
+                        "  waiting_retry_counter int,\n" +
+                        "  waiting_packed_key_tracker_bits text,\n" +
+                        "  waiting_scheduled_at timestamp,\n" +
+                        // HomeState/TxnState
+                        "  home_phase text,\n" +
+                        "  home_progress text,\n" +
+                        "  home_retry_counter int,\n" +
+                        "  home_scheduled_at timestamp,\n" +
+                        "  PRIMARY KEY (command_store_id, txn_id)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            CommandStores stores = 
AccordService.instance().node().commandStores();
+            List<DefaultProgressLog.ImmutableView> views =
+                getBlockingAndRethrow(stores.map(store -> 
((DefaultProgressLog) store.progressLog()).immutableView()));
+            views.sort(comparing(DefaultProgressLog.ImmutableView::storeId));
+
+            SimpleDataSet ds = new SimpleDataSet(metadata());
+            for (int i = 0, size = views.size(); i < size; ++i)
+            {
+                DefaultProgressLog.ImmutableView view = views.get(i);
+                while (view.advance())
+                {
+                    ds.row(view.storeId(), view.txnId().toString())
+                      .column("contact_everyone", view.contactEveryone())
+                      .column("waiting_is_uninitialised", 
view.isWaitingUninitialised())
+                      .column("waiting_blocked_until", 
view.waitingIsBlockedUntil().name())
+                      .column("waiting_home_satisfies", 
view.waitingHomeSatisfies().name())
+                      .column("waiting_progress", 
view.waitingProgress().name())
+                      .column("waiting_retry_counter", 
view.waitingRetryCounter())
+                      .column("waiting_packed_key_tracker_bits", 
Long.toBinaryString(view.waitingPackedKeyTrackerBits()))
+                      .column("waiting_scheduled_at", 
toTimestamp(view.timerScheduledAt(TxnStateKind.Waiting)))
+                      .column("home_phase", view.homePhase().name())
+                      .column("home_progress", view.homeProgress().name())
+                      .column("home_retry_counter", view.homeRetryCounter())
+                      .column("home_scheduled_at", 
toTimestamp(view.timerScheduledAt(TxnStateKind.Home)))
+                    ;
+                }
+            }
+            return ds;
+        }
+
+        private Date toTimestamp(Long deadline)
+        {
+            if (deadline == null)
+                return null;
+
+            long millisSinceEpoch = 
approxTime.translate().toMillisSinceEpoch(deadline * 1000L);
+            return new Date(millisSinceEpoch);
+        }
+    }
+
+    public static final class RedundantBeforeTable extends AbstractVirtualTable
+    {
+        private RedundantBeforeTable()
+        {
+            super(parse(VIRTUAL_ACCORD_DEBUG, REDUNDANT_BEFORE,
+                        "Accord per-CommandStore RedundantBefore State",
+                        "CREATE TABLE %s (\n" +
+                        "  command_store_id int,\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                        "  start_ownership_epoch bigint,\n" +
+                        "  end_ownership_epoch bigint,\n" +
+                        "  locally_applied_or_invalidated_before text,\n" +
+                        "  locally_decided_and_applied_or_invalidated_before 
text,\n" +
+                        "  shard_applied_or_invalidated_before text,\n" +
+                        "  gc_before text,\n" +
+                        "  shard_only_applied_or_invalidated_before text,\n" +
+                        "  bootstrapped_at text,\n" +
+                        "  stale_until_at_least text,\n" +
+                        "  PRIMARY KEY (command_store_id, range_start, 
range_end)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            CommandStores stores = 
AccordService.instance().node().commandStores();
+            List<Pair<Integer, RedundantBefore>> rangeMaps =
+                getBlockingAndRethrow(stores.map(store -> 
Pair.create(store.commandStore().id(), 
store.commandStore().unsafeGetRedundantBefore())));
+            rangeMaps.sort(comparing(p -> p.left));
+
+            SimpleDataSet dataSet = new SimpleDataSet(metadata());
+            for (Pair<Integer, RedundantBefore> pair : rangeMaps)
+            {
+                int storeId = pair.left;
+                RedundantBefore redundantBefore = pair.right;
+
+                redundantBefore.foldlWithBounds(
+                    (entry, ds, start, end) -> {
+                        ds.row(storeId, decompose(start), decompose(end))
+                          .column("start_ownership_epoch", 
entry.startOwnershipEpoch)
+                          .column("end_ownership_epoch", 
entry.endOwnershipEpoch)
+                          .column("locally_applied_or_invalidated_before", 
entry.locallyAppliedOrInvalidatedBefore.toString())
+                          
.column("locally_decided_and_applied_or_invalidated_before", 
entry.locallyDecidedAndAppliedOrInvalidatedBefore.toString())
+                          .column("shard_applied_or_invalidated_before", 
entry.shardAppliedOrInvalidatedBefore.toString())
+                          .column("gc_before", entry.gcBefore.toString())
+                          .column("shard_only_applied_or_invalidated_before", 
entry.shardOnlyAppliedOrInvalidatedBefore.toString())
+                          .column("bootstrapped_at", 
entry.bootstrappedAt.toString())
+                          .column("stale_until_at_least", 
entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
+                        return ds;
+                    },
+                    dataSet,
+                    ignore -> false
+                );
+            }
+            return dataSet;
+        }
+    }
+
+    public static final class RejectBeforeTable extends AbstractVirtualTable
+    {
+        private RejectBeforeTable()
+        {
+            super(parse(VIRTUAL_ACCORD_DEBUG, REJECT_BEFORE,
+                        "Accord per-CommandStore RejectBefore State",
+                        "CREATE TABLE %s (\n" +
+                        "  command_store_id int,\n" +
+                           format("range_start %s,\n", 
ROUTING_KEY_TYPE_STRING) +
+                           format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) +
+                        "  txn_id text,\n" +
+                        "  PRIMARY KEY (command_store_id, range_start, 
range_end)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            CommandStores stores = 
AccordService.instance().node().commandStores();
+            List<Pair<Integer, RejectBefore>> rangeMaps =
+                getBlockingAndRethrow(stores.map(store -> 
Pair.create(store.commandStore().id(), 
store.commandStore().unsafeGetRejectBefore())));
+            rangeMaps.sort(comparing(p -> p.left));
+
+            SimpleDataSet dataSet = new SimpleDataSet(metadata());
+            for (Pair<Integer, RejectBefore> pair : rangeMaps)
+            {
+                int storeId = pair.left;
+                RejectBefore rejectBefore = pair.right;
+
+                if (rejectBefore == null)
+                    continue;
+
+                rejectBefore.foldlWithBounds(
+                    (txnId, ds, start, end) -> ds.row(storeId, 
decompose(start), decompose(end)).column("txn_id", txnId.toString()),
+                    dataSet,
+                    ignore -> false
+                );
+            }
+            return dataSet;
+        }
+    }
+
+    public static class TxnBlockedByTable extends AbstractVirtualTable
+    {
+        enum Reason { Self, Txn, Key }
+
+        protected TxnBlockedByTable()
+        {
+            super(parse(VIRTUAL_ACCORD_DEBUG, TXN_BLOCKED_BY,
+                        "Accord Transactions Blocked By Table" ,
+                        "CREATE TABLE %s (\n" +
+                        "  txn_id text,\n" +
+                        "  command_store_id int,\n" +
+                        "  depth int,\n" +
+                        "  blocked_by text,\n" +
+                        "  reason text,\n" +
+                        "  save_status text,\n" +
+                        "  execute_at text,\n" +
+                           format("key %s,\n", ROUTING_KEY_TYPE_STRING) +
+                        "  PRIMARY KEY (txn_id, command_store_id, depth, 
blocked_by, reason)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data(DecoratedKey partitionKey)
+        {
+            TxnId id = 
TxnId.parse(UTF8Type.instance.compose(partitionKey.getKey()));
+            List<CommandStoreTxnBlockedGraph> shards = 
AccordService.instance().debugTxnBlockedGraph(id);
+
+            SimpleDataSet ds = new SimpleDataSet(metadata());
+            for (CommandStoreTxnBlockedGraph shard : shards)
+            {
+                Set<TxnId> processed = new HashSet<>();
+                process(ds, shard, processed, id, 0, id, Reason.Self, null);
+                // everything was processed right?
+                if (!shard.txns.isEmpty() && 
!shard.txns.keySet().containsAll(processed))
+                    throw new IllegalStateException("Skipped txns: " + 
Sets.difference(shard.txns.keySet(), processed));
+            }
+
+            return ds;
+        }
+
+        private void process(SimpleDataSet ds, CommandStoreTxnBlockedGraph 
shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason 
reason, Runnable onDone)
+        {
+            if (!processed.add(txnId))
+                throw new IllegalStateException("Double processed " + txnId);
+            CommandStoreTxnBlockedGraph.TxnState txn = shard.txns.get(txnId);
+            if (txn == null)
+            {
+                Invariants.checkState(reason == Reason.Self, "Txn %s unknown 
for reason %s", txnId, reason);
+                return;
+            }
+            // was it applied?  If so ignore it
+            if (reason != Reason.Self && 
txn.saveStatus.hasBeen(Status.Applied))
+                return;
+            ds.row(userTxn.toString(), shard.storeId, depth, reason == 
Reason.Self ? "" : txn.txnId.toString(), reason.name());
+            ds.column("save_status", txn.saveStatus.name());
+            if (txn.executeAt != null)
+                ds.column("execute_at", txn.executeAt.toString());
+            if (onDone != null)
+                onDone.run();
+            if (txn.isBlocked())
+            {
+                for (TxnId blockedBy : txn.blockedBy)
+                {
+                    if (!processed.contains(blockedBy))
+                        process(ds, shard, processed, userTxn, depth + 1, 
blockedBy, Reason.Txn, null);
+                }
+
+                for (TokenKey blockedBy : txn.blockedByKey)
+                {
+                    TxnId blocking = shard.keys.get(blockedBy);
+                    if (!processed.contains(blocking))
+                        process(ds, shard, processed, userTxn, depth + 1, 
blocking, Reason.Key, () -> ds.column("key", decompose(blockedBy)));
+                }
+            }
+        }
+
+        @Override
+        public DataSet data()
+        {
+            throw new InvalidRequestException("Must select a single txn_id");
+        }
+    }
+
+    private static ByteBuffer decompose(RoutingKey routingKey)
+    {
+        AccordRoutingKey key = (AccordRoutingKey) routingKey;
+        switch (key.kindOfRoutingKey())
+        {
+            case SENTINEL:
+            case TOKEN:
+                return 
ROUTING_KEY_TYPE.pack(UUIDType.instance.decompose(key.table().asUUID()), 
bytes(key.suffix()));
+            default:
+                throw new IllegalStateException("Unhandled key Kind " + 
key.kindOfRoutingKey());
+        }
+    }
+
+    private static TableMetadata parse(String keyspace, String table, String 
comment, String schema)
+    {
+        return CreateTableStatement.parse(format(schema, table), keyspace)
+                                   .comment(comment)
+                                   .kind(TableMetadata.Kind.VIRTUAL)
+                                   .build();
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java 
b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
index 1ff8c2e82d..9df0a517c1 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
@@ -15,59 +15,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.db.virtual;
 
-import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import accord.primitives.Status;
-import accord.primitives.TxnId;
-import accord.utils.Invariants;
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.cql3.FieldIdentifier;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.db.marshal.UserType;
-import org.apache.cassandra.dht.NormalizedRanges;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.schema.Schema;
-import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.accord.AccordCache;
-import org.apache.cassandra.service.accord.AccordCommandStores;
-import org.apache.cassandra.service.accord.AccordExecutor;
-import org.apache.cassandra.service.accord.AccordKeyspace;
-import org.apache.cassandra.service.accord.AccordService;
-import org.apache.cassandra.service.accord.CommandStoreTxnBlockedGraph;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
-import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
-import org.apache.cassandra.service.consensus.migration.TableMigrationState;
-import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.utils.Clock;
-
-import static com.google.common.collect.ImmutableList.toImmutableList;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 
 public class AccordVirtualTables
 {
@@ -79,281 +35,9 @@ public class AccordVirtualTables
             return Collections.emptyList();
 
         return List.of(
-            new ExecutorCache(keyspace),
-            new MigrationState(keyspace),
-            new CoordinationStatus(keyspace),
-            new TxnBlockedByTable(keyspace)
         );
     }
 
-    public static final class ExecutorCache extends AbstractVirtualTable
-    {
-        private ExecutorCache(String keyspace)
-        {
-            super(parse(keyspace,
-                        "Accord Executor Cache Metrics",
-                        "CREATE TABLE accord_executor_cache(\n" +
-                        "  id int,\n" +
-                        "  scope text,\n" +
-                        "  queries bigint,\n" +
-                        "  hits bigint,\n" +
-                        "  misses bigint,\n" +
-                        "  PRIMARY KEY (id, scope)" +
-                        ')'));
-        }
-
-        @Override
-        public DataSet data()
-        {
-            AccordCommandStores stores = (AccordCommandStores) 
((AccordService) AccordService.instance()).node().commandStores();
-            SimpleDataSet result = new SimpleDataSet(metadata());
-            for (AccordExecutor executor : stores.executors())
-            {
-                Map<String, AccordCache.ImmutableStats> snapshots = new 
HashMap<>(3);
-                try (AccordExecutor.ExclusiveGlobalCaches cache = 
executor.lockCaches())
-                {
-                    addRow(cache.commands.statsSnapshot(), result, 
executor.executorId(), AccordKeyspace.COMMANDS);
-                    addRow(cache.commandsForKey.statsSnapshot(), result, 
executor.executorId(), AccordKeyspace.COMMANDS_FOR_KEY);
-                    addRow(cache.timestampsForKey.statsSnapshot(), result, 
executor.executorId(), AccordKeyspace.TIMESTAMPS_FOR_KEY);
-                }
-            }
-            return result;
-        }
-
-        private static void addRow(AccordCache.ImmutableStats stats, 
SimpleDataSet result, int storeID, String scope)
-        {
-            result.row(storeID, scope);
-            result.column("queries", stats.queries);
-            result.column("hits", stats.hits);
-            result.column("misses", stats.misses);
-        }
-    }
-
-    public static final class MigrationState extends AbstractVirtualTable
-    {
-        private static final Logger logger = 
LoggerFactory.getLogger(MigrationState.class);
-        
-        private MigrationState(String keyspace)
-        {
-            super(parse(keyspace,
-                        "Consensus Migration State",
-                        "CREATE TABLE consensus_migration_state(\n" +
-                        "  keyspace_name text,\n" +
-                        "  table_name text,\n" +
-                        "  table_id uuid,\n" +
-                        "  target_protocol text,\n" +
-                        "  transactional_mode text,\n" +
-                        "  transactional_migration_from text,\n" +
-                        "  migrated_ranges frozen<list<text>>,\n" +
-                        "  repair_pending_ranges frozen<list<text>>,\n" +
-                        "  migrating_ranges_by_epoch frozen<map<bigint, 
list<text>>>,\n" +
-                        "  PRIMARY KEY (keyspace_name, table_name)" +
-                        ')'));
-        }
-
-        @Override
-        public DataSet data()
-        {
-            ConsensusMigrationState snapshot = 
ClusterMetadata.current().consensusMigrationState;
-            Collection<TableMigrationState> tableStates = 
snapshot.tableStates();
-            return data(tableStates);
-        }
-
-        @Override
-        public DataSet data(DecoratedKey key)
-        {
-            String keyspaceName = UTF8Type.instance.compose(key.getKey());
-            Keyspace keyspace = 
Schema.instance.getKeyspaceInstance(keyspaceName);
-
-            if (keyspace == null)
-                throw new InvalidRequestException("Unknown keyspace: '" + 
keyspaceName + '\'');
-
-            List<TableId> tableIDs = keyspace.getColumnFamilyStores()
-                                             .stream()
-                                             
.map(ColumnFamilyStore::getTableId)
-                                             .collect(Collectors.toList());
-
-            ConsensusMigrationState snapshot = 
ClusterMetadata.current().consensusMigrationState;
-            Collection<TableMigrationState> tableStates = 
snapshot.tableStatesFor(tableIDs);
-
-            return data(tableStates);
-        }
-
-        private SimpleDataSet data(Collection<TableMigrationState> tableStates)
-        {
-            SimpleDataSet result = new SimpleDataSet(metadata());
-
-            for (TableMigrationState state : tableStates)
-            {
-                TableMetadata table = 
Schema.instance.getTableMetadata(state.tableId);
-
-                if (table == null)
-                {
-                    logger.warn("Table {}.{} (id: {}) no longer exists. It may 
have been dropped.",
-                                state.keyspaceName, state.tableName, 
state.tableId);
-                    continue;
-                }
-
-                result.row(state.keyspaceName, state.tableName);
-                result.column("table_id", state.tableId.asUUID());
-                result.column("target_protocol", 
state.targetProtocol.toString());
-                result.column("transactional_mode", 
table.params.transactionalMode.toString());
-                result.column("transactional_migration_from", 
table.params.transactionalMode.toString());
-
-                List<String> primitiveMigratedRanges = 
state.migratedRanges.stream().map(Objects::toString).collect(toImmutableList());
-                result.column("migrated_ranges", primitiveMigratedRanges);
-
-                List<String> primitiveRepairPendingRanges = 
state.repairPendingRanges.stream().map(Objects::toString).collect(toImmutableList());
-                result.column("repair_pending_ranges", 
primitiveRepairPendingRanges);
-
-                Map<Long, List<String>> primitiveRangesByEpoch = new 
LinkedHashMap<>();
-                for (Map.Entry<org.apache.cassandra.tcm.Epoch, 
NormalizedRanges<Token>> entry : state.migratingRangesByEpoch.entrySet())
-                    primitiveRangesByEpoch.put(entry.getKey().getEpoch(), 
entry.getValue().stream().map(Objects::toString).collect(toImmutableList()));
-
-                result.column("migrating_ranges_by_epoch", 
primitiveRangesByEpoch);
-            }
-
-            return result;
-        }
-    }
-
-    public static final class CoordinationStatus extends AbstractVirtualTable
-    {
-        private CoordinationStatus(String keyspace)
-        {
-            super(parse(keyspace,
-                        "Accord Coordination Status",
-                        "CREATE TABLE accord_coordination_status(\n" +
-                        "  node_id int,\n" +
-                        "  epoch bigint,\n" +
-                        "  start_time_micros bigint,\n" +
-                        "  duration_millis bigint,\n" +
-                        "  kind text,\n" +
-                        "  domain text,\n" +
-                        "  PRIMARY KEY (node_id, epoch, start_time_micros)" +
-                        ')'));
-        }
-
-        @Override
-        public DataSet data()
-        {
-            AccordService accord = (AccordService) AccordService.instance();
-            SimpleDataSet result = new SimpleDataSet(metadata());
-
-            for (TxnId txn : accord.node().coordinating().keySet())
-            {
-                result.row(txn.node.id, txn.epoch(), txn.hlc());
-                result.column("duration_millis", 
Clock.Global.currentTimeMillis() - TimeUnit.MICROSECONDS.toMillis(txn.hlc()));
-                result.column("kind", txn.kind().toString());
-                result.column("domain", txn.domain().toString());
-            }
-
-            return result;
-        }
-    }
-
-    public static class TxnBlockedByTable extends AbstractVirtualTable
-    {
-        enum Reason { Self, Txn, Key }
-        private final UserType partitionKeyType;
-
-        protected TxnBlockedByTable(String keyspace)
-        {
-            super(TableMetadata.builder(keyspace, "txn_blocked_by")
-                               .kind(TableMetadata.Kind.VIRTUAL)
-                               .addPartitionKeyColumn("txn_id", 
UTF8Type.instance)
-                               .addClusteringColumn("store_id", 
Int32Type.instance)
-                               .addClusteringColumn("depth", 
Int32Type.instance)
-                               .addClusteringColumn("blocked_by", 
UTF8Type.instance)
-                               .addClusteringColumn("reason", 
UTF8Type.instance)
-                               .addRegularColumn("save_status", 
UTF8Type.instance)
-                               .addRegularColumn("execute_at", 
UTF8Type.instance)
-                               .addRegularColumn("key", pkType(keyspace))
-                               .build());
-            partitionKeyType = pkType(keyspace);
-        }
-
-        private static UserType pkType(String keyspace)
-        {
-            return new UserType(keyspace, bytes("partition_key"),
-                                
Arrays.asList(FieldIdentifier.forQuoted("table"), 
FieldIdentifier.forQuoted("token")),
-                                Arrays.asList(UTF8Type.instance, 
UTF8Type.instance), false);
-        }
-
-        private ByteBuffer pk(TokenKey pk)
-        {
-            var tm = Schema.instance.getTableMetadata(pk.table());
-            return 
partitionKeyType.pack(UTF8Type.instance.decompose(tm.toString()),
-                                         
UTF8Type.instance.decompose(pk.token().toString()));
-        }
-
-        @Override
-        public Iterable<UserType> userTypes()
-        {
-            return Arrays.asList(partitionKeyType);
-        }
-
-        @Override
-        public DataSet data(DecoratedKey partitionKey)
-        {
-            TxnId id = 
TxnId.parse(UTF8Type.instance.compose(partitionKey.getKey()));
-            List<CommandStoreTxnBlockedGraph> shards = 
AccordService.instance().debugTxnBlockedGraph(id);
-
-            SimpleDataSet ds = new SimpleDataSet(metadata());
-            for (CommandStoreTxnBlockedGraph shard : shards)
-            {
-                Set<TxnId> processed = new HashSet<>();
-                process(ds, shard, processed, id, 0, id, Reason.Self, null);
-                // everything was processed right?
-                if (!shard.txns.isEmpty() && 
!shard.txns.keySet().containsAll(processed))
-                    throw new IllegalStateException("Skipped txns: " + 
Sets.difference(shard.txns.keySet(), processed));
-            }
-
-            return ds;
-        }
-
-        private void process(SimpleDataSet ds, CommandStoreTxnBlockedGraph 
shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason 
reason, Runnable onDone)
-        {
-            if (!processed.add(txnId))
-                throw new IllegalStateException("Double processed " + txnId);
-            CommandStoreTxnBlockedGraph.TxnState txn = shard.txns.get(txnId);
-            if (txn == null)
-            {
-                Invariants.checkState(reason == Reason.Self, "Txn %s unknown 
for reason %s", txnId, reason);
-                return;
-            }
-            // was it applied?  If so ignore it
-            if (reason != Reason.Self && 
txn.saveStatus.hasBeen(Status.Applied))
-                return;
-            ds.row(userTxn.toString(), shard.storeId, depth, reason == 
Reason.Self ? "" : txn.txnId.toString(), reason.name());
-            ds.column("save_status", txn.saveStatus.name());
-            if (txn.executeAt != null)
-                ds.column("execute_at", txn.executeAt.toString());
-            if (onDone != null)
-                onDone.run();
-            if (txn.isBlocked())
-            {
-                for (TxnId blockedBy : txn.blockedBy)
-                {
-                    if (processed.contains(blockedBy)) continue; // already 
listed
-                    process(ds, shard, processed, userTxn, depth + 1, 
blockedBy, Reason.Txn, null);
-                }
-                for (TokenKey blockedBy : txn.blockedByKey)
-                {
-                    TxnId blocking = shard.keys.get(blockedBy);
-                    if (processed.contains(blocking)) continue; // already 
listed
-                    process(ds, shard, processed, userTxn, depth + 1, 
blocking, Reason.Key, () -> ds.column("key", pk(blockedBy)));
-                }
-            }
-        }
-
-        @Override
-        public DataSet data()
-        {
-            throw new InvalidRequestException("Must select a single txn_id");
-        }
-    }
-
     private static TableMetadata parse(String keyspace, String comment, String 
query)
     {
         return CreateTableStatement.parse(query, keyspace)
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java 
b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
index 3316aa4fb3..a0585a2694 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java
@@ -50,17 +50,7 @@ public class VirtualKeyspace
         if (!duplicates.isEmpty())
             throw new IllegalArgumentException(String.format("Duplicate table 
names in virtual keyspace %s: %s", name, duplicates));
 
-        KeyspaceMetadata metadata = KeyspaceMetadata.virtual(name, 
Tables.of(Iterables.transform(tables, VirtualTable::metadata)));
-        for (var t : tables)
-        {
-            for (var udt : t.userTypes())
-            {
-                if (metadata.types.getNullable(udt.name) != null)
-                    throw new IllegalStateException("UDT " + 
udt.getNameAsString() + " already exists");
-                metadata = metadata.withUpdatedUserType(udt);
-            }
-        }
-        this.metadata = metadata;
+        this.metadata = KeyspaceMetadata.virtual(name, 
Tables.of(Iterables.transform(tables, VirtualTable::metadata)));
     }
 
     public String name()
@@ -68,13 +58,13 @@ public class VirtualKeyspace
         return name;
     }
 
-    public KeyspaceMetadata metadata()
+    public ImmutableCollection<VirtualTable> tables()
     {
-        return metadata;
+        return tables;
     }
 
-    public ImmutableCollection<VirtualTable> tables()
+    public KeyspaceMetadata metadata()
     {
-        return tables;
+        return metadata;
     }
 }
diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java 
b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
index 93047faad8..53a9f2ac7f 100644
--- a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java
@@ -17,13 +17,10 @@
  */
 package org.apache.cassandra.db.virtual;
 
-import java.util.Collections;
-
 import org.apache.cassandra.db.DataRange;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.marshal.UserType;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.schema.TableMetadata;
@@ -90,9 +87,4 @@ public interface VirtualTable
     {
         return true;
     }
-
-    default Iterable<UserType> userTypes()
-    {
-        return Collections.emptyList();
-    }
 }
diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java 
b/src/java/org/apache/cassandra/schema/SchemaConstants.java
index e9f71054e8..093425d544 100644
--- a/src/java/org/apache/cassandra/schema/SchemaConstants.java
+++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java
@@ -50,9 +50,9 @@ public final class SchemaConstants
     public static final String DISTRIBUTED_KEYSPACE_NAME = 
"system_distributed";
 
     public static final String VIRTUAL_SCHEMA = "system_virtual_schema";
-
     public static final String VIRTUAL_VIEWS = "system_views";
     public static final String VIRTUAL_METRICS = "system_metrics";
+    public static final String VIRTUAL_ACCORD_DEBUG = "system_accord_debug";
 
     public static final String DUMMY_KEYSPACE_OR_TABLE_NAME = "--dummy--";
 
@@ -62,7 +62,7 @@ public final class SchemaConstants
 
     /* virtual table system keyspace names */
     public static final Set<String> VIRTUAL_SYSTEM_KEYSPACE_NAMES =
-        ImmutableSet.of(VIRTUAL_VIEWS, VIRTUAL_SCHEMA);
+        ImmutableSet.of(VIRTUAL_SCHEMA, VIRTUAL_VIEWS, VIRTUAL_METRICS);
 
     /* replicate system keyspace names (the ones with a "true" replication 
strategy) */
     public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES =
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index b9cbb96ad0..8e172ba697 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -55,6 +55,7 @@ import org.apache.cassandra.db.SizeEstimatesRecorder;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.SystemKeyspaceMigrator41;
 import org.apache.cassandra.db.commitlog.CommitLog;
+import org.apache.cassandra.db.virtual.AccordDebugKeyspace;
 import org.apache.cassandra.db.virtual.SystemViewsKeyspace;
 import org.apache.cassandra.db.virtual.VirtualKeyspace;
 import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
@@ -558,6 +559,9 @@ public class CassandraDaemon
         
VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance);
         VirtualKeyspaceRegistry.instance.register(new 
VirtualKeyspace(VIRTUAL_METRICS, createMetricsKeyspaceTables()));
 
+        if (DatabaseDescriptor.getAccord().enable_virtual_debug_only_keyspace)
+            
VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance);
+
         // flush log messages to system_views.system_logs virtual table as 
there were messages already logged
         // before that virtual table was instantiated
         LoggingSupportFactory.getLoggingSupport()
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 5b875dd1cd..db40c870df 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -69,6 +69,7 @@ import accord.coordinate.tracking.RequestStatus;
 import accord.impl.AbstractConfigurationService;
 import accord.impl.DefaultLocalListeners;
 import accord.impl.DefaultRemoteListeners;
+import accord.impl.DurabilityScheduling;
 import accord.impl.RequestCallbacks;
 import accord.impl.SizeOfIntersectionSorter;
 import accord.impl.progresslog.DefaultProgressLogs;
@@ -480,6 +481,11 @@ public class AccordService implements IAccordService, 
Shutdownable
         return responseHandler;
     }
 
+    public DurabilityScheduling.ImmutableView durabilityScheduling()
+    {
+        return node.durabilityScheduling().immutableView();
+    }
+
     private Seekables<?, ?> barrier(@Nonnull Seekables<?, ?> keysOrRanges, 
long epoch, Dispatcher.RequestTime requestTime, long timeoutNanos, BarrierType 
barrierType, boolean isForWrite, BiFunction<Node, FullRoute<?>, AsyncSyncPoint> 
syncPoint)
     {
         Stopwatch sw = Stopwatch.createStarted();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index 815035dbb4..6be67c2384 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@ -101,6 +101,7 @@ public class InstanceConfig implements IInstanceConfig
                 .set("accord.queue_shard_count", 
accord.queue_shard_count.toString())
                 .set("accord.command_store_shard_count", 
accord.command_store_shard_count.toString())
                 .set("accord.recover_delay", accord.recover_delay.toString())
+                .set("accord.enable_virtual_debug_only_keyspace", "true")
                 .set("partitioner", 
"org.apache.cassandra.dht.Murmur3Partitioner")
                 .set("start_native_transport", true)
                 .set("concurrent_writes", 2)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index 376d082cba..8b48dfe10c 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -54,6 +54,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.primitives.TxnId;
+import org.apache.cassandra.db.virtual.AccordDebugKeyspace;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
@@ -80,6 +81,7 @@ import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.accord.AccordService;
@@ -101,7 +103,6 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.BROADCAST_
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.REPLACE_ADDRESS_FIRST_BOOT;
 import static 
org.apache.cassandra.config.CassandraRelevantProperties.RING_DELAY;
 import static 
org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
-import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -1531,9 +1532,9 @@ public class ClusterUtils
 
     public static <T extends IInstance> LinkedHashMap<String, 
SimpleQueryResult> queryTxnState(AbstractCluster<T> cluster, TxnId txnId, 
int... nodes)
     {
-        String cql = String.format("SELECT * FROM %s.txn_blocked_by WHERE 
txn_id=?", VIRTUAL_VIEWS);
+        String cql = String.format("SELECT * FROM %s.%s WHERE txn_id=?", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_BLOCKED_BY);
         LinkedHashMap<String, SimpleQueryResult> map = new LinkedHashMap<>();
-        Iterable<T> it = nodes.length == 0 ? cluster::iterator : 
cluster.get(nodes);
+        Iterable<T> it = nodes.length == 0 ? cluster : cluster.get(nodes);
         for (T i : it)
         {
             if (i.isShutdown())
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
index 3ccdad7997..0a4c575b5b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.cassandra.distributed.test;
 
 import java.io.IOException;
@@ -48,7 +47,6 @@ import org.apache.cassandra.utils.Throwables;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class QueriesTableTest extends TestBaseImpl
@@ -221,12 +219,6 @@ public class QueriesTableTest extends TestBaseImpl
 
         assertTrue(readVisible);
         assertTrue(coordinatorTxnVisible);
-
-        SimpleQueryResult txns = 
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM 
system_views.accord_coordination_status");
-        assertTrue(txns.hasNext());
-        Row txn = txns.next();
-        assertEquals(1, txn.getInteger("node_id").intValue());
-        assertEquals("Key", txn.getString("domain"));
     }
 
     private static void waitForQueriesToFinish() throws InterruptedException
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
index f08176046b..4f87534942 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
@@ -33,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.virtual.AccordDebugKeyspace;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IMessageFilters;
 import org.apache.cassandra.distributed.api.Row;
@@ -43,6 +44,7 @@ import org.apache.cassandra.metrics.AccordMetrics;
 import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.metrics.RatioGaugeSet;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.exceptions.ReadPreemptedException;
 import org.apache.cassandra.service.accord.exceptions.WritePreemptedException;
@@ -51,6 +53,7 @@ import org.apache.cassandra.utils.AssertionUtils;
 import org.assertj.core.api.Assertions;
 import org.assertj.core.data.Offset;
 
+import static java.lang.String.format;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
@@ -281,8 +284,8 @@ public class AccordMetricsTest extends AccordTestBase
         }
 
         // Verify that per-store global cache stats are published to the 
appropriate virtual table:
-        SimpleQueryResult storeCacheResults = SHARED_CLUSTER.get(node + 1)
-                                                   
.executeInternalWithResult("SELECT * FROM system_views.accord_executor_cache");
+        SimpleQueryResult storeCacheResults =
+            SHARED_CLUSTER.get(node + 
1).executeInternalWithResult(format("SELECT * FROM %s.%s", 
SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.EXECUTOR_CACHE));
         assertThat(storeCacheResults).hasNext();
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
index 0b011fe0b3..a7a2d406d4 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
@@ -54,6 +54,7 @@ import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.Mutation.SimpleBuilder;
 import org.apache.cassandra.db.SimpleBuilders.PartitionUpdateBuilder;
+import org.apache.cassandra.db.virtual.AccordDebugKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
 import org.apache.cassandra.dht.NormalizedRanges;
@@ -109,6 +110,7 @@ import static 
org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.SERIAL;
 import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.ConsensusRoutingDecision.paxosV2;
 import static 
org.apache.cassandra.service.paxos.PaxosState.MaybePromise.Outcome.PROMISE;
 import static org.assertj.core.api.Fail.fail;
@@ -751,8 +753,8 @@ public class AccordMigrationTest extends AccordTestBase
                     assertNotNull(state);
 
                     SimpleQueryResult vtableResult =
-                            instance.executeInternalWithResult("SELECT * FROM 
system_views.consensus_migration_state WHERE keyspace_name = ? AND table_name = 
? ",
-                                                               
state.keyspaceName, state.tableName);
+                            instance.executeInternalWithResult(format("SELECT 
* FROM %s.%s WHERE keyspace_name = ? AND table_name = ? ", 
VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.MIGRATION_STATE),
+                                                                      
state.keyspaceName, state.tableName);
                     assertTrue(vtableResult.hasNext());
 
                     assertEquals(KEYSPACE, state.keyspaceName);
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
similarity index 86%
rename from 
test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
rename to test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index d9bdc1d75a..d79a346c36 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -47,17 +47,17 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.concurrent.Condition;
 import org.awaitility.Awaitility;
 
 import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;
 
-public class AccordVirtualTablesTest extends CQLTester
+public class AccordDebugKeyspaceTest extends CQLTester
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(AccordVirtualTablesTest.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordDebugKeyspaceTest.class);
 
-    private static final String QUERY_TXN_BLOCKED_BY = "SELECT * FROM 
system_views.txn_blocked_by WHERE txn_id=?";
-    private static final String QUERY_TXN_STATUS = "SELECT save_status FROM 
system_views.txn_blocked_by WHERE txn_id=? LIMIT 1";
+    private static final String QUERY_TXN_BLOCKED_BY = "SELECT * FROM 
system_accord_debug.txn_blocked_by WHERE txn_id=?";
 
     @BeforeClass
     public static void setUpClass()
@@ -69,7 +69,7 @@ public class AccordVirtualTablesTest extends CQLTester
         CQLTester.setUpClass();
 
         AccordService.startup(ClusterMetadata.current().myNodeId());
-        addVirtualKeyspace();
+        
VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance);
         requireNetwork();
     }
 
@@ -90,7 +90,7 @@ public class AccordVirtualTablesTest extends CQLTester
         AsyncChains.getBlocking(accord.node().coordinate(id, txn));
 
         assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
-                   row(id.toString(), anyInt(), 0, "", "Self", any(), null, 
anyOf(SaveStatus.Applying.name(), SaveStatus.Applied.name())));
+                   row(id.toString(), anyInt(), 0, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, 
anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(), 
SaveStatus.Applied.name())));
     }
 
     @Test
@@ -109,11 +109,11 @@ public class AccordVirtualTablesTest extends CQLTester
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
 
             assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
-                       row(id.toString(), anyInt(), 0, "", "Self", any(), 
null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+                       row(id.toString(), anyInt(), 0, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
 
             filter.apply.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()),
-                       row(id.toString(), anyInt(), 0, "", "Self", any(), 
null, SaveStatus.ReadyToExecute.name()));
+                       row(id.toString(), anyInt(), 0, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, 
SaveStatus.ReadyToExecute.name()));
         }
         finally
         {
@@ -135,11 +135,11 @@ public class AccordVirtualTablesTest extends CQLTester
 
             filter.preAccept.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
-                       row(first.toString(), anyInt(), 0, "", "Self", any(), 
null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
+                       row(first.toString(), anyInt(), 0, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, 
anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name())));
 
             filter.apply.awaitThrowUncheckedOnInterrupt();
             assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()),
-                       row(first.toString(), anyInt(), 0, "", "Self", 
anyNonNull(), null, SaveStatus.ReadyToExecute.name()));
+                       row(first.toString(), anyInt(), 0, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, 
SaveStatus.ReadyToExecute.name()));
 
             filter.reset();
 
@@ -154,7 +154,7 @@ public class AccordVirtualTablesTest extends CQLTester
                                               return rs.size() == 2;
                                           });
             assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()),
-                       row(second.toString(), anyInt(), 0, "", "Self", 
anyNonNull(), null, SaveStatus.Stable.name()),
+                       row(second.toString(), anyInt(), 0, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, 
SaveStatus.Stable.name()),
                        row(second.toString(), anyInt(), 1, first.toString(), 
"Key", anyNonNull(), anyNonNull(), SaveStatus.ReadyToExecute.name()));
         }
         finally
@@ -191,9 +191,9 @@ public class AccordVirtualTablesTest extends CQLTester
             TxnId txnId = null;
             if (msg.payload instanceof TxnRequest)
             {
-                txnId = ((TxnRequest) msg.payload).txnId;
+                txnId = ((TxnRequest<?>) msg.payload).txnId;
             }
-            Set<Verb> seen = null;
+            Set<Verb> seen;
             if (txnId != null)
             {
                 seen = txnToVerbs.computeIfAbsent(txnId, ignore -> new 
ConcurrentSkipListSet<>());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to