This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch retry-new-system
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 892f19d7714bc7128d84b7e635acc6621ce476c4
Author: Ariel Weisberg <aweisb...@apple.com>
AuthorDate: Mon May 6 18:12:30 2024 -0400

    retry new system checkpoint
---
 accord-core/src/main/java/accord/local/Node.java   |  18 ++--
 .../main/java/accord/topology/TopologyManager.java | 120 +++++++++++++++++----
 accord-core/src/test/java/accord/Utils.java        |   8 ++
 .../java/accord/topology/TopologyManagerTest.java  |  61 +++++------
 4 files changed, 149 insertions(+), 58 deletions(-)

diff --git a/accord-core/src/main/java/accord/local/Node.java 
b/accord-core/src/main/java/accord/local/Node.java
index 1605f43..62331d0 100644
--- a/accord-core/src/main/java/accord/local/Node.java
+++ b/accord-core/src/main/java/accord/local/Node.java
@@ -34,14 +34,7 @@ import java.util.function.Function;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.function.ToLongFunction;
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
-import accord.coordinate.CoordinateEphemeralRead;
-import accord.coordinate.CoordinationAdapter;
-import accord.coordinate.CoordinationAdapter.Factory.Step;
-import accord.utils.DeterministicSet;
-import accord.utils.Invariants;
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +51,10 @@ import accord.api.RoutingKey;
 import accord.api.Scheduler;
 import accord.api.TopologySorter;
 import accord.config.LocalConfig;
+import accord.coordinate.CoordinateEphemeralRead;
 import accord.coordinate.CoordinateTransaction;
+import accord.coordinate.CoordinationAdapter;
+import accord.coordinate.CoordinationAdapter.Factory.Step;
 import accord.coordinate.MaybeRecover;
 import accord.coordinate.Outcome;
 import accord.coordinate.RecoverWithRoute;
@@ -85,12 +81,16 @@ import accord.primitives.Unseekables;
 import accord.topology.Shard;
 import accord.topology.Topology;
 import accord.topology.TopologyManager;
+import accord.utils.DeterministicSet;
+import accord.utils.Invariants;
 import accord.utils.MapReduceConsume;
 import accord.utils.RandomSource;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncExecutor;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import net.nicoulaj.compilecommand.annotations.Inline;
 
 import static accord.utils.Invariants.illegalState;
@@ -179,7 +179,8 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
         this.localRequestHandler = localRequestHandler;
         this.configService = configService;
         this.coordinationAdapters = coordinationAdapters;
-        this.topology = new TopologyManager(topologySorter, id);
+        this.topology = new TopologyManager(topologySorter, id, scheduler, 
nowTimeUnit);
+        topology.scheduleTopologyUpdateWatchdog();
         this.nowSupplier = nowSupplier;
         this.nowTimeUnit = nowTimeUnit;
         this.now = new 
AtomicReference<>(Timestamp.fromValues(topology.epoch(), 
nowSupplier.getAsLong(), id));
@@ -330,6 +331,7 @@ public class Node implements ConfigurationService.Listener, 
NodeTimeService
     public void shutdown()
     {
         commandStores.shutdown();
+        topology.shutdown();
     }
 
     public Timestamp uniqueNow()
diff --git a/accord-core/src/main/java/accord/topology/TopologyManager.java 
b/accord-core/src/main/java/accord/topology/TopologyManager.java
index bd4b25a..d2f609e 100644
--- a/accord-core/src/main/java/accord/topology/TopologyManager.java
+++ b/accord-core/src/main/java/accord/topology/TopologyManager.java
@@ -23,14 +23,18 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.function.Supplier;
+import java.util.function.ToLongFunction;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import accord.api.ConfigurationService.EpochReady;
 import accord.api.RoutingKey;
+import accord.api.Scheduler;
 import accord.api.TopologySorter;
+import accord.coordinate.Timeout;
 import accord.coordinate.TopologyMismatch;
 import accord.coordinate.tracking.QuorumTracker;
 import accord.local.CommandStore;
@@ -44,13 +48,14 @@ import accord.utils.Invariants;
 import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncResult;
 import accord.utils.async.AsyncResults;
-
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import static accord.coordinate.tracking.RequestStatus.Success;
 import static accord.primitives.AbstractRanges.UnionMode.MERGE_ADJACENT;
 import static accord.primitives.Routables.Slice.Minimal;
 import static accord.utils.Invariants.checkArgument;
+import static accord.utils.Invariants.checkState;
 import static accord.utils.Invariants.illegalState;
 import static accord.utils.Invariants.nonNull;
 
@@ -71,6 +76,11 @@ public class TopologyManager
 {
     private static final AsyncResult<Void> SUCCESS = 
AsyncResults.success(null);
 
+    // How long before we start notifying waiters on an epoch of timeout,
+    private static final long EPOCH_INITIAL_TIMEOUT_MILLIS = 10_000;
+    // How often we check for timeout, and once an epoch has timed out, how 
often we timeout new waiters
+    private static final long WATCHDOG_INTERVAL_MILLIS = 2_000;
+
     static class EpochState
     {
         final Id self;
@@ -106,7 +116,7 @@ public class TopologyManager
             newPrevSyncComplete = newPrevSyncComplete.union(MERGE_ADJACENT, 
addedRanges).subtract(removedRanges);
             if (prevSyncComplete.containsAll(newPrevSyncComplete))
                 return false;
-            
Invariants.checkState(newPrevSyncComplete.containsAll(prevSyncComplete), 
"Expected %s to contain all ranges in %s; but did not", newPrevSyncComplete, 
prevSyncComplete);
+            checkState(newPrevSyncComplete.containsAll(prevSyncComplete), 
"Expected %s to contain all ranges in %s; but did not", newPrevSyncComplete, 
prevSyncComplete);
             prevSyncComplete = newPrevSyncComplete;
             syncComplete = curSyncComplete.slice(newPrevSyncComplete, 
Minimal).union(MERGE_ADJACENT, addedRanges);
             return true;
@@ -215,13 +225,13 @@ public class TopologyManager
         // list of promises to be completed as newer epochs become active. 
This is to support processes that
         // are waiting on future epochs to begin (ie: txn requests from 
futures epochs). Index 0 is for
         // currentEpoch + 1
-        private final List<AsyncResult.Settable<Void>> futureEpochFutures;
+        private final List<FutureEpoch> futureEpochs;
 
-        private Epochs(EpochState[] epochs, List<Notifications> pending, 
List<AsyncResult.Settable<Void>> futureEpochFutures)
+        private Epochs(EpochState[] epochs, List<Notifications> pending, 
List<FutureEpoch> futureEpochs)
         {
             this.currentEpoch = epochs.length > 0 ? epochs[0].epoch() : 0;
             this.pending = pending;
-            this.futureEpochFutures = futureEpochFutures;
+            this.futureEpochs = futureEpochs;
             for (int i=1; i<epochs.length; i++)
                 checkArgument(epochs[i].epoch() == epochs[i-1].epoch() - 1);
             this.epochs = epochs;
@@ -232,16 +242,18 @@ public class TopologyManager
             this(epochs, new ArrayList<>(), new ArrayList<>());
         }
 
-        public AsyncResult<Void> awaitEpoch(long epoch)
+        public AsyncResult<Void> awaitEpoch(long epoch, 
ToLongFunction<TimeUnit> nowTimeUnit)
         {
             if (epoch <= currentEpoch)
                 return SUCCESS;
 
+            long now = nowTimeUnit.applyAsLong(TimeUnit.MILLISECONDS);
+            long deadline = now + EPOCH_INITIAL_TIMEOUT_MILLIS;
             int diff = (int) (epoch - currentEpoch);
-            while (futureEpochFutures.size() < diff)
-                futureEpochFutures.add(AsyncResults.settable());
+            while (futureEpochs.size() < diff)
+                futureEpochs.add(new FutureEpoch(AsyncResults.settable(), 
deadline));
 
-            return futureEpochFutures.get(diff - 1);
+            return futureEpochs.get(diff - 1).future;
         }
 
         public long nextEpoch()
@@ -365,17 +377,84 @@ public class TopologyManager
         }
     }
 
+    private static class FutureEpoch
+    {
+        private final long deadlineMillis;
+        private volatile AsyncResult.Settable<Void> future;
+
+        public FutureEpoch(AsyncResult.Settable<Void> future, long 
deadlineMillis)
+        {
+            nonNull(future);
+            this.future = future;
+            this.deadlineMillis = deadlineMillis;
+        }
+
+        /*
+         * Notify any listeners that are waiting for the epoch that is has 
been a long time since
+         * we started waiting for the epoch. We may still eventually get the 
epoch so also create
+         * a new future so subsequent operations may have a chance at seeing 
the epoch if ever appears.
+         *
+         * Subsequent waiters may get a timeout notification far sooner 
(WATCHDOG_INTERVAL_MILLISS)
+         * instead of EPOCH_INITIAL_TIMEOUT_MILLIS
+         */
+        @GuardedBy("TopologyManager.this")
+        public void timeOutCurrentListeners()
+        {
+            checkState(future != null);
+            AsyncResult.Settable<Void> oldFuture = future;
+            checkState(oldFuture != null);
+            future = AsyncResults.settable();
+            oldFuture.tryFailure(new Timeout(null, null));
+        }
+    }
+
     private final TopologySorter.Supplier sorter;
     private final Id node;
+    private final Scheduler scheduler;
+    private final ToLongFunction<TimeUnit> nowTimeUnit;
     private volatile Epochs epochs;
+    private Scheduler.Scheduled topologyUpdateWatchdog;
 
-    public TopologyManager(TopologySorter.Supplier sorter, Id node)
+    public TopologyManager(TopologySorter.Supplier sorter, Id node, Scheduler 
scheduler, ToLongFunction<TimeUnit> nowTimeUnit)
     {
         this.sorter = sorter;
         this.node = node;
+        this.scheduler = scheduler;
+        this.nowTimeUnit = nowTimeUnit;
         this.epochs = Epochs.EMPTY;
     }
 
+    public void shutdown()
+    {
+        topologyUpdateWatchdog.cancel();
+    }
+
+    public void scheduleTopologyUpdateWatchdog()
+    {
+        topologyUpdateWatchdog = scheduler.recurring(() -> {
+            synchronized (TopologyManager.this)
+            {
+                Epochs current = epochs;
+                if (current.futureEpochs.isEmpty())
+                    return;
+
+                long now = nowTimeUnit.applyAsLong(TimeUnit.MILLISECONDS);
+                if (now > current.futureEpochs.get(0).deadlineMillis)
+                {
+                    List<FutureEpoch> futureEpochs = null;
+                    for (int i = 0; i < current.futureEpochs.size(); i++)
+                    {
+                        FutureEpoch futureEpoch = current.futureEpochs.get(i);
+                        if (now <= futureEpoch.deadlineMillis)
+                            break;
+                        else
+                            futureEpoch.timeOutCurrentListeners();
+                    }
+                }
+            }
+        }, WATCHDOG_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);
+    }
+
     public synchronized EpochReady onTopologyUpdate(Topology topology, 
Supplier<EpochReady> bootstrap)
     {
         Epochs current = epochs;
@@ -400,11 +479,12 @@ public class TopologyManager
         nextEpochs[0].recordClosed(notifications.closed);
         nextEpochs[0].recordComplete(notifications.complete);
 
-        List<AsyncResult.Settable<Void>> futureEpochFutures = new 
ArrayList<>(current.futureEpochFutures);
-        AsyncResult.Settable<Void> toComplete = !futureEpochFutures.isEmpty() 
? futureEpochFutures.remove(0) : null;
-        epochs = new Epochs(nextEpochs, pending, futureEpochFutures);
+        List<FutureEpoch> futureEpochs = new ArrayList<>(current.futureEpochs);
+        FutureEpoch toComplete = !futureEpochs.isEmpty() ? 
futureEpochs.remove(0) : null;
+        epochs = new Epochs(nextEpochs, pending, futureEpochs);
+        epochs = new Epochs(nextEpochs, pending, futureEpochs);
         if (toComplete != null)
-            toComplete.trySuccess(null);
+            toComplete.future.trySuccess(null);
 
         return nextEpochs[0].ready = bootstrap.get();
     }
@@ -414,7 +494,7 @@ public class TopologyManager
         AsyncResult<Void> result;
         synchronized (this)
         {
-            result = epochs.awaitEpoch(epoch);
+            result = epochs.awaitEpoch(epoch, nowTimeUnit);
         }
         CommandStore current = CommandStore.maybeCurrent();
         return current == null || result.isDone() ? result : 
result.withExecutor(current);
@@ -448,11 +528,11 @@ public class TopologyManager
             return;
 
         int newLen = current.epochs.length - (int) (epoch - 
current.minEpoch());
-        Invariants.checkState(current.epochs[newLen - 1].syncComplete(), 
"Epoch %d's sync is not complete", current.epochs[newLen - 1].epoch());
+        checkState(current.epochs[newLen - 1].syncComplete(), "Epoch %d's sync 
is not complete", current.epochs[newLen - 1].epoch());
 
         EpochState[] nextEpochs = new EpochState[newLen];
         System.arraycopy(current.epochs, 0, nextEpochs, 0, newLen);
-        epochs = new Epochs(nextEpochs, current.pending, 
current.futureEpochFutures);
+        epochs = new Epochs(nextEpochs, current.pending, current.futureEpochs);
     }
 
     public synchronized void onEpochClosed(Ranges ranges, long epoch)
@@ -527,7 +607,7 @@ public class TopologyManager
             throw tm;
 
         if (maxEpoch == Long.MAX_VALUE) maxEpoch = snapshot.currentEpoch;
-        else Invariants.checkState(snapshot.currentEpoch >= maxEpoch, "current 
epoch %d < max %d", snapshot.currentEpoch, maxEpoch);
+        else checkState(snapshot.currentEpoch >= maxEpoch, "current epoch %d < 
max %d", snapshot.currentEpoch, maxEpoch);
 
         EpochState maxEpochState = nonNull(snapshot.get(maxEpoch));
         if (minEpoch == maxEpoch && 
isSufficientFor.apply(maxEpochState).containsAll(select))
@@ -591,7 +671,7 @@ public class TopologyManager
         Epochs snapshot = epochs;
 
         EpochState maxState = snapshot.get(maxEpoch);
-        Invariants.checkState(maxState != null, "Unable to find epoch %d; 
known epochs are %d -> %d", maxEpoch, snapshot.minEpoch(), 
snapshot.currentEpoch);
+        checkState(maxState != null, "Unable to find epoch %d; known epochs 
are %d -> %d", maxEpoch, snapshot.minEpoch(), snapshot.currentEpoch);
         TopologyMismatch tm = 
TopologyMismatch.checkForMismatch(maxState.global(), select);
         if (tm != null)
             throw tm;
@@ -607,7 +687,7 @@ public class TopologyManager
             topologies.add(epochState.global.forSelection(select));
             select = select.subtract(epochState.addedRanges);
         }
-        Invariants.checkState(!topologies.isEmpty(), "Unable to find an epoch 
that contained %s", select);
+        checkState(!topologies.isEmpty(), "Unable to find an epoch that 
contained %s", select);
 
         return topologies.build(sorter);
     }
diff --git a/accord-core/src/test/java/accord/Utils.java 
b/accord-core/src/test/java/accord/Utils.java
index 78b1298..437a31e 100644
--- a/accord-core/src/test/java/accord/Utils.java
+++ b/accord-core/src/test/java/accord/Utils.java
@@ -30,6 +30,7 @@ import com.google.common.collect.Sets;
 import accord.api.Key;
 import accord.api.MessageSink;
 import accord.api.Scheduler;
+import accord.api.TopologySorter;
 import accord.config.LocalConfig;
 import accord.config.MutableLocalConfig;
 import accord.coordinate.CoordinationAdapter;
@@ -45,6 +46,7 @@ import accord.impl.mock.MockCluster;
 import accord.impl.mock.MockConfigurationService;
 import accord.impl.mock.MockStore;
 import accord.local.Node;
+import accord.local.Node.Id;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.messages.LocalRequest;
@@ -55,6 +57,7 @@ import accord.primitives.Txn;
 import accord.topology.Shard;
 import accord.topology.Topologies;
 import accord.topology.Topology;
+import accord.topology.TopologyManager;
 import accord.utils.DefaultRandom;
 import accord.utils.EpochFunction;
 import accord.utils.Invariants;
@@ -194,4 +197,9 @@ public class Utils
                   .ignoreExceptions()
                   .untilAsserted(runnable);
     }
+
+    public static TopologyManager testTopologyManager(TopologySorter.Supplier 
sorter, Id node)
+    {
+        return new TopologyManager(sorter, node, 
Scheduler.NEVER_RUN_SCHEDULED, 
NodeTimeService.unixWrapper(TimeUnit.MILLISECONDS, () -> 0));
+    }
 }
diff --git a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java 
b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
index d090772..031e6a4 100644
--- a/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
+++ b/accord-core/src/test/java/accord/topology/TopologyManagerTest.java
@@ -18,42 +18,43 @@
 
 package accord.topology;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
 import accord.burn.TopologyUpdates;
 import accord.impl.PrefixedIntHashKey;
 import accord.impl.TestAgent;
 import accord.local.AgentExecutor;
+import accord.local.Node;
+import accord.primitives.Range;
 import accord.primitives.Ranges;
+import accord.primitives.RoutingKeys;
 import accord.primitives.Unseekables;
 import accord.utils.AccordGens;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
 import org.agrona.collections.Long2ObjectHashMap;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-import accord.local.Node;
-import accord.primitives.Range;
-import accord.primitives.RoutingKeys;
 import org.mockito.Mockito;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Supplier;
-
 import static accord.Utils.id;
 import static accord.Utils.idList;
 import static accord.Utils.idSet;
 import static accord.Utils.shard;
+import static accord.Utils.testTopologyManager;
 import static accord.Utils.topologies;
 import static accord.Utils.topology;
 import static accord.impl.IntKey.keys;
@@ -79,7 +80,7 @@ public class TopologyManagerTest
                                shard(range(100, 200), idList(2, 3, 5), 
idSet(2, 3, 5)));
         int[] unmoved = { 1, 3, 5 };
         int[] moved = { 2, 4 };
-        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        TopologyManager service = testTopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(t1, () -> null);
         service.onTopologyUpdate(t2, () -> null);
 
@@ -105,7 +106,7 @@ public class TopologyManagerTest
         Topology topology1 = topology(1, shard(range, idList(1, 2, 3), 
idSet(1, 2)));
         Topology topology2 = topology(2, shard(range, idList(1, 2, 3), 
idSet(2, 3)));
 
-        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        TopologyManager service = testTopologyManager(SUPPLIER, ID);
 
         Assertions.assertSame(Topology.EMPTY, service.current());
         service.onTopologyUpdate(topology1, () -> null);
@@ -130,7 +131,7 @@ public class TopologyManagerTest
                                       shard(range(100, 200), idList(1, 2, 3), 
idSet(3, 4)),
                                       shard(range(200, 300), idList(4, 5, 6), 
idSet(4, 5)));
 
-        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        TopologyManager service = testTopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(topology1, () -> null);
         service.onTopologyUpdate(topology2, () -> null);
 
@@ -161,7 +162,7 @@ public class TopologyManagerTest
         Topology topology2 = topology(2, shard(range, idList(1, 2, 3), 
idSet(2, 3)));
         Topology topology3 = topology(3, shard(range, idList(1, 2, 3), 
idSet(1, 2)));
 
-        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        TopologyManager service = testTopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(topology1, () -> null);
         service.onTopologyUpdate(topology2, () -> null);
         service.onTopologyUpdate(topology3, () -> null);
@@ -198,7 +199,7 @@ public class TopologyManagerTest
         Topology topology2 = topology(2, shard(range, idList(1, 2, 3), 
idSet(2, 3)));
 //        Topology topology3 = topology(3, shard(range, idList(1, 2, 3), 
idSet(3, 4)));
 
-        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        TopologyManager service = testTopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(topology1, () -> null);
 
         // sync epoch 2
@@ -220,7 +221,7 @@ public class TopologyManagerTest
         Topology topology2 = topology(2, shard(range, idList(1, 2, 3), 
idSet(1, 2)));
         Topology topology3 = topology(3, shard(range, idList(1, 2, 3), 
idSet(2, 3)));
 
-        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        TopologyManager service = testTopologyManager(SUPPLIER, ID);
 
         Assertions.assertSame(Topology.EMPTY, service.current());
         service.onTopologyUpdate(topology1, () -> null);
@@ -250,7 +251,7 @@ public class TopologyManagerTest
                                       shard(range(100, 200), idList(1, 2, 3), 
idSet(1, 2)),
                                       shard(range(200, 300), idList(4, 5, 6), 
idSet(5, 6)));
 
-        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        TopologyManager service = testTopologyManager(SUPPLIER, ID);
         service.onTopologyUpdate(topology5, () -> null);
         service.onTopologyUpdate(topology6, () -> null);
 
@@ -278,7 +279,7 @@ public class TopologyManagerTest
     void truncateTopologyHistory()
     {
         Range range = range(100, 200);
-        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        TopologyManager service = testTopologyManager(SUPPLIER, ID);
         addAndMarkSynced(service, topology(1, shard(range, idList(1, 2, 3), 
idSet(1, 2))));
         addAndMarkSynced(service, topology(2, shard(range, idList(1, 2, 3), 
idSet(2, 3))));
         addAndMarkSynced(service, topology(3, shard(range, idList(1, 2, 3), 
idSet(1, 2))));
@@ -319,7 +320,7 @@ public class TopologyManagerTest
                                         shard(PrefixedIntHashKey.range(1, 0, 
100), idList(1, 2, 3), idSet(1, 2))));
             topologies.add(topology(epochCounter++,
                                     shard(PrefixedIntHashKey.range(1, 0, 100), 
idList(1, 2, 3), idSet(1, 2))));;
-            History history = new History(new TopologyManager(SUPPLIER, ID), 
topologies.iterator()) {
+            History history = new History(testTopologyManager(SUPPLIER, ID), 
topologies.iterator()) {
 
                 @Override
                 protected void postTopologyUpdate(int id, Topology t)
@@ -365,7 +366,7 @@ public class TopologyManagerTest
     @Test
     void aba()
     {
-        TopologyManager service = new TopologyManager(SUPPLIER, ID);
+        TopologyManager service = testTopologyManager(SUPPLIER, ID);
         List<Node.Id> dc1Nodes = idList(1, 2, 3);
         Set<Node.Id> dc1Fp = idSet(1, 2);
         List<Node.Id> dc2Nodes = idList(4, 5, 6);
@@ -416,7 +417,7 @@ public class TopologyManagerTest
                     return t == null ? endOfData() : t;
                 }
             }, 42);
-            History history = new History(new TopologyManager(SUPPLIER, ID), 
next) {
+            History history = new History(testTopologyManager(SUPPLIER, ID), 
next) {
 
                 @Override
                 protected void postTopologyUpdate(int id, Topology t)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to