This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new f47364697f Baseline Diagnostic vtables for Accord
f47364697f is described below
commit f47364697f3352e17422a43e29c8dd4a4cc44448
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Mon May 13 16:39:18 2024 -0500
Baseline Diagnostic vtables for Accord
patch by Caleb Rackliffe; reviewed by David Capwell and Ariel Weisberg for
CASSANDRA-18732
---
.../pages/managing/operating/virtualtables.adoc | 3 +
modules/accord | 2 +-
.../cassandra/db/virtual/AccordVirtualTables.java | 213 +++++++++++++++++++--
.../apache/cassandra/metrics/AccordMetrics.java | 4 +-
.../cassandra/metrics/AccordStateCacheMetrics.java | 10 +-
.../service/accord/AccordCommandStores.java | 2 +-
.../cassandra/service/accord/AccordService.java | 1 -
.../cassandra/service/accord/AccordStateCache.java | 24 +++
.../cassandra/service/accord/txn/TxnNamedRead.java | 52 ++++-
.../migration/ConsensusMigrationState.java | 10 +
.../distributed/test/QueriesTableTest.java | 132 ++++++++++---
.../distributed/test/accord/AccordMetricsTest.java | 33 +++-
.../test/accord/AccordMigrationTest.java | 43 ++++-
13 files changed, 463 insertions(+), 66 deletions(-)
diff --git a/doc/modules/cassandra/pages/managing/operating/virtualtables.adoc
b/doc/modules/cassandra/pages/managing/operating/virtualtables.adoc
index d3b948e3d1..6ab4917ef7 100644
--- a/doc/modules/cassandra/pages/managing/operating/virtualtables.adoc
+++ b/doc/modules/cassandra/pages/managing/operating/virtualtables.adoc
@@ -72,6 +72,8 @@ cqlsh> select * from system_metrics.all_groups ;
group_name | virtual_table
-------------------+---------------------------
+ AccordCoordinator | accord_coordinator_group
+ AccordReplica | accord_replica_group
Batch | batch_group
BufferPool | buffer_pool_group
CIDRAuthorizer | cidr_authorizer_group
@@ -98,6 +100,7 @@ cqlsh> select * from system_metrics.all_groups ;
Paxos | paxos_group
ReadRepair | read_repair_group
Repair | repair_group
+ RouteIndex | route_index_group
Storage | storage_group
StorageProxy | storage_proxy_group
Streaming | streaming_group
diff --git a/modules/accord b/modules/accord
index 21cdaf5d28..778c45cd97 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 21cdaf5d280965cfdc690d385375635b498bc9f9
+Subproject commit 778c45cd977576a901abf24a9759872d36fde056
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
index 0dba15a421..1b2e041c16 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
@@ -18,57 +18,230 @@
package org.apache.cassandra.db.virtual;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import accord.local.CommandStores;
+import accord.primitives.TxnId;
+import accord.utils.async.AsyncChain;
+import accord.utils.async.AsyncChains;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordCommandStore;
+import org.apache.cassandra.service.accord.AccordKeyspace;
import org.apache.cassandra.service.accord.AccordService;
-import org.apache.cassandra.service.accord.IAccordService;
+import org.apache.cassandra.service.accord.AccordStateCache;
+import
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
+import org.apache.cassandra.service.consensus.migration.TableMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.Clock;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
public class AccordVirtualTables
{
- private AccordVirtualTables()
- {
-
- }
+ private AccordVirtualTables() {}
public static Collection<VirtualTable> getAll(String keyspace)
{
if (!DatabaseDescriptor.getAccordTransactionsEnabled())
return Collections.emptyList();
- return Arrays.asList(
- new Epoch(keyspace)
+ return List.of(
+ new CommandStoreCache(keyspace),
+ new MigrationState(keyspace),
+ new CoordinationStatus(keyspace)
);
}
- @VisibleForTesting
- public static final class Epoch extends AbstractVirtualTable
+ public static final class CommandStoreCache extends AbstractVirtualTable
{
+ private CommandStoreCache(String keyspace)
+ {
+ super(parse(keyspace,
+ "Accord Command Store Cache Metrics",
+ "CREATE TABLE accord_command_store_cache(\n" +
+ " id int,\n" +
+ " scope text,\n" +
+ " queries bigint,\n" +
+ " hits bigint,\n" +
+ " misses bigint,\n" +
+ " PRIMARY KEY (id, scope)" +
+ ')'));
+ }
- protected Epoch(String keyspace)
+ @Override
+ public DataSet data()
{
- super(parse(keyspace, "Accord Epochs",
- "CREATE TABLE accord_epochs(\n" +
- " epoch bigint,\n" +
- " PRIMARY KEY ( (epoch) )" +
- ")"));
+ CommandStores stores = ((AccordService)
AccordService.instance()).node().commandStores();
+
+ AsyncChain<List<Map<String, AccordStateCache.ImmutableStats>>>
statsByStoreChain = stores.map(store -> {
+ Map<String, AccordStateCache.ImmutableStats> snapshots = new
HashMap<>(3);
+ AccordCommandStore accordStore = (AccordCommandStore)
store.commandStore();
+ snapshots.put(AccordKeyspace.COMMANDS,
accordStore.commandCache().statsSnapshot());
+ snapshots.put(AccordKeyspace.COMMANDS_FOR_KEY,
accordStore.commandsForKeyCache().statsSnapshot());
+ snapshots.put(AccordKeyspace.TIMESTAMPS_FOR_KEY,
accordStore.timestampsForKeyCache().statsSnapshot());
+ return snapshots;
+ });
+
+ List<Map<String, AccordStateCache.ImmutableStats>> statsByStore =
AsyncChains.getBlockingAndRethrow(statsByStoreChain);
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (int storeID : stores.ids())
+ {
+ Map<String, AccordStateCache.ImmutableStats> storeStats =
statsByStore.get(storeID);
+ addRow(storeStats.get(AccordKeyspace.COMMANDS), result,
storeID, AccordKeyspace.COMMANDS);
+ addRow(storeStats.get(AccordKeyspace.COMMANDS_FOR_KEY),
result, storeID, AccordKeyspace.COMMANDS_FOR_KEY);
+ addRow(storeStats.get(AccordKeyspace.TIMESTAMPS_FOR_KEY),
result, storeID, AccordKeyspace.TIMESTAMPS_FOR_KEY);
+ }
+
+ return result;
+ }
+
+ private static void addRow(AccordStateCache.ImmutableStats stats,
SimpleDataSet result, int storeID, String scope)
+ {
+ result.row(storeID, scope);
+ result.column("queries", stats.queries);
+ result.column("hits", stats.hits);
+ result.column("misses", stats.misses);
+ }
+ }
+
+ public static final class MigrationState extends AbstractVirtualTable
+ {
+ private static final Logger logger =
LoggerFactory.getLogger(MigrationState.class);
+
+ private MigrationState(String keyspace)
+ {
+ super(parse(keyspace,
+ "Consensus Migration State",
+ "CREATE TABLE consensus_migration_state(\n" +
+ " keyspace_name text,\n" +
+ " table_name text,\n" +
+ " table_id uuid,\n" +
+ " target_protocol text,\n" +
+ " transactional_mode text,\n" +
+ " transactional_migration_from text,\n" +
+ " migrated_ranges frozen<list<text>>,\n" +
+ " migrating_ranges_by_epoch frozen<map<bigint,
list<text>>>,\n" +
+ " PRIMARY KEY (keyspace_name, table_name)" +
+ ')'));
}
@Override
public DataSet data()
{
- IAccordService accord = AccordService.instance();
+ ConsensusMigrationState snapshot =
ClusterMetadata.current().consensusMigrationState;
+ Collection<TableMigrationState> tableStates =
snapshot.tableStates();
+ return data(tableStates);
+ }
+
+ @Override
+ public DataSet data(DecoratedKey key)
+ {
+ String keyspaceName = UTF8Type.instance.compose(key.getKey());
+ Keyspace keyspace =
Schema.instance.getKeyspaceInstance(keyspaceName);
- long epoch = accord.currentEpoch();
+ if (keyspace == null)
+ throw new InvalidRequestException("Unknown keyspace: '" +
keyspaceName + '\'');
+ List<TableId> tableIDs = keyspace.getColumnFamilyStores()
+ .stream()
+
.map(ColumnFamilyStore::getTableId)
+ .collect(Collectors.toList());
+
+ ConsensusMigrationState snapshot =
ClusterMetadata.current().consensusMigrationState;
+ Collection<TableMigrationState> tableStates =
snapshot.tableStatesFor(tableIDs);
+
+ return data(tableStates);
+ }
+
+ private SimpleDataSet data(Collection<TableMigrationState> tableStates)
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (TableMigrationState state : tableStates)
+ {
+ TableMetadata table =
Schema.instance.getTableMetadata(state.tableId);
+
+ if (table == null)
+ {
+ logger.warn("Table {}.{} (id: {}) no longer exists. It may
have been dropped.",
+ state.keyspaceName, state.tableName,
state.tableId);
+ continue;
+ }
+
+ result.row(state.keyspaceName, state.tableName);
+ result.column("table_id", state.tableId.asUUID());
+ result.column("target_protocol",
state.targetProtocol.toString());
+ result.column("transactional_mode",
table.params.transactionalMode.toString());
+ result.column("transactional_migration_from",
table.params.transactionalMode.toString());
+
+ List<String> primitiveMigratedRanges =
state.migratedRanges.stream().map(Objects::toString).collect(toImmutableList());
+ result.column("migrated_ranges", primitiveMigratedRanges);
+
+ Map<Long, List<String>> primitiveRangesByEpoch = new
LinkedHashMap<>();
+ for (Map.Entry<org.apache.cassandra.tcm.Epoch,
List<Range<Token>>> entry : state.migratingRangesByEpoch.entrySet())
+ primitiveRangesByEpoch.put(entry.getKey().getEpoch(),
entry.getValue().stream().map(Objects::toString).collect(toImmutableList()));
+
+ result.column("migrating_ranges_by_epoch",
primitiveRangesByEpoch);
+ }
+
+ return result;
+ }
+ }
+
+ public static final class CoordinationStatus extends AbstractVirtualTable
+ {
+ private CoordinationStatus(String keyspace)
+ {
+ super(parse(keyspace,
+ "Accord Coordination Status",
+ "CREATE TABLE accord_coordination_status(\n" +
+ " node_id int,\n" +
+ " epoch bigint,\n" +
+ " start_time_micros bigint,\n" +
+ " duration_millis bigint,\n" +
+ " kind text,\n" +
+ " domain text,\n" +
+ " PRIMARY KEY (node_id, epoch, start_time_micros)" +
+ ')'));
+ }
+
+ @Override
+ public DataSet data()
+ {
+ AccordService accord = (AccordService) AccordService.instance();
SimpleDataSet result = new SimpleDataSet(metadata());
- result.row(epoch);
+
+ for (TxnId txn : accord.node().coordinating().keySet())
+ {
+ result.row(txn.node.id, txn.epoch(), txn.hlc());
+ result.column("duration_millis",
Clock.Global.currentTimeMillis() - TimeUnit.MICROSECONDS.toMillis(txn.hlc()));
+ result.column("kind", txn.kind().toString());
+ result.column("domain", txn.domain().toString());
+ }
+
return result;
}
}
diff --git a/src/java/org/apache/cassandra/metrics/AccordMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordMetrics.java
index 4dc053ccee..765fe154ee 100644
--- a/src/java/org/apache/cassandra/metrics/AccordMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordMetrics.java
@@ -57,8 +57,8 @@ public class AccordMetrics
public static final String RECOVERY_DELAY = "RecoveryDelay";
public static final String RECOVERY_TIME = "RecoveryTime";
public static final String FAST_PATH_TO_TOTAL = "FastPathToTotal";
- public static final String ACCORD_REPLICA = "accord-replica";
- public static final String ACCORD_COORDINATOR = "accord-coordinator";
+ public static final String ACCORD_REPLICA = "AccordReplica";
+ public static final String ACCORD_COORDINATOR = "AccordCoordinator";
/**
* The time between start on the coordinator and commit on this replica.
diff --git a/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java
index f63fedf282..b00793087e 100644
--- a/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordStateCacheMetrics.java
@@ -34,19 +34,19 @@ public class AccordStateCacheMetrics extends
CacheAccessMetrics
private final Map<String, CacheAccessMetrics> instanceMetrics = new
ConcurrentHashMap<>(2);
- private final String type;
+ private final String scope;
- public AccordStateCacheMetrics(String type)
+ public AccordStateCacheMetrics(String scope)
{
- super(new DefaultNameFactory(TYPE_NAME, type));
+ super(new DefaultNameFactory(TYPE_NAME, scope));
objectSize = Metrics.histogram(factory.createMetricName(OBJECT_SIZE),
false);
- this.type = type;
+ this.scope = scope;
}
public CacheAccessMetrics forInstance(Class<?> klass)
{
// cannot make Class<?> hashCode deterministic, as cannot rewrite - so
cannot safely use as Map key if want deterministic simulation
// (or we need to create extra hoops to catch this specific case in
method rewriting)
- return instanceMetrics.computeIfAbsent(klass.getSimpleName(), k -> new
CacheAccessMetrics(new DefaultNameFactory(TYPE_NAME, String.format("%s-%s",
type, k))));
+ return instanceMetrics.computeIfAbsent(klass.getSimpleName(), k -> new
CacheAccessMetrics(new DefaultNameFactory(TYPE_NAME, String.format("%s-%s",
scope, k))));
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 0fd719d5fb..7328a31de6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -39,7 +39,7 @@ import
org.apache.cassandra.service.accord.api.AccordRoutingKey;
public class AccordCommandStores extends CommandStores implements CacheSize
{
- public static final String ACCORD_STATE_CACHE = "accord-state-cache";
+ public static final String ACCORD_STATE_CACHE = "AccordStateCache";
private final CacheSizeMetrics cacheSizeMetrics;
private long cacheSize;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 01331bb2d6..f853c329c6 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -649,7 +649,6 @@ public class AccordService implements IAccordService,
Shutdownable
return node.id();
}
- @VisibleForTesting
public Node node()
{
return node;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
index b76d63b830..085504f092 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordStateCache.java
@@ -76,6 +76,20 @@ public class AccordStateCache extends
IntrusiveLinkedList<AccordCachingState<?,?
long misses;
}
+ public static final class ImmutableStats
+ {
+ public final long queries;
+ public final long hits;
+ public final long misses;
+
+ public ImmutableStats(Stats stats)
+ {
+ queries = stats.queries;
+ hits = stats.hits;
+ misses = stats.misses;
+ }
+ }
+
private ImmutableList<Instance<?, ?, ?>> instances = ImmutableList.of();
private final ExecutorPlus loadExecutor, saveExecutor;
@@ -215,6 +229,11 @@ public class AccordStateCache extends
IntrusiveLinkedList<AccordCachingState<?,?
}
}
+ public ImmutableStats stats()
+ {
+ return new ImmutableStats(stats);
+ }
+
private Instance<?, ?, ?> instanceForNode(AccordCachingState<?, ?> node)
{
return instances.get(node.index);
@@ -592,6 +611,11 @@ public class AccordStateCache extends
IntrusiveLinkedList<AccordCachingState<?,?
return stats;
}
+ public ImmutableStats statsSnapshot()
+ {
+ return new ImmutableStats(stats);
+ }
+
public Stats globalStats()
{
return AccordStateCache.this.stats;
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index bbb1076b23..4787e2105b 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord.txn;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
@@ -30,6 +31,7 @@ import accord.api.Data;
import accord.primitives.Timestamp;
import accord.utils.async.AsyncChain;
import accord.utils.async.AsyncChains;
+import org.apache.cassandra.concurrent.DebuggableTask;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
@@ -43,6 +45,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.service.accord.api.PartitionKey;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.ObjectSizes;
import static org.apache.cassandra.utils.ByteBufferUtil.readWithVIntLength;
@@ -136,7 +139,7 @@ public class TxnNamedRead extends
AbstractSerialized<ReadCommand>
private AsyncChain<Data> performLocalRead(SinglePartitionReadCommand
command, int nowInSeconds)
{
- return AsyncChains.ofCallable(Stage.READ.executor(), () ->
+ Callable<Data> readCallable = () ->
{
SinglePartitionReadCommand read =
command.withNowInSec(nowInSeconds);
@@ -153,10 +156,53 @@ public class TxnNamedRead extends
AbstractSerialized<ReadCommand>
}
return result;
}
- });
+ };
+
+ return AsyncChains.ofCallable(Stage.READ.executor(), readCallable,
(callable, receiver) ->
+ new DebuggableTask.RunnableDebuggableTask()
+ {
+ private final long approxCreationTimeNanos =
MonotonicClock.Global.approxTime.now();
+ private volatile long approxStartTimeNanos;
+
+ @Override
+ public void run()
+ {
+ approxStartTimeNanos =
MonotonicClock.Global.approxTime.now();
+
+ try
+ {
+ Data call = callable.call();
+ receiver.accept(call, null);
+ }
+ catch (Throwable t)
+ {
+ logger.debug("AsyncChain Callable threw an Exception",
t);
+ receiver.accept(null, t);
+ }
+ }
+
+ @Override
+ public long creationTimeNanos()
+ {
+ return approxCreationTimeNanos;
+ }
+
+ @Override
+ public long startTimeNanos()
+ {
+ return approxStartTimeNanos;
+ }
+
+ @Override
+ public String description()
+ {
+ return command.toCQLString();
+ }
+ }
+ );
}
- static final IVersionedSerializer<TxnNamedRead> serializer = new
IVersionedSerializer<TxnNamedRead>()
+ static final IVersionedSerializer<TxnNamedRead> serializer = new
IVersionedSerializer<>()
{
@Override
public void serialize(TxnNamedRead read, DataOutputPlus out, int
version) throws IOException
diff --git
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
index 7364db38c0..fa6b146d77 100644
---
a/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
+++
b/src/java/org/apache/cassandra/service/consensus/migration/ConsensusMigrationState.java
@@ -78,6 +78,16 @@ public class ConsensusMigrationState implements
MetadataValue<ConsensusMigration
"version", PojoToString.CURRENT_VERSION);
}
+ public Collection<TableMigrationState> tableStates()
+ {
+ return tableStates.values();
+ }
+
+ public List<TableMigrationState> tableStatesFor(List<TableId> tableIDs)
+ {
+ return
tableIDs.stream().map(tableStates::get).collect(Collectors.toList());
+ }
+
private List<Map<String, Object>> tableStatesAsMaps(@Nullable Set<String>
keyspaceNames,
@Nullable Set<String>
tableNames)
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
index b0c3902ad1..36220e3853 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
@@ -21,16 +21,17 @@ package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
+import accord.impl.SimpleProgressLog;
import com.datastax.driver.core.Session;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
@@ -39,12 +40,15 @@ import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.Row;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.utils.Throwables;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class QueriesTableTest extends TestBaseImpl
@@ -57,7 +61,10 @@ public class QueriesTableTest extends TestBaseImpl
public static void createCluster() throws IOException
{
SHARED_CLUSTER =
init(Cluster.build(1).withInstanceInitializer(QueryDelayHelper::install)
- .withConfig(c ->
c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP)).start());
+ .withConfig(c ->
c.with(Feature.NATIVE_PROTOCOL, Feature.GOSSIP)
+
.set("write_request_timeout", "10s")
+
.set("transaction_timeout", "15s")).start());
+
DRIVER_CLUSTER = JavaDriverUtils.create(SHARED_CLUSTER);
SESSION = DRIVER_CLUSTER.connect();
}
@@ -79,19 +86,31 @@ public class QueriesTableTest extends TestBaseImpl
public void shouldExposeReadsAndWrites() throws Throwable
{
SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int
primary key, v int)");
-
- boolean readVisible = false;
- boolean coordinatorReadVisible = false;
- boolean writeVisible = false;
- boolean coordinatorWriteVisible = false;
-
SESSION.executeAsync("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES
(0, 0)");
SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0");
// Wait until the coordinator/local read and write are visible:
+ Awaitility.await()
+ .atMost(60, SECONDS)
+ .pollInterval(1, SECONDS)
+ .dontCatchUncaughtExceptions()
+
.untilAsserted(QueriesTableTest::assertReadsAndWritesVisible);
+
+ // Issue another read and write to unblock the original queries in
progress:
+ SESSION.execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (0,
0)");
+ SESSION.execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0");
+
+ waitForQueriesToFinish();
+ }
+
+ private static void assertReadsAndWritesVisible()
+ {
SimpleQueryResult result =
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM
system_views.queries");
- while (result.toObjectArrays().length < 4)
- result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT *
FROM system_views.queries");
+
+ boolean readVisible = false;
+ boolean coordinatorReadVisible = false;
+ boolean writeVisible = false;
+ boolean coordinatorWriteVisible = false;
while (result.hasNext())
{
@@ -105,32 +124,38 @@ public class QueriesTableTest extends TestBaseImpl
coordinatorWriteVisible |=
threadId.contains("Native-Transport-Requests") && task.contains("INSERT");
}
- // Issue another read and write to unblock the original queries in
progress:
- SESSION.execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (0,
0)");
- SESSION.execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = 0");
-
assertTrue(readVisible);
assertTrue(coordinatorReadVisible);
assertTrue(writeVisible);
assertTrue(coordinatorWriteVisible);
-
- waitForQueriesToFinish();
}
@Test
public void shouldExposeCAS() throws Throwable
{
SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".cas_tbl (k
int primary key, v int)");
-
- boolean readVisible = false;
- boolean coordinatorUpdateVisible = false;
-
SESSION.executeAsync("UPDATE " + KEYSPACE + ".cas_tbl SET v = 10 WHERE
k = 0 IF v = 0");
// Wait until the coordinator update and local read required by the
CAS operation are visible:
+ Awaitility.await()
+ .atMost(60, SECONDS)
+ .pollInterval(1, SECONDS)
+ .dontCatchUncaughtExceptions()
+ .untilAsserted(QueriesTableTest::assertCasVisible);
+
+ // Issue a read to unblock the read generated by the original CAS
operation:
+ SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".cas_tbl WHERE k =
0");
+
+
+ waitForQueriesToFinish();
+ }
+
+ private static void assertCasVisible()
+ {
SimpleQueryResult result =
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM
system_views.queries");
- while (result.toObjectArrays().length < 2)
- result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT *
FROM system_views.queries");
+
+ boolean readVisible = false;
+ boolean coordinatorUpdateVisible = false;
while (result.hasNext())
{
@@ -142,26 +167,81 @@ public class QueriesTableTest extends TestBaseImpl
coordinatorUpdateVisible |=
threadId.contains("Native-Transport-Requests") && task.contains("UPDATE");
}
- // Issue a read to unblock the read generated by the original CAS
operation:
- SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".cas_tbl WHERE k =
0");
-
assertTrue(readVisible);
assertTrue(coordinatorUpdateVisible);
+ }
+
+ @Test
+ public void shouldExposeTransaction() throws Throwable
+ {
+ SHARED_CLUSTER.schemaChange("CREATE TABLE " + KEYSPACE + ".accord_tbl
(k int primary key, v int) WITH transactional_mode='full'");
+
+ // Disable recovery to make sure only one local read occurs:
+ for (IInvokableInstance instance : SHARED_CLUSTER)
+ instance.runOnInstance(() -> SimpleProgressLog.PAUSE_FOR_TEST =
true);
+
+ String update = "BEGIN TRANSACTION\n" +
+ " LET row1 = (SELECT * FROM " + KEYSPACE +
".accord_tbl WHERE k = 0);\n" +
+ " SELECT row1.k, row1.v;\n" +
+ " IF row1.v = 0 THEN\n" +
+ " UPDATE " + KEYSPACE + ".accord_tbl SET v = 10
WHERE k = 0;\n" +
+ " END IF\n" +
+ "COMMIT TRANSACTION";
+
+ SESSION.executeAsync(update);
+
+ // Wait until the coordinator update and local read required by the
CAS operation are visible:
+ Awaitility.await()
+ .atMost(60, SECONDS)
+ .pollInterval(1, SECONDS)
+ .dontCatchUncaughtExceptions()
+ .untilAsserted(QueriesTableTest::assertTransactionVisible);
+
+ // Issue a read to unblock the read generated by the original CAS
operation:
+ SESSION.executeAsync("SELECT * FROM " + KEYSPACE + ".accord_tbl WHERE
k = 0");
waitForQueriesToFinish();
}
+ private static void assertTransactionVisible()
+ {
+ SimpleQueryResult queries =
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM
system_views.queries");
+
+ boolean readVisible = false;
+ boolean coordinatorTxnVisible = false;
+
+ while (queries.hasNext())
+ {
+ Row row = queries.next();
+ String threadId = row.get("thread_id").toString();
+ String task = row.get("task").toString();
+
+ readVisible |= threadId.contains("Read") &&
task.contains("SELECT");
+ coordinatorTxnVisible |=
threadId.contains("Native-Transport-Requests") && task.contains("BEGIN
TRANSACTION");
+ }
+
+ assertTrue(readVisible);
+ assertTrue(coordinatorTxnVisible);
+
+ SimpleQueryResult txns =
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM
system_views.accord_coordination_status");
+ assertTrue(txns.hasNext());
+ Row txn = txns.next();
+ assertEquals(1, txn.getInteger("node_id").intValue());
+ assertEquals("Key", txn.getString("domain"));
+ }
+
private static void waitForQueriesToFinish() throws InterruptedException
{
// Continue to query the "queries" table until nothing is in
progress...
SimpleQueryResult result =
SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM
system_views.queries");
while (result.hasNext())
{
- TimeUnit.SECONDS.sleep(1);
+ SECONDS.sleep(1);
result = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT *
FROM system_views.queries");
}
}
+ @SuppressWarnings("resource")
public static class QueryDelayHelper
{
private static final CyclicBarrier readBarrier = new CyclicBarrier(2);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
index 235df7ebf7..0e9a47f71c 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
@@ -24,7 +24,7 @@ import java.util.Map;
import java.util.function.Function;
import com.google.common.base.Throwables;
-import org.apache.cassandra.service.consensus.TransactionalMode;
+
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -33,6 +33,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.metrics.AccordMetrics;
@@ -42,6 +44,7 @@ import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.service.accord.exceptions.ReadPreemptedException;
import org.apache.cassandra.service.accord.exceptions.WritePreemptedException;
+import org.apache.cassandra.service.consensus.TransactionalMode;
import org.assertj.core.data.Offset;
import static org.assertj.core.api.Assertions.assertThat;
@@ -225,6 +228,16 @@ public class AccordMetricsTest extends AccordTestBase
assertThat(metric.apply(AccordMetrics.RECOVERY_TIME)).isEqualTo(recoveries);
assertThat(metric.apply(AccordMetrics.DEPENDENCIES)).isEqualTo(fastPaths +
slowPaths);
+ // Verify that coordinator metrics are published to the appropriate
virtual table:
+ SimpleQueryResult res = SHARED_CLUSTER.get(node + 1)
+
.executeInternalWithResult("SELECT * FROM
system_metrics.accord_coordinator_group WHERE scope = ?", scope);
+ while (res.hasNext())
+ {
+ Row metricRow = res.next();
+ String name = metricRow.getString("name");
+ assertThat(metrics).containsKey(name);
+ }
+
if ((fastPaths + slowPaths) > 0)
{
String fastPathToTotalName =
nameFactory.createMetricName(AccordMetrics.FAST_PATH_TO_TOTAL + "." +
RatioGaugeSet.MEAN_RATIO).getMetricName();
@@ -242,13 +255,29 @@ public class AccordMetricsTest extends AccordTestBase
assertThat(metric.apply(AccordMetrics.APPLY_LATENCY)).isEqualTo(applications);
assertThat(metric.apply(AccordMetrics.APPLY_DURATION)).isEqualTo(applications);
assertThat(metric.apply(AccordMetrics.PARTIAL_DEPENDENCIES)).isEqualTo(executions);
+
+ // Verify that replica metrics are published to the appropriate
virtual table:
+ SimpleQueryResult vtableResults = SHARED_CLUSTER.get(node + 1)
+
.executeInternalWithResult("SELECT * FROM system_metrics.accord_replica_group
WHERE scope = ?", scope);
+
+ while (vtableResults.hasNext())
+ {
+ Row metricRow = vtableResults.next();
+ String name = metricRow.getString("name");
+ assertThat(metrics).containsKey(name);
+ }
+
+ // Verify that per-store global cache stats are published to the
appropriate virtual table:
+ SimpleQueryResult storeCacheResults = SHARED_CLUSTER.get(node + 1)
+
.executeInternalWithResult("SELECT * FROM
system_views.accord_command_store_cache");
+ assertThat(storeCacheResults).hasNext();
}
private Map<Integer, Map<String, Long>> getMetrics()
{
Map<Integer, Map<String, Long>> metrics = new HashMap<>();
for (int i = 0; i < SHARED_CLUSTER.size(); i++)
- metrics.put(i, SHARED_CLUSTER.get(i +
1).metrics().getCounters(name ->
name.startsWith("org.apache.cassandra.metrics.accord-")));
+ metrics.put(i, SHARED_CLUSTER.get(i +
1).metrics().getCounters(name ->
name.startsWith("org.apache.cassandra.metrics.Accord")));
return metrics;
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
index 519ea15f70..a63642f819 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java
@@ -28,8 +28,10 @@ import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.ImmutableList;
import org.junit.After;
@@ -39,8 +41,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
-import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.Config.PaxosVariant;
@@ -57,6 +59,8 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -84,11 +88,14 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.JsonUtils;
import org.apache.cassandra.utils.PojoToString;
-import org.yaml.snakeyaml.Yaml;
-import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
import static org.apache.cassandra.Util.spinUntilSuccess;
import static org.apache.cassandra.cql3.QueryProcessor.executeInternal;
import static org.apache.cassandra.db.SystemKeyspace.CONSENSUS_MIGRATION_STATE;
@@ -101,7 +108,6 @@ import static
org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME;
import static
org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.ConsensusRoutingDecision.paxosV2;
import static
org.apache.cassandra.service.paxos.PaxosState.MaybePromise.Outcome.PROMISE;
import static org.assertj.core.api.Fail.fail;
-import static org.junit.Assert.*;
/*
* This test suite is intended to serve as an integration test with some
pretty good visibility into actual execution
@@ -136,7 +142,7 @@ public class AccordMigrationTest extends AccordTestBase
// To create a precise repair where the repaired range is fully contained
in a locally replicated range
// we need to align with this token. The local ranges are
(9223372036854775805,-1] and (-1,9223372036854775805]
// No idea why the partitioner creates such an
- private Token maxAlignedWithLocalRanges = new
LongToken(9223372036854775805L);
+ private final Token maxAlignedWithLocalRanges = new
LongToken(9223372036854775805L);
@Override
protected Logger logger()
@@ -676,10 +682,17 @@ public class AccordMigrationTest extends AccordTestBase
for (IInvokableInstance instance : SHARED_CLUSTER)
{
ConsensusMigrationState snapshot =
getMigrationStateSnapshot(instance);
+
for (String tableId : tableIds)
{
TableMigrationState state =
snapshot.tableStates.get(TableId.fromString(tableId));
assertNotNull(state);
+
+ SimpleQueryResult vtableResult =
+ instance.executeInternalWithResult("SELECT * FROM
system_views.consensus_migration_state WHERE keyspace_name = ? AND table_name =
? ",
+
state.keyspaceName, state.tableName);
+ assertTrue(vtableResult.hasNext());
+
assertEquals(KEYSPACE, state.keyspaceName);
assertEquals(tableName, state.tableName);
assertEquals(target, state.targetProtocol);
@@ -691,11 +704,31 @@ public class AccordMigrationTest extends AccordTestBase
assertEquals(0, state.migratingRangesByEpoch.size());
else
assertEquals(migratingRanges,
state.migratingRangesByEpoch.values().iterator().next());
+
+ Row vtableState = vtableResult.next();
+ assertVtableState(state, vtableState);
}
}
});
}
+ private static void assertVtableState(TableMigrationState expectedState,
Row vtableState)
+ {
+ List<String> vtableMigratedRanges =
vtableState.getList("migrated_ranges");
+ assertEquals(expectedState.migratedRanges,
vtableMigratedRanges.stream().map(Range::fromString).collect(Collectors.toList()));
+
+ Map<Long, List<String>> vtableMigratingByEpoch =
vtableState.get("migrating_ranges_by_epoch");
+ Map<Long, List<Range<Token>>> pojoMigratingByEpoch = new
LinkedHashMap<>();
+
+ for (Map.Entry<Long, List<String>> entry :
vtableMigratingByEpoch.entrySet())
+ pojoMigratingByEpoch.put(entry.getKey(),
entry.getValue().stream().map(Range::fromString).collect(toImmutableList()));
+
+ if (expectedState.migratingRanges.isEmpty())
+ assertEquals(0, pojoMigratingByEpoch.size());
+ else
+ assertEquals(expectedState.migratingRanges,
pojoMigratingByEpoch.values().iterator().next());
+ }
+
/**
* Save a promise that is after the committed one to make a subsequent
read not linearizable
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]