This is an automated email from the ASF dual-hosted git repository. aleksey pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit dc2d113677e940e9a099b37e05887c66cc717e39 Author: Aleksey Yeschenko <[email protected]> AuthorDate: Thu Oct 24 13:02:52 2024 +0100 Implement missing virtual tables for Accord debugging patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith and David Capwell for CASSANDRA-20062 --- .../org/apache/cassandra/config/AccordSpec.java | 15 + .../cassandra/db/virtual/AccordDebugKeyspace.java | 634 +++++++++++++++++++++ .../cassandra/db/virtual/AccordVirtualTables.java | 316 ---------- .../cassandra/db/virtual/VirtualKeyspace.java | 20 +- .../apache/cassandra/db/virtual/VirtualTable.java | 8 - .../apache/cassandra/schema/SchemaConstants.java | 4 +- .../apache/cassandra/service/CassandraDaemon.java | 4 + .../cassandra/service/accord/AccordService.java | 6 + .../cassandra/distributed/impl/InstanceConfig.java | 1 + .../cassandra/distributed/shared/ClusterUtils.java | 7 +- .../distributed/test/QueriesTableTest.java | 8 - .../distributed/test/accord/AccordMetricsTest.java | 7 +- .../test/accord/AccordMigrationTest.java | 6 +- ...ablesTest.java => AccordDebugKeyspaceTest.java} | 26 +- 14 files changed, 693 insertions(+), 369 deletions(-) diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java b/src/java/org/apache/cassandra/config/AccordSpec.java index b445e4492e..f062c9a570 100644 --- a/src/java/org/apache/cassandra/config/AccordSpec.java +++ b/src/java/org/apache/cassandra/config/AccordSpec.java @@ -38,6 +38,21 @@ public class AccordSpec public volatile boolean enable_journal_compaction = true; + /** + * Enables the virtual Accord debug-only keyspace with tables + * that expose internal state to aid the developers working + * on Accord implementation. + * <p/> + * These tables can and will change and/or go away at any point, + * including in a minor release, are not to be considered part of the API, + * and are NOT to be relied on for anything. + * <p/> + * Only enable this keyspace if you are working on Accord and + * need to debug an issue with Accord implementation, or if an Accord + * developer asked you to. + */ + public boolean enable_virtual_debug_only_keyspace = false; + public enum QueueShardModel { /** diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java new file mode 100644 index 0000000000..77fda644a2 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -0,0 +1,634 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.api.RoutingKey; +import accord.impl.DurabilityScheduling; +import accord.impl.progresslog.DefaultProgressLog; +import accord.impl.progresslog.TxnStateKind; +import accord.local.CommandStores; +import accord.local.DurableBefore; +import accord.local.MaxConflicts; +import accord.local.RedundantBefore; +import accord.local.RejectBefore; +import accord.primitives.Status; +import accord.primitives.TxnId; +import accord.utils.Invariants; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.TupleType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.marshal.UUIDType; +import org.apache.cassandra.dht.NormalizedRanges; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.accord.AccordCache; +import org.apache.cassandra.service.accord.AccordCommandStores; +import org.apache.cassandra.service.accord.AccordExecutor; +import org.apache.cassandra.service.accord.AccordKeyspace; +import org.apache.cassandra.service.accord.AccordService; +import org.apache.cassandra.service.accord.CommandStoreTxnBlockedGraph; +import org.apache.cassandra.service.accord.api.AccordRoutingKey; +import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; +import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; +import org.apache.cassandra.service.consensus.migration.TableMigrationState; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.Pair; + +import static java.lang.String.format; +import static java.util.Comparator.comparing; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static accord.utils.async.AsyncChains.getBlockingAndRethrow; +import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; + +public class AccordDebugKeyspace extends VirtualKeyspace +{ + public static final String DURABILITY_SCHEDULING = "durability_scheduling"; + public static final String DURABLE_BEFORE = "durable_before"; + public static final String EXECUTOR_CACHE = "executor_cache"; + public static final String MAX_CONFLICTS = "max_conflicts"; + public static final String MIGRATION_STATE = "migration_state"; + public static final String PROGRESS_LOG = "progress_log"; + public static final String REDUNDANT_BEFORE = "redundant_before"; + public static final String REJECT_BEFORE = "reject_before"; + public static final String TXN_BLOCKED_BY = "txn_blocked_by"; + + // {table_id, token} or {table_id, +Inf/-Inf} + private static final TupleType ROUTING_KEY_TYPE = new TupleType(List.of(UUIDType.instance, UTF8Type.instance)); + private static final String ROUTING_KEY_TYPE_STRING = ROUTING_KEY_TYPE.asCQL3Type().toString(); + + public static final AccordDebugKeyspace instance = new AccordDebugKeyspace(); + + private AccordDebugKeyspace() + { + super(VIRTUAL_ACCORD_DEBUG, List.of( + new DurabilitySchedulingTable(), + new DurableBeforeTable(), + new ExecutorCacheTable(), + new MaxConflictsTable(), + new MigrationStateTable(), + new ProgressLogTable(), + new RedundantBeforeTable(), + new RejectBeforeTable(), + new TxnBlockedByTable() + )); + } + + // TODO (consider): use a different type for the three timestamps in micros + public static final class DurabilitySchedulingTable extends AbstractVirtualTable + { + private DurabilitySchedulingTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, DURABILITY_SCHEDULING, + "Accord per-Range Durability Scheduling State", + "CREATE TABLE %s (\n" + + format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + + format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + + " node_offset int,\n" + + " \"index\" int,\n" + + " number_of_splits int,\n" + + " range_started_at bigint,\n" + + " cycle_started_at bigint,\n" + + " retry_delay_micros bigint,\n" + + " is_defunct boolean,\n" + + " PRIMARY KEY ((range_start, range_end))" + + ')')); + } + + @Override + public DataSet data() + { + DurabilityScheduling.ImmutableView view = ((AccordService) AccordService.instance()).durabilityScheduling(); + + SimpleDataSet ds = new SimpleDataSet(metadata()); + while (view.advance()) + { + ds.row(decompose(view.range().start()), decompose(view.range().end())) + .column("node_offset", view.nodeOffset()) + .column("index", view.index()) + .column("number_of_splits", view.numberOfSplits()) + .column("range_started_at", view.rangeStartedAtMicros()) + .column("cycle_started_at", view.cycleStartedAtMicros()) + .column("retry_delay_micros", view.retryDelayMicros()) + .column("is_defunct", view.isDefunct()); + } + return ds; + } + } + + public static final class DurableBeforeTable extends AbstractVirtualTable + { + private DurableBeforeTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, DURABLE_BEFORE, + "Accord Node's DurableBefore State", + "CREATE TABLE %s (\n" + + format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + + format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + + " majority_before text,\n" + + " universal_before text,\n" + + " PRIMARY KEY ((range_start, range_end))" + + ')')); + } + + @Override + public DataSet data() + { + DurableBefore durableBefore = AccordService.instance().node().durableBefore(); + return durableBefore.foldlWithBounds( + (entry, ds, start, end) -> { + ds.row(decompose(start), decompose(end)) + .column("majority_before", entry.majorityBefore.toString()) + .column("universal_before", entry.universalBefore.toString()); + return ds; + }, + new SimpleDataSet(metadata()), + ignore -> false + ); + } + } + + public static final class ExecutorCacheTable extends AbstractVirtualTable + { + private ExecutorCacheTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, EXECUTOR_CACHE, + "Accord Executor Cache Metrics", + "CREATE TABLE %s (\n" + + " executor_id int,\n" + + " scope text,\n" + + " queries bigint,\n" + + " hits bigint,\n" + + " misses bigint,\n" + + " PRIMARY KEY (executor_id, scope)" + + ')')); + } + + @Override + public DataSet data() + { + AccordCommandStores stores = (AccordCommandStores) AccordService.instance().node().commandStores(); + SimpleDataSet ds = new SimpleDataSet(metadata()); + for (AccordExecutor executor : stores.executors()) + { + try (AccordExecutor.ExclusiveGlobalCaches cache = executor.lockCaches()) + { + addRow(ds, executor.executorId(), AccordKeyspace.COMMANDS, cache.commands.statsSnapshot()); + addRow(ds, executor.executorId(), AccordKeyspace.COMMANDS_FOR_KEY, cache.commandsForKey.statsSnapshot()); + addRow(ds, executor.executorId(), AccordKeyspace.TIMESTAMPS_FOR_KEY, cache.timestampsForKey.statsSnapshot()); + } + } + return ds; + } + + private static void addRow(SimpleDataSet ds, int executorId, String scope, AccordCache.ImmutableStats stats) + { + ds.row(executorId, scope) + .column("queries", stats.queries) + .column("hits", stats.hits) + .column("misses", stats.misses); + } + } + + + public static final class MaxConflictsTable extends AbstractVirtualTable + { + private MaxConflictsTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, MAX_CONFLICTS, + "Accord per-CommandStore MaxConflicts State", + "CREATE TABLE %s (\n" + + " command_store_id int,\n" + + format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + + format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + + " timestamp text,\n" + + " PRIMARY KEY (command_store_id, range_start, range_end)" + + ')')); + } + + @Override + public DataSet data() + { + CommandStores stores = AccordService.instance().node().commandStores(); + List<Pair<Integer, MaxConflicts>> rangeMaps = + getBlockingAndRethrow(stores.map(store -> Pair.create(store.commandStore().id(), store.commandStore().unsafeGetMaxConflicts()))); + rangeMaps.sort(comparing(p -> p.left)); + + SimpleDataSet dataSet = new SimpleDataSet(metadata()); + for (Pair<Integer, MaxConflicts> pair : rangeMaps) + { + int storeId = pair.left; + MaxConflicts maxConflicts = pair.right; + + maxConflicts.foldlWithBounds( + (timestamp, ds, start, end) -> ds.row(storeId, decompose(start), decompose(end)).column("timestamp", timestamp.toString()), + dataSet, + ignore -> false + ); + } + return dataSet; + } + } + + public static final class MigrationStateTable extends AbstractVirtualTable + { + private static final Logger logger = LoggerFactory.getLogger(MigrationStateTable.class); + + private MigrationStateTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, MIGRATION_STATE, + "Accord Consensus Migration State", + "CREATE TABLE %s (\n" + + " keyspace_name text,\n" + + " table_name text,\n" + + " table_id uuid,\n" + + " target_protocol text,\n" + + " transactional_mode text,\n" + + " transactional_migration_from text,\n" + + " migrated_ranges frozen<list<text>>,\n" + + " repair_pending_ranges frozen<list<text>>,\n" + + " migrating_ranges_by_epoch frozen<map<bigint, list<text>>>,\n" + + " PRIMARY KEY (keyspace_name, table_name)" + + ')')); + } + + @Override + public DataSet data() + { + ConsensusMigrationState snapshot = ClusterMetadata.current().consensusMigrationState; + Collection<TableMigrationState> tableStates = snapshot.tableStates(); + return data(tableStates); + } + + @Override + public DataSet data(DecoratedKey key) + { + String keyspaceName = UTF8Type.instance.compose(key.getKey()); + Keyspace keyspace = Schema.instance.getKeyspaceInstance(keyspaceName); + + if (keyspace == null) + throw new InvalidRequestException("Unknown keyspace: '" + keyspaceName + '\''); + + List<TableId> tableIDs = keyspace.getColumnFamilyStores() + .stream() + .map(ColumnFamilyStore::getTableId) + .collect(Collectors.toList()); + + ConsensusMigrationState snapshot = ClusterMetadata.current().consensusMigrationState; + Collection<TableMigrationState> tableStates = snapshot.tableStatesFor(tableIDs); + + return data(tableStates); + } + + private SimpleDataSet data(Collection<TableMigrationState> tableStates) + { + SimpleDataSet result = new SimpleDataSet(metadata()); + + for (TableMigrationState state : tableStates) + { + TableMetadata table = Schema.instance.getTableMetadata(state.tableId); + + if (table == null) + { + logger.warn("Table {}.{} (id: {}) no longer exists. It may have been dropped.", + state.keyspaceName, state.tableName, state.tableId); + continue; + } + + result.row(state.keyspaceName, state.tableName); + result.column("table_id", state.tableId.asUUID()); + result.column("target_protocol", state.targetProtocol.toString()); + result.column("transactional_mode", table.params.transactionalMode.toString()); + result.column("transactional_migration_from", table.params.transactionalMode.toString()); + + List<String> primitiveMigratedRanges = state.migratedRanges.stream().map(Objects::toString).collect(toImmutableList()); + result.column("migrated_ranges", primitiveMigratedRanges); + + List<String> primitiveRepairPendingRanges = state.repairPendingRanges.stream().map(Objects::toString).collect(toImmutableList()); + result.column("repair_pending_ranges", primitiveRepairPendingRanges); + + Map<Long, List<String>> primitiveRangesByEpoch = new LinkedHashMap<>(); + for (Map.Entry<org.apache.cassandra.tcm.Epoch, NormalizedRanges<Token>> entry : state.migratingRangesByEpoch.entrySet()) + primitiveRangesByEpoch.put(entry.getKey().getEpoch(), entry.getValue().stream().map(Objects::toString).collect(toImmutableList())); + + result.column("migrating_ranges_by_epoch", primitiveRangesByEpoch); + } + + return result; + } + } + + // TODO (desired): human readable packed key tracker (but requires loading Txn, so might be preferable to only do conditionally) + public static final class ProgressLogTable extends AbstractVirtualTable + { + private ProgressLogTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, PROGRESS_LOG, + "Accord per-CommandStore ProgressLog State", + "CREATE TABLE %s (\n" + + " command_store_id int,\n" + + " txn_id text,\n" + + // Timer + BaseTxnState + " contact_everyone boolean,\n" + + // WaitingState + " waiting_is_uninitialised boolean,\n" + + " waiting_blocked_until text,\n" + + " waiting_home_satisfies text,\n" + + " waiting_progress text,\n" + + " waiting_retry_counter int,\n" + + " waiting_packed_key_tracker_bits text,\n" + + " waiting_scheduled_at timestamp,\n" + + // HomeState/TxnState + " home_phase text,\n" + + " home_progress text,\n" + + " home_retry_counter int,\n" + + " home_scheduled_at timestamp,\n" + + " PRIMARY KEY (command_store_id, txn_id)" + + ')')); + } + + @Override + public DataSet data() + { + CommandStores stores = AccordService.instance().node().commandStores(); + List<DefaultProgressLog.ImmutableView> views = + getBlockingAndRethrow(stores.map(store -> ((DefaultProgressLog) store.progressLog()).immutableView())); + views.sort(comparing(DefaultProgressLog.ImmutableView::storeId)); + + SimpleDataSet ds = new SimpleDataSet(metadata()); + for (int i = 0, size = views.size(); i < size; ++i) + { + DefaultProgressLog.ImmutableView view = views.get(i); + while (view.advance()) + { + ds.row(view.storeId(), view.txnId().toString()) + .column("contact_everyone", view.contactEveryone()) + .column("waiting_is_uninitialised", view.isWaitingUninitialised()) + .column("waiting_blocked_until", view.waitingIsBlockedUntil().name()) + .column("waiting_home_satisfies", view.waitingHomeSatisfies().name()) + .column("waiting_progress", view.waitingProgress().name()) + .column("waiting_retry_counter", view.waitingRetryCounter()) + .column("waiting_packed_key_tracker_bits", Long.toBinaryString(view.waitingPackedKeyTrackerBits())) + .column("waiting_scheduled_at", toTimestamp(view.timerScheduledAt(TxnStateKind.Waiting))) + .column("home_phase", view.homePhase().name()) + .column("home_progress", view.homeProgress().name()) + .column("home_retry_counter", view.homeRetryCounter()) + .column("home_scheduled_at", toTimestamp(view.timerScheduledAt(TxnStateKind.Home))) + ; + } + } + return ds; + } + + private Date toTimestamp(Long deadline) + { + if (deadline == null) + return null; + + long millisSinceEpoch = approxTime.translate().toMillisSinceEpoch(deadline * 1000L); + return new Date(millisSinceEpoch); + } + } + + public static final class RedundantBeforeTable extends AbstractVirtualTable + { + private RedundantBeforeTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, REDUNDANT_BEFORE, + "Accord per-CommandStore RedundantBefore State", + "CREATE TABLE %s (\n" + + " command_store_id int,\n" + + format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + + format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + + " start_ownership_epoch bigint,\n" + + " end_ownership_epoch bigint,\n" + + " locally_applied_or_invalidated_before text,\n" + + " locally_decided_and_applied_or_invalidated_before text,\n" + + " shard_applied_or_invalidated_before text,\n" + + " gc_before text,\n" + + " shard_only_applied_or_invalidated_before text,\n" + + " bootstrapped_at text,\n" + + " stale_until_at_least text,\n" + + " PRIMARY KEY (command_store_id, range_start, range_end)" + + ')')); + } + + @Override + public DataSet data() + { + CommandStores stores = AccordService.instance().node().commandStores(); + List<Pair<Integer, RedundantBefore>> rangeMaps = + getBlockingAndRethrow(stores.map(store -> Pair.create(store.commandStore().id(), store.commandStore().unsafeGetRedundantBefore()))); + rangeMaps.sort(comparing(p -> p.left)); + + SimpleDataSet dataSet = new SimpleDataSet(metadata()); + for (Pair<Integer, RedundantBefore> pair : rangeMaps) + { + int storeId = pair.left; + RedundantBefore redundantBefore = pair.right; + + redundantBefore.foldlWithBounds( + (entry, ds, start, end) -> { + ds.row(storeId, decompose(start), decompose(end)) + .column("start_ownership_epoch", entry.startOwnershipEpoch) + .column("end_ownership_epoch", entry.endOwnershipEpoch) + .column("locally_applied_or_invalidated_before", entry.locallyAppliedOrInvalidatedBefore.toString()) + .column("locally_decided_and_applied_or_invalidated_before", entry.locallyDecidedAndAppliedOrInvalidatedBefore.toString()) + .column("shard_applied_or_invalidated_before", entry.shardAppliedOrInvalidatedBefore.toString()) + .column("gc_before", entry.gcBefore.toString()) + .column("shard_only_applied_or_invalidated_before", entry.shardOnlyAppliedOrInvalidatedBefore.toString()) + .column("bootstrapped_at", entry.bootstrappedAt.toString()) + .column("stale_until_at_least", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null); + return ds; + }, + dataSet, + ignore -> false + ); + } + return dataSet; + } + } + + public static final class RejectBeforeTable extends AbstractVirtualTable + { + private RejectBeforeTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, REJECT_BEFORE, + "Accord per-CommandStore RejectBefore State", + "CREATE TABLE %s (\n" + + " command_store_id int,\n" + + format("range_start %s,\n", ROUTING_KEY_TYPE_STRING) + + format("range_end %s,\n", ROUTING_KEY_TYPE_STRING) + + " txn_id text,\n" + + " PRIMARY KEY (command_store_id, range_start, range_end)" + + ')')); + } + + @Override + public DataSet data() + { + CommandStores stores = AccordService.instance().node().commandStores(); + List<Pair<Integer, RejectBefore>> rangeMaps = + getBlockingAndRethrow(stores.map(store -> Pair.create(store.commandStore().id(), store.commandStore().unsafeGetRejectBefore()))); + rangeMaps.sort(comparing(p -> p.left)); + + SimpleDataSet dataSet = new SimpleDataSet(metadata()); + for (Pair<Integer, RejectBefore> pair : rangeMaps) + { + int storeId = pair.left; + RejectBefore rejectBefore = pair.right; + + if (rejectBefore == null) + continue; + + rejectBefore.foldlWithBounds( + (txnId, ds, start, end) -> ds.row(storeId, decompose(start), decompose(end)).column("txn_id", txnId.toString()), + dataSet, + ignore -> false + ); + } + return dataSet; + } + } + + public static class TxnBlockedByTable extends AbstractVirtualTable + { + enum Reason { Self, Txn, Key } + + protected TxnBlockedByTable() + { + super(parse(VIRTUAL_ACCORD_DEBUG, TXN_BLOCKED_BY, + "Accord Transactions Blocked By Table" , + "CREATE TABLE %s (\n" + + " txn_id text,\n" + + " command_store_id int,\n" + + " depth int,\n" + + " blocked_by text,\n" + + " reason text,\n" + + " save_status text,\n" + + " execute_at text,\n" + + format("key %s,\n", ROUTING_KEY_TYPE_STRING) + + " PRIMARY KEY (txn_id, command_store_id, depth, blocked_by, reason)" + + ')')); + } + + @Override + public DataSet data(DecoratedKey partitionKey) + { + TxnId id = TxnId.parse(UTF8Type.instance.compose(partitionKey.getKey())); + List<CommandStoreTxnBlockedGraph> shards = AccordService.instance().debugTxnBlockedGraph(id); + + SimpleDataSet ds = new SimpleDataSet(metadata()); + for (CommandStoreTxnBlockedGraph shard : shards) + { + Set<TxnId> processed = new HashSet<>(); + process(ds, shard, processed, id, 0, id, Reason.Self, null); + // everything was processed right? + if (!shard.txns.isEmpty() && !shard.txns.keySet().containsAll(processed)) + throw new IllegalStateException("Skipped txns: " + Sets.difference(shard.txns.keySet(), processed)); + } + + return ds; + } + + private void process(SimpleDataSet ds, CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason reason, Runnable onDone) + { + if (!processed.add(txnId)) + throw new IllegalStateException("Double processed " + txnId); + CommandStoreTxnBlockedGraph.TxnState txn = shard.txns.get(txnId); + if (txn == null) + { + Invariants.checkState(reason == Reason.Self, "Txn %s unknown for reason %s", txnId, reason); + return; + } + // was it applied? If so ignore it + if (reason != Reason.Self && txn.saveStatus.hasBeen(Status.Applied)) + return; + ds.row(userTxn.toString(), shard.storeId, depth, reason == Reason.Self ? "" : txn.txnId.toString(), reason.name()); + ds.column("save_status", txn.saveStatus.name()); + if (txn.executeAt != null) + ds.column("execute_at", txn.executeAt.toString()); + if (onDone != null) + onDone.run(); + if (txn.isBlocked()) + { + for (TxnId blockedBy : txn.blockedBy) + { + if (!processed.contains(blockedBy)) + process(ds, shard, processed, userTxn, depth + 1, blockedBy, Reason.Txn, null); + } + + for (TokenKey blockedBy : txn.blockedByKey) + { + TxnId blocking = shard.keys.get(blockedBy); + if (!processed.contains(blocking)) + process(ds, shard, processed, userTxn, depth + 1, blocking, Reason.Key, () -> ds.column("key", decompose(blockedBy))); + } + } + } + + @Override + public DataSet data() + { + throw new InvalidRequestException("Must select a single txn_id"); + } + } + + private static ByteBuffer decompose(RoutingKey routingKey) + { + AccordRoutingKey key = (AccordRoutingKey) routingKey; + switch (key.kindOfRoutingKey()) + { + case SENTINEL: + case TOKEN: + return ROUTING_KEY_TYPE.pack(UUIDType.instance.decompose(key.table().asUUID()), bytes(key.suffix())); + default: + throw new IllegalStateException("Unhandled key Kind " + key.kindOfRoutingKey()); + } + } + + private static TableMetadata parse(String keyspace, String table, String comment, String schema) + { + return CreateTableStatement.parse(format(schema, table), keyspace) + .comment(comment) + .kind(TableMetadata.Kind.VIRTUAL) + .build(); + } +} diff --git a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java index 1ff8c2e82d..9df0a517c1 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java @@ -15,59 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.db.virtual; -import java.nio.ByteBuffer; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import accord.primitives.Status; -import accord.primitives.TxnId; -import accord.utils.Invariants; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.cql3.FieldIdentifier; import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.marshal.Int32Type; -import org.apache.cassandra.db.marshal.UTF8Type; -import org.apache.cassandra.db.marshal.UserType; -import org.apache.cassandra.dht.NormalizedRanges; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.service.accord.AccordCache; -import org.apache.cassandra.service.accord.AccordCommandStores; -import org.apache.cassandra.service.accord.AccordExecutor; -import org.apache.cassandra.service.accord.AccordKeyspace; -import org.apache.cassandra.service.accord.AccordService; -import org.apache.cassandra.service.accord.CommandStoreTxnBlockedGraph; -import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; -import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; -import org.apache.cassandra.service.consensus.migration.TableMigrationState; -import org.apache.cassandra.tcm.ClusterMetadata; -import org.apache.cassandra.utils.Clock; - -import static com.google.common.collect.ImmutableList.toImmutableList; -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; public class AccordVirtualTables { @@ -79,281 +35,9 @@ public class AccordVirtualTables return Collections.emptyList(); return List.of( - new ExecutorCache(keyspace), - new MigrationState(keyspace), - new CoordinationStatus(keyspace), - new TxnBlockedByTable(keyspace) ); } - public static final class ExecutorCache extends AbstractVirtualTable - { - private ExecutorCache(String keyspace) - { - super(parse(keyspace, - "Accord Executor Cache Metrics", - "CREATE TABLE accord_executor_cache(\n" + - " id int,\n" + - " scope text,\n" + - " queries bigint,\n" + - " hits bigint,\n" + - " misses bigint,\n" + - " PRIMARY KEY (id, scope)" + - ')')); - } - - @Override - public DataSet data() - { - AccordCommandStores stores = (AccordCommandStores) ((AccordService) AccordService.instance()).node().commandStores(); - SimpleDataSet result = new SimpleDataSet(metadata()); - for (AccordExecutor executor : stores.executors()) - { - Map<String, AccordCache.ImmutableStats> snapshots = new HashMap<>(3); - try (AccordExecutor.ExclusiveGlobalCaches cache = executor.lockCaches()) - { - addRow(cache.commands.statsSnapshot(), result, executor.executorId(), AccordKeyspace.COMMANDS); - addRow(cache.commandsForKey.statsSnapshot(), result, executor.executorId(), AccordKeyspace.COMMANDS_FOR_KEY); - addRow(cache.timestampsForKey.statsSnapshot(), result, executor.executorId(), AccordKeyspace.TIMESTAMPS_FOR_KEY); - } - } - return result; - } - - private static void addRow(AccordCache.ImmutableStats stats, SimpleDataSet result, int storeID, String scope) - { - result.row(storeID, scope); - result.column("queries", stats.queries); - result.column("hits", stats.hits); - result.column("misses", stats.misses); - } - } - - public static final class MigrationState extends AbstractVirtualTable - { - private static final Logger logger = LoggerFactory.getLogger(MigrationState.class); - - private MigrationState(String keyspace) - { - super(parse(keyspace, - "Consensus Migration State", - "CREATE TABLE consensus_migration_state(\n" + - " keyspace_name text,\n" + - " table_name text,\n" + - " table_id uuid,\n" + - " target_protocol text,\n" + - " transactional_mode text,\n" + - " transactional_migration_from text,\n" + - " migrated_ranges frozen<list<text>>,\n" + - " repair_pending_ranges frozen<list<text>>,\n" + - " migrating_ranges_by_epoch frozen<map<bigint, list<text>>>,\n" + - " PRIMARY KEY (keyspace_name, table_name)" + - ')')); - } - - @Override - public DataSet data() - { - ConsensusMigrationState snapshot = ClusterMetadata.current().consensusMigrationState; - Collection<TableMigrationState> tableStates = snapshot.tableStates(); - return data(tableStates); - } - - @Override - public DataSet data(DecoratedKey key) - { - String keyspaceName = UTF8Type.instance.compose(key.getKey()); - Keyspace keyspace = Schema.instance.getKeyspaceInstance(keyspaceName); - - if (keyspace == null) - throw new InvalidRequestException("Unknown keyspace: '" + keyspaceName + '\''); - - List<TableId> tableIDs = keyspace.getColumnFamilyStores() - .stream() - .map(ColumnFamilyStore::getTableId) - .collect(Collectors.toList()); - - ConsensusMigrationState snapshot = ClusterMetadata.current().consensusMigrationState; - Collection<TableMigrationState> tableStates = snapshot.tableStatesFor(tableIDs); - - return data(tableStates); - } - - private SimpleDataSet data(Collection<TableMigrationState> tableStates) - { - SimpleDataSet result = new SimpleDataSet(metadata()); - - for (TableMigrationState state : tableStates) - { - TableMetadata table = Schema.instance.getTableMetadata(state.tableId); - - if (table == null) - { - logger.warn("Table {}.{} (id: {}) no longer exists. It may have been dropped.", - state.keyspaceName, state.tableName, state.tableId); - continue; - } - - result.row(state.keyspaceName, state.tableName); - result.column("table_id", state.tableId.asUUID()); - result.column("target_protocol", state.targetProtocol.toString()); - result.column("transactional_mode", table.params.transactionalMode.toString()); - result.column("transactional_migration_from", table.params.transactionalMode.toString()); - - List<String> primitiveMigratedRanges = state.migratedRanges.stream().map(Objects::toString).collect(toImmutableList()); - result.column("migrated_ranges", primitiveMigratedRanges); - - List<String> primitiveRepairPendingRanges = state.repairPendingRanges.stream().map(Objects::toString).collect(toImmutableList()); - result.column("repair_pending_ranges", primitiveRepairPendingRanges); - - Map<Long, List<String>> primitiveRangesByEpoch = new LinkedHashMap<>(); - for (Map.Entry<org.apache.cassandra.tcm.Epoch, NormalizedRanges<Token>> entry : state.migratingRangesByEpoch.entrySet()) - primitiveRangesByEpoch.put(entry.getKey().getEpoch(), entry.getValue().stream().map(Objects::toString).collect(toImmutableList())); - - result.column("migrating_ranges_by_epoch", primitiveRangesByEpoch); - } - - return result; - } - } - - public static final class CoordinationStatus extends AbstractVirtualTable - { - private CoordinationStatus(String keyspace) - { - super(parse(keyspace, - "Accord Coordination Status", - "CREATE TABLE accord_coordination_status(\n" + - " node_id int,\n" + - " epoch bigint,\n" + - " start_time_micros bigint,\n" + - " duration_millis bigint,\n" + - " kind text,\n" + - " domain text,\n" + - " PRIMARY KEY (node_id, epoch, start_time_micros)" + - ')')); - } - - @Override - public DataSet data() - { - AccordService accord = (AccordService) AccordService.instance(); - SimpleDataSet result = new SimpleDataSet(metadata()); - - for (TxnId txn : accord.node().coordinating().keySet()) - { - result.row(txn.node.id, txn.epoch(), txn.hlc()); - result.column("duration_millis", Clock.Global.currentTimeMillis() - TimeUnit.MICROSECONDS.toMillis(txn.hlc())); - result.column("kind", txn.kind().toString()); - result.column("domain", txn.domain().toString()); - } - - return result; - } - } - - public static class TxnBlockedByTable extends AbstractVirtualTable - { - enum Reason { Self, Txn, Key } - private final UserType partitionKeyType; - - protected TxnBlockedByTable(String keyspace) - { - super(TableMetadata.builder(keyspace, "txn_blocked_by") - .kind(TableMetadata.Kind.VIRTUAL) - .addPartitionKeyColumn("txn_id", UTF8Type.instance) - .addClusteringColumn("store_id", Int32Type.instance) - .addClusteringColumn("depth", Int32Type.instance) - .addClusteringColumn("blocked_by", UTF8Type.instance) - .addClusteringColumn("reason", UTF8Type.instance) - .addRegularColumn("save_status", UTF8Type.instance) - .addRegularColumn("execute_at", UTF8Type.instance) - .addRegularColumn("key", pkType(keyspace)) - .build()); - partitionKeyType = pkType(keyspace); - } - - private static UserType pkType(String keyspace) - { - return new UserType(keyspace, bytes("partition_key"), - Arrays.asList(FieldIdentifier.forQuoted("table"), FieldIdentifier.forQuoted("token")), - Arrays.asList(UTF8Type.instance, UTF8Type.instance), false); - } - - private ByteBuffer pk(TokenKey pk) - { - var tm = Schema.instance.getTableMetadata(pk.table()); - return partitionKeyType.pack(UTF8Type.instance.decompose(tm.toString()), - UTF8Type.instance.decompose(pk.token().toString())); - } - - @Override - public Iterable<UserType> userTypes() - { - return Arrays.asList(partitionKeyType); - } - - @Override - public DataSet data(DecoratedKey partitionKey) - { - TxnId id = TxnId.parse(UTF8Type.instance.compose(partitionKey.getKey())); - List<CommandStoreTxnBlockedGraph> shards = AccordService.instance().debugTxnBlockedGraph(id); - - SimpleDataSet ds = new SimpleDataSet(metadata()); - for (CommandStoreTxnBlockedGraph shard : shards) - { - Set<TxnId> processed = new HashSet<>(); - process(ds, shard, processed, id, 0, id, Reason.Self, null); - // everything was processed right? - if (!shard.txns.isEmpty() && !shard.txns.keySet().containsAll(processed)) - throw new IllegalStateException("Skipped txns: " + Sets.difference(shard.txns.keySet(), processed)); - } - - return ds; - } - - private void process(SimpleDataSet ds, CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int depth, TxnId txnId, Reason reason, Runnable onDone) - { - if (!processed.add(txnId)) - throw new IllegalStateException("Double processed " + txnId); - CommandStoreTxnBlockedGraph.TxnState txn = shard.txns.get(txnId); - if (txn == null) - { - Invariants.checkState(reason == Reason.Self, "Txn %s unknown for reason %s", txnId, reason); - return; - } - // was it applied? If so ignore it - if (reason != Reason.Self && txn.saveStatus.hasBeen(Status.Applied)) - return; - ds.row(userTxn.toString(), shard.storeId, depth, reason == Reason.Self ? "" : txn.txnId.toString(), reason.name()); - ds.column("save_status", txn.saveStatus.name()); - if (txn.executeAt != null) - ds.column("execute_at", txn.executeAt.toString()); - if (onDone != null) - onDone.run(); - if (txn.isBlocked()) - { - for (TxnId blockedBy : txn.blockedBy) - { - if (processed.contains(blockedBy)) continue; // already listed - process(ds, shard, processed, userTxn, depth + 1, blockedBy, Reason.Txn, null); - } - for (TokenKey blockedBy : txn.blockedByKey) - { - TxnId blocking = shard.keys.get(blockedBy); - if (processed.contains(blocking)) continue; // already listed - process(ds, shard, processed, userTxn, depth + 1, blocking, Reason.Key, () -> ds.column("key", pk(blockedBy))); - } - } - } - - @Override - public DataSet data() - { - throw new InvalidRequestException("Must select a single txn_id"); - } - } - private static TableMetadata parse(String keyspace, String comment, String query) { return CreateTableStatement.parse(query, keyspace) diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java index 3316aa4fb3..a0585a2694 100644 --- a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java @@ -50,17 +50,7 @@ public class VirtualKeyspace if (!duplicates.isEmpty()) throw new IllegalArgumentException(String.format("Duplicate table names in virtual keyspace %s: %s", name, duplicates)); - KeyspaceMetadata metadata = KeyspaceMetadata.virtual(name, Tables.of(Iterables.transform(tables, VirtualTable::metadata))); - for (var t : tables) - { - for (var udt : t.userTypes()) - { - if (metadata.types.getNullable(udt.name) != null) - throw new IllegalStateException("UDT " + udt.getNameAsString() + " already exists"); - metadata = metadata.withUpdatedUserType(udt); - } - } - this.metadata = metadata; + this.metadata = KeyspaceMetadata.virtual(name, Tables.of(Iterables.transform(tables, VirtualTable::metadata))); } public String name() @@ -68,13 +58,13 @@ public class VirtualKeyspace return name; } - public KeyspaceMetadata metadata() + public ImmutableCollection<VirtualTable> tables() { - return metadata; + return tables; } - public ImmutableCollection<VirtualTable> tables() + public KeyspaceMetadata metadata() { - return tables; + return metadata; } } diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java index 93047faad8..53a9f2ac7f 100644 --- a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java +++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java @@ -17,13 +17,10 @@ */ package org.apache.cassandra.db.virtual; -import java.util.Collections; - import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.schema.TableMetadata; @@ -90,9 +87,4 @@ public interface VirtualTable { return true; } - - default Iterable<UserType> userTypes() - { - return Collections.emptyList(); - } } diff --git a/src/java/org/apache/cassandra/schema/SchemaConstants.java b/src/java/org/apache/cassandra/schema/SchemaConstants.java index e9f71054e8..093425d544 100644 --- a/src/java/org/apache/cassandra/schema/SchemaConstants.java +++ b/src/java/org/apache/cassandra/schema/SchemaConstants.java @@ -50,9 +50,9 @@ public final class SchemaConstants public static final String DISTRIBUTED_KEYSPACE_NAME = "system_distributed"; public static final String VIRTUAL_SCHEMA = "system_virtual_schema"; - public static final String VIRTUAL_VIEWS = "system_views"; public static final String VIRTUAL_METRICS = "system_metrics"; + public static final String VIRTUAL_ACCORD_DEBUG = "system_accord_debug"; public static final String DUMMY_KEYSPACE_OR_TABLE_NAME = "--dummy--"; @@ -62,7 +62,7 @@ public final class SchemaConstants /* virtual table system keyspace names */ public static final Set<String> VIRTUAL_SYSTEM_KEYSPACE_NAMES = - ImmutableSet.of(VIRTUAL_VIEWS, VIRTUAL_SCHEMA); + ImmutableSet.of(VIRTUAL_SCHEMA, VIRTUAL_VIEWS, VIRTUAL_METRICS); /* replicate system keyspace names (the ones with a "true" replication strategy) */ public static final Set<String> REPLICATED_SYSTEM_KEYSPACE_NAMES = diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index b9cbb96ad0..8e172ba697 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -55,6 +55,7 @@ import org.apache.cassandra.db.SizeEstimatesRecorder; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.SystemKeyspaceMigrator41; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.virtual.AccordDebugKeyspace; import org.apache.cassandra.db.virtual.SystemViewsKeyspace; import org.apache.cassandra.db.virtual.VirtualKeyspace; import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; @@ -558,6 +559,9 @@ public class CassandraDaemon VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance); VirtualKeyspaceRegistry.instance.register(new VirtualKeyspace(VIRTUAL_METRICS, createMetricsKeyspaceTables())); + if (DatabaseDescriptor.getAccord().enable_virtual_debug_only_keyspace) + VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance); + // flush log messages to system_views.system_logs virtual table as there were messages already logged // before that virtual table was instantiated LoggingSupportFactory.getLoggingSupport() diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 5b875dd1cd..db40c870df 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -69,6 +69,7 @@ import accord.coordinate.tracking.RequestStatus; import accord.impl.AbstractConfigurationService; import accord.impl.DefaultLocalListeners; import accord.impl.DefaultRemoteListeners; +import accord.impl.DurabilityScheduling; import accord.impl.RequestCallbacks; import accord.impl.SizeOfIntersectionSorter; import accord.impl.progresslog.DefaultProgressLogs; @@ -480,6 +481,11 @@ public class AccordService implements IAccordService, Shutdownable return responseHandler; } + public DurabilityScheduling.ImmutableView durabilityScheduling() + { + return node.durabilityScheduling().immutableView(); + } + private Seekables<?, ?> barrier(@Nonnull Seekables<?, ?> keysOrRanges, long epoch, Dispatcher.RequestTime requestTime, long timeoutNanos, BarrierType barrierType, boolean isForWrite, BiFunction<Node, FullRoute<?>, AsyncSyncPoint> syncPoint) { Stopwatch sw = Stopwatch.createStarted(); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index 815035dbb4..6be67c2384 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@ -101,6 +101,7 @@ public class InstanceConfig implements IInstanceConfig .set("accord.queue_shard_count", accord.queue_shard_count.toString()) .set("accord.command_store_shard_count", accord.command_store_shard_count.toString()) .set("accord.recover_delay", accord.recover_delay.toString()) + .set("accord.enable_virtual_debug_only_keyspace", "true") .set("partitioner", "org.apache.cassandra.dht.Murmur3Partitioner") .set("start_native_transport", true) .set("concurrent_writes", 2) diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index 376d082cba..8b48dfe10c 100644 --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@ -54,6 +54,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.primitives.TxnId; +import org.apache.cassandra.db.virtual.AccordDebugKeyspace; import org.apache.cassandra.dht.Token; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; @@ -80,6 +81,7 @@ import org.apache.cassandra.net.RequestCallback; import org.apache.cassandra.net.Verb; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.AccordService; @@ -101,7 +103,6 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.BROADCAST_ import static org.apache.cassandra.config.CassandraRelevantProperties.REPLACE_ADDRESS_FIRST_BOOT; import static org.apache.cassandra.config.CassandraRelevantProperties.RING_DELAY; import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort; -import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS; import static org.assertj.core.api.Assertions.assertThat; /** @@ -1531,9 +1532,9 @@ public class ClusterUtils public static <T extends IInstance> LinkedHashMap<String, SimpleQueryResult> queryTxnState(AbstractCluster<T> cluster, TxnId txnId, int... nodes) { - String cql = String.format("SELECT * FROM %s.txn_blocked_by WHERE txn_id=?", VIRTUAL_VIEWS); + String cql = String.format("SELECT * FROM %s.%s WHERE txn_id=?", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.TXN_BLOCKED_BY); LinkedHashMap<String, SimpleQueryResult> map = new LinkedHashMap<>(); - Iterable<T> it = nodes.length == 0 ? cluster::iterator : cluster.get(nodes); + Iterable<T> it = nodes.length == 0 ? cluster : cluster.get(nodes); for (T i : it) { if (i.isShutdown()) diff --git a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java index 3ccdad7997..0a4c575b5b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java @@ -15,7 +15,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.distributed.test; import java.io.IOException; @@ -48,7 +47,6 @@ import org.apache.cassandra.utils.Throwables; import static java.util.concurrent.TimeUnit.SECONDS; import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.takesArguments; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class QueriesTableTest extends TestBaseImpl @@ -221,12 +219,6 @@ public class QueriesTableTest extends TestBaseImpl assertTrue(readVisible); assertTrue(coordinatorTxnVisible); - - SimpleQueryResult txns = SHARED_CLUSTER.get(1).executeInternalWithResult("SELECT * FROM system_views.accord_coordination_status"); - assertTrue(txns.hasNext()); - Row txn = txns.next(); - assertEquals(1, txn.getInteger("node_id").intValue()); - assertEquals("Key", txn.getString("domain")); } private static void waitForQueriesToFinish() throws InterruptedException diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java index f08176046b..4f87534942 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java @@ -33,6 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.virtual.AccordDebugKeyspace; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IMessageFilters; import org.apache.cassandra.distributed.api.Row; @@ -43,6 +44,7 @@ import org.apache.cassandra.metrics.AccordMetrics; import org.apache.cassandra.metrics.DefaultNameFactory; import org.apache.cassandra.metrics.RatioGaugeSet; import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.exceptions.ReadPreemptedException; import org.apache.cassandra.service.accord.exceptions.WritePreemptedException; @@ -51,6 +53,7 @@ import org.apache.cassandra.utils.AssertionUtils; import org.assertj.core.api.Assertions; import org.assertj.core.data.Offset; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; @@ -281,8 +284,8 @@ public class AccordMetricsTest extends AccordTestBase } // Verify that per-store global cache stats are published to the appropriate virtual table: - SimpleQueryResult storeCacheResults = SHARED_CLUSTER.get(node + 1) - .executeInternalWithResult("SELECT * FROM system_views.accord_executor_cache"); + SimpleQueryResult storeCacheResults = + SHARED_CLUSTER.get(node + 1).executeInternalWithResult(format("SELECT * FROM %s.%s", SchemaConstants.VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.EXECUTOR_CACHE)); assertThat(storeCacheResults).hasNext(); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java index 0b011fe0b3..a7a2d406d4 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMigrationTest.java @@ -54,6 +54,7 @@ import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.Mutation.SimpleBuilder; import org.apache.cassandra.db.SimpleBuilders.PartitionUpdateBuilder; +import org.apache.cassandra.db.virtual.AccordDebugKeyspace; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; import org.apache.cassandra.dht.NormalizedRanges; @@ -109,6 +110,7 @@ import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY; import static org.apache.cassandra.distributed.api.ConsistencyLevel.SERIAL; import static org.apache.cassandra.schema.SchemaConstants.SYSTEM_KEYSPACE_NAME; +import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG; import static org.apache.cassandra.service.consensus.migration.ConsensusRequestRouter.ConsensusRoutingDecision.paxosV2; import static org.apache.cassandra.service.paxos.PaxosState.MaybePromise.Outcome.PROMISE; import static org.assertj.core.api.Fail.fail; @@ -751,8 +753,8 @@ public class AccordMigrationTest extends AccordTestBase assertNotNull(state); SimpleQueryResult vtableResult = - instance.executeInternalWithResult("SELECT * FROM system_views.consensus_migration_state WHERE keyspace_name = ? AND table_name = ? ", - state.keyspaceName, state.tableName); + instance.executeInternalWithResult(format("SELECT * FROM %s.%s WHERE keyspace_name = ? AND table_name = ? ", VIRTUAL_ACCORD_DEBUG, AccordDebugKeyspace.MIGRATION_STATE), + state.keyspaceName, state.tableName); assertTrue(vtableResult.hasNext()); assertEquals(KEYSPACE, state.keyspaceName); diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java similarity index 86% rename from test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java rename to test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java index d9bdc1d75a..d79a346c36 100644 --- a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java @@ -47,17 +47,17 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.concurrent.Condition; import org.awaitility.Awaitility; import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn; -public class AccordVirtualTablesTest extends CQLTester +public class AccordDebugKeyspaceTest extends CQLTester { - private static final Logger logger = LoggerFactory.getLogger(AccordVirtualTablesTest.class); + private static final Logger logger = LoggerFactory.getLogger(AccordDebugKeyspaceTest.class); - private static final String QUERY_TXN_BLOCKED_BY = "SELECT * FROM system_views.txn_blocked_by WHERE txn_id=?"; - private static final String QUERY_TXN_STATUS = "SELECT save_status FROM system_views.txn_blocked_by WHERE txn_id=? LIMIT 1"; + private static final String QUERY_TXN_BLOCKED_BY = "SELECT * FROM system_accord_debug.txn_blocked_by WHERE txn_id=?"; @BeforeClass public static void setUpClass() @@ -69,7 +69,7 @@ public class AccordVirtualTablesTest extends CQLTester CQLTester.setUpClass(); AccordService.startup(ClusterMetadata.current().myNodeId()); - addVirtualKeyspace(); + VirtualKeyspaceRegistry.instance.register(AccordDebugKeyspace.instance); requireNetwork(); } @@ -90,7 +90,7 @@ public class AccordVirtualTablesTest extends CQLTester AsyncChains.getBlocking(accord.node().coordinate(id, txn)); assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, "", "Self", any(), null, anyOf(SaveStatus.Applying.name(), SaveStatus.Applied.name()))); + row(id.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.ReadyToExecute.name(), SaveStatus.Applying.name(), SaveStatus.Applied.name()))); } @Test @@ -109,11 +109,11 @@ public class AccordVirtualTablesTest extends CQLTester filter.preAccept.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, "", "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); + row(id.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); filter.apply.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, id.toString()), - row(id.toString(), anyInt(), 0, "", "Self", any(), null, SaveStatus.ReadyToExecute.name())); + row(id.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, SaveStatus.ReadyToExecute.name())); } finally { @@ -135,11 +135,11 @@ public class AccordVirtualTablesTest extends CQLTester filter.preAccept.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()), - row(first.toString(), anyInt(), 0, "", "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); + row(first.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", any(), null, anyOf(SaveStatus.PreAccepted.name(), SaveStatus.ReadyToExecute.name()))); filter.apply.awaitThrowUncheckedOnInterrupt(); assertRows(execute(QUERY_TXN_BLOCKED_BY, first.toString()), - row(first.toString(), anyInt(), 0, "", "Self", anyNonNull(), null, SaveStatus.ReadyToExecute.name())); + row(first.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.ReadyToExecute.name())); filter.reset(); @@ -154,7 +154,7 @@ public class AccordVirtualTablesTest extends CQLTester return rs.size() == 2; }); assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()), - row(second.toString(), anyInt(), 0, "", "Self", anyNonNull(), null, SaveStatus.Stable.name()), + row(second.toString(), anyInt(), 0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null, SaveStatus.Stable.name()), row(second.toString(), anyInt(), 1, first.toString(), "Key", anyNonNull(), anyNonNull(), SaveStatus.ReadyToExecute.name())); } finally @@ -191,9 +191,9 @@ public class AccordVirtualTablesTest extends CQLTester TxnId txnId = null; if (msg.payload instanceof TxnRequest) { - txnId = ((TxnRequest) msg.payload).txnId; + txnId = ((TxnRequest<?>) msg.payload).txnId; } - Set<Verb> seen = null; + Set<Verb> seen; if (txnId != null) { seen = txnToVerbs.computeIfAbsent(txnId, ignore -> new ConcurrentSkipListSet<>()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
