This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit fe8f473e4257987ea4dd10e5e96e7801df85b6d6
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Thu May 25 15:56:24 2023 -0700

    Accord TCM integration
    
    Patch by Blake Eggleston; Reviewed by David Capwell for CASSANDRA-18444
---
 .../main/java/accord/api/ConfigurationService.java |   6 +
 .../accord/impl/AbstractConfigurationService.java  | 279 +++++++++++++++++++
 accord-core/src/main/java/accord/local/Node.java   |   7 +-
 .../src/main/java/accord/topology/Topology.java    |   2 +-
 .../main/java/accord/topology/TopologyManager.java |  35 ++-
 accord-core/src/test/java/accord/Utils.java        |  22 +-
 .../accord/burn/BurnTestConfigurationService.java  | 167 ++----------
 .../impl/AbstractConfigurationServiceTest.java     | 302 +++++++++++++++++++++
 .../test/java/accord/impl/mock/MockCluster.java    |   1 +
 .../java/accord/local/ImmutableCommandTest.java    |   1 +
 .../test/java/accord/messages/PreAcceptTest.java   |   3 +-
 .../java/accord/topology/TopologyManagerTest.java  |  63 +++++
 12 files changed, 721 insertions(+), 167 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ConfigurationService.java 
b/accord-core/src/main/java/accord/api/ConfigurationService.java
index 7b868acd..f7ecfece 100644
--- a/accord-core/src/main/java/accord/api/ConfigurationService.java
+++ b/accord-core/src/main/java/accord/api/ConfigurationService.java
@@ -118,6 +118,12 @@ public interface ConfigurationService
          * This should be invoked on each replica once EpochReady.coordination 
has returned on a replica.
          */
         void onEpochSyncComplete(Node.Id node, long epoch);
+
+        /**
+         * Called when the configuration service is meant to truncate it's 
topology data up to (but not including)
+         * the given epoch
+         */
+        void truncateTopologyUntil(long epoch);
     }
 
     void registerListener(Listener listener);
diff --git 
a/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java 
b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
new file mode 100644
index 00000000..6d2aaf39
--- /dev/null
+++ b/accord-core/src/main/java/accord/impl/AbstractConfigurationService.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.ConfigurationService;
+import accord.local.Node;
+import accord.topology.Topology;
+import accord.utils.Invariants;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+public abstract class AbstractConfigurationService implements 
ConfigurationService
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AbstractConfigurationService.class);
+
+    protected final Node.Id node;
+
+    protected final EpochHistory epochs = new EpochHistory();
+
+    protected final List<Listener> listeners = new ArrayList<>();
+
+    static class EpochState
+    {
+        private final long epoch;
+        private final AsyncResult.Settable<Topology> received = 
AsyncResults.settable();
+        private final AsyncResult.Settable<Void> acknowledged = 
AsyncResults.settable();
+
+        private Topology topology = null;
+
+        public EpochState(long epoch)
+        {
+            this.epoch = epoch;
+        }
+
+        public long epoch()
+        {
+            return epoch;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "EpochState{" + epoch + '}';
+        }
+    }
+
+    @VisibleForTesting
+    protected static class EpochHistory
+    {
+        // TODO (low priority): move pendingEpochs / FetchTopology into here?
+        private List<EpochState> epochs = new ArrayList<>();
+
+        protected long lastReceived = 0;
+        private long lastAcknowledged = 0;
+
+        long minEpoch()
+        {
+            return epochs.isEmpty() ? 0L : epochs.get(0).epoch;
+        }
+
+        long maxEpoch()
+        {
+            int size = epochs.size();
+            return size == 0 ? 0L : epochs.get(size - 1).epoch;
+        }
+
+        @VisibleForTesting
+        EpochState atIndex(int idx)
+        {
+            return epochs.get(idx);
+        }
+
+        @VisibleForTesting
+        int size()
+        {
+            return epochs.size();
+        }
+
+        EpochState getOrCreate(long epoch)
+        {
+            Invariants.checkArgument(epoch > 0);
+            if (epochs.isEmpty())
+            {
+                EpochState state = new EpochState(epoch);
+                epochs.add(state);
+                return state;
+            }
+
+            long minEpoch = minEpoch();
+            if (epoch < minEpoch)
+            {
+                int prepend = Ints.checkedCast(minEpoch - epoch);
+                List<EpochState> next = new ArrayList<>(epochs.size() + 
prepend);
+                for (long addEpoch=epoch; addEpoch<minEpoch; addEpoch++)
+                    next.add(new EpochState(addEpoch));
+                next.addAll(epochs);
+                epochs = next;
+                minEpoch = minEpoch();
+                Invariants.checkState(minEpoch == epoch);
+            }
+            long maxEpoch = maxEpoch();
+            int idx = Ints.checkedCast(epoch - minEpoch);
+
+            // add any missing epochs
+            for (long addEpoch = maxEpoch + 1; addEpoch <= epoch; addEpoch++)
+                epochs.add(new EpochState(addEpoch));
+
+            return epochs.get(idx);
+        }
+
+        public EpochHistory receive(Topology topology)
+        {
+            long epoch = topology.epoch();
+            Invariants.checkState(lastReceived == epoch - 1 || epoch == 0 || 
lastReceived == 0);
+            lastReceived = epoch;
+            EpochState state = getOrCreate(epoch);
+            if (state != null)
+            {
+                state.topology = topology;
+                state.received.setSuccess(topology);
+            }
+            return this;
+        }
+
+        AsyncResult<Topology> receiveFuture(long epoch)
+        {
+            return getOrCreate(epoch).received;
+        }
+
+        Topology topologyFor(long epoch)
+        {
+            return getOrCreate(epoch).topology;
+        }
+
+        public EpochHistory acknowledge(long epoch)
+        {
+            Invariants.checkState(lastAcknowledged == epoch - 1 || epoch == 0 
|| lastAcknowledged == 0);
+            lastAcknowledged = epoch;
+            getOrCreate(epoch).acknowledged.setSuccess(null);
+            return this;
+        }
+
+        AsyncResult<Void> acknowledgeFuture(long epoch)
+        {
+            return getOrCreate(epoch).acknowledged;
+        }
+
+        void truncateUntil(long epoch)
+        {
+            Invariants.checkArgument(epoch <= maxEpoch());
+            long minEpoch = minEpoch();
+            int toTrim = Ints.checkedCast(epoch - minEpoch);
+            if (toTrim <=0)
+                return;
+
+            epochs = new ArrayList<>(epochs.subList(toTrim, epochs.size()));
+        }
+    }
+
+    public AbstractConfigurationService(Node.Id node)
+    {
+        this.node = node;
+    }
+
+    @Override
+    public synchronized void registerListener(Listener listener)
+    {
+        listeners.add(listener);
+    }
+
+    @Override
+    public synchronized Topology currentTopology()
+    {
+        return epochs.topologyFor(epochs.lastReceived);
+    }
+
+    @Override
+    public synchronized Topology getTopologyForEpoch(long epoch)
+    {
+        return epochs.topologyFor(epoch);
+    }
+
+    protected abstract void fetchTopologyInternal(long epoch);
+
+    @Override
+    public synchronized void fetchTopologyForEpoch(long epoch)
+    {
+        if (epoch <= epochs.lastReceived)
+            return;
+
+        fetchTopologyInternal(epoch);
+    }
+
+    protected abstract void epochSyncComplete(Topology topology );
+
+    @Override
+    public synchronized void acknowledgeEpoch(EpochReady ready)
+    {
+        ready.metadata.addCallback(() -> epochs.acknowledge(ready.epoch));
+        ready.coordination.addCallback(() ->  
epochSyncComplete(epochs.getOrCreate(ready.epoch).topology));
+    }
+
+    protected void topologyUpdatePreListenerNotify(Topology topology) {}
+    protected void topologyUpdatePostListenerNotify(Topology topology) {}
+
+    public synchronized AsyncResult<Void> reportTopology(Topology topology)
+    {
+        long lastReceived = epochs.lastReceived;
+        if (topology.epoch() <= lastReceived)
+            return AsyncResults.success(null);
+
+        if (lastReceived > 0 && topology.epoch() > lastReceived + 1)
+        {
+            fetchTopologyForEpoch(lastReceived + 1);
+            epochs.receiveFuture(lastReceived + 1).addCallback(() -> 
reportTopology(topology));
+            return AsyncResults.success(null);
+        }
+
+        long lastAcked = epochs.lastAcknowledged;
+        if (lastAcked > 0 && topology.epoch() > lastAcked + 1)
+        {
+            epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> 
reportTopology(topology));
+            return AsyncResults.success(null);
+        }
+        logger.trace("Epoch {} received by {}", topology.epoch(), node);
+
+        epochs.receive(topology);
+        topologyUpdatePreListenerNotify(topology);
+        for (Listener listener : listeners)
+            listener.onTopologyUpdate(topology);
+        topologyUpdatePostListenerNotify(topology);
+        return AsyncResults.success(null);
+    }
+
+    protected void epochSyncCompletePreListenerNotify(Node.Id node, long 
epoch) {}
+
+    public synchronized void epochSyncComplete(Node.Id node, long epoch)
+    {
+        epochSyncCompletePreListenerNotify(node, epoch);
+        for (Listener listener : listeners)
+            listener.onEpochSyncComplete(node, epoch);
+    }
+
+    protected void truncateTopologiesPreListenerNotify(long epoch) {}
+    protected void truncateTopologiesPostListenerNotify(long epoch) {}
+
+    public synchronized void truncateTopologiesUntil(long epoch)
+    {
+        truncateTopologiesPreListenerNotify(epoch);
+        for (Listener listener : listeners)
+            listener.truncateTopologyUntil(epoch);
+        truncateTopologiesPostListenerNotify(epoch);
+        epochs.truncateUntil(epoch);
+    }
+}
diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 6e97c042..01e83ebb 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -141,7 +141,6 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
         this.configService = configService;
         this.topology = new TopologyManager(topologySorter, id);
         this.nowSupplier = nowSupplier;
-        Topology topology = configService.currentTopology();
         this.now = new 
AtomicReference<>(Timestamp.fromValues(topology.epoch(), 
nowSupplier.getAsLong(), id));
         this.agent = agent;
         this.random = random;
@@ -199,6 +198,12 @@ public class Node implements 
ConfigurationService.Listener, NodeTimeService
         topology.onEpochSyncComplete(node, epoch);
     }
 
+    @Override
+    public void truncateTopologyUntil(long epoch)
+    {
+        topology.truncateTopologyUntil(epoch);
+    }
+
     public void withEpoch(long epoch, Runnable runnable)
     {
         if (topology.hasEpoch(epoch))
diff --git a/accord-core/src/main/java/accord/topology/Topology.java 
b/accord-core/src/main/java/accord/topology/Topology.java
index 728553c1..56b251e9 100644
--- a/accord-core/src/main/java/accord/topology/Topology.java
+++ b/accord-core/src/main/java/accord/topology/Topology.java
@@ -103,7 +103,7 @@ public class Topology
     @Override
     public String toString()
     {
-        return "Topology{" + "epoch=" + epoch + ", " + super.toString() + '}';
+        return "Topology{" + "epoch=" + epoch + ", " + Arrays.toString(shards) 
+ '}';
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index aea8c535..9c083a5c 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -68,6 +68,7 @@ public class TopologyManager
             this.global = checkArgument(global, !global.isSubset());
             this.local = global.forNode(node).trim();
             Invariants.checkArgument(!global().isSubset());
+            // TODO: can we just track sync for local ranges here?
             this.syncTracker = new QuorumTracker(new Single(sorter, global()));
             this.syncComplete = syncComplete;
             this.prevSynced = prevSynced;
@@ -129,6 +130,7 @@ public class TopologyManager
 
     private static class Epochs
     {
+        private static final Epochs EMPTY = new Epochs(new EpochState[0]);
         private final long currentEpoch;
         private final EpochState[] epochs;
         // nodes we've received sync complete notifications from, for epochs 
we do not yet have topologies for.
@@ -174,6 +176,16 @@ public class TopologyManager
             return current().epoch + 1;
         }
 
+        public long minEpoch()
+        {
+            return currentEpoch - epochs.length + 1;
+        }
+
+        public long epoch()
+        {
+            return currentEpoch;
+        }
+
         public Topology current()
         {
             return epochs.length > 0 ? epochs[0].global() : Topology.EMPTY;
@@ -197,6 +209,8 @@ public class TopologyManager
             else
             {
                 EpochState state = get(epoch);
+                if (state == null)
+                    return;
                 state.recordSyncComplete(node);
                 for (epoch++ ; state.syncComplete() && epoch <= currentEpoch; 
epoch++)
                 {
@@ -223,14 +237,15 @@ public class TopologyManager
     {
         this.sorter = sorter;
         this.node = node;
-        this.epochs = new Epochs(new EpochState[0]);
+        this.epochs = Epochs.EMPTY;
     }
 
     public synchronized void onTopologyUpdate(Topology topology)
     {
         Epochs current = epochs;
 
-        checkArgument(topology.epoch == current.nextEpoch(), "Expected 
topology update %d to be %d", topology.epoch, current.nextEpoch());
+        checkArgument(topology.epoch == current.nextEpoch() || epochs == 
Epochs.EMPTY,
+                      "Expected topology update %d to be %d", topology.epoch, 
current.nextEpoch());
         EpochState[] nextEpochs = new EpochState[current.epochs.length + 1];
         List<Set<Id>> pendingSync = new 
ArrayList<>(current.pendingSyncComplete);
         Set<Id> alreadySyncd = Collections.emptySet();
@@ -274,6 +289,22 @@ public class TopologyManager
         epochs.syncComplete(node, epoch);
     }
 
+    public synchronized void truncateTopologyUntil(long epoch)
+    {
+        Epochs current = epochs;
+        checkArgument(current.epoch() >= epoch);
+
+        if (current.minEpoch() >= epoch)
+            return;
+
+        int newLen = current.epochs.length - (int) (epoch - 
current.minEpoch());
+        Invariants.checkState(current.epochs[newLen - 1].syncComplete());
+
+        EpochState[] nextEpochs = new EpochState[newLen];
+        System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
+        epochs = new Epochs(nextEpochs, current.pendingSyncComplete, 
current.futureEpochFutures);
+    }
+
     public TopologySorter.Supplier sorter()
     {
         return sorter;
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index b5f38637..3b554161 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -138,17 +138,17 @@ public class Utils
         MockStore store = new MockStore();
         Scheduler scheduler = new ThreadPoolScheduler();
         Node node = new Node(nodeId,
-                        messageSink,
-                        new MockConfigurationService(messageSink, 
EpochFunction.noop(), topology),
-                        clock,
-                        () -> store,
-                        new ShardDistributor.EvenSplit(8, ignore -> new 
IntKey.Splitter()),
-                        new TestAgent(),
-                        new DefaultRandom(),
-                        scheduler,
-                        SizeOfIntersectionSorter.SUPPLIER,
-                        SimpleProgressLog::new,
-                        InMemoryCommandStores.Synchronized::new);
+                             messageSink,
+                             new MockConfigurationService(messageSink, 
EpochFunction.noop(), topology),
+                             clock,
+                             () -> store,
+                             new ShardDistributor.EvenSplit(8, ignore -> new 
IntKey.Splitter()),
+                             new TestAgent(),
+                             new DefaultRandom(),
+                             scheduler,
+                             SizeOfIntersectionSorter.SUPPLIER,
+                             SimpleProgressLog::new,
+                             InMemoryCommandStores.Synchronized::new);
         awaitUninterruptibly(node.start());
         return node;
     }
diff --git 
a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java 
b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
index 0c5aa2a6..855366f6 100644
--- a/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
+++ b/accord-core/src/test/java/accord/burn/BurnTestConfigurationService.java
@@ -20,17 +20,13 @@ package accord.burn;
 
 import accord.api.TestableConfigurationService;
 import accord.local.AgentExecutor;
+import accord.impl.AbstractConfigurationService;
 import accord.utils.RandomSource;
 import accord.local.Node;
 import accord.messages.*;
 import accord.topology.Topology;
-import accord.utils.Invariants;
-import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -38,124 +34,22 @@ import java.util.Map;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-public class BurnTestConfigurationService implements 
TestableConfigurationService
+public class BurnTestConfigurationService extends AbstractConfigurationService 
implements TestableConfigurationService
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(BurnTestConfigurationService.class);
-
-    private final Node.Id node;
     private final AgentExecutor executor;
     private final Function<Node.Id, Node> lookup;
     private final Supplier<RandomSource> randomSupplier;
-    private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
-
-    private final EpochHistory epochs = new EpochHistory();
-    private final List<Listener> listeners = new ArrayList<>();
     private final TopologyUpdates topologyUpdates;
-
-    private static class EpochState
-    {
-        private final long epoch;
-        private final AsyncResult.Settable<Topology> received = 
AsyncResults.settable();
-        private final AsyncResult.Settable<Void> acknowledged = 
AsyncResults.settable();
-        private final AsyncResult.Settable<Void> synced = 
AsyncResults.settable();
-
-        private Topology topology = null;
-
-        public EpochState(long epoch)
-        {
-            this.epoch = epoch;
-        }
-    }
-
-    private static class EpochHistory
-    {
-        // TODO (low priority): move pendingEpochs / FetchTopology into here?
-        private final List<EpochState> epochs = new ArrayList<>();
-
-        private long lastReceived = 0;
-        private long lastAcknowledged = 0;
-        private long lastSyncd = 0;
-
-        private EpochState get(long epoch)
-        {
-            for (long addEpoch = epochs.size() - 1; addEpoch <= epoch; 
addEpoch++)
-                epochs.add(new EpochState(addEpoch));
-            return epochs.get((int) epoch);
-        }
-
-        EpochHistory receive(Topology topology)
-        {
-            long epoch = topology.epoch();
-            Invariants.checkState(epoch == 0 || lastReceived == epoch - 1);
-            lastReceived = epoch;
-            EpochState state = get(epoch);
-            state.topology = topology;
-            state.received.setSuccess(topology);
-            return this;
-        }
-
-        AsyncResult<Topology> receiveFuture(long epoch)
-        {
-            return get(epoch).received;
-        }
-
-        Topology topologyFor(long epoch)
-        {
-            return get(epoch).topology;
-        }
-
-        EpochHistory acknowledge(long epoch)
-        {
-            Invariants.checkState(epoch == 0 || lastAcknowledged == epoch - 1);
-            lastAcknowledged = epoch;
-            get(epoch).acknowledged.setSuccess(null);
-            return this;
-        }
-
-        AsyncResult<Void> acknowledgeFuture(long epoch)
-        {
-            return get(epoch).acknowledged;
-        }
-
-        EpochHistory syncComplete(long epoch)
-        {
-            Invariants.checkState(epoch == 0 || lastSyncd == epoch - 1);
-            EpochState state = get(epoch);
-            Invariants.checkState(state.received.isDone());
-            Invariants.checkState(state.acknowledged.isDone());
-            lastSyncd = epoch;
-            get(epoch).synced.setSuccess(null);
-            return this;
-        }
-    }
+    private final Map<Long, FetchTopology> pendingEpochs = new HashMap<>();
 
     public BurnTestConfigurationService(Node.Id node, AgentExecutor executor, 
Supplier<RandomSource> randomSupplier, Topology topology, Function<Node.Id, 
Node> lookup, TopologyUpdates topologyUpdates)
     {
-        this.node = node;
+        super(node);
         this.executor = executor;
         this.randomSupplier = randomSupplier;
         this.lookup = lookup;
         this.topologyUpdates = topologyUpdates;
-        epochs.receive(Topology.EMPTY).acknowledge(0).syncComplete(0);
-        epochs.receive(topology).acknowledge(1).syncComplete(1);
-    }
-
-    @Override
-    public synchronized void registerListener(Listener listener)
-    {
-        listeners.add(listener);
-    }
-
-    @Override
-    public synchronized Topology currentTopology()
-    {
-        return epochs.topologyFor(epochs.lastReceived);
-    }
-
-    @Override
-    public synchronized Topology getTopologyForEpoch(long epoch)
-    {
-        return epochs.topologyFor(epoch);
+        reportTopology(topology);
     }
 
     private static class FetchTopologyRequest implements Request
@@ -257,58 +151,29 @@ public class BurnTestConfigurationService implements 
TestableConfigurationServic
     }
 
     @Override
-    public synchronized void fetchTopologyForEpoch(long epoch)
+    protected void fetchTopologyInternal(long epoch)
     {
-        if (epoch <= epochs.lastReceived)
-            return;
-
-        for (long e = epochs.lastReceived + 1; e < epoch ; ++e)
-            pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
+        pendingEpochs.computeIfAbsent(epoch, FetchTopology::new);
     }
 
     @Override
-    public synchronized void acknowledgeEpoch(EpochReady ready)
-    {
-        ready.metadata.addCallback(() -> epochs.acknowledge(ready.epoch));
-        ready.coordination.addCallback(() ->  
topologyUpdates.syncComplete(lookup.apply(node), 
epochs.get(ready.epoch).topology.nodes(), ready.epoch));
-    }
-
-    private Node originator()
+    protected void epochSyncComplete(Topology topology)
     {
-        return lookup.apply(node);
+        topologyUpdates.syncComplete(lookup.apply(node), topology.nodes(), 
topology.epoch());
     }
 
     @Override
-    public synchronized AsyncResult<Void> reportTopology(Topology topology)
+    protected void topologyUpdatePostListenerNotify(Topology topology)
     {
-        long lastReceived = epochs.lastReceived;
-        if (topology.epoch() <= lastReceived)
-            return AsyncResults.success(null);
-
-        if (topology.epoch() > lastReceived + 1)
-        {
-            fetchTopologyForEpoch(lastReceived + 1);
-            epochs.receiveFuture(lastReceived + 1).addCallback(() -> 
reportTopology(topology));
-            return AsyncResults.success(null);
-        }
-
-        long lastAcked = epochs.lastAcknowledged;
-        if (topology.epoch() > lastAcked + 1)
-        {
-            epochs.acknowledgeFuture(lastAcked + 1).addCallback(() -> 
reportTopology(topology));
-            return AsyncResults.success(null);
-        }
-        logger.trace("Epoch {} received by {}", topology.epoch(), node);
-
-        epochs.receive(topology);
-        for (Listener listener : listeners)
-            listener.onTopologyUpdate(topology);
-
         FetchTopology fetch = pendingEpochs.remove(topology.epoch());
         if (fetch == null)
-            return AsyncResults.success(null);
+            return;
 
         fetch.setSuccess(null);
-        return AsyncResults.success(null);
+    }
+
+    private Node originator()
+    {
+        return lookup.apply(node);
     }
 }
diff --git 
a/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
new file mode 100644
index 00000000..d0e6ee03
--- /dev/null
+++ 
b/accord-core/src/test/java/accord/impl/AbstractConfigurationServiceTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.impl;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import accord.api.ConfigurationService.EpochReady;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import accord.api.ConfigurationService;
+import accord.impl.AbstractConfigurationService.EpochHistory;
+import accord.local.Node.Id;
+import accord.primitives.Range;
+import accord.topology.Shard;
+import accord.topology.Topology;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class AbstractConfigurationServiceTest
+{
+    public static class TestListener implements ConfigurationService.Listener
+    {
+        private final ConfigurationService parent;
+        private final boolean ackTopologies;
+        final Map<Long, Topology> topologies = new HashMap<>();
+        final Map<Long, Set<Id>> syncCompletes = new HashMap<>();
+        final Set<Long> truncates = new HashSet<>();
+
+        public TestListener(ConfigurationService parent, boolean ackTopologies)
+        {
+            this.parent = parent;
+            this.ackTopologies = ackTopologies;
+        }
+
+        @Override
+        public AsyncResult<Void> onTopologyUpdate(Topology topology)
+        {
+            if (topologies.put(topology.epoch(), topology) != null)
+                Assertions.fail("Received topology twice for epoch " + 
topology.epoch());
+            if (ackTopologies)
+                parent.acknowledgeEpoch(EpochReady.done(topology.epoch()));
+            return AsyncResults.success(null);
+        }
+
+        @Override
+        public void onEpochSyncComplete(Id node, long epoch)
+        {
+            Set<Id> synced = syncCompletes.computeIfAbsent(epoch, e -> new 
HashSet<>());
+            if (!synced.add(node))
+                Assertions.fail(String.format("Recieved multiple syncs for 
epoch %s from %s", epoch, node));
+        }
+
+        @Override
+        public void truncateTopologyUntil(long epoch)
+        {
+            if (!truncates.add(epoch))
+                Assertions.fail(String.format("Recieved multiple truncates for 
epoch", epoch));
+        }
+
+        public void assertNoTruncates()
+        {
+            Assertions.assertTrue(truncates.isEmpty());
+        }
+
+        public void assertTruncates(Long... epochs)
+        {
+            Assertions.assertEquals(ImmutableSet.copyOf(epochs), truncates);
+        }
+
+        public void assertSyncsFor(Long... epochs)
+        {
+            Assertions.assertEquals(ImmutableSet.copyOf(epochs), 
syncCompletes.keySet());
+        }
+
+        public void assertSyncsForEpoch(long epoch, Id... nodes)
+        {
+            Assertions.assertEquals(ImmutableSet.copyOf(nodes), 
syncCompletes.get(epoch));
+        }
+
+        public void assertTopologiesFor(Long... epochs)
+        {
+            Assertions.assertEquals(ImmutableSet.copyOf(epochs), 
topologies.keySet());
+        }
+
+        public void assertTopologyForEpoch(long epoch, Topology topology)
+        {
+            Assertions.assertEquals(topology, topologies.get(epoch));
+        }
+    }
+
+    private static class TestableConfigurationService extends 
AbstractConfigurationService
+    {
+        final Set<Long> syncStarted = new HashSet<>();
+        final Set<Long> epochsFetched = new HashSet<>();
+
+        public TestableConfigurationService(Id node)
+        {
+            super(node);
+        }
+
+        @Override
+        protected void fetchTopologyInternal(long epoch)
+        {
+            epochsFetched.add(epoch);
+        }
+
+        @Override
+        protected void epochSyncComplete(Topology topology)
+        {
+            if (!syncStarted.add(topology.epoch()))
+                Assertions.fail("Sync started multiple times for " + 
topology.epoch());
+        }
+
+        @Override
+        protected void topologyUpdatePostListenerNotify(Topology topology)
+        {
+            acknowledgeEpoch(EpochReady.done(topology.epoch()));
+        }
+    }
+
+    private static final Id ID1 = new Id(1);
+    private static final Id ID2 = new Id(2);
+    private static final Id ID3 = new Id(3);
+    private static final List<Id> NODES = ImmutableList.of(ID1, ID2, ID3);
+    private static final Range RANGE = IntKey.range(0, 100);
+
+    private static Shard shard(Range range, List<Id> nodes, Set<Id> fastPath)
+    {
+        return new Shard(range, nodes, fastPath);
+    }
+
+    private static Topology topology(long epoch, Range range, List<Id> nodes, 
Set<Id> fastPath)
+    {
+        return new Topology(epoch, shard(range, nodes, fastPath));
+    }
+
+    private static Topology topology(long epoch, Id... fastPath)
+    {
+        return topology(epoch, RANGE, NODES, ImmutableSet.copyOf(fastPath));
+    }
+
+    private static Topology topology(long epoch, int... fastPath)
+    {
+        Set<Id> fpSet = 
Arrays.stream(fastPath).mapToObj(Id::new).collect(Collectors.toSet());
+        return topology(epoch, RANGE, NODES, fpSet);
+    }
+
+    private static final Topology TOPOLOGY1 = topology(1, 1, 2, 3);
+    private static final Topology TOPOLOGY2 = topology(2, 1, 2);
+    private static final Topology TOPOLOGY3 = topology(3, 1, 3);
+    private static final Topology TOPOLOGY4 = topology(4, 2, 3);
+
+    @Test
+    public void getTopologyTest()
+    {
+        TestableConfigurationService service = new 
TestableConfigurationService(ID1);
+        TestListener listener = new TestListener(service, false);
+        service.registerListener(listener);
+        service.reportTopology(TOPOLOGY1);
+        service.reportTopology(TOPOLOGY2);
+        service.reportTopology(TOPOLOGY3);
+        service.reportTopology(TOPOLOGY4);
+
+        listener.assertNoTruncates();
+        listener.assertTopologiesFor(1L, 2L, 3L, 4L);
+        Assertions.assertSame(TOPOLOGY1, service.getTopologyForEpoch(1));
+        Assertions.assertSame(TOPOLOGY2, service.getTopologyForEpoch(2));
+        Assertions.assertSame(TOPOLOGY3, service.getTopologyForEpoch(3));
+        Assertions.assertSame(TOPOLOGY4, service.getTopologyForEpoch(4));
+    }
+
+    /**
+     * check everything works properly if we start loading after epoch 1 has
+     * been removed
+     */
+    @Test
+    public void loadAfterTruncate()
+    {
+        TestableConfigurationService service = new 
TestableConfigurationService(ID1);
+        TestListener listener = new TestListener(service, false);
+        service.registerListener(listener);
+        service.reportTopology(TOPOLOGY3);
+        service.reportTopology(TOPOLOGY4);
+
+        listener.assertNoTruncates();
+        listener.assertTopologiesFor(3L, 4L);
+        Assertions.assertSame(TOPOLOGY3, service.getTopologyForEpoch(3));
+        Assertions.assertSame(TOPOLOGY4, service.getTopologyForEpoch(4));
+    }
+
+    /**
+     * If we receive topology epochs out of order for some reason, we should
+     * reorder with callbacks
+     */
+    @Test
+    public void awaitOutOfOrderTopologies()
+    {
+        TestableConfigurationService service = new 
TestableConfigurationService(ID1);
+
+        TestListener listener = new TestListener(service, false);
+        service.registerListener(listener);
+
+        service.reportTopology(TOPOLOGY1);
+        service.reportTopology(TOPOLOGY3);
+        listener.assertTopologiesFor(1L);
+        Assertions.assertEquals(ImmutableSet.of(2L), service.epochsFetched);
+
+        service.reportTopology(TOPOLOGY2);
+        listener.assertTopologiesFor(1L, 2L, 3L);
+
+    }
+
+    private static void assertHistoryEpochs(EpochHistory history, long... 
expected)
+    {
+        Assertions.assertEquals(history.size(), expected.length);
+        if (expected.length == 0)
+            return;
+
+        Assertions.assertEquals(expected[0], history.minEpoch());
+        Assertions.assertEquals(expected[expected.length - 1], 
history.maxEpoch());
+
+        for (int i=0; i<expected.length; i++)
+            Assertions.assertEquals(expected[i], history.atIndex(i).epoch());
+    }
+
+    @Test
+    public void epochHistoryAppend()
+    {
+        EpochHistory history = new EpochHistory();
+        Assertions.assertEquals(0, history.size());
+
+        history.getOrCreate(5);
+        assertHistoryEpochs(history, 5);
+
+        history.getOrCreate(6);
+        assertHistoryEpochs(history, 5, 6);
+
+        history.getOrCreate(8);
+        assertHistoryEpochs(history, 5, 6, 7, 8);
+    }
+
+    @Test
+    public void epochHistoryPrepend()
+    {
+        EpochHistory history = new EpochHistory();
+        Assertions.assertEquals(0, history.size());
+
+        history.getOrCreate(5);
+        history.getOrCreate(6);
+        assertHistoryEpochs(history, 5, 6);
+
+        history.getOrCreate(3);
+        assertHistoryEpochs(history, 3, 4, 5, 6);
+    }
+
+    @Test
+    public void epochHistoryTruncate()
+    {
+        EpochHistory history = new EpochHistory();
+        Assertions.assertEquals(0, history.size());
+
+        history.getOrCreate(1);
+        history.getOrCreate(2);
+        history.getOrCreate(3);
+        history.getOrCreate(4);
+        history.getOrCreate(5);
+        history.getOrCreate(6);
+
+        assertHistoryEpochs(history, 1, 2, 3, 4, 5, 6);
+
+        history.truncateUntil(4);
+        assertHistoryEpochs(history, 4, 5, 6);
+
+        history.getOrCreate(7);
+        assertHistoryEpochs(history, 4, 5, 6, 7);
+    }
+}
diff --git a/accord-core/src/test/java/accord/impl/mock/MockCluster.java 
b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
index d6a244d9..d2bc2337 100644
--- a/accord-core/src/test/java/accord/impl/mock/MockCluster.java
+++ b/accord-core/src/test/java/accord/impl/mock/MockCluster.java
@@ -119,6 +119,7 @@ public class MockCluster implements Network, AutoCloseable, 
Iterable<Node>
                         SimpleProgressLog::new,
                         InMemoryCommandStores.SingleThread::new);
         awaitUninterruptibly(node.start());
+        node.onTopologyUpdate(topology);
         return node;
     }
 
diff --git a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java 
b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
index 18bb226d..c39f46a1 100644
--- a/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
+++ b/accord-core/src/test/java/accord/local/ImmutableCommandTest.java
@@ -100,6 +100,7 @@ public class ImmutableCommandTest
                         new MockCluster.Clock(100), () -> storeSupport.data, 
new ShardDistributor.EvenSplit(8, ignore -> new IntKey.Splitter()), new 
TestAgent(), new DefaultRandom(), null,
                         SizeOfIntersectionSorter.SUPPLIER, ignore -> ignore2 
-> new NoOpProgressLog(), InMemoryCommandStores.Synchronized::new);
         awaitUninterruptibly(node.start());
+        node.onTopologyUpdate(storeSupport.local.get());
         return node;
     }
 
diff --git a/accord-core/src/test/java/accord/messages/PreAcceptTest.java 
b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
index b3a6994e..ec4f58d9 100644
--- a/accord-core/src/test/java/accord/messages/PreAcceptTest.java
+++ b/accord-core/src/test/java/accord/messages/PreAcceptTest.java
@@ -196,7 +196,8 @@ public class PreAcceptTest
             messageSink.assertHistorySizes(0, 1);
             Assertions.assertEquals(ID3, messageSink.responses.get(0).to);
             PartialDeps expectedDeps = new PartialDeps(Ranges.of(range(0, 
12)), KeyDeps.NONE, RangeDeps.NONE);
-            Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2, 
Timestamp.fromValues(1, 110, ID1), expectedDeps),
+            Timestamp expectedTs = Timestamp.fromValues(1, 110, 
ID1).withExtraFlags(txnId2.flags());
+            Assertions.assertEquals(new PreAccept.PreAcceptOk(txnId2, 
expectedTs, expectedDeps),
                                     messageSink.responses.get(0).payload);
         }
         finally
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java 
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index ec85d157..bec84683 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -200,4 +200,67 @@ public class TopologyManagerTest
         Assertions.assertEquals(topologies(topology2, topology(1, 
shard(range(200, 300), idList(4, 5, 6), idSet(4, 5)))),
                                 actual);
     }
+
+    @Test
+    void incompleteTopologyHistory()
+    {
+        Topology topology5 = topology(5,
+                                      shard(range(100, 200), idList(1, 2, 3), 
idSet(1, 2)),
+                                      shard(range(200, 300), idList(4, 5, 6), 
idSet(4, 5)));
+        Topology topology6 = topology(6,
+                                      shard(range(100, 200), idList(1, 2, 3), 
idSet(1, 2)),
+                                      shard(range(200, 300), idList(4, 5, 6), 
idSet(5, 6)));
+
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        service.onTopologyUpdate(topology5);
+        service.onTopologyUpdate(topology6);
+
+        Assertions.assertSame(topology6, 
service.getEpochStateUnsafe(6).global());
+        Assertions.assertSame(topology5, 
service.getEpochStateUnsafe(5).global());
+        for (int i=1; i<=6; i++) service.onEpochSyncComplete(id(i), 5);
+        Assertions.assertTrue(service.getEpochStateUnsafe(5).syncComplete());
+        Assertions.assertNull(service.getEpochStateUnsafe(4));
+
+        service.onEpochSyncComplete(id(1), 4);
+    }
+
+    private static void markTopologySynced(TopologyManager service, long epoch)
+    {
+        service.getEpochStateUnsafe(epoch).global().nodes().forEach(id -> 
service.onEpochSyncComplete(id, epoch));
+    }
+
+    private static void addAndMarkSynced(TopologyManager service, Topology 
topology)
+    {
+        service.onTopologyUpdate(topology);
+        markTopologySynced(service, topology.epoch());
+    }
+
+    @Test
+    void truncateTopologyHistory()
+    {
+        Range range = range(100, 200);
+        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        addAndMarkSynced(service, topology(1, shard(range, idList(1, 2, 3), 
idSet(1, 2))));
+        addAndMarkSynced(service, topology(2, shard(range, idList(1, 2, 3), 
idSet(2, 3))));
+        addAndMarkSynced(service, topology(3, shard(range, idList(1, 2, 3), 
idSet(1, 2))));
+        addAndMarkSynced(service, topology(4, shard(range, idList(1, 2, 3), 
idSet(1, 3))));
+
+        Assertions.assertTrue(service.hasEpoch(1));
+        Assertions.assertTrue(service.hasEpoch(2));
+        Assertions.assertTrue(service.hasEpoch(3));
+        Assertions.assertTrue(service.hasEpoch(4));
+
+        service.truncateTopologyUntil(3);
+        Assertions.assertFalse(service.hasEpoch(1));
+        Assertions.assertFalse(service.hasEpoch(2));
+        Assertions.assertTrue(service.hasEpoch(3));
+        Assertions.assertTrue(service.hasEpoch(4));
+
+    }
+
+    @Test
+    void truncateTopologyCantTruncateUnsyncedEpochs()
+    {
+
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to