This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 2936dd7c00 Improve topology fetching
2936dd7c00 is described below

commit 2936dd7c004368faf598d8fd10e9ef5b078dc123
Author: Alex Petrov <[email protected]>
AuthorDate: Mon Jan 27 16:25:09 2025 +0100

    Improve topology fetching
    
      - preclude TCM of being behind Accord if newer epoch is reported via 
withEpoch/fetchTopologyInternal
      - improve topology discovery during first boot and replay
      - fix races between config service TCM listener reporting topologies, and 
fetched topologies during
    
    Patch by Alex Petrov, reviewed by Ariel Weisberg and David Capwell for 
CASSANDRA-20245
---
 modules/accord                                     |   2 +-
 .../org/apache/cassandra/config/AccordSpec.java    |  11 +-
 .../cassandra/exceptions/RequestFailure.java       |   2 +
 .../cassandra/exceptions/RequestFailureReason.java |   1 +
 .../org/apache/cassandra/net/MessageDelivery.java  |  11 +-
 .../org/apache/cassandra/net/MessagingUtils.java   |  16 ++-
 .../service/accord/AccordConfigurationService.java |  54 +++++++--
 .../cassandra/service/accord/AccordService.java    | 131 +++++++++++++--------
 .../cassandra/service/accord/FetchMinEpoch.java    |  78 ++++--------
 .../cassandra/service/accord/FetchTopology.java    |  58 ++++-----
 .../cassandra/service/accord/IAccordService.java   |   9 +-
 .../config/DatabaseDescriptorRefTest.java          |   1 +
 .../service/accord/FetchMinEpochTest.java          |  97 ++-------------
 13 files changed, 226 insertions(+), 245 deletions(-)

diff --git a/modules/accord b/modules/accord
index e5e108adfe..78ab7eef90 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit e5e108adfeb817f899ca89507e8fa6bc0b191759
+Subproject commit 78ab7eef904ef549d0d7a34332b83d6110e0762d
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java 
b/src/java/org/apache/cassandra/config/AccordSpec.java
index 720a2db772..566a0d16ef 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -194,7 +194,8 @@ public class AccordSpec
     public boolean ephemeralReadEnabled = true;
     public boolean state_cache_listener_jfr_enabled = true;
     public final JournalSpec journal = new JournalSpec();
-    public final MinEpochRetrySpec minEpochSyncRetry = new MinEpochRetrySpec();
+    public final RetrySpec minEpochSyncRetry = new MinEpochRetrySpec();
+    public final RetrySpec fetchRetry = new FetchRetrySpec();
 
     public static class MinEpochRetrySpec extends RetrySpec
     {
@@ -204,6 +205,14 @@ public class AccordSpec
         }
     }
 
+    public static class FetchRetrySpec extends RetrySpec
+    {
+        public FetchRetrySpec()
+        {
+            maxAttempts = new MaxAttempt(100);
+        }
+    }
+
     public static class JournalSpec implements Params
     {
         public int segmentSize = 32 << 20;
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailure.java 
b/src/java/org/apache/cassandra/exceptions/RequestFailure.java
index 312102c012..6946b812aa 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailure.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailure.java
@@ -43,6 +43,7 @@ import static 
org.apache.cassandra.exceptions.ExceptionSerializer.nullableRemote
 public class RequestFailure
 {
     public static final RequestFailure UNKNOWN = new 
RequestFailure(RequestFailureReason.UNKNOWN);
+    public static final RequestFailure UNKNOWN_TOPOLOGY = new 
RequestFailure(RequestFailureReason.UNKNOWN_TOPOLOGY);
     public static final RequestFailure READ_TOO_MANY_TOMBSTONES = new 
RequestFailure(RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
     public static final RequestFailure TIMEOUT = new 
RequestFailure(RequestFailureReason.TIMEOUT);
     public static final RequestFailure INCOMPATIBLE_SCHEMA = new 
RequestFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA);
@@ -134,6 +135,7 @@ public class RequestFailure
         {
             default: throw new IllegalStateException("Unhandled request 
failure reason " + reason);
             case UNKNOWN: return UNKNOWN;
+            case UNKNOWN_TOPOLOGY: return UNKNOWN_TOPOLOGY;
             case READ_TOO_MANY_TOMBSTONES: return READ_TOO_MANY_TOMBSTONES;
             case TIMEOUT: return TIMEOUT;
             case INCOMPATIBLE_SCHEMA: return INCOMPATIBLE_SCHEMA;
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java 
b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
index b3b9bddfc4..917c6c753d 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
@@ -42,6 +42,7 @@ public enum RequestFailureReason
     READ_TOO_MANY_INDEXES                 (10),
     RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM (11),
     BOOTING                               (12),
+    UNKNOWN_TOPOLOGY                      (13)
     ;
 
     static
diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java 
b/src/java/org/apache/cassandra/net/MessageDelivery.java
index 9dc374750f..c57366d450 100644
--- a/src/java/org/apache/cassandra/net/MessageDelivery.java
+++ b/src/java/org/apache/cassandra/net/MessageDelivery.java
@@ -99,14 +99,6 @@ public interface MessageDelivery
         return promise;
     }
 
-    public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Verb verb, 
REQ request,
-                                                                   
Iterator<InetAddressAndPort> candidates,
-                                                                   
RetryPredicate shouldRetry,
-                                                                   
RetryErrorMessage errorMessage)
-    {
-        return sendWithRetries(Backoff.NO_OP.INSTANCE, 
ImmediateRetryScheduler.instance, verb, request, candidates, shouldRetry, 
errorMessage);
-    }
-
     public default <REQ, RSP> void sendWithRetries(Backoff backoff, 
RetryScheduler retryThreads,
                                                    Verb verb, REQ request,
                                                    
Iterator<InetAddressAndPort> candidates,
@@ -147,7 +139,8 @@ public interface MessageDelivery
     }
 
     private static <REQ, RSP> void sendWithRetries(MessageDelivery messaging,
-                                                   Backoff backoff, 
RetryScheduler retryThreads,
+                                                   Backoff backoff,
+                                                   RetryScheduler retryThreads,
                                                    Verb verb, REQ request,
                                                    
Iterator<InetAddressAndPort> candidates,
                                                    OnResult<RSP> onResult,
diff --git a/src/java/org/apache/cassandra/net/MessagingUtils.java 
b/src/java/org/apache/cassandra/net/MessagingUtils.java
index 2190eaf3a6..11735f8d7f 100644
--- a/src/java/org/apache/cassandra/net/MessagingUtils.java
+++ b/src/java/org/apache/cassandra/net/MessagingUtils.java
@@ -18,22 +18,31 @@
 
 package org.apache.cassandra.net;
 
+import java.util.Collection;
 import java.util.Iterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.SharedContext;
 
 public class MessagingUtils
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(MessagingUtils.class);
+
     /**
      * Candidate iterator that would try all endpoints known to be alive 
first, and then try all endpoints
      * in a round-robin manner.
+     * <p>
+     * Calls onIteration every time after exhausting the peers.
      */
-    public static Iterator<InetAddressAndPort> tryAliveFirst(SharedContext 
context, Iterable<InetAddressAndPort> peers)
+    public static Iterator<InetAddressAndPort> tryAliveFirst(SharedContext 
context, Collection<InetAddressAndPort> peers, String verb)
     {
         return new Iterator<>()
         {
             boolean firstRun = true;
+            int attempt = 0;
             Iterator<InetAddressAndPort> iter = peers.iterator();
             boolean isEmpty = !iter.hasNext();
 
@@ -58,10 +67,13 @@ public class MessagingUtils
 
                 // After that, cycle through all nodes
                 if (!iter.hasNext())
+                {
+                    logger.warn("Exhausted iterator on {} cycling through the 
set of peers: {} attempt #{}", verb, peers, attempt++);
                     iter = peers.iterator();
+                }
 
                 return iter.next();
             }
         };
     }
-}
+}
\ No newline at end of file
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 0dbf99635a..9d8b41bcb1 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -213,6 +213,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         }
     }
 
+    //TODO (required): should not be public
     public final ChangeListener listener = new MetadataChangeListener();
     private class MetadataChangeListener implements ChangeListener
     {
@@ -267,8 +268,6 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         Map<Node.Id, Long> removedNodes = mapping.removedNodes();
         for (Map.Entry<Node.Id, Long> e : removedNodes.entrySet())
             onNodeRemoved(e.getValue(), currentTopology(), e.getKey());
-
-        ClusterMetadataService.instance().log().addListener(listener);
     }
 
     @Override
@@ -416,13 +415,36 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         long epoch = metadata.epoch.getEpoch();
         synchronized (epochs)
         {
-            if (epochs.maxEpoch() == 0)
+            // On first boot, we have 2 options:
+            //
+            //  - we can start listening to TCM _before_ we replay topologies
+            //  - we can start listening to TCM _after_ we replay topologies
+            //
+            // If we start listening to TCM _before_ we replay topologies from 
other nodes,
+            // we may end up in a situation where TCM reports metadata that 
would create an
+            // `epoch - 1` epoch state that is not associated with any 
topologies, and
+            // therefore should not be listened upon.
+            //
+            // If we start listening to TCM _after_ we replay topologies, we 
may end up in a
+            // situation where TCM reports metadata that is 1 (or more) epochs 
_ahead_ of the
+            // last known epoch. Previous implementations were using TCM peer 
catch up, which
+            // could have resulted in gaps.
+            //
+            // Current protocol solves both problems by _first_ replaying 
topologies form peers,
+            // then subscribing to TCM _and_, if there are still any gaps, 
filling them again.
+            // However, it still has a slight chance of creating an `epoch - 
1` epoch state
+            // not associated with any topologies, which under "right" 
circumstances could
+            // have been waited upon with `epochReady`. This check precludes 
creation of this
+            // epoch: by the time this code can be called, remote topology 
replay is already
+            // done, so TCM listener will only report epochs that are _at 
least_ min epoch.
+            if (epochs.maxEpoch() == 0 || epochs.minEpoch() == 
metadata.epoch.getEpoch())
             {
                 getOrCreateEpochState(epoch);  // touch epoch state so 
subsequent calls see it
                 reportMetadata(metadata);
                 return;
             }
         }
+
         getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() -> 
reportMetadata(metadata));
     }
 
@@ -433,16 +455,25 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         Stage.ACCORD_MIGRATION.execute(() -> {
             if (ClusterMetadata.current().epoch.getEpoch() < epoch)
                 
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch));
+
+            // In most cases, after fetching log from CMS, we will be caught 
up to the required epoch.
+            // This TCM will also notify Accord via reportMetadata, so we do 
not need to fetch topologies.
+            // If metadata has reported has skipped one or more epochs, and is 
_ahead_ of the requested epoch,
+            // we need to fetch topologies from peers to fill in the gap.
+            ClusterMetadata metadata = ClusterMetadata.current();
+            if (metadata.epoch.getEpoch() == epoch)
+                return;
+
             try
             {
-                Set<InetAddressAndPort> peers = new 
HashSet<>(ClusterMetadata.current().directory.allJoinedEndpoints());
+                Set<InetAddressAndPort> peers = new 
HashSet<>(metadata.directory.allJoinedEndpoints());
                 peers.remove(FBUtilities.getBroadcastAddressAndPort());
                 if (peers.isEmpty())
                     return;
-                Topology topology;
-                while ((topology = 
FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null)
-                {
-                }
+
+                // TODO (required): fetch only _missing_ topologies.
+                Topology topology = 
FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get();
+                Invariants.require(topology.epoch() == epoch);
                 reportTopology(topology);
             }
             catch (InterruptedException e)
@@ -461,6 +492,13 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         });
     }
 
+    @Override
+    public void reportTopology(Topology topology, boolean isLoad, boolean 
startSync)
+    {
+        Invariants.require(topology.epoch() <= 
ClusterMetadata.current().epoch.getEpoch());
+        super.reportTopology(topology, isLoad, startSync);
+    }
+
     @Override
     protected void localSyncComplete(Topology topology, boolean startSync)
     {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index a84cfa8f07..b4c3e636bf 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -22,13 +22,10 @@ import java.math.BigInteger;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -44,7 +41,6 @@ import javax.annotation.concurrent.GuardedBy;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,7 +134,6 @@ import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageDelivery;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.SharedContext;
-import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -165,7 +160,6 @@ import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.membership.NodeId;
-import org.apache.cassandra.tcm.ownership.DataPlacement;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.utils.Blocking;
@@ -379,25 +373,48 @@ public class AccordService implements IAccordService, 
Shutdownable
         node.commandStores().restoreShardStateUnsafe(topology -> 
configService.reportTopology(topology, true, true));
         configService.start();
 
-        long minEpoch = fetchMinEpoch();
-        if (minEpoch >= 0)
+        try
         {
-            for (long epoch = minEpoch; epoch <= metadata.epoch.getEpoch(); 
epoch++)
-                node.configService().fetchTopologyForEpoch(epoch);
+            // Fetch topologies up to current
+            List<Topology> topologies = fetchTopologies(null, metadata);
+            for (Topology topology : topologies)
+                configService.reportTopology(topology);
 
-            try
-            {
-                
epochReady(metadata.epoch).get(DatabaseDescriptor.getTransactionTimeout(MILLISECONDS),
 MILLISECONDS);
-            }
-            catch (InterruptedException e)
+            
ClusterMetadataService.instance().log().addListener(configService.listener);
+            ClusterMetadata next = ClusterMetadata.current();
+
+            // if metadata was updated before we were able to add a listener, 
fetch remaining topologies
+            if (next.epoch.isAfter(metadata.epoch))
             {
-                throw new UncheckedInterruptedException(e);
+                topologies = fetchTopologies(metadata.epoch.getEpoch() + 1, 
next);
+                for (Topology topology : topologies)
+                    configService.reportTopology(topology);
             }
-            catch (ExecutionException | TimeoutException e)
+
+            int attempt = 0;
+            int waitSeconds = 5;
+            while (true)
             {
-                throw new RuntimeException(e);
+                try
+                {
+                    epochReady(metadata.epoch).get(waitSeconds, SECONDS);
+                    break;
+                }
+                catch (TimeoutException e)
+                {
+                    logger.warn("Epoch {} is not ready after waiting for {} 
seconds", metadata.epoch, (++attempt) * waitSeconds);
+                }
             }
         }
+        catch (InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+            throw new UncheckedInterruptedException(e);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
 
         fastPathCoordinator.start();
         
ClusterMetadataService.instance().log().addListener(fastPathCoordinator);
@@ -412,44 +429,60 @@ public class AccordService implements IAccordService, 
Shutdownable
     }
 
     /**
-     * Queries peers to discover min epoch
+     * Queries peers to discover min epoch, and then fetches all topologies 
between min and current epochs
      */
-    private long fetchMinEpoch()
+    private List<Topology> fetchTopologies(Long minEpoch, ClusterMetadata 
metadata) throws ExecutionException, InterruptedException
     {
-        ClusterMetadata metadata = ClusterMetadata.current();
-        Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
-        for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
-        {
-            List<TableMetadata> tables = 
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
-            if (tables.isEmpty())
-                continue;
-            DataPlacement current = 
metadata.placements.get(keyspace.params.replication);
-            DataPlacement settled = 
metadata.writePlacementAllSettled(keyspace);
-            Sets.SetView<InetAddressAndPort> alive = 
Sets.intersection(settled.writes.byEndpoint().keySet(), 
current.writes.byEndpoint().keySet());
-            InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
-            settled.writes.forEach((range, group) -> {
-                if (group.endpoints().contains(self))
-                {
-                    for (InetAddressAndPort peer : group.endpoints())
-                    {
-                        if (peer.equals(self) || !alive.contains(peer)) 
continue;
-                        for (TableMetadata table : tables)
-                            peers.computeIfAbsent(peer, i -> new 
HashSet<>()).add(AccordTopology.fullRange(table.id));
-                    }
-                }
-            });
-        }
+        if (minEpoch != null && minEpoch == metadata.epoch.getEpoch())
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
+
+        Set<InetAddressAndPort> peers = new HashSet<>();
+        peers.addAll(metadata.directory.allAddresses());
+        peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
+        // No peers: single node cluster or first node to boot
         if (peers.isEmpty())
-            return -1;
+            return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
 
-        Long minEpoch = findMinEpoch(SharedContext.Global.instance, peers);
+        // Bootstrap, fetch min epoch
         if (minEpoch == null)
-            return -1;
-        return minEpoch;
+        {
+            Long fetched = findMinEpoch(SharedContext.Global.instance, peers);
+            if (fetched != null)
+                logger.info("Discovered min epoch of {} by querying {}", 
fetched, peers);
+
+            // No other node has advanced epoch just yet
+            if (fetched == null || fetched == metadata.epoch.getEpoch())
+                return 
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
+
+            minEpoch = fetched;
+        }
+
+        long maxEpoch = metadata.epoch.getEpoch();
+
+        // If we are behind minEpoch, catch up to at least minEpoch
+        if (metadata.epoch.getEpoch() < minEpoch)
+        {
+            minEpoch = metadata.epoch.getEpoch();
+            maxEpoch = minEpoch;
+        }
+
+        List<Future<Topology>> futures = new ArrayList<>();
+        logger.info("Fetching topologies for epochs [{}, {}].", minEpoch, 
maxEpoch);
+
+        for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
+            futures.add(FetchTopology.fetch(SharedContext.Global.instance, 
peers, epoch));
+
+        FBUtilities.waitOnFutures(futures);
+        List<Topology> topologies = new ArrayList<>(futures.size());
+        for (Future<Topology> future : futures)
+            topologies.add(future.get());
+
+        return topologies;
     }
 
     @VisibleForTesting
-    static Long findMinEpoch(SharedContext context, Map<InetAddressAndPort, 
Set<TokenRange>> peers)
+    static Long findMinEpoch(SharedContext context, Set<InetAddressAndPort> 
peers)
     {
         try
         {
@@ -1152,7 +1185,7 @@ public class AccordService implements IAccordService, 
Shutdownable
 
     @Nullable
     @Override
-    public Long minEpoch(Collection<TokenRange> ranges)
+    public Long minEpoch()
     {
         return node.topology().minEpoch();
     }
diff --git a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java 
b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java
index ef670c572e..3c6e18c3d6 100644
--- a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java
+++ b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java
@@ -20,9 +20,7 @@ package org.apache.cassandra.service.accord;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import javax.annotation.Nullable;
@@ -38,7 +36,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageDelivery;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.SharedContext;
@@ -53,40 +50,32 @@ import static 
org.apache.cassandra.net.MessageDelivery.logger;
 // TODO (required, efficiency): this can be simplified: we seem to always use 
"entire range"
 public class FetchMinEpoch
 {
+    private static final FetchMinEpoch instance = new FetchMinEpoch();
+
     public static final IVersionedSerializer<FetchMinEpoch> serializer = new 
IVersionedSerializer<>()
     {
         @Override
-        public void serialize(FetchMinEpoch t, DataOutputPlus out, int 
version) throws IOException
+        public void serialize(FetchMinEpoch t, DataOutputPlus out, int version)
         {
-            out.writeUnsignedVInt32(t.ranges.size());
-            for (TokenRange range : t.ranges)
-                TokenRange.serializer.serialize(range, out, version);
         }
 
         @Override
-        public FetchMinEpoch deserialize(DataInputPlus in, int version) throws 
IOException
+        public FetchMinEpoch deserialize(DataInputPlus in, int version)
         {
-            int size = in.readUnsignedVInt32();
-            List<TokenRange> ranges = new ArrayList<>(size);
-            for (int i = 0; i < size; i++)
-                ranges.add(TokenRange.serializer.deserialize(in, version));
-            return new FetchMinEpoch(ranges);
+            return FetchMinEpoch.instance;
         }
 
         @Override
         public long serializedSize(FetchMinEpoch t, int version)
         {
-            long size = TypeSizes.sizeofUnsignedVInt(t.ranges.size());
-            for (TokenRange range : t.ranges)
-                size += TokenRange.serializer.serializedSize(range, version);
-            return size;
+            return 0;
         }
     };
 
     public static final IVerbHandler<FetchMinEpoch> handler = message -> {
         if (AccordService.started())
         {
-            Long epoch = 
AccordService.instance().minEpoch(message.payload.ranges);
+            Long epoch = AccordService.instance().minEpoch();
             MessagingService.instance().respond(new Response(epoch), message);
         }
         else
@@ -96,41 +85,15 @@ public class FetchMinEpoch
         }
     };
 
-    public final Collection<TokenRange> ranges;
-
-    public FetchMinEpoch(Collection<TokenRange> ranges)
-    {
-        this.ranges = ranges;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        FetchMinEpoch that = (FetchMinEpoch) o;
-        return Objects.equals(ranges, that.ranges);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return Objects.hash(ranges);
-    }
-
-    @Override
-    public String toString()
+    private FetchMinEpoch()
     {
-        return "FetchMinEpoch{" +
-               "ranges=" + ranges +
-               '}';
     }
 
-    public static Future<Long> fetch(SharedContext context, 
Map<InetAddressAndPort, Set<TokenRange>> peers)
+    public static Future<Long> fetch(SharedContext context, 
Set<InetAddressAndPort> peers)
     {
         List<Future<Long>> accum = new ArrayList<>(peers.size());
-        for (Map.Entry<InetAddressAndPort, Set<TokenRange>> e : 
peers.entrySet())
-            accum.add(fetch(context, e.getKey(), e.getValue()));
+        for (InetAddressAndPort peer : peers)
+            accum.add(fetch(context, peer));
         // TODO (required): we are collecting only successes, but we need some 
threshold
         return FutureCombiner.successfulOf(accum).map(epochs -> {
             Long min = null;
@@ -145,21 +108,22 @@ public class FetchMinEpoch
     }
 
     @VisibleForTesting
-    static Future<Long> fetch(SharedContext context, InetAddressAndPort to, 
Set<TokenRange> value)
+    static Future<Long> fetch(SharedContext context, InetAddressAndPort to)
     {
-        FetchMinEpoch req = new FetchMinEpoch(value);
-        return context.messaging().<FetchMinEpoch, 
FetchMinEpoch.Response>sendWithRetries(Backoff.NO_OP.INSTANCE,
-                                                                               
           MessageDelivery.ImmediateRetryScheduler.instance,
-                                                                               
           Verb.ACCORD_FETCH_MIN_EPOCH_REQ, req,
-                                                                               
           Iterators.cycle(to),
-                                                                               
           
RetryPredicate.times(DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts.value),
-                                                                               
           RetryErrorMessage.EMPTY)
+        Backoff backoff = Backoff.fromConfig(context, 
DatabaseDescriptor.getAccord().minEpochSyncRetry);
+        return context.messaging().<FetchMinEpoch, 
Response>sendWithRetries(backoff,
+                                                                            
context.optionalTasks()::schedule,
+                                                                            
Verb.ACCORD_FETCH_MIN_EPOCH_REQ,
+                                                                            
FetchMinEpoch.instance,
+                                                                            
Iterators.cycle(to),
+                                                                            
RetryPredicate.ALWAYS_RETRY,
+                                                                            
RetryErrorMessage.EMPTY)
                       .map(m -> m.payload.minEpoch);
     }
 
     public static class Response
     {
-        public static final IVersionedSerializer<Response> serializer = new 
IVersionedSerializer<Response>()
+        public static final IVersionedSerializer<Response> serializer = new 
IVersionedSerializer<>()
         {
             @Override
             public void serialize(Response t, DataOutputPlus out, int version) 
throws IOException
diff --git a/src/java/org/apache/cassandra/service/accord/FetchTopology.java 
b/src/java/org/apache/cassandra/service/accord/FetchTopology.java
index d40d2654ea..dc8df4f454 100644
--- a/src/java/org/apache/cassandra/service/accord/FetchTopology.java
+++ b/src/java/org/apache/cassandra/service/accord/FetchTopology.java
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.util.Collection;
 
 import accord.topology.Topology;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.RequestFailure;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -33,10 +36,18 @@ import org.apache.cassandra.net.MessagingUtils;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.repair.SharedContext;
 import org.apache.cassandra.service.accord.serializers.TopologySerializers;
+import org.apache.cassandra.utils.Backoff;
 import org.apache.cassandra.utils.concurrent.Future;
 
 public class FetchTopology
 {
+    public String toString()
+    {
+        return "FetchTopology{" +
+               "epoch=" + epoch +
+               '}';
+    }
+
     private final long epoch;
 
     public static final IVersionedSerializer<FetchTopology> serializer = new 
IVersionedSerializer<>()
@@ -67,34 +78,20 @@ public class FetchTopology
 
     public static class Response
     {
-        private static Response UNKNOWN = new Response(-1, null) {
-            public String toString()
-            {
-                return "UNKNOWN_TOPOLOGY{}";
-            }
-        };
-
         // TODO (required): messaging version after version patch
         public static final IVersionedSerializer<Response> serializer = new 
IVersionedSerializer<>()
         {
             @Override
             public void serialize(Response t, DataOutputPlus out, int version) 
throws IOException
             {
-                if (t == UNKNOWN)
-                {
-                    out.writeLong(-1);
-                    return;
-                }
-                out.writeLong(t.epoch);
+                out.writeUnsignedVInt(t.epoch);
                 TopologySerializers.topology.serialize(t.topology, out, 
version);
             }
 
             @Override
             public Response deserialize(DataInputPlus in, int version) throws 
IOException
             {
-                long epoch = in.readLong();
-                if (epoch == -1)
-                    return UNKNOWN;
+                long epoch = in.readUnsignedVInt();
                 Topology topology = 
TopologySerializers.topology.deserialize(in, version);
                 return new Response(epoch, topology);
             }
@@ -102,10 +99,8 @@ public class FetchTopology
             @Override
             public long serializedSize(Response t, int version)
             {
-                if (t == UNKNOWN)
-                    return Long.BYTES;
-
-                return Long.BYTES + 
TopologySerializers.topology.serializedSize(t.topology, version);
+                return TypeSizes.sizeofUnsignedVInt(t.epoch)
+                       + 
TopologySerializers.topology.serializedSize(t.topology, version);
             }
         };
 
@@ -121,20 +116,25 @@ public class FetchTopology
 
     public static final IVerbHandler<FetchTopology> handler = message -> {
         long epoch = message.payload.epoch;
-        Topology topology = 
AccordService.instance().topology().maybeGlobalForEpoch(epoch);
-        if (topology == null)
-            MessagingService.instance().respond(Response.UNKNOWN, message);
-        else
+
+        Topology topology;
+        if (AccordService.isSetup() && (topology = 
AccordService.instance().topology().maybeGlobalForEpoch(epoch)) != null)
             MessagingService.instance().respond(new Response(epoch, topology), 
message);
+        else
+            
MessagingService.instance().respondWithFailure(RequestFailure.UNKNOWN_TOPOLOGY, 
message);
     };
 
     public static Future<Topology> fetch(SharedContext context, 
Collection<InetAddressAndPort> peers, long epoch)
     {
-        FetchTopology req = new FetchTopology(epoch);
-        return context.messaging().<FetchTopology, 
Response>sendWithRetries(Verb.ACCORD_FETCH_TOPOLOGY_REQ, req, 
MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers),
-                                                                               
           // If the epoch is already discovered, no need to retry
-                                                                               
           (attempt, from, failure) -> AccordService.instance().currentEpoch() 
< epoch,
-                                                                               
           MessageDelivery.RetryErrorMessage.EMPTY)
+        FetchTopology request = new FetchTopology(epoch);
+        Backoff backoff = Backoff.fromConfig(context, 
DatabaseDescriptor.getAccord().fetchRetry);
+        return context.messaging().<FetchTopology, 
Response>sendWithRetries(backoff,
+                                                                            
context.optionalTasks()::schedule,
+                                                                            
Verb.ACCORD_FETCH_TOPOLOGY_REQ,
+                                                                            
request,
+                                                                            
MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers, 
Verb.ACCORD_FETCH_TOPOLOGY_REQ.name()),
+                                                                            
(attempt, from, failure) -> true,
+                                                                            
MessageDelivery.RetryErrorMessage.EMPTY)
                       .map(m -> m.payload.topology);
     }
 }
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java 
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index 14fb94b6b4..865473d76c 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
@@ -180,7 +179,7 @@ public interface IAccordService
 
     List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId txnId);
     @Nullable
-    Long minEpoch(Collection<TokenRange> ranges);
+    Long minEpoch();
 
     void tryMarkRemoved(Topology topology, Node.Id node);
     void awaitTableDrop(TableId id);
@@ -326,7 +325,7 @@ public interface IAccordService
 
         @Nullable
         @Override
-        public Long minEpoch(Collection<TokenRange> ranges)
+        public Long minEpoch()
         {
             return null;
         }
@@ -513,9 +512,9 @@ public interface IAccordService
 
         @Nullable
         @Override
-        public Long minEpoch(Collection<TokenRange> ranges)
+        public Long minEpoch()
         {
-            return delegate.minEpoch(ranges);
+            return delegate.minEpoch();
         }
 
         @Override
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index ceb8d911ec..1d1d1742be 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -80,6 +80,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.AccordSpec",
     "org.apache.cassandra.config.AccordSpec$JournalSpec",
     "org.apache.cassandra.config.AccordSpec$MinEpochRetrySpec",
+    "org.apache.cassandra.config.AccordSpec$FetchRetrySpec",
     "org.apache.cassandra.config.AccordSpec$TransactionalRangeMigration",
     "org.apache.cassandra.config.AccordSpec$QueueShardModel",
     "org.apache.cassandra.config.AccordSpec$QueueSubmissionModel",
diff --git 
a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java 
b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java
index 7bfe09b15f..f06df7e5e3 100644
--- a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.service.accord;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -28,6 +27,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -36,18 +36,13 @@ import accord.utils.Gens;
 import accord.utils.RandomSource;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.RetrySpec;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.exceptions.RequestFailure;
 import org.apache.cassandra.io.IVersionedSerializers;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.MessageDelivery;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.SimulatedMessageDelivery.Action;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey.RoutingKeyKind;
-import org.apache.cassandra.utils.AccordGenerators;
-import org.apache.cassandra.utils.CassandraGenerators;
 import org.apache.cassandra.utils.SimulatedMiniCluster;
 import org.apache.cassandra.utils.SimulatedMiniCluster.Node;
 import org.apache.cassandra.utils.concurrent.Future;
@@ -55,7 +50,6 @@ import org.assertj.core.api.Assertions;
 
 import static accord.utils.Property.qt;
 import static org.apache.cassandra.net.MessagingService.Version.VERSION_51;
-import static org.apache.cassandra.utils.AccordGenerators.fromQT;
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class FetchMinEpochTest
@@ -74,24 +68,6 @@ public class FetchMinEpochTest
         DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts = new 
RetrySpec.MaxAttempt(retries);
     }
 
-    @Test
-    public void requestSerde()
-    {
-        DataOutputBuffer output = new DataOutputBuffer();
-        Gen<FetchMinEpoch> gen = fromQT(CassandraGenerators.partitioners())
-                                 .map(CassandraGenerators::simplify)
-                                 .flatMap(partitioner ->
-                                          
Gens.lists(AccordGenerators.range(partitioner)
-                                                                     .map(r -> 
(TokenRange) r))
-                                              .ofSizeBetween(0, 10)
-                                              .map(FetchMinEpoch::new));
-        qt().forAll(gen).check(req -> {
-            maybeSetPartitioner(req);
-            for (MessagingService.Version version : SUPPORTED)
-                IVersionedSerializers.testSerde(output, 
FetchMinEpoch.serializer, req, version.value);
-        });
-    }
-
     @Test
     public void responseSerde()
     {
@@ -115,12 +91,12 @@ public class FetchMinEpochTest
             Node from = cluster.createNodeAndJoin();
             Node to = cluster.createNodeAndJoin();
 
-            Future<Long> f = FetchMinEpoch.fetch(from, 
to.broadcastAddressAndPort(), Collections.emptySet());
+            Future<Long> f = FetchMinEpoch.fetch(from, 
to.broadcastAddressAndPort());
             assertThat(f).isNotDone();
             cluster.processAll();
             assertThat(f).isDone();
-            MessageDelivery.FailedResponseException maxRetries = 
getFailedResponseException(f);
-            
Assertions.assertThat(maxRetries.failure).isEqualTo(RequestFailure.TIMEOUT);
+            MessageDelivery.MaxRetriesException maxRetries = 
getMaxRetriesException(f);
+            
Assertions.assertThat(maxRetries.attempts).isEqualTo(expectedMaxAttempts);
         });
     }
 
@@ -139,7 +115,7 @@ public class FetchMinEpochTest
             }
             Node to = cluster.createNodeAndJoin();
 
-            Future<Long> f = FetchMinEpoch.fetch(from, 
to.broadcastAddressAndPort(), Collections.emptySet());
+            Future<Long> f = FetchMinEpoch.fetch(from, 
to.broadcastAddressAndPort());
             assertThat(f).isNotDone();
             cluster.processAll();
             assertThat(f).isDone();
@@ -161,10 +137,10 @@ public class FetchMinEpochTest
             Node to3 = cluster.createNodeAndJoin();
             Node to4 = cluster.createNodeAndJoin();
 
-            Future<Long> f = FetchMinEpoch.fetch(from, 
ImmutableMap.of(to1.broadcastAddressAndPort(), Collections.emptySet(),
-                                                                       
to2.broadcastAddressAndPort(), Collections.emptySet(),
-                                                                       
to3.broadcastAddressAndPort(), Collections.emptySet(),
-                                                                       
to4.broadcastAddressAndPort(), Collections.emptySet()));
+            Future<Long> f = FetchMinEpoch.fetch(from, 
ImmutableSet.of(to1.broadcastAddressAndPort(),
+                                                                       
to2.broadcastAddressAndPort(),
+                                                                       
to3.broadcastAddressAndPort(),
+                                                                       
to4.broadcastAddressAndPort()));
             assertThat(f).isNotDone();
             cluster.processAll();
             assertThat(f).isDone();
@@ -201,10 +177,10 @@ public class FetchMinEpochTest
                                                                                
       to4.broadcastAddressAndPort(), actionGen(rs, maxRetries));
             from.messagingActions((self, msg, to) -> 
nodeToActions.get(to).get());
 
-            Future<Long> f = FetchMinEpoch.fetch(from, 
ImmutableMap.of(to1.broadcastAddressAndPort(), Collections.emptySet(),
-                                                                       
to2.broadcastAddressAndPort(), Collections.emptySet(),
-                                                                       
to3.broadcastAddressAndPort(), Collections.emptySet(),
-                                                                       
to4.broadcastAddressAndPort(), Collections.emptySet()));
+            Future<Long> f = FetchMinEpoch.fetch(from, 
ImmutableSet.of(to1.broadcastAddressAndPort(),
+                                                                       
to2.broadcastAddressAndPort(),
+                                                                       
to3.broadcastAddressAndPort(),
+                                                                       
to4.broadcastAddressAndPort()));
             assertThat(f).isNotDone();
             cluster.processAll();
             assertThat(f).isDone();
@@ -235,53 +211,6 @@ public class FetchMinEpochTest
         return safeActionGen.asSupplier(actionSource);
     }
 
-    private static void maybeSetPartitioner(FetchMinEpoch req)
-    {
-        IPartitioner partitioner = null;
-        for (TokenRange r : req.ranges)
-        {
-            IPartitioner rangePartitioner = null;
-            if (r.start().kindOfRoutingKey() != RoutingKeyKind.SENTINEL)
-                rangePartitioner = r.start().token().getPartitioner();
-            if (rangePartitioner == null && r.end().kindOfRoutingKey() != 
RoutingKeyKind.SENTINEL)
-                rangePartitioner = r.end().token().getPartitioner();
-            if (rangePartitioner == null)
-                continue;
-            if (partitioner == null)
-            {
-                partitioner = rangePartitioner;
-            }
-            else
-            {
-                Assertions.assertThat(rangePartitioner).isEqualTo(partitioner);
-            }
-        }
-        if (partitioner != null)
-            DatabaseDescriptor.setPartitionerUnsafe(partitioner);
-    }
-
-    private static MessageDelivery.FailedResponseException 
getFailedResponseException(Future<Long> f) throws InterruptedException, 
ExecutionException
-    {
-        MessageDelivery.FailedResponseException exception;
-        try
-        {
-            f.get();
-            Assert.fail("Future should have failed");
-            throw new AssertionError("Unreachable");
-        }
-        catch (ExecutionException e)
-        {
-            if (e.getCause() instanceof 
MessageDelivery.FailedResponseException)
-            {
-                exception = (MessageDelivery.FailedResponseException) 
e.getCause();
-            }
-            else
-            {
-                throw e;
-            }
-        }
-        return exception;
-    }
 
     private static MessageDelivery.MaxRetriesException 
getMaxRetriesException(Future<Long> f) throws InterruptedException, 
ExecutionException
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to