This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new f47364697f Baseline Diagnostic vtables for Accord
f47364697f is described below

commit f47364697f3352e17422a43e29c8dd4a4cc44448
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Mon May 13 16:39:18 2024 -0500

    Baseline Diagnostic vtables for Accord
    
    patch by Caleb Rackliffe; reviewed by David Capwell and Ariel Weisberg for 
CASSANDRA-18732
---
 .../pages/managing/operating/virtualtables.adoc    |   3 +
 modules/accord                                     |   2 +-
 .../cassandra/db/virtual/AccordVirtualTables.java  | 213 +++++++++++++++++++--
 .../apache/cassandra/metrics/AccordMetrics.java    |   4 +-
 .../cassandra/metrics/AccordStateCacheMetrics.java |  10 +-
 .../service/accord/AccordCommandStores.java        |   2 +-
 .../cassandra/service/accord/AccordService.java    |   1 -
 .../cassandra/service/accord/AccordStateCache.java |  24 +++
 .../cassandra/service/accord/txn/TxnNamedRead.java |  52 ++++-
 .../migration/ConsensusMigrationState.java         |  10 +
 .../distributed/test/QueriesTableTest.java         | 132 ++++++++++---
 .../distributed/test/accord/AccordMetricsTest.java |  33 +++-
 .../test/accord/AccordMigrationTest.java           |  43 ++++-
 13 files changed, 463 insertions(+), 66 deletions(-)

diff --git a/doc/modules/cassandra/pages/managing/operating/virtualtables.adoc 
b/doc/modules/cassandra/pages/managing/operating/virtualtables.adoc
index d3b948e3d1..6ab4917ef7 100644
--- a/doc/modules/cassandra/pages/managing/operating/virtualtables.adoc
+++ b/doc/modules/cassandra/pages/managing/operating/virtualtables.adoc
@@ -72,6 +72,8 @@ cqlsh> select * from system_metrics.all_groups  ;
 
  group_name        | virtual_table
 -------------------+---------------------------
+ AccordCoordinator |  accord_coordinator_group
+     AccordReplica |      accord_replica_group
              Batch |               batch_group
         BufferPool |         buffer_pool_group
     CIDRAuthorizer |     cidr_authorizer_group
@@ -98,6 +100,7 @@ cqlsh> select * from system_metrics.all_groups  ;
              Paxos |               paxos_group
         ReadRepair |         read_repair_group
             Repair |              repair_group
+        RouteIndex |         route_index_group
            Storage |             storage_group
       StorageProxy |       storage_proxy_group
          Streaming |           streaming_group
diff --git a/modules/accord b/modules/accord
index 21cdaf5d28..778c45cd97 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 21cdaf5d280965cfdc690d385375635b498bc9f9
+Subproject commit 778c45cd977576a901abf24a9759872d36fde056
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java 
b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
index 0dba15a421..1b2e041c16 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
@@ -18,57 +18,230 @@
 
 package org.apache.cassandra.db.virtual;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import accord.local.CommandStores;
+import accord.primitives.TxnId;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordCommandStore;
+import org.apache.cassandra.service.accord.AccordKeyspace;
 import org.apache.cassandra.service.accord.AccordService;
-import org.apache.cassandra.service.accord.IAccordService;
+import org.apache.cassandra.service.accord.AccordStateCache;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
+import org.apache.cassandra.service.consensus.migration.TableMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.Clock;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
 
 public class AccordVirtualTables
 {
-    private AccordVirtualTables()
-    {
-
-    }
+    private AccordVirtualTables() {}
 
     public static Collection<VirtualTable> getAll(String keyspace)
     {
         if (!DatabaseDescriptor.getAccordTransactionsEnabled())
             return Collections.emptyList();
 
-        return Arrays.asList(
-        new Epoch(keyspace)
+        return List.of(
+            new CommandStoreCache(keyspace),
+            new MigrationState(keyspace),
+            new CoordinationStatus(keyspace)
         );
     }
 
-    @VisibleForTesting
-    public static final class Epoch extends AbstractVirtualTable
+    public static final class CommandStoreCache extends AbstractVirtualTable
     {
+        private CommandStoreCache(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord Command Store Cache Metrics",
+                        "CREATE TABLE accord_command_store_cache(\n" +
+                        "  id int,\n" +
+                        "  scope text,\n" +
+                        "  queries bigint,\n" +
+                        "  hits bigint,\n" +
+                        "  misses bigint,\n" +
+                        "  PRIMARY KEY (id, scope)" +
+                        ')'));
+        }
 
-        protected Epoch(String keyspace)
+        @Override
+        public DataSet data()
         {
-            super(parse(keyspace, "Accord Epochs",
-                        "CREATE TABLE accord_epochs(\n" +
-                        "  epoch bigint,\n" +
-                        "  PRIMARY KEY ( (epoch) )" +
-                        ")"));
+            CommandStores stores = ((AccordService) 
AccordService.instance()).node().commandStores();
+
+            AsyncChain<List<Map<String, AccordStateCache.ImmutableStats>>> 
statsByStoreChain = stores.map(store -> {
+                Map<String, AccordStateCache.ImmutableStats> snapshots = new 
HashMap<>(3);
+                AccordCommandStore accordStore = (AccordCommandStore) 
store.commandStore();
+                snapshots.put(AccordKeyspace.COMMANDS, 
accordStore.commandCache().statsSnapshot());
+                snapshots.put(AccordKeyspace.COMMANDS_FOR_KEY, 
accordStore.commandsForKeyCache().statsSnapshot());
+                snapshots.put(AccordKeyspace.TIMESTAMPS_FOR_KEY, 
accordStore.timestampsForKeyCache().statsSnapshot());
+                return snapshots;
+            });
+
+            List<Map<String, AccordStateCache.ImmutableStats>> statsByStore = 
AsyncChains.getBlockingAndRethrow(statsByStoreChain);
+            SimpleDataSet result = new SimpleDataSet(metadata());
+
+            for (int storeID : stores.ids())
+            {
+                Map<String, AccordStateCache.ImmutableStats> storeStats = 
statsByStore.get(storeID);
+                addRow(storeStats.get(AccordKeyspace.COMMANDS), result, 
storeID, AccordKeyspace.COMMANDS);
+                addRow(storeStats.get(AccordKeyspace.COMMANDS_FOR_KEY), 
result, storeID, AccordKeyspace.COMMANDS_FOR_KEY);
+                addRow(storeStats.get(AccordKeyspace.TIMESTAMPS_FOR_KEY), 
result, storeID, AccordKeyspace.TIMESTAMPS_FOR_KEY);
+            }
+
+            return result;
+        }
+
+        private static void addRow(AccordStateCache.ImmutableStats stats, 
SimpleDataSet result, int storeID, String scope)
+        {
+            result.row(storeID, scope);
+            result.column("queries", stats.queries);
+            result.column("hits", stats.hits);
+            result.column("misses", stats.misses);
+        }
+    }
+
+    public static final class MigrationState extends AbstractVirtualTable
+    {
+        private static final Logger logger = 
LoggerFactory.getLogger(MigrationState.class);
+        
+        private MigrationState(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Consensus Migration State",
+                        "CREATE TABLE consensus_migration_state(\n" +
+                        "  keyspace_name text,\n" +
+                        "  table_name text,\n" +
+                        "  table_id uuid,\n" +
+                        "  target_protocol text,\n" +
+                        "  transactional_mode text,\n" +
+                        "  transactional_migration_from text,\n" +
+                        "  migrated_ranges frozen<list<text>>,\n" +
+                        "  migrating_ranges_by_epoch frozen<map<bigint, 
list<text>>>,\n" +
+                        "  PRIMARY KEY (keyspace_name, table_name)" +
+                        ')'));
         }
 
         @Override
         public DataSet data()
         {
-            IAccordService accord = AccordService.instance();
+            ConsensusMigrationState snapshot = 
ClusterMetadata.current().consensusMigrationState;
+            Collection<TableMigrationState> tableStates = 
snapshot.tableStates();
+            return data(tableStates);
+        }
+
+        @Override
+        public DataSet data(DecoratedKey key)
+        {
+            String keyspaceName = UTF8Type.instance.compose(key.getKey());
+            Keyspace keyspace = 
Schema.instance.getKeyspaceInstance(keyspaceName);
 
-            long epoch = accord.currentEpoch();
+            if (keyspace == null)
+                throw new InvalidRequestException("Unknown keyspace: '" + 
keyspaceName + '\'');
 
+            List<TableId> tableIDs = keyspace.getColumnFamilyStores()
+                                             .stream()
+                                             
.map(ColumnFamilyStore::getTableId)
+                                             .collect(Collectors.toList());
+
+            ConsensusMigrationState snapshot = 
ClusterMetadata.current().consensusMigrationState;
+            Collection<TableMigrationState> tableStates = 
snapshot.tableStatesFor(tableIDs);
+
+            return data(tableStates);
+        }
+
+        private SimpleDataSet data(Collection<TableMigrationState> tableStates)
+        {
+            SimpleDataSet result = new SimpleDataSet(metadata());
+
+            for (TableMigrationState state : tableStates)
+            {
+                TableMetadata table = 
Schema.instance.getTableMetadata(state.tableId);
+
+                if (table == null)
+                {
+                    logger.warn("Table {}.{} (id: {}) no longer exists. It may 
have been dropped.",
+                                state.keyspaceName, state.tableName, 
state.tableId);
+                    continue;
+                }
+
+                result.row(state.keyspaceName, state.tableName);
+                result.column("table_id", state.tableId.asUUID());
+                result.column("target_protocol", 
state.targetProtocol.toString());
+                result.column("transactional_mode", 
table.params.transactionalMode.toString());
+                result.column("transactional_migration_from", 
table.params.transactionalMode.toString());
+
+                List<String> primitiveMigratedRanges = 
state.migratedRanges.stream().map(Objects::toString).collect(toImmutableList());
+                result.column("migrated_ranges", primitiveMigratedRanges);
+        
+                Map<Long, List<String>> primitiveRangesByEpoch = new 
LinkedHashMap<>();
+                for (Map.Entry<org.apache.cassandra.tcm.Epoch, 
List<Range<Token>>> entry : state.migratingRangesByEpoch.entrySet())
+                    primitiveRangesByEpoch.put(entry.getKey().getEpoch(), 
entry.getValue().stream().map(Objects::toString).collect(toImmutableList()));
+
+                result.column("migrating_ranges_by_epoch", 
primitiveRangesByEpoch);
+            }
+
+            return result;
+        }
+    }
+
+    public static final class CoordinationStatus extends AbstractVirtualTable
+    {
+        private CoordinationStatus(String keyspace)
+        {
+            super(parse(keyspace,
+                        "Accord Coordination Status",
+                        "CREATE TABLE accord_coordination_status(\n" +
+                        "  node_id int,\n" +
+                        "  epoch bigint,\n" +
+                        "  start_time_micros bigint,\n" +
+                        "  duration_millis bigint,\n" +
+                        "  kind text,\n" +
+                        "  domain text,\n" +
+                        "  PRIMARY KEY (node_id, epoch, start_time_micros)" +
+                        ')'));
+        }
+
+        @Override
+        public DataSet data()
+        {
+            AccordService accord = (AccordService) AccordService.instance();
             SimpleDataSet result = new SimpleDataSet(metadata());
-            result.row(epoch);
+
+            for (TxnId txn : accord.node().coordinating().keySet())
+            {
+                result.row(txn.node.id, txn.epoch(), txn.hlc());
+                result.column("duration_millis", 
Clock.Global.currentTimeMillis() - TimeUnit.MICROSECONDS.toMillis(txn.hlc()));
+                result.column("kind", txn.kind().toString());
+                result.column("domain", txn.domain().toString());
+            }
+
             return result;
         }
     }
diff --git a/src/java/org/apache/cassandra/metrics/AccordMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordMetrics.java
index 4dc053ccee..765fe154ee 100644
--- a/src/java/org/apache/cassandra/metrics/AccordMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordMetrics.java
@@ -57,8 +57,8 @@ public class AccordMetrics
     public static final String RECOVERY_DELAY = "RecoveryDelay";
     public static final String RECOVERY_TIME = "RecoveryTime";
     public static final String FAST_PATH_TO_TOTAL = "FastPathToTotal";
-    public static final String ACCORD_REPLICA = "accord-replica";
-    public static final String ACCORD_COORDINATOR = "accord-coordinator";
+    public static final String ACCORD_REPLICA = "AccordReplica";
+    public static final String ACCORD_COORDINATOR = "AccordCoordinator";
 
     /**
      * The time between start on the coordinator and commit on this replica.
diff --git a/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java
index f63fedf282..b00793087e 100644
--- a/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java
@@ -34,19 +34,19 @@ public class AccordStateCacheMetrics extends 
CacheAccessMetrics
 
     private final Map<String, CacheAccessMetrics> instanceMetrics = new 
ConcurrentHashMap<>(2);
 
-    private final String type;
+    private final String scope;
 
-    public AccordStateCacheMetrics(String type)
+    public AccordStateCacheMetrics(String scope)
     {
-        super(new DefaultNameFactory(TYPE_NAME, type));
+        super(new DefaultNameFactory(TYPE_NAME, scope));
         objectSize = Metrics.histogram(factory.createMetricName(OBJECT_SIZE), 
false);
-        this.type = type;
+        this.scope = scope;
     }
 
     public CacheAccessMetrics forInstance(Class<?> klass)
     {
         // cannot make Class<?> hashCode deterministic, as cannot rewrite - so 
cannot safely use as Map key if want deterministic simulation
         // (or we need to create extra hoops to catch this specific case in 
method rewriting)
-        return instanceMetrics.computeIfAbsent(klass.getSimpleName(), k -> new 
CacheAccessMetrics(new DefaultNameFactory(TYPE_NAME, String.format("%s-%s", 
type, k))));
+        return instanceMetrics.computeIfAbsent(klass.getSimpleName(), k -> new 
CacheAccessMetrics(new DefaultNameFactory(TYPE_NAME, String.format("%s-%s", 
scope, k))));
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 0fd719d5fb..7328a31de6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -39,7 +39,7 @@ import 
org.apache.cassandra.service.accord.api.AccordRoutingKey;
 
 public class AccordCommandStores extends CommandStores implements CacheSize
 {
-    public static final String ACCORD_STATE_CACHE = "accord-state-cache";
+    public static final String ACCORD_STATE_CACHE = "AccordStateCache";
 
     private final CacheSizeMetrics cacheSizeMetrics;
     private long cacheSize;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 01331bb2d6..f853c329c6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -649,7 +649,6 @@ public class AccordService implements IAccordService, 
Shutdownable
         return node.id();
     }
 
-    @VisibleForTesting
     public Node node()
     {
         return node;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
index b76d63b830..085504f092 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
@@ -76,6 +76,20 @@ public class AccordStateCache extends 
IntrusiveLinkedList<AccordCachingState<?,?
         long misses;
     }
 
+    public static final class ImmutableStats
+    {
+        public final long queries;
+        public final long hits;
+        public final long misses;
+        
+        public ImmutableStats(Stats stats)
+        {
+            queries = stats.queries;
+            hits = stats.hits;
+            misses = stats.misses;
+        }
+    }
+
     private ImmutableList<Instance<?, ?, ?>> instances = ImmutableList.of();
 
     private final ExecutorPlus loadExecutor, saveExecutor;
@@ -215,6 +229,11 @@ public class AccordStateCache extends 
IntrusiveLinkedList<AccordCachingState<?,?
         }
     }
 
+    public ImmutableStats stats()
+    {
+        return new ImmutableStats(stats);
+    }
+
     private Instance<?, ?, ?> instanceForNode(AccordCachingState<?, ?> node)
     {
         return instances.get(node.index);
@@ -592,6 +611,11 @@ public class AccordStateCache extends 
IntrusiveLinkedList<AccordCachingState<?,?
             return stats;
         }
 
+        public ImmutableStats statsSnapshot()
+        {
+            return new ImmutableStats(stats);
+        }
+
         public Stats globalStats()
         {
             return AccordStateCache.this.stats;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index bbb1076b23..4787e2105b 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord.txn;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -30,6 +31,7 @@ import accord.api.Data;
 import accord.primitives.Timestamp;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
+import org.apache.cassandra.concurrent.DebuggableTask;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
@@ -43,6 +45,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.api.PartitionKey;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.ObjectSizes;
 
 import static org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
@@ -136,7 +139,7 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
 
     private AsyncChain<Data> performLocalRead(SinglePartitionReadCommand 
command, int nowInSeconds)
     {
-        return AsyncChains.ofCallable(Stage.READ.executor(), () ->
+        Callable<Data> readCallable = () ->
         {
             SinglePartitionReadCommand read = 
command.withNowInSec(nowInSeconds);
 
@@ -153,10 +156,53 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
                 }
                 return result;
             }
-        });
+        };
+
+        return AsyncChains.ofCallable(Stage.READ.executor(), readCallable, 
(callable, receiver) ->
+            new DebuggableTask.RunnableDebuggableTask()
+            {
+                private final long approxCreationTimeNanos = 
MonotonicClock.Global.approxTime.now();
+                private volatile long approxStartTimeNanos;
+
+                @Override
+                public void run()
+                {
+                    approxStartTimeNanos = 
MonotonicClock.Global.approxTime.now();
+
+                    try
+                    {
+                        Data call = callable.call();
+                        receiver.accept(call, null);
+                    }
+                    catch (Throwable t)
+                    {
+                        logger.debug("AsyncChain Callable threw an Exception", 
t);
+                        receiver.accept(null, t);
+                    }
+                }
+
+                @Override
+                public long creationTimeNanos()
+                {
+                    return approxCreationTimeNanos;
+                }
+
+                @Override
+                public long startTimeNanos()
+                {
+                    return approxStartTimeNanos;
+                }
+
+                @Override
+                public String description()
+                {
+                    return command.toCQLString();
+                }
+            }
+        );
     }
 
-    static final IVersionedSerializer<TxnNamedRead> serializer = new 
IVersionedSerializer<TxnNamedRead>()
+    static final IVersionedSerializer<TxnNamedRead> serializer = new 
IVersionedSerializer<>()
     {
         @Override
         public void serialize(TxnNamedRead read, DataOutputPlus out, int 
version) throws IOException
diff --git 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
index 7364db38c0..fa6b146d77 100644
--- 
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
+++ 
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
@@ -78,6 +78,16 @@ public class ConsensusMigrationState implements 
MetadataValue<ConsensusMigration
                                "version", PojoToString.CURRENT_VERSION);
     }
 
+    public Collection<TableMigrationState> tableStates()
+    {
+        return tableStates.values();
+    }
+
+    public List<TableMigrationState> tableStatesFor(List<TableId> tableIDs)
+    {
+        return 
tableIDs.stream().map(tableStates::get).collect(Collectors.toList());
+    }
+
     private List<Map<String, Object>> tableStatesAsMaps(@Nullable Set<String> 
keyspaceNames,
                                                         @Nullable Set<String> 
tableNames)
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
index b0c3902ad1..36220e3853 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
@@ -21,16 +21,17 @@ package org.apache.cassandra.distributed.test;
 import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
 
 import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.awaitility.Awaitility;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import accord.impl.SimpleProgressLog;
 import com.datastax.driver.core.Session;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
@@ -39,12 +40,15 @@ import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.Row;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.utils.Throwables;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class QueriesTableTest extends TestBaseImpl
@@ -57,7 +61,10 @@ public class QueriesTableTest extends TestBaseImpl
     public static void createCluster() throws IOException
     {
         SHARED_CLUSTER = 
init(Cluster.build(1).withInstanceInitializer(QueryDelayHelper::install)
-                                              .withConfig(c -> 
c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP)).start());
+                                              .withConfig(c -> 
c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP)
+                                                                
.set("write_request_timeout", "10s")
+                                                                
.set("transaction_timeout", "15s")).start());
+
         DRIVER_CLUSTER = JavaDriverUtils.create(SHARED_CLUSTER);
         SESSION = DRIVER_CLUSTER.connect();
     }
@@ -79,19 +86,31 @@ public class QueriesTableTest extends TestBaseImpl
     public void shouldExposeReadsAndWrites() throws Throwable
     {
         SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int 
primary key, v int)");
-
-        boolean readVisible = false;
-        boolean coordinatorReadVisible = false;
-        boolean writeVisible = false;
-        boolean coordinatorWriteVisible = false;
-        
         SESSION.executeAsync("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES 
(0, 0)");
         SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0");
 
         // Wait until the coordinator/local read and write are visible:
+        Awaitility.await()
+                  .atMost(60, SECONDS)
+                  .pollInterval(1, SECONDS)
+                  .dontCatchUncaughtExceptions()
+                  
.untilAsserted(QueriesTableTest::assertReadsAndWritesVisible);
+
+        // Issue another read and write to unblock the original queries in 
progress:
+        SESSION.execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (0, 
0)");
+        SESSION.execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0");
+
+        waitForQueriesToFinish();
+    }
+
+    private static void assertReadsAndWritesVisible()
+    {
         SimpleQueryResult result = 
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM 
system_views.queries");
-        while (result.toObjectArrays().length < 4)
-            result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * 
FROM system_views.queries");
+
+        boolean readVisible = false;
+        boolean coordinatorReadVisible = false;
+        boolean writeVisible = false;
+        boolean coordinatorWriteVisible = false;
 
         while (result.hasNext())
         {
@@ -105,32 +124,38 @@ public class QueriesTableTest extends TestBaseImpl
             coordinatorWriteVisible |= 
threadId.contains("Native-Transport-Requests") && task.contains("INSERT");
         }
 
-        // Issue another read and write to unblock the original queries in 
progress:
-        SESSION.execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (0, 
0)");
-        SESSION.execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0");
-
         assertTrue(readVisible);
         assertTrue(coordinatorReadVisible);
         assertTrue(writeVisible);
         assertTrue(coordinatorWriteVisible);
-
-        waitForQueriesToFinish();
     }
 
     @Test
     public void shouldExposeCAS() throws Throwable
     {
         SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".cas_tbl (k 
int primary key, v int)");
-
-        boolean readVisible = false;
-        boolean coordinatorUpdateVisible = false;
-
         SESSION.executeAsync("UPDATE " + KEYSPACE + ".cas_tbl SET v = 10 WHERE 
k = 0 IF v = 0");
 
         // Wait until the coordinator update and local read required by the 
CAS operation are visible:
+        Awaitility.await()
+                  .atMost(60, SECONDS)
+                  .pollInterval(1, SECONDS)
+                  .dontCatchUncaughtExceptions()
+                  .untilAsserted(QueriesTableTest::assertCasVisible);
+
+        // Issue a read to unblock the read generated by the original CAS 
operation:
+        SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".cas_tbl WHERE k = 
0");
+
+
+        waitForQueriesToFinish();
+    }
+
+    private static void assertCasVisible()
+    {
         SimpleQueryResult result = 
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM 
system_views.queries");
-        while (result.toObjectArrays().length < 2)
-            result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * 
FROM system_views.queries");
+
+        boolean readVisible = false;
+        boolean coordinatorUpdateVisible = false;
 
         while (result.hasNext())
         {
@@ -142,26 +167,81 @@ public class QueriesTableTest extends TestBaseImpl
             coordinatorUpdateVisible |= 
threadId.contains("Native-Transport-Requests") && task.contains("UPDATE");
         }
 
-        // Issue a read to unblock the read generated by the original CAS 
operation:
-        SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".cas_tbl WHERE k = 
0");
-
         assertTrue(readVisible);
         assertTrue(coordinatorUpdateVisible);
+    }
+
+    @Test
+    public void shouldExposeTransaction() throws Throwable
+    {
+        SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".accord_tbl 
(k int primary key, v int)  WITH transactional_mode='full'");
+
+        // Disable recovery to make sure only one local read occurs:
+        for (IInvokableInstance instance : SHARED_CLUSTER)
+            instance.runOnInstance(() -> SimpleProgressLog.PAUSE_FOR_TEST = 
true);
+
+        String update = "BEGIN TRANSACTION\n" +
+                        "  LET row1 = (SELECT * FROM " + KEYSPACE + 
".accord_tbl WHERE k = 0);\n" +
+                        "  SELECT row1.k, row1.v;\n" +
+                        "  IF row1.v = 0 THEN\n" +
+                        "    UPDATE " + KEYSPACE + ".accord_tbl SET v = 10 
WHERE k = 0;\n" +
+                        "  END IF\n" +
+                        "COMMIT TRANSACTION";
+        
+        SESSION.executeAsync(update);
+
+        // Wait until the coordinator update and local read required by the 
CAS operation are visible:
+        Awaitility.await()
+                  .atMost(60, SECONDS)
+                  .pollInterval(1, SECONDS)
+                  .dontCatchUncaughtExceptions()
+                  .untilAsserted(QueriesTableTest::assertTransactionVisible);
+
+        // Issue a read to unblock the read generated by the original CAS 
operation:
+        SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".accord_tbl WHERE 
k = 0");
 
         waitForQueriesToFinish();
     }
 
+    private static void assertTransactionVisible()
+    {
+        SimpleQueryResult queries = 
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM 
system_views.queries");
+
+        boolean readVisible = false;
+        boolean coordinatorTxnVisible = false;
+
+        while (queries.hasNext())
+        {
+            Row row = queries.next();
+            String threadId = row.get("thread_id").toString();
+            String task = row.get("task").toString();
+
+            readVisible |= threadId.contains("Read") && 
task.contains("SELECT");
+            coordinatorTxnVisible |= 
threadId.contains("Native-Transport-Requests") && task.contains("BEGIN 
TRANSACTION");
+        }
+
+        assertTrue(readVisible);
+        assertTrue(coordinatorTxnVisible);
+
+        SimpleQueryResult txns = 
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM 
system_views.accord_coordination_status");
+        assertTrue(txns.hasNext());
+        Row txn = txns.next();
+        assertEquals(1, txn.getInteger("node_id").intValue());
+        assertEquals("Key", txn.getString("domain"));
+    }
+
     private static void waitForQueriesToFinish() throws InterruptedException
     {
         // Continue to query the "queries" table until nothing is in 
progress...
         SimpleQueryResult result = 
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM 
system_views.queries");
         while (result.hasNext())
         {
-            TimeUnit.SECONDS.sleep(1);
+            SECONDS.sleep(1);
             result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * 
FROM system_views.queries");
         }
     }
 
+    @SuppressWarnings("resource")
     public static class QueryDelayHelper
     {
         private static final CyclicBarrier readBarrier = new CyclicBarrier(2);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
index 235df7ebf7..0e9a47f71c 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
@@ -24,7 +24,7 @@ import java.util.Map;
 import java.util.function.Function;
 
 import com.google.common.base.Throwables;
-import org.apache.cassandra.service.consensus.TransactionalMode;
+
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.metrics.AccordMetrics;
@@ -42,6 +44,7 @@ import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.exceptions.ReadPreemptedException;
 import org.apache.cassandra.service.accord.exceptions.WritePreemptedException;
+import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.assertj.core.data.Offset;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -225,6 +228,16 @@ public class AccordMetricsTest extends AccordTestBase
         
assertThat(metric.apply(AccordMetrics.RECOVERY_TIME)).isEqualTo(recoveries);
         
assertThat(metric.apply(AccordMetrics.DEPENDENCIES)).isEqualTo(fastPaths + 
slowPaths);
 
+        // Verify that coordinator metrics are published to the appropriate 
virtual table:
+        SimpleQueryResult res = SHARED_CLUSTER.get(node + 1)
+                                              
.executeInternalWithResult("SELECT * FROM 
system_metrics.accord_coordinator_group WHERE scope = ?", scope);
+        while (res.hasNext())
+        {
+            Row metricRow = res.next();
+            String name = metricRow.getString("name");
+            assertThat(metrics).containsKey(name);
+        }
+
         if ((fastPaths + slowPaths) > 0)
         {
             String fastPathToTotalName = 
nameFactory.createMetricName(AccordMetrics.FAST_PATH_TO_TOTAL + "." + 
RatioGaugeSet.MEAN_RATIO).getMetricName();
@@ -242,13 +255,29 @@ public class AccordMetricsTest extends AccordTestBase
         
assertThat(metric.apply(AccordMetrics.APPLY_LATENCY)).isEqualTo(applications);
         
assertThat(metric.apply(AccordMetrics.APPLY_DURATION)).isEqualTo(applications);
         
assertThat(metric.apply(AccordMetrics.PARTIAL_DEPENDENCIES)).isEqualTo(executions);
+
+        // Verify that replica metrics are published to the appropriate 
virtual table:
+        SimpleQueryResult vtableResults = SHARED_CLUSTER.get(node + 1)
+                                              
.executeInternalWithResult("SELECT * FROM system_metrics.accord_replica_group 
WHERE scope = ?", scope);
+
+        while (vtableResults.hasNext())
+        {
+            Row metricRow = vtableResults.next();
+            String name = metricRow.getString("name");
+            assertThat(metrics).containsKey(name);
+        }
+
+        // Verify that per-store global cache stats are published to the 
appropriate virtual table:
+        SimpleQueryResult storeCacheResults = SHARED_CLUSTER.get(node + 1)
+                                                   
.executeInternalWithResult("SELECT * FROM 
system_views.accord_command_store_cache");
+        assertThat(storeCacheResults).hasNext();
     }
 
     private Map<Integer, Map<String, Long>> getMetrics()
     {
         Map<Integer, Map<String, Long>> metrics = new HashMap<>();
         for (int i = 0; i < SHARED_CLUSTER.size(); i++)
-            metrics.put(i, SHARED_CLUSTER.get(i + 
1).metrics().getCounters(name -> 
name.startsWith("org.apache.cassandra.metrics.accord-")));
+            metrics.put(i, SHARED_CLUSTER.get(i + 
1).metrics().getCounters(name -> 
name.startsWith("org.apache.cassandra.metrics.Accord")));
         return metrics;
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
index 519ea15f70..a63642f819 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
@@ -28,8 +28,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.ImmutableList;
 
 import org.junit.After;
@@ -39,8 +41,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
 
-import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.cassandra.ServerTestUtils;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.Config.PaxosVariant;
@@ -57,6 +59,8 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -84,11 +88,14 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.JsonUtils;
 import org.apache.cassandra.utils.PojoToString;
-import org.yaml.snakeyaml.Yaml;
 
-import static com.google.common.collect.ImmutableList.toImmutableList;
 import static java.lang.String.format;
 import static java.util.Collections.emptyList;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 import static org.apache.cassandra.Util.spinUntilSuccess;
 import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 import static org.apache.cassandra.db.SystemKeyspace.CONSENSUS_MIGRATION_STATE;
@@ -101,7 +108,6 @@ import static 
org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
 import static 
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.ConsensusRoutingDecision.paxosV2;
 import static 
org.apache.cassandra.service.paxos.PaxosState.MaybePromise.Outcome.PROMISE;
 import static org.assertj.core.api.Fail.fail;
-import static org.junit.Assert.*;
 
 /*
  * This test suite is intended to serve as an integration test with some 
pretty good visibility into actual execution
@@ -136,7 +142,7 @@ public class AccordMigrationTest extends AccordTestBase
     // To create a precise repair where the repaired range is fully contained 
in a locally replicated range
     // we need to align with this token. The local ranges are 
(9223372036854775805,-1] and (-1,9223372036854775805]
     // No idea why the partitioner creates such an
-    private Token maxAlignedWithLocalRanges = new 
LongToken(9223372036854775805L);
+    private final Token maxAlignedWithLocalRanges = new 
LongToken(9223372036854775805L);
 
     @Override
     protected Logger logger()
@@ -676,10 +682,17 @@ public class AccordMigrationTest extends AccordTestBase
             for (IInvokableInstance instance : SHARED_CLUSTER)
             {
                 ConsensusMigrationState snapshot = 
getMigrationStateSnapshot(instance);
+
                 for (String tableId : tableIds)
                 {
                     TableMigrationState state = 
snapshot.tableStates.get(TableId.fromString(tableId));
                     assertNotNull(state);
+
+                    SimpleQueryResult vtableResult =
+                            instance.executeInternalWithResult("SELECT * FROM 
system_views.consensus_migration_state WHERE keyspace_name = ? AND table_name = 
? ",
+                                                               
state.keyspaceName, state.tableName);
+                    assertTrue(vtableResult.hasNext());
+
                     assertEquals(KEYSPACE, state.keyspaceName);
                     assertEquals(tableName, state.tableName);
                     assertEquals(target, state.targetProtocol);
@@ -691,11 +704,31 @@ public class AccordMigrationTest extends AccordTestBase
                         assertEquals(0, state.migratingRangesByEpoch.size());
                     else
                         assertEquals(migratingRanges, 
state.migratingRangesByEpoch.values().iterator().next());
+
+                    Row vtableState = vtableResult.next();
+                    assertVtableState(state, vtableState);
                 }
             }
         });
     }
 
+    private static void assertVtableState(TableMigrationState expectedState, 
Row vtableState)
+    {
+        List<String> vtableMigratedRanges = 
vtableState.getList("migrated_ranges");
+        assertEquals(expectedState.migratedRanges, 
vtableMigratedRanges.stream().map(Range::fromString).collect(Collectors.toList()));
+
+        Map<Long, List<String>> vtableMigratingByEpoch = 
vtableState.get("migrating_ranges_by_epoch");
+        Map<Long, List<Range<Token>>> pojoMigratingByEpoch = new 
LinkedHashMap<>();
+
+        for (Map.Entry<Long, List<String>> entry : 
vtableMigratingByEpoch.entrySet())
+            pojoMigratingByEpoch.put(entry.getKey(), 
entry.getValue().stream().map(Range::fromString).collect(toImmutableList()));
+
+        if (expectedState.migratingRanges.isEmpty())
+            assertEquals(0, pojoMigratingByEpoch.size());
+        else
+            assertEquals(expectedState.migratingRanges, 
pojoMigratingByEpoch.values().iterator().next());
+    }
+
     /**
      * Save a promise that is after the committed one to make a subsequent 
read not linearizable
      */


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to