This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 985da9060f Expose epoch ready state as a vtable for operators to 
inspect things
985da9060f is described below

commit 985da9060f44dc33285f2935afeb162f5c44beae
Author: David Capwell <[email protected]>
AuthorDate: Wed Feb 12 10:06:57 2025 -0800

    Expose epoch ready state as a vtable for operators to inspect things
    
    patch by David Capwell; reviewed by Benedict Elliott Smith for 
CASSANDRA-20302
---
 .gitmodules                                        |   4 +-
 modules/accord                                     |   2 +-
 .../cassandra/db/virtual/AccordVirtualTables.java  | 167 ++++++++++++++++++-
 .../cassandra/distributed/api/QueryResults.java    |  33 ++++
 .../fuzz/topology/AccordTopologyMixupTest.java     |  77 ++++++---
 .../fuzz/topology/TopologyMixupTestBase.java       |  24 ++-
 .../db/virtual/AccordVirtualTablesTest.java        | 176 +++++++++++++++++++++
 .../cassandra/service/accord/EpochSyncTest.java    |   2 +-
 8 files changed, 438 insertions(+), 47 deletions(-)

diff --git a/.gitmodules b/.gitmodules
index 22d4a4fb41..616dacf610 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -1,4 +1,4 @@
 [submodule "modules/accord"]
        path = modules/accord
-       url = https://github.com/ifesdjeen/cassandra-accord.git
-       branch = CASSANDRA-20297
+       url = https://github.com/apache/cassandra-accord.git
+       branch = trunk
diff --git a/modules/accord b/modules/accord
index a2ac02b4d2..74a3b81ca9 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit a2ac02b4d20bb6ab9078e70a0a43c17c00f4b0fc
+Subproject commit 74a3b81ca9d9e1ce7ddfd117682fc7c310f0cd99
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java 
b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
index 9df0a517c1..19f061ad71 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordVirtualTables.java
@@ -17,32 +17,187 @@
  */
 package org.apache.cassandra.db.virtual;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
+import javax.annotation.Nullable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+import accord.primitives.Range;
+import accord.primitives.Ranges;
+import accord.topology.TopologyManager.EpochsSnapshot;
+import accord.topology.TopologyManager.EpochsSnapshot.Epoch;
+import accord.topology.TopologyManager.EpochsSnapshot.EpochReady;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.ReversedType;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.TokenRange;
+
+import static 
accord.topology.TopologyManager.EpochsSnapshot.ResultStatus.SUCCESS;
 
 public class AccordVirtualTables
 {
-    private AccordVirtualTables() {}
+    public static final String EPOCHS = "accord_epochs";
+    public static final String TABLE_EPOCHS = "accord_table_epochs";
+
+    private AccordVirtualTables()
+    {
+    }
 
     public static Collection<VirtualTable> getAll(String keyspace)
     {
         if (!DatabaseDescriptor.getAccordTransactionsEnabled())
             return Collections.emptyList();
 
-        return List.of(
+        return List.of(new EpochReadyTable(keyspace),
+                       new EpochSyncRanges(keyspace)
         );
     }
 
-    private static TableMetadata parse(String keyspace, String comment, String 
query)
+    private static TableMetadata.Builder parse(String keyspace, String query)
     {
         return CreateTableStatement.parse(query, keyspace)
-                                   .comment(comment)
-                                   .kind(TableMetadata.Kind.VIRTUAL)
-                                   .build();
+                                   .kind(TableMetadata.Kind.VIRTUAL);
+    }
+
+    public static class EpochReadyTable extends AbstractVirtualTable
+    {
+        public EpochReadyTable(String keyspace)
+        {
+            super(parse(keyspace, "CREATE TABLE " + EPOCHS + " (\n" +
+                                  "  epoch bigint PRIMARY KEY,\n" +
+                                  "  ready_metadata text,\n" +
+                                  "  ready_coordinate text,\n" +
+                                  "  ready_data text,\n" +
+                                  "  ready_reads text,\n" +
+                                  "  ready boolean,\n" +
+                                  ")")
+                  .partitioner(new 
LocalPartitioner(ReversedType.getInstance(LongType.instance)))
+                  .comment("Exposes the epoch ready state for recieved epochs 
in Accord")
+                  .build());
+        }
+
+        @Override
+        public DataSet data()
+        {
+            SimpleDataSet ds = new SimpleDataSet(metadata());
+            EpochsSnapshot snapshot = epochsSnapshot();
+            for (Epoch epoch : snapshot)
+            {
+                ds.row(epoch.epoch);
+                EpochReady ready = epoch.ready;
+                ds.column("ready_metadata", ready.metadata.value);
+                ds.column("ready_coordinate", ready.coordinate.value);
+                ds.column("ready_data", ready.data.value);
+                ds.column("ready_reads", ready.reads.value);
+                ds.column("ready", ready.reads == SUCCESS);
+            }
+            return ds;
+        }
+    }
+
+    public static class EpochSyncRanges extends AbstractVirtualTable
+    {
+        protected EpochSyncRanges(String keyspace)
+        {
+            super(parse(keyspace, "CREATE TABLE " + TABLE_EPOCHS + " (\n" +
+                                  "  epoch bigint,\n" +
+                                  "  keyspace_name text,\n" +
+                                  "  table_name text,\n" +
+                                  "  added frozen<list<text>>,\n" +
+                                  "  removed frozen<list<text>>,\n" +
+                                  "  synced frozen<list<text>>,\n" +
+                                  "  closed frozen<list<text>>,\n" +
+                                  "  retired frozen<list<text>>,\n" +
+                                  "  PRIMARY KEY (epoch, keyspace_name, 
table_name)\n" +
+                                  ")")
+                  .partitioner(new 
LocalPartitioner(ReversedType.getInstance(LongType.instance)))
+                  .comment("Shows details on a per-table basis about what 
ranges are synced per epoch")
+                  .build());
+        }
+
+        @Override
+        public DataSet data()
+        {
+            SimpleDataSet ds = new SimpleDataSet(metadata());
+            EpochsSnapshot snapshot = epochsSnapshot();
+            for (Epoch state : snapshot)
+            {
+                Map<TableId, List<TokenRange>> addedRanges = 
groupByTable(state.addedRanges);
+                Map<TableId, List<TokenRange>> removedRanges = 
groupByTable(state.removedRanges);
+                Map<TableId, List<TokenRange>> synced = 
groupByTable(state.synced);
+                Map<TableId, List<TokenRange>> closed = 
groupByTable(state.closed);
+                Map<TableId, List<TokenRange>> retired = 
groupByTable(state.retired);
+
+                Set<TableId> allTables = union(addedRanges.keySet(), 
removedRanges.keySet(), synced.keySet(), closed.keySet(), retired.keySet());
+                for (TableId table : allTables)
+                {
+                    TableMetadata metadata = 
Schema.instance.getTableMetadata(table);
+                    if (metadata == null) continue; // table dropped, ignore
+                    ds.row(state.epoch, metadata.keyspace, metadata.name);
+
+                    ds.column("added", format(addedRanges.get(table)));
+                    ds.column("removed", format(removedRanges.get(table)));
+                    ds.column("synced", format(synced.get(table)));
+                    ds.column("closed", format(closed.get(table)));
+                    ds.column("retired", format(retired.get(table)));
+                }
+            }
+            return ds;
+        }
+
+        private static <T> Set<T> union(Set<T>... sets)
+        {
+            Preconditions.checkArgument(sets.length > 0);
+            if (sets.length == 1) return sets[0];
+            Sets.SetView accum = Sets.union(sets[0], sets[1]);
+            for (int i = 2; i < sets.length; i++)
+                accum = Sets.union(accum, sets[i]);
+            return accum;
+        }
+
+        private static List<String> format(@Nullable List<TokenRange> list)
+        {
+            if (list == null || list.isEmpty()) return Collections.emptyList();
+            List<String> result = new ArrayList<>(list.size());
+            for (TokenRange tr : list)
+                result.add(toStringNoTable(tr));
+            return result;
+        }
+    }
+
+    private static EpochsSnapshot epochsSnapshot()
+    {
+        return AccordService.instance().topology().epochsSnapshot();
+    }
+
+    private static String toStringNoTable(TokenRange tr)
+    {
+        // TokenRange extends Range.EndInclusive
+        return "(" + tr.start().suffix() + ", " + tr.end().suffix() + "]";
+    }
+
+    private static Map<TableId, List<TokenRange>> groupByTable(Ranges ranges)
+    {
+        Map<TableId, List<TokenRange>> map = new HashMap<>();
+        for (Range range : ranges)
+        {
+            TokenRange tr = (TokenRange) range;
+            map.computeIfAbsent(tr.table(), i -> new ArrayList<>()).add(tr);
+        }
+        return map;
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/api/QueryResults.java 
b/test/distributed/org/apache/cassandra/distributed/api/QueryResults.java
index 081d06a525..46ce489dd1 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/QueryResults.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/QueryResults.java
@@ -25,6 +25,11 @@ import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.AbstractIterator;
 
 public final class QueryResults
 {
@@ -69,6 +74,34 @@ public final class QueryResults
         return new FilterQueryResult(result, fn);
     }
 
+    public static Iterable<List<String>> stringify(SimpleQueryResult qr)
+    {
+        return stringify(qr, -1);
+    }
+
+    public static Iterable<List<String>> stringify(SimpleQueryResult qr, int 
maxColumnSize)
+    {
+        Preconditions.checkArgument(maxColumnSize == -1 || maxColumnSize > 0, 
"max column size must be positive or -1 (disabled); given %s", maxColumnSize);
+        qr.mark();
+        return () -> {
+            qr.reset();
+            return new AbstractIterator<>()
+            {
+                @Override
+                protected List<String> computeNext()
+                {
+                    if (!qr.hasNext())
+                        return endOfData();
+                    Row next = qr.next();
+                    Stream<String> stream = 
Stream.of(next.toObjectArray()).map(Objects::toString);
+                    if (maxColumnSize != -1)
+                        stream = stream.map(s -> s.length() > maxColumnSize ? 
s.substring(0, maxColumnSize) + "..." : s);
+                    return stream.collect(Collectors.toList());
+                }
+            };
+        };
+    }
+
     public static Builder builder()
     {
         return new Builder();
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
index a1731004d2..f011f89847 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/AccordTopologyMixupTest.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
@@ -56,25 +57,30 @@ import org.apache.cassandra.cql3.ast.Txn;
 import org.apache.cassandra.db.marshal.AsciiType;
 import org.apache.cassandra.db.marshal.BytesType;
 import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.virtual.AccordVirtualTables;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.QueryResults;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.test.accord.AccordTestBase;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.accord.api.AccordAgent;
 import org.apache.cassandra.service.consensus.TransactionalMode;
+import org.apache.cassandra.tools.nodetool.formatter.TableBuilder;
 import org.apache.cassandra.utils.ASTGenerators;
 import org.apache.cassandra.utils.AbstractTypeGenerators;
 import org.apache.cassandra.utils.CassandraGenerators;
 import org.apache.cassandra.utils.FastByteOperations;
 import org.apache.cassandra.utils.Generators;
 import org.apache.cassandra.utils.Isolated;
+import org.apache.cassandra.utils.Retry;
 import org.apache.cassandra.utils.Shared;
 import org.quicktheories.generators.SourceDSL;
 
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
 import static 
org.apache.cassandra.utils.AbstractTypeGenerators.overridePrimitiveTypeSupport;
 import static 
org.apache.cassandra.utils.AbstractTypeGenerators.stringComparator;
 import static org.apache.cassandra.utils.AccordGenerators.fromQT;
@@ -121,7 +127,6 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
     private static Spec createSchemaSpec(RandomSource rs, Cluster cluster)
     {
         TransactionalMode mode = rs.pick(TRANSACTIONAL_MODES);
-        boolean enableMigration = allowsMigration(mode) && rs.nextBoolean();
         // This test puts a focus on topology / cluster operations, so schema 
"shouldn't matter"... limit the domain of the test to improve the ability to 
debug
         AbstractTypeGenerators.TypeGenBuilder supportedTypes = 
AbstractTypeGenerators.withoutUnsafeEquality(AbstractTypeGenerators.builder()
                                                                                
                                                   
.withTypeKinds(AbstractTypeGenerators.TypeKind.PRIMITIVE));
@@ -132,7 +137,7 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
                                         .withKnownMemtables()
                                         .withSimpleColumnNames()
                                         //TODO (coverage): include "fast_path 
= 'keyspace'" override
-                                        .withTransactionalMode(enableMigration 
? TransactionalMode.off : mode)
+                                        .withTransactionalMode(mode)
                                         .withDefaultTypeGen(supportedTypes)
                                         .build())
                                  .next(rs);
@@ -140,12 +145,7 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
         String schemaCQL = metadata.toCqlString(false, false, false);
         logger.info("Creating test table:\n{}", schemaCQL);
         cluster.schemaChange(schemaCQL);
-        if (enableMigration)
-        {
-            cluster.schemaChange("ALTER TABLE " + metadata + " WITH " + 
mode.asCqlParam());
-            cluster.get(1).nodetoolResult("consensus_admin", 
"begin-migration", metadata.keyspace, metadata.name).asserts().success();
-        }
-        return new Spec(mode, enableMigration, metadata);
+        return new Spec(mode, metadata);
     }
 
     private static CommandGen<Spec> cqlOperations(Spec spec)
@@ -175,18 +175,6 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
         return new Property.SimpleCommand<>(node + ":" + msg + "; epoch=" + 
state.currentEpoch.get(), s2 -> executeTxn(s2.cluster, node, stmt.toCQL(), 
stmt.bindsEncoded()));
     }
 
-    private static boolean allowsMigration(TransactionalMode mode)
-    {
-        switch (mode)
-        {
-            case mixed_reads:
-            case full:
-                return true;
-            default:
-                return false;
-        }
-    }
-
     private static SimpleQueryResult executeTxn(Cluster cluster, 
IInvokableInstance node, String stmt, ByteBuffer[] binds)
     {
         if (!AccordTestBase.isIdempotent(node, stmt))
@@ -209,13 +197,11 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
     public static class Spec implements Schema
     {
         private final TransactionalMode mode;
-        private final boolean enableMigration;
         private final TableMetadata metadata;
 
-        public Spec(TransactionalMode mode, boolean enableMigration, 
TableMetadata metadata)
+        public Spec(TransactionalMode mode, TableMetadata metadata)
         {
             this.mode = mode;
-            this.enableMigration = enableMigration;
             this.metadata = metadata;
         }
 
@@ -240,6 +226,8 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
 
     private static class AccordState extends State<Spec>
     {
+        private final Map<Integer, String> instanceEpochReadyState = new 
TreeMap<>();
+        private final Map<Integer, String> instanceEpochSyncState = new 
TreeMap<>();
         private final ListenerHolder listener;
 
         public AccordState(RandomSource rs)
@@ -247,6 +235,36 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
             super(rs, AccordTopologyMixupTest::createSchemaSpec, 
AccordTopologyMixupTest::cqlOperations);
 
             this.listener = new ListenerHolder(this);
+            this.preActions.add(this::populateEpochState);
+        }
+
+        private void populateEpochState()
+        {
+            updateMap(instanceEpochReadyState, "SELECT * FROM " + 
VIRTUAL_VIEWS + "." + AccordVirtualTables.EPOCHS);
+            updateMap(instanceEpochSyncState, "SELECT * FROM " + VIRTUAL_VIEWS 
+ "." + AccordVirtualTables.TABLE_EPOCHS);
+        }
+
+        private void updateMap(Map<Integer, String> map, String cql)
+        {
+            for (var inst : cluster)
+            {
+                int num = inst.config().num();
+                if (inst.isShutdown())
+                {
+                    map.put(num, "unknown");
+                    continue;
+                }
+                try
+                {
+                    SimpleQueryResult qr = Retry.retryWithBackoffBlocking(5, 
() -> cluster.get(num).executeInternalWithResult(cql));
+                    map.put(num, TableBuilder.toStringPiped(qr.names(), 
QueryResults.stringify(qr)));
+                }
+                catch (Throwable t)
+                {
+                    // Throwable.toString shows the type + msg but not the 
stack trace
+                    map.put(num, "unknown due to failure: " + t);
+                }
+            }
         }
 
         @Override
@@ -264,6 +282,19 @@ public class AccordTopologyMixupTest extends 
TopologyMixupTestBase<AccordTopolog
             ClusterUtils.awaitAccordEpochReady(cluster, tcmEpoch);
         }
 
+        @Override
+        public String toString()
+        {
+            StringBuilder sb = new StringBuilder(super.toString());
+            sb.append("\nAccord Epoch State:");
+            for (var e : instanceEpochReadyState.entrySet())
+                
sb.append("\nnode").append(e.getKey()).append(":\n").append(e.getValue());
+            sb.append("\nAccord Epoch Ranges:");
+            for (var e : instanceEpochSyncState.entrySet())
+                
sb.append("\nnode").append(e.getKey()).append(":\n").append(e.getValue());
+            return sb.toString();
+        }
+
         @Override
         public void close() throws Exception
         {
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
index 9c5544de8d..8c93b32455 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
@@ -51,7 +51,7 @@ import org.agrona.collections.IntArrayList;
 import org.agrona.collections.IntHashSet;
 import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.api.ICoordinator;
-import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.QueryResults;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 
 import org.junit.Test;
@@ -375,7 +375,7 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
         preCheck(statefulBuilder);
         statefulBuilder.check(commands(this::stateGen)
                               .preCommands(state -> 
state.preActions.forEach(Runnable::run))
-                              .add(2, (rs, state) -> {
+                              .addIf(State::allowTopologyChanges, 2, (rs, 
state) -> {
                                   EnumSet<TopologyChange> 
possibleTopologyChanges = possibleTopologyChanges(state);
                                   if (possibleTopologyChanges.isEmpty()) 
return ignoreCommand();
                                   return topologyCommand(state, 
possibleTopologyChanges).next(rs);
@@ -397,7 +397,7 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
                               .build());
     }
 
-    private EnumSet<TopologyChange> possibleTopologyChanges(State<S> state)
+    private static EnumSet<TopologyChange> possibleTopologyChanges(State<?> 
state)
     {
         EnumSet<TopologyChange> possibleTopologyChanges = 
EnumSet.noneOf(TopologyChange.class);
         // up or down is logically more correct, but since this runs 
sequentially and after the topology changes are complete, we don't have downed 
nodes at this point
@@ -683,6 +683,11 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
             onStartupComplete(waitForEpoch);
         }
 
+        protected boolean allowTopologyChanges()
+        {
+            return !possibleTopologyChanges(this).isEmpty();
+        }
+
         protected void onStartupComplete(long tcmEpoch)
         {
 
@@ -753,17 +758,8 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
             try
             {
                 SimpleQueryResult qr = Retry.retryWithBackoffBlocking(5, () -> 
cluster.get(cmsNode).executeInternalWithResult("SELECT epoch, kind, 
transformation FROM system_views.cluster_metadata_log"));
-                TableBuilder builder = new TableBuilder(" | ");
-                builder.add(qr.names());
-                while (qr.hasNext())
-                {
-                    Row next = qr.next();
-                    builder.add(Stream.of(next.toObjectArray())
-                                      .map(Objects::toString)
-                                      .map(s -> s.length() > 100 ? 
s.substring(0, 100) + "..." : s)
-                                      .collect(Collectors.toList()));
-                }
-                epochHistory = "Epochs:\n" + builder;
+                String table = TableBuilder.toStringPiped(qr.names(), 
QueryResults.stringify(qr, 100));
+                epochHistory = "Epochs:\n" + table;
             }
             catch (Throwable t)
             {
diff --git 
a/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java 
b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
new file mode 100644
index 0000000000..0eb510a354
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordVirtualTablesTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.virtual;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import accord.api.ConfigurationService;
+import accord.api.TopologySorter;
+import accord.local.Node;
+import accord.primitives.Ranges;
+import accord.topology.Shard;
+import accord.topology.Topologies;
+import accord.topology.Topology;
+import accord.topology.TopologyManager;
+import accord.utils.SortedArrays;
+import accord.utils.async.AsyncResults;
+import org.apache.cassandra.ServerTestUtils;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.accord.AccordService;
+import org.apache.cassandra.service.accord.IAccordService;
+import org.apache.cassandra.service.accord.TokenRange;
+import org.mockito.Mockito;
+
+import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_VIEWS;
+
+public class AccordVirtualTablesTest extends CQLTester
+{
+    public static final Node.Id N1 = new Node.Id(1);
+    public static final SortedArrays.SortedArrayList<Node.Id> ALL = 
SortedArrays.SortedArrayList.ofSorted(N1);
+    public static final Set<Node.Id> FP = Collections.singleton(N1);
+    public static final String SUCCESS = "success";
+    public static final String PENDING = "pending";
+    public static final List<String> FULL_RANGE = List.of("(-Inf, +Inf]");
+
+    public static TableId T1;
+    public static TableMetadata T1_META;
+
+    @BeforeClass
+    public static void setup()
+    {
+        addVirtualKeyspace();
+    }
+
+    @Before
+    public void setupTables()
+    {
+        if (T1_META != null) return;
+
+        String tbl1 = createTable("CREATE TABLE %s(pk int primary key)");
+        T1_META = Schema.instance.getTableMetadata(keyspace(), tbl1);
+        T1 = T1_META.id;
+
+        ServerTestUtils.markCMS();
+    }
+
+    @Test
+    public void emptyEpochs()
+    {
+        TopologyManager tm = empty();
+        assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.EPOCHS));
+        assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.TABLE_EPOCHS));
+    }
+
+    @Test
+    public void epochUpdates()
+    {
+        TopologyManager tm = empty();
+        long e1 = 1;
+        tm.onTopologyUpdate(topology(e1, T1), () -> 
ConfigurationService.EpochReady.done(e1));
+        assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.EPOCHS),
+                   row(e1, true, SUCCESS, SUCCESS, SUCCESS, SUCCESS));
+
+        long e2 = 2;
+        tm.onTopologyUpdate(topology(e2, T1), () -> pendingReady(e1));
+        assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.EPOCHS),
+                   row(e2, false, PENDING, PENDING, PENDING, PENDING),
+                   row(e1, true, SUCCESS, SUCCESS, SUCCESS, SUCCESS));
+    }
+
+    @Test
+    public void tableUpdates()
+    {
+        TopologyManager tm = empty();
+        long e1 = 1;
+        tm.onTopologyUpdate(topology(e1, T1), () -> 
ConfigurationService.EpochReady.done(e1));
+
+        // the range was added in the first epoch, so its fully synced
+        assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.TABLE_EPOCHS),
+                   row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE, 
List.of(), List.of(), List.of(), FULL_RANGE));
+
+        // range is no longer "added" so doesn't show up as synced!
+        long e2 = 2;
+        tm.onTopologyUpdate(topology(e2, T1), () -> 
ConfigurationService.EpochReady.done(e2));
+        assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.TABLE_EPOCHS),
+                   row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE, 
List.of(), List.of(), List.of(), FULL_RANGE));
+
+        // sync the range
+        tm.onEpochSyncComplete(N1, e2);
+        assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.TABLE_EPOCHS),
+                   row(e2, T1_META.keyspace, T1_META.name, List.of(), 
List.of(), List.of(), List.of(), FULL_RANGE),
+                   row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE, 
List.of(), List.of(), List.of(), FULL_RANGE));
+
+        // lets close e2
+        tm.onEpochClosed(Ranges.single(TokenRange.fullRange(T1)), e2);
+        assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.TABLE_EPOCHS),
+                   row(e2, T1_META.keyspace, T1_META.name, List.of(), 
FULL_RANGE, List.of(), List.of(), FULL_RANGE),
+                   row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE, 
FULL_RANGE, List.of(), List.of(), FULL_RANGE));
+
+        // enjoy retirement!
+        tm.onEpochRetired(Ranges.single(TokenRange.fullRange(T1)), e2);
+        assertRows(execute("SELECT * FROM " + VIRTUAL_VIEWS + "." + 
AccordVirtualTables.TABLE_EPOCHS),
+                   row(e2, T1_META.keyspace, T1_META.name, List.of(), 
FULL_RANGE, List.of(), FULL_RANGE, FULL_RANGE),
+                   row(e1, T1_META.keyspace, T1_META.name, FULL_RANGE, 
FULL_RANGE, List.of(), FULL_RANGE, FULL_RANGE));
+    }
+
+    private static ConfigurationService.EpochReady pendingReady(long epoch)
+    {
+        return new ConfigurationService.EpochReady(epoch, 
AsyncResults.settable(), AsyncResults.settable(), AsyncResults.settable(), 
AsyncResults.settable());
+    }
+
+    private static Topology topology(long epoch, TableId tableId)
+    {
+        TokenRange all = TokenRange.fullRange(tableId);
+        return new Topology(epoch, Shard.create(all, ALL, FP));
+    }
+
+    private static TopologyManager empty()
+    {
+        TopologySorter sorter = (TopologySorter.StaticSorter) (node1, node2, 
shards) -> 0;
+        TopologySorter.Supplier supplier = new TopologySorter.Supplier()
+        {
+            @Override
+            public TopologySorter get(Topology topologies)
+            {
+                return sorter;
+            }
+
+            @Override
+            public TopologySorter get(Topologies topologies)
+            {
+                return sorter;
+            }
+        };
+        TopologyManager tm = new TopologyManager(supplier, null, N1, null, 
null, null);
+
+        var mock = Mockito.mock(IAccordService.class);
+        Mockito.when(mock.topology()).thenReturn(tm);
+        AccordService.unsafeSetNewAccordService(mock);
+        return tm;
+    }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java 
b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
index 0efac1d081..d9b70a7068 100644
--- a/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/EpochSyncTest.java
@@ -720,7 +720,7 @@ public class EpochSyncTest
                     @Override
                     public void onEpochRetired(Ranges ranges, long epoch)
                     {
-                        topology.onEpochRedundant(ranges, epoch);
+                        topology.onEpochRetired(ranges, epoch);
                     }
                 });
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to