This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 985da9060f Expose epoch ready state as a vtable for operators to
inspect things
985da9060f is described below
commit 985da9060f44dc33285f2935afeb162f5c44beae
Author: David Capwell <[email protected]>
AuthorDate: Wed Feb 12 10:06:57 2025 -0800
Expose epoch ready state as a vtable for operators to inspect things
patch by David Capwell; reviewed by Benedict Elliott Smith for
CASSANDRA-20302
---
.gitmodules | 4 +-
modules/accord | 2 +-
.../cassandra/db/virtual/AccordVirtualTables.java | 167 ++++++++++++++++++-
.../cassandra/distributed/api/QueryResults.java | 33 ++++
.../fuzz/topology/AccordTopologyMixupTest.java | 77 ++++++---
.../fuzz/topology/TopologyMixupTestBase.java | 24 ++-
.../db/virtual/AccordVirtualTablesTest.java | 176 +++++++++++++++++++++
.../cassandra/service/accord/EpochSyncTest.java | 2 +-
8 files changed, 438 insertions(+), 47 deletions(-)
diff --git a/.gitmodules b/.gitmodules
index 22d4a4fb41..616dacf610 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
- url = https://github.com/ifesdjeen/cassandra-accord.git
- branch = CASSANDRA-20297
+ url = https://github.com/apache/cassandra-accord.git
+ branch = trunk
diff --git a/modules/accord b/modules/accord
index a2ac02b4d2..74a3b81ca9 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit a2ac02b4d20bb6ab9078e70a0a43c17c00f4b0fc
+Subproject commit 74a3b81ca9d9e1ce7ddfd117682fc7c310f0cd99
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
index 9df0a517c1..19f061ad71 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
@@ -17,32 +17,187 @@
*/
package org.apache.cassandra.db.virtual;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.topology.TopologyManager.EpochsSnapshot;
+import accord.topology.TopologyManager.EpochsSnapshot.Epoch;
+import accord.topology.TopologyManager.EpochsSnapshot.EpochReady;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.TokenRange;
+
+import static
accord.topology.TopologyManager.EpochsSnapshot.ResultStatus.SUCCESS;
public class AccordVirtualTables
{
- private AccordVirtualTables() {}
+ public static final String EPOCHS = "accord_epochs";
+ public static final String TABLE_EPOCHS = "accord_table_epochs";
+
+ private AccordVirtualTables()
+ {
+ }
public static Collection<VirtualTable> getAll(String keyspace)
{
if (!DatabaseDescriptor.getAccordTransactionsEnabled())
return Collections.emptyList();
- return List.of(
+ return List.of(new EpochReadyTable(keyspace),
+ new EpochSyncRanges(keyspace)
);
}
- private static TableMetadata parse(String keyspace, String comment, String
query)
+ private static TableMetadata.Builder parse(String keyspace, String query)
{
return CreateTableStatement.parse(query, keyspace)
- .comment(comment)
- .kind(TableMetadata.Kind.VIRTUAL)
- .build();
+ .kind(TableMetadata.Kind.VIRTUAL);
+ }
+
+ public static class EpochReadyTable extends AbstractVirtualTable
+ {
+ public EpochReadyTable(String keyspace)
+ {
+ super(parse(keyspace, "CREATE TABLE " + EPOCHS + " (\n" +
+ " epoch bigint PRIMARY KEY,\n" +
+ " ready_metadata text,\n" +
+ " ready_coordinate text,\n" +
+ " ready_data text,\n" +
+ " ready_reads text,\n" +
+ " ready boolean,\n" +
+ ")")
+ .partitioner(new
LocalPartitioner(ReversedType.getInstance(LongType.instance)))
+ .comment("Exposes the epoch ready state for recieved epochs
in Accord")
+ .build());
+ }
+
+ @Override
+ public DataSet data()
+ {
+ SimpleDataSet ds = new SimpleDataSet(metadata());
+ EpochsSnapshot snapshot = epochsSnapshot();
+ for (Epoch epoch : snapshot)
+ {
+ ds.row(epoch.epoch);
+ EpochReady ready = epoch.ready;
+ ds.column("ready_metadata", ready.metadata.value);
+ ds.column("ready_coordinate", ready.coordinate.value);
+ ds.column("ready_data", ready.data.value);
+ ds.column("ready_reads", ready.reads.value);
+ ds.column("ready", ready.reads == SUCCESS);
+ }
+ return ds;
+ }
+ }
+
+ public static class EpochSyncRanges extends AbstractVirtualTable
+ {
+ protected EpochSyncRanges(String keyspace)
+ {
+ super(parse(keyspace, "CREATE TABLE " + TABLE_EPOCHS + " (\n" +
+ " epoch bigint,\n" +
+ " keyspace_name text,\n" +
+ " table_name text,\n" +
+ " added frozen<list<text>>,\n" +
+ " removed frozen<list<text>>,\n" +
+ " synced frozen<list<text>>,\n" +
+ " closed frozen<list<text>>,\n" +
+ " retired frozen<list<text>>,\n" +
+ " PRIMARY KEY (epoch, keyspace_name,
table_name)\n" +
+ ")")
+ .partitioner(new
LocalPartitioner(ReversedType.getInstance(LongType.instance)))
+ .comment("Shows details on a per-table basis about what
ranges are synced per epoch")
+ .build());
+ }
+
+ @Override
+ public DataSet data()
+ {
+ SimpleDataSet ds = new SimpleDataSet(metadata());
+ EpochsSnapshot snapshot = epochsSnapshot();
+ for (Epoch state : snapshot)
+ {
+ Map<TableId, List<TokenRange>> addedRanges =
groupByTable(state.addedRanges);
+ Map<TableId, List<TokenRange>> removedRanges =
groupByTable(state.removedRanges);
+ Map<TableId, List<TokenRange>> synced =
groupByTable(state.synced);
+ Map<TableId, List<TokenRange>> closed =
groupByTable(state.closed);
+ Map<TableId, List<TokenRange>> retired =
groupByTable(state.retired);
+
+ Set<TableId> allTables = union(addedRanges.keySet(),
removedRanges.keySet(), synced.keySet(), closed.keySet(), retired.keySet());
+ for (TableId table : allTables)
+ {
+ TableMetadata metadata =
Schema.instance.getTableMetadata(table);
+ if (metadata == null) continue; // table dropped, ignore
+ ds.row(state.epoch, metadata.keyspace, metadata.name);
+
+ ds.column("added", format(addedRanges.get(table)));
+ ds.column("removed", format(removedRanges.get(table)));
+ ds.column("synced", format(synced.get(table)));
+ ds.column("closed", format(closed.get(table)));
+ ds.column("retired", format(retired.get(table)));
+ }
+ }
+ return ds;
+ }
+
+ private static <T> Set<T> union(Set<T>... sets)
+ {
+ Preconditions.checkArgument(sets.length > 0);
+ if (sets.length == 1) return sets[0];
+ Sets.SetView accum = Sets.union(sets[0], sets[1]);
+ for (int i = 2; i < sets.length; i++)
+ accum = Sets.union(accum, sets[i]);
+ return accum;
+ }
+
+ private static List<String> format(@Nullable List<TokenRange> list)
+ {
+ if (list == null || list.isEmpty()) return Collections.emptyList();
+ List<String> result = new ArrayList<>(list.size());
+ for (TokenRange tr : list)
+ result.add(toStringNoTable(tr));
+ return result;
+ }
+ }
+
+ private static EpochsSnapshot epochsSnapshot()
+ {
+ return AccordService.instance().topology().epochsSnapshot();
+ }
+
+ private static String toStringNoTable(TokenRange tr)
+ {
+ // TokenRange extends Range.EndInclusive
+ return "(" + tr.start().suffix() + ", " + tr.end().suffix() + "]";
+ }
+
+ private static Map<TableId, List<TokenRange>> groupByTable(Ranges ranges)
+ {
+ Map<TableId, List<TokenRange>> map = new HashMap<>();
+ for (Range range : ranges)
+ {
+ TokenRange tr = (TokenRange) range;
+ map.computeIfAbsent(tr.table(), i -> new ArrayList<>()).add(tr);
+ }
+ return map;
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/api/QueryResults.java
b/test/distributed/org/apache/cassandra/distributed/api/QueryResults.java
index 081d06a525..46ce489dd1 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/QueryResults.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/QueryResults.java
@@ -25,6 +25,11 @@ import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
public final class QueryResults
{
@@ -69,6 +74,34 @@ public final class QueryResults
return new FilterQueryResult(result, fn);
}
+ public static Iterable<List<String>> stringify(SimpleQueryResult qr)
+ {
+ return stringify(qr, -1);
+ }
+
+ public static Iterable<List<String>> stringify(SimpleQueryResult qr, int
maxColumnSize)
+ {
+ Preconditions.checkArgument(maxColumnSize == -1 || maxColumnSize > 0,
"max column size must be positive or -1 (disabled); given %s", maxColumnSize);
+ qr.mark();
+ return () -> {
+ qr.reset();
+ return new AbstractIterator<>()
+ {
+ @Override
+ protected List<String> computeNext()
+ {
+ if (!qr.hasNext())
+ return endOfData();
+ Row next = qr.next();
+ Stream<String> stream =
Stream.of(next.toObjectArray()).map(Objects::toString);
+ if (maxColumnSize != -1)
+ stream = stream.map(s -> s.length() > maxColumnSize ?
s.substring(0, maxColumnSize) + "..." : s);
+ return stream.collect(Collectors.toList());
+ }
+ };
+ };
+ }
+
public static Builder builder()
{
return new Builder();
diff --git
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
index a1731004d2..f011f89847 100644
---
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
+++
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
@@ -56,25 +57,30 @@ import org.apache.cassandra.cql3.ast.Txn;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.virtual.AccordVirtualTables;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.accord.AccordTestBase;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.accord.api.AccordAgent;
import org.apache.cassandra.service.consensus.TransactionalMode;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
import org.apache.cassandra.utils.ASTGenerators;
import org.apache.cassandra.utils.AbstractTypeGenerators;
import org.apache.cassandra.utils.CassandraGenerators;
import org.apache.cassandra.utils.FastByteOperations;
import org.apache.cassandra.utils.Generators;
import org.apache.cassandra.utils.Isolated;
+import org.apache.cassandra.utils.Retry;
import org.apache.cassandra.utils.Shared;
import org.quicktheories.generators.SourceDSL;
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
import static
org.apache.cassandra.utils.AbstractTypeGenerators.overridePrimitiveTypeSupport;
import static
org.apache.cassandra.utils.AbstractTypeGenerators.stringComparator;
import static org.apache.cassandra.utils.AccordGenerators.fromQT;
@@ -121,7 +127,6 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
private static Spec createSchemaSpec(RandomSource rs, Cluster cluster)
{
TransactionalMode mode = rs.pick(TRANSACTIONAL_MODES);
- boolean enableMigration = allowsMigration(mode) && rs.nextBoolean();
// This test puts a focus on topology / cluster operations, so schema
"shouldn't matter"... limit the domain of the test to improve the ability to
debug
AbstractTypeGenerators.TypeGenBuilder supportedTypes =
AbstractTypeGenerators.withoutUnsafeEquality(AbstractTypeGenerators.builder()
.withTypeKinds(AbstractTypeGenerators.TypeKind.PRIMITIVE));
@@ -132,7 +137,7 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
.withKnownMemtables()
.withSimpleColumnNames()
//TODO (coverage): include "fast_path
= 'keyspace'" override
- .withTransactionalMode(enableMigration
? TransactionalMode.off : mode)
+ .withTransactionalMode(mode)
.withDefaultTypeGen(supportedTypes)
.build())
.next(rs);
@@ -140,12 +145,7 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
String schemaCQL = metadata.toCqlString(false, false, false);
logger.info("Creating test table:\n{}", schemaCQL);
cluster.schemaChange(schemaCQL);
- if (enableMigration)
- {
- cluster.schemaChange("ALTER TABLE " + metadata + " WITH " +
mode.asCqlParam());
- cluster.get(1).nodetoolResult("consensus_admin",
"begin-migration", metadata.keyspace, metadata.name).asserts().success();
- }
- return new Spec(mode, enableMigration, metadata);
+ return new Spec(mode, metadata);
}
private static CommandGen<Spec> cqlOperations(Spec spec)
@@ -175,18 +175,6 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
return new Property.SimpleCommand<>(node + ":" + msg + "; epoch=" +
state.currentEpoch.get(), s2 -> executeTxn(s2.cluster, node, stmt.toCQL(),
stmt.bindsEncoded()));
}
- private static boolean allowsMigration(TransactionalMode mode)
- {
- switch (mode)
- {
- case mixed_reads:
- case full:
- return true;
- default:
- return false;
- }
- }
-
private static SimpleQueryResult executeTxn(Cluster cluster,
IInvokableInstance node, String stmt, ByteBuffer[] binds)
{
if (!AccordTestBase.isIdempotent(node, stmt))
@@ -209,13 +197,11 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
public static class Spec implements Schema
{
private final TransactionalMode mode;
- private final boolean enableMigration;
private final TableMetadata metadata;
- public Spec(TransactionalMode mode, boolean enableMigration,
TableMetadata metadata)
+ public Spec(TransactionalMode mode, TableMetadata metadata)
{
this.mode = mode;
- this.enableMigration = enableMigration;
this.metadata = metadata;
}
@@ -240,6 +226,8 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
private static class AccordState extends State<Spec>
{
+ private final Map<Integer, String> instanceEpochReadyState = new
TreeMap<>();
+ private final Map<Integer, String> instanceEpochSyncState = new
TreeMap<>();
private final ListenerHolder listener;
public AccordState(RandomSource rs)
@@ -247,6 +235,36 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
super(rs, AccordTopologyMixupTest::createSchemaSpec,
AccordTopologyMixupTest::cqlOperations);
this.listener = new ListenerHolder(this);
+ this.preActions.add(this::populateEpochState);
+ }
+
+ private void populateEpochState()
+ {
+ updateMap(instanceEpochReadyState, "SELECT * FROM " +
VIRTUAL_VIEWS + "." + AccordVirtualTables.EPOCHS);
+ updateMap(instanceEpochSyncState, "SELECT * FROM " + VIRTUAL_VIEWS
+ "." + AccordVirtualTables.TABLE_EPOCHS);
+ }
+
+ private void updateMap(Map<Integer, String> map, String cql)
+ {
+ for (var inst : cluster)
+ {
+ int num = inst.config().num();
+ if (inst.isShutdown())
+ {
+ map.put(num, "unknown");
+ continue;
+ }
+ try
+ {
+ SimpleQueryResult qr = Retry.retryWithBackoffBlocking(5,
() -> cluster.get(num).executeInternalWithResult(cql));
+ map.put(num, TableBuilder.toStringPiped(qr.names(),
QueryResults.stringify(qr)));
+ }
+ catch (Throwable t)
+ {
+ // Throwable.toString shows the type + msg but not the
stack trace
+ map.put(num, "unknown due to failure: " + t);
+ }
+ }
}
@Override
@@ -264,6 +282,19 @@ public class AccordTopologyMixupTest extends
TopologyMixupTestBase<AccordTopolog
ClusterUtils.awaitAccordEpochReady(cluster, tcmEpoch);
}
+ @Override
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder(super.toString());
+ sb.append("\nAccord Epoch State:");
+ for (var e : instanceEpochReadyState.entrySet())
+
sb.append("\nnode").append(e.getKey()).append(":\n").append(e.getValue());
+ sb.append("\nAccord Epoch Ranges:");
+ for (var e : instanceEpochSyncState.entrySet())
+
sb.append("\nnode").append(e.getKey()).append(":\n").append(e.getValue());
+ return sb.toString();
+ }
+
@Override
public void close() throws Exception
{
diff --git
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
index 9c5544de8d..8c93b32455 100644
---
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
+++
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
@@ -51,7 +51,7 @@ import org.agrona.collections.IntArrayList;
import org.agrona.collections.IntHashSet;
import org.apache.cassandra.distributed.Constants;
import org.apache.cassandra.distributed.api.ICoordinator;
-import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.junit.Test;
@@ -375,7 +375,7 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
preCheck(statefulBuilder);
statefulBuilder.check(commands(this::stateGen)
.preCommands(state ->
state.preActions.forEach(Runnable::run))
- .add(2, (rs, state) -> {
+ .addIf(State::allowTopologyChanges, 2, (rs,
state) -> {
EnumSet<TopologyChange>
possibleTopologyChanges = possibleTopologyChanges(state);
if (possibleTopologyChanges.isEmpty())
return ignoreCommand();
return topologyCommand(state,
possibleTopologyChanges).next(rs);
@@ -397,7 +397,7 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
.build());
}
- private EnumSet<TopologyChange> possibleTopologyChanges(State<S> state)
+ private static EnumSet<TopologyChange> possibleTopologyChanges(State<?>
state)
{
EnumSet<TopologyChange> possibleTopologyChanges =
EnumSet.noneOf(TopologyChange.class);
// up or down is logically more correct, but since this runs
sequentially and after the topology changes are complete, we don't have downed
nodes at this point
@@ -683,6 +683,11 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
onStartupComplete(waitForEpoch);
}
+ protected boolean allowTopologyChanges()
+ {
+ return !possibleTopologyChanges(this).isEmpty();
+ }
+
protected void onStartupComplete(long tcmEpoch)
{
@@ -753,17 +758,8 @@ public abstract class TopologyMixupTestBase<S extends
TopologyMixupTestBase.Sche
try
{
SimpleQueryResult qr = Retry.retryWithBackoffBlocking(5, () ->
cluster.get(cmsNode).executeInternalWithResult("SELECT epoch, kind,
transformation FROM system_views.cluster_metadata_log"));
- TableBuilder builder = new TableBuilder(" | ");
- builder.add(qr.names());
- while (qr.hasNext())
- {
- Row next = qr.next();
- builder.add(Stream.of(next.toObjectArray())
- .map(Objects::toString)
- .map(s -> s.length() > 100 ?
s.substring(0, 100) + "..." : s)
- .collect(Collectors.toList()));
- }
- epochHistory = "Epochs:\n" + builder;
+ String table = TableBuilder.toStringPiped(qr.names(),
QueryResults.stringify(qr, 100));
+ epochHistory = "Epochs:\n" + table;
}
catch (Throwable t)
{
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
new file mode 100644
index 0000000000..0eb510a354
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import accord.api.ConfigurationService;
+import accord.api.TopologySorter;
+import accord.local.Node;
+import accord.primitives.Ranges;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+import accord.topology.TopologyManager;
+import accord.utils.SortedArrays;
+import accord.utils.async.AsyncResults;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.IAccordService;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.mockito.Mockito;
+
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
+
+public class AccordVirtualTablesTest extends CQLTester
+{
+ public static final Node.Id N1 = new Node.Id(1);
+ public static final SortedArrays.SortedArrayList<Node.Id> ALL =
SortedArrays.SortedArrayList.ofSorted(N1);
+ public static final Set<Node.Id> FP = Collections.singleton(N1);
+ public static final String SUCCESS = "success";
+ public static final String PENDING = "pending";
+ public static final List<String> FULL_RANGE = List.of("(-Inf, +Inf]");
+
+ public static TableId T1;
+ public static TableMetadata T1_META;
+
+ @BeforeClass
+ public static void setup()
+ {
+ addVirtualKeyspace();
+ }
+
+ @Before
+ public void setupTables()
+ {
+ if (T1_META != null) return;
+
+ String tbl1 = createTable("CREATE TABLE %s(pk int primary key)");
+ T1_META = Schema.instance.getTableMetadata(keyspace(), tbl1);
+ T1 = T1_META.id;
+
+ ServerTestUtils.markCMS();
+ }
+
+ @Test
+ public void emptyEpochs()
+ {
+ TopologyManager tm = empty();
+ assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.EPOCHS));
+ assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.TABLE_EPOCHS));
+ }
+
+ @Test
+ public void epochUpdates()
+ {
+ TopologyManager tm = empty();
+ long e1 = 1;
+ tm.onTopologyUpdate(topology(e1, T1), () ->
ConfigurationService.EpochReady.done(e1));
+ assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.EPOCHS),
+ row(e1, true, SUCCESS, SUCCESS, SUCCESS, SUCCESS));
+
+ long e2 = 2;
+ tm.onTopologyUpdate(topology(e2, T1), () -> pendingReady(e1));
+ assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.EPOCHS),
+ row(e2, false, PENDING, PENDING, PENDING, PENDING),
+ row(e1, true, SUCCESS, SUCCESS, SUCCESS, SUCCESS));
+ }
+
+ @Test
+ public void tableUpdates()
+ {
+ TopologyManager tm = empty();
+ long e1 = 1;
+ tm.onTopologyUpdate(topology(e1, T1), () ->
ConfigurationService.EpochReady.done(e1));
+
+ // the range was added in the first epoch, so its fully synced
+ assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.TABLE_EPOCHS),
+ row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE,
List.of(), List.of(), List.of(), FULL_RANGE));
+
+ // range is no longer "added" so doesn't show up as synced!
+ long e2 = 2;
+ tm.onTopologyUpdate(topology(e2, T1), () ->
ConfigurationService.EpochReady.done(e2));
+ assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.TABLE_EPOCHS),
+ row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE,
List.of(), List.of(), List.of(), FULL_RANGE));
+
+ // sync the range
+ tm.onEpochSyncComplete(N1, e2);
+ assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.TABLE_EPOCHS),
+ row(e2, T1_META.keyspace, T1_META.name, List.of(),
List.of(), List.of(), List.of(), FULL_RANGE),
+ row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE,
List.of(), List.of(), List.of(), FULL_RANGE));
+
+ // lets close e2
+ tm.onEpochClosed(Ranges.single(TokenRange.fullRange(T1)), e2);
+ assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.TABLE_EPOCHS),
+ row(e2, T1_META.keyspace, T1_META.name, List.of(),
FULL_RANGE, List.of(), List.of(), FULL_RANGE),
+ row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE,
FULL_RANGE, List.of(), List.of(), FULL_RANGE));
+
+ // enjoy retirement!
+ tm.onEpochRetired(Ranges.single(TokenRange.fullRange(T1)), e2);
+ assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." +
AccordVirtualTables.TABLE_EPOCHS),
+ row(e2, T1_META.keyspace, T1_META.name, List.of(),
FULL_RANGE, List.of(), FULL_RANGE, FULL_RANGE),
+ row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE,
FULL_RANGE, List.of(), FULL_RANGE, FULL_RANGE));
+ }
+
+ private static ConfigurationService.EpochReady pendingReady(long epoch)
+ {
+ return new ConfigurationService.EpochReady(epoch,
AsyncResults.settable(), AsyncResults.settable(), AsyncResults.settable(),
AsyncResults.settable());
+ }
+
+ private static Topology topology(long epoch, TableId tableId)
+ {
+ TokenRange all = TokenRange.fullRange(tableId);
+ return new Topology(epoch, Shard.create(all, ALL, FP));
+ }
+
+ private static TopologyManager empty()
+ {
+ TopologySorter sorter = (TopologySorter.StaticSorter) (node1, node2,
shards) -> 0;
+ TopologySorter.Supplier supplier = new TopologySorter.Supplier()
+ {
+ @Override
+ public TopologySorter get(Topology topologies)
+ {
+ return sorter;
+ }
+
+ @Override
+ public TopologySorter get(Topologies topologies)
+ {
+ return sorter;
+ }
+ };
+ TopologyManager tm = new TopologyManager(supplier, null, N1, null,
null, null);
+
+ var mock = Mockito.mock(IAccordService.class);
+ Mockito.when(mock.topology()).thenReturn(tm);
+ AccordService.unsafeSetNewAccordService(mock);
+ return tm;
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
index 0efac1d081..d9b70a7068 100644
--- a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
@@ -720,7 +720,7 @@ public class EpochSyncTest
@Override
public void onEpochRetired(Ranges ranges, long epoch)
{
- topology.onEpochRedundant(ranges, epoch);
+ topology.onEpochRetired(ranges, epoch);
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]