This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 2936dd7c00 Improve topology fetching
2936dd7c00 is described below
commit 2936dd7c004368faf598d8fd10e9ef5b078dc123
Author: Alex Petrov <[email protected]>
AuthorDate: Mon Jan 27 16:25:09 2025 +0100
Improve topology fetching
- preclude TCM of being behind Accord if newer epoch is reported via
withEpoch/fetchTopologyInternal
- improve topology discovery during first boot and replay
- fix races between config service TCM listener reporting topologies, and
fetched topologies during
Patch by Alex Petrov, reviewed by Ariel Weisberg and David Capwell for
CASSANDRA-20245
---
modules/accord | 2 +-
.../org/apache/cassandra/config/AccordSpec.java | 11 +-
.../cassandra/exceptions/RequestFailure.java | 2 +
.../cassandra/exceptions/RequestFailureReason.java | 1 +
.../org/apache/cassandra/net/MessageDelivery.java | 11 +-
.../org/apache/cassandra/net/MessagingUtils.java | 16 ++-
.../service/accord/AccordConfigurationService.java | 54 +++++++--
.../cassandra/service/accord/AccordService.java | 131 +++++++++++++--------
.../cassandra/service/accord/FetchMinEpoch.java | 78 ++++--------
.../cassandra/service/accord/FetchTopology.java | 58 ++++-----
.../cassandra/service/accord/IAccordService.java | 9 +-
.../config/DatabaseDescriptorRefTest.java | 1 +
.../service/accord/FetchMinEpochTest.java | 97 ++-------------
13 files changed, 226 insertions(+), 245 deletions(-)
diff --git a/modules/accord b/modules/accord
index e5e108adfe..78ab7eef90 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit e5e108adfeb817f899ca89507e8fa6bc0b191759
+Subproject commit 78ab7eef904ef549d0d7a34332b83d6110e0762d
diff --git a/src/java/org/apache/cassandra/config/AccordSpec.java
b/src/java/org/apache/cassandra/config/AccordSpec.java
index 720a2db772..566a0d16ef 100644
--- a/src/java/org/apache/cassandra/config/AccordSpec.java
+++ b/src/java/org/apache/cassandra/config/AccordSpec.java
@@ -194,7 +194,8 @@ public class AccordSpec
public boolean ephemeralReadEnabled = true;
public boolean state_cache_listener_jfr_enabled = true;
public final JournalSpec journal = new JournalSpec();
- public final MinEpochRetrySpec minEpochSyncRetry = new MinEpochRetrySpec();
+ public final RetrySpec minEpochSyncRetry = new MinEpochRetrySpec();
+ public final RetrySpec fetchRetry = new FetchRetrySpec();
public static class MinEpochRetrySpec extends RetrySpec
{
@@ -204,6 +205,14 @@ public class AccordSpec
}
}
+ public static class FetchRetrySpec extends RetrySpec
+ {
+ public FetchRetrySpec()
+ {
+ maxAttempts = new MaxAttempt(100);
+ }
+ }
+
public static class JournalSpec implements Params
{
public int segmentSize = 32 << 20;
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailure.java
b/src/java/org/apache/cassandra/exceptions/RequestFailure.java
index 312102c012..6946b812aa 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailure.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailure.java
@@ -43,6 +43,7 @@ import static
org.apache.cassandra.exceptions.ExceptionSerializer.nullableRemote
public class RequestFailure
{
public static final RequestFailure UNKNOWN = new
RequestFailure(RequestFailureReason.UNKNOWN);
+ public static final RequestFailure UNKNOWN_TOPOLOGY = new
RequestFailure(RequestFailureReason.UNKNOWN_TOPOLOGY);
public static final RequestFailure READ_TOO_MANY_TOMBSTONES = new
RequestFailure(RequestFailureReason.READ_TOO_MANY_TOMBSTONES);
public static final RequestFailure TIMEOUT = new
RequestFailure(RequestFailureReason.TIMEOUT);
public static final RequestFailure INCOMPATIBLE_SCHEMA = new
RequestFailure(RequestFailureReason.INCOMPATIBLE_SCHEMA);
@@ -134,6 +135,7 @@ public class RequestFailure
{
default: throw new IllegalStateException("Unhandled request
failure reason " + reason);
case UNKNOWN: return UNKNOWN;
+ case UNKNOWN_TOPOLOGY: return UNKNOWN_TOPOLOGY;
case READ_TOO_MANY_TOMBSTONES: return READ_TOO_MANY_TOMBSTONES;
case TIMEOUT: return TIMEOUT;
case INCOMPATIBLE_SCHEMA: return INCOMPATIBLE_SCHEMA;
diff --git a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
index b3b9bddfc4..917c6c753d 100644
--- a/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
+++ b/src/java/org/apache/cassandra/exceptions/RequestFailureReason.java
@@ -42,6 +42,7 @@ public enum RequestFailureReason
READ_TOO_MANY_INDEXES (10),
RETRY_ON_DIFFERENT_TRANSACTION_SYSTEM (11),
BOOTING (12),
+ UNKNOWN_TOPOLOGY (13)
;
static
diff --git a/src/java/org/apache/cassandra/net/MessageDelivery.java
b/src/java/org/apache/cassandra/net/MessageDelivery.java
index 9dc374750f..c57366d450 100644
--- a/src/java/org/apache/cassandra/net/MessageDelivery.java
+++ b/src/java/org/apache/cassandra/net/MessageDelivery.java
@@ -99,14 +99,6 @@ public interface MessageDelivery
return promise;
}
- public default <REQ, RSP> Future<Message<RSP>> sendWithRetries(Verb verb,
REQ request,
-
Iterator<InetAddressAndPort> candidates,
-
RetryPredicate shouldRetry,
-
RetryErrorMessage errorMessage)
- {
- return sendWithRetries(Backoff.NO_OP.INSTANCE,
ImmediateRetryScheduler.instance, verb, request, candidates, shouldRetry,
errorMessage);
- }
-
public default <REQ, RSP> void sendWithRetries(Backoff backoff,
RetryScheduler retryThreads,
Verb verb, REQ request,
Iterator<InetAddressAndPort> candidates,
@@ -147,7 +139,8 @@ public interface MessageDelivery
}
private static <REQ, RSP> void sendWithRetries(MessageDelivery messaging,
- Backoff backoff,
RetryScheduler retryThreads,
+ Backoff backoff,
+ RetryScheduler retryThreads,
Verb verb, REQ request,
Iterator<InetAddressAndPort> candidates,
OnResult<RSP> onResult,
diff --git a/src/java/org/apache/cassandra/net/MessagingUtils.java
b/src/java/org/apache/cassandra/net/MessagingUtils.java
index 2190eaf3a6..11735f8d7f 100644
--- a/src/java/org/apache/cassandra/net/MessagingUtils.java
+++ b/src/java/org/apache/cassandra/net/MessagingUtils.java
@@ -18,22 +18,31 @@
package org.apache.cassandra.net;
+import java.util.Collection;
import java.util.Iterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.SharedContext;
public class MessagingUtils
{
+ private static final Logger logger =
LoggerFactory.getLogger(MessagingUtils.class);
+
/**
* Candidate iterator that would try all endpoints known to be alive
first, and then try all endpoints
* in a round-robin manner.
+ * <p>
+ * Calls onIteration every time after exhausting the peers.
*/
- public static Iterator<InetAddressAndPort> tryAliveFirst(SharedContext
context, Iterable<InetAddressAndPort> peers)
+ public static Iterator<InetAddressAndPort> tryAliveFirst(SharedContext
context, Collection<InetAddressAndPort> peers, String verb)
{
return new Iterator<>()
{
boolean firstRun = true;
+ int attempt = 0;
Iterator<InetAddressAndPort> iter = peers.iterator();
boolean isEmpty = !iter.hasNext();
@@ -58,10 +67,13 @@ public class MessagingUtils
// After that, cycle through all nodes
if (!iter.hasNext())
+ {
+ logger.warn("Exhausted iterator on {} cycling through the
set of peers: {} attempt #{}", verb, peers, attempt++);
iter = peers.iterator();
+ }
return iter.next();
}
};
}
-}
+}
\ No newline at end of file
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 0dbf99635a..9d8b41bcb1 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -213,6 +213,7 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
}
}
+ //TODO (required): should not be public
public final ChangeListener listener = new MetadataChangeListener();
private class MetadataChangeListener implements ChangeListener
{
@@ -267,8 +268,6 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
Map<Node.Id, Long> removedNodes = mapping.removedNodes();
for (Map.Entry<Node.Id, Long> e : removedNodes.entrySet())
onNodeRemoved(e.getValue(), currentTopology(), e.getKey());
-
- ClusterMetadataService.instance().log().addListener(listener);
}
@Override
@@ -416,13 +415,36 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
long epoch = metadata.epoch.getEpoch();
synchronized (epochs)
{
- if (epochs.maxEpoch() == 0)
+ // On first boot, we have 2 options:
+ //
+ // - we can start listening to TCM _before_ we replay topologies
+ // - we can start listening to TCM _after_ we replay topologies
+ //
+ // If we start listening to TCM _before_ we replay topologies from
other nodes,
+ // we may end up in a situation where TCM reports metadata that
would create an
+ // `epoch - 1` epoch state that is not associated with any
topologies, and
+ // therefore should not be listened upon.
+ //
+ // If we start listening to TCM _after_ we replay topologies, we
may end up in a
+ // situation where TCM reports metadata that is 1 (or more) epochs
_ahead_ of the
+ // last known epoch. Previous implementations were using TCM peer
catch up, which
+ // could have resulted in gaps.
+ //
+ // Current protocol solves both problems by _first_ replaying
topologies form peers,
+ // then subscribing to TCM _and_, if there are still any gaps,
filling them again.
+ // However, it still has a slight chance of creating an `epoch -
1` epoch state
+ // not associated with any topologies, which under "right"
circumstances could
+ // have been waited upon with `epochReady`. This check precludes
creation of this
+ // epoch: by the time this code can be called, remote topology
replay is already
+ // done, so TCM listener will only report epochs that are _at
least_ min epoch.
+ if (epochs.maxEpoch() == 0 || epochs.minEpoch() ==
metadata.epoch.getEpoch())
{
getOrCreateEpochState(epoch); // touch epoch state so
subsequent calls see it
reportMetadata(metadata);
return;
}
}
+
getOrCreateEpochState(epoch - 1).acknowledged().addCallback(() ->
reportMetadata(metadata));
}
@@ -433,16 +455,25 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
Stage.ACCORD_MIGRATION.execute(() -> {
if (ClusterMetadata.current().epoch.getEpoch() < epoch)
ClusterMetadataService.instance().fetchLogFromCMS(Epoch.create(epoch));
+
+ // In most cases, after fetching log from CMS, we will be caught
up to the required epoch.
+ // This TCM will also notify Accord via reportMetadata, so we do
not need to fetch topologies.
+ // If metadata has reported has skipped one or more epochs, and is
_ahead_ of the requested epoch,
+ // we need to fetch topologies from peers to fill in the gap.
+ ClusterMetadata metadata = ClusterMetadata.current();
+ if (metadata.epoch.getEpoch() == epoch)
+ return;
+
try
{
- Set<InetAddressAndPort> peers = new
HashSet<>(ClusterMetadata.current().directory.allJoinedEndpoints());
+ Set<InetAddressAndPort> peers = new
HashSet<>(metadata.directory.allJoinedEndpoints());
peers.remove(FBUtilities.getBroadcastAddressAndPort());
if (peers.isEmpty())
return;
- Topology topology;
- while ((topology =
FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get()) == null)
- {
- }
+
+ // TODO (required): fetch only _missing_ topologies.
+ Topology topology =
FetchTopology.fetch(SharedContext.Global.instance, peers, epoch).get();
+ Invariants.require(topology.epoch() == epoch);
reportTopology(topology);
}
catch (InterruptedException e)
@@ -461,6 +492,13 @@ public class AccordConfigurationService extends
AbstractConfigurationService<Acc
});
}
+ @Override
+ public void reportTopology(Topology topology, boolean isLoad, boolean
startSync)
+ {
+ Invariants.require(topology.epoch() <=
ClusterMetadata.current().epoch.getEpoch());
+ super.reportTopology(topology, isLoad, startSync);
+ }
+
@Override
protected void localSyncComplete(Topology topology, boolean startSync)
{
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index a84cfa8f07..b4c3e636bf 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -22,13 +22,10 @@ import java.math.BigInteger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -44,7 +41,6 @@ import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.base.Throwables;
-import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,7 +134,6 @@ import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.SharedContext;
-import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
@@ -165,7 +160,6 @@ import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.membership.NodeId;
-import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.Dispatcher;
import org.apache.cassandra.utils.Blocking;
@@ -379,25 +373,48 @@ public class AccordService implements IAccordService,
Shutdownable
node.commandStores().restoreShardStateUnsafe(topology ->
configService.reportTopology(topology, true, true));
configService.start();
- long minEpoch = fetchMinEpoch();
- if (minEpoch >= 0)
+ try
{
- for (long epoch = minEpoch; epoch <= metadata.epoch.getEpoch();
epoch++)
- node.configService().fetchTopologyForEpoch(epoch);
+ // Fetch topologies up to current
+ List<Topology> topologies = fetchTopologies(null, metadata);
+ for (Topology topology : topologies)
+ configService.reportTopology(topology);
- try
- {
-
epochReady(metadata.epoch).get(DatabaseDescriptor.getTransactionTimeout(MILLISECONDS),
MILLISECONDS);
- }
- catch (InterruptedException e)
+
ClusterMetadataService.instance().log().addListener(configService.listener);
+ ClusterMetadata next = ClusterMetadata.current();
+
+ // if metadata was updated before we were able to add a listener,
fetch remaining topologies
+ if (next.epoch.isAfter(metadata.epoch))
{
- throw new UncheckedInterruptedException(e);
+ topologies = fetchTopologies(metadata.epoch.getEpoch() + 1,
next);
+ for (Topology topology : topologies)
+ configService.reportTopology(topology);
}
- catch (ExecutionException | TimeoutException e)
+
+ int attempt = 0;
+ int waitSeconds = 5;
+ while (true)
{
- throw new RuntimeException(e);
+ try
+ {
+ epochReady(metadata.epoch).get(waitSeconds, SECONDS);
+ break;
+ }
+ catch (TimeoutException e)
+ {
+ logger.warn("Epoch {} is not ready after waiting for {}
seconds", metadata.epoch, (++attempt) * waitSeconds);
+ }
}
}
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new UncheckedInterruptedException(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
fastPathCoordinator.start();
ClusterMetadataService.instance().log().addListener(fastPathCoordinator);
@@ -412,44 +429,60 @@ public class AccordService implements IAccordService,
Shutdownable
}
/**
- * Queries peers to discover min epoch
+ * Queries peers to discover min epoch, and then fetches all topologies
between min and current epochs
*/
- private long fetchMinEpoch()
+ private List<Topology> fetchTopologies(Long minEpoch, ClusterMetadata
metadata) throws ExecutionException, InterruptedException
{
- ClusterMetadata metadata = ClusterMetadata.current();
- Map<InetAddressAndPort, Set<TokenRange>> peers = new HashMap<>();
- for (KeyspaceMetadata keyspace : metadata.schema.getKeyspaces())
- {
- List<TableMetadata> tables =
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
- if (tables.isEmpty())
- continue;
- DataPlacement current =
metadata.placements.get(keyspace.params.replication);
- DataPlacement settled =
metadata.writePlacementAllSettled(keyspace);
- Sets.SetView<InetAddressAndPort> alive =
Sets.intersection(settled.writes.byEndpoint().keySet(),
current.writes.byEndpoint().keySet());
- InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
- settled.writes.forEach((range, group) -> {
- if (group.endpoints().contains(self))
- {
- for (InetAddressAndPort peer : group.endpoints())
- {
- if (peer.equals(self) || !alive.contains(peer))
continue;
- for (TableMetadata table : tables)
- peers.computeIfAbsent(peer, i -> new
HashSet<>()).add(AccordTopology.fullRange(table.id));
- }
- }
- });
- }
+ if (minEpoch != null && minEpoch == metadata.epoch.getEpoch())
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
+
+ Set<InetAddressAndPort> peers = new HashSet<>();
+ peers.addAll(metadata.directory.allAddresses());
+ peers.remove(FBUtilities.getBroadcastAddressAndPort());
+
+ // No peers: single node cluster or first node to boot
if (peers.isEmpty())
- return -1;
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
- Long minEpoch = findMinEpoch(SharedContext.Global.instance, peers);
+ // Bootstrap, fetch min epoch
if (minEpoch == null)
- return -1;
- return minEpoch;
+ {
+ Long fetched = findMinEpoch(SharedContext.Global.instance, peers);
+ if (fetched != null)
+ logger.info("Discovered min epoch of {} by querying {}",
fetched, peers);
+
+ // No other node has advanced epoch just yet
+ if (fetched == null || fetched == metadata.epoch.getEpoch())
+ return
Collections.singletonList(AccordTopology.createAccordTopology(metadata));
+
+ minEpoch = fetched;
+ }
+
+ long maxEpoch = metadata.epoch.getEpoch();
+
+ // If we are behind minEpoch, catch up to at least minEpoch
+ if (metadata.epoch.getEpoch() < minEpoch)
+ {
+ minEpoch = metadata.epoch.getEpoch();
+ maxEpoch = minEpoch;
+ }
+
+ List<Future<Topology>> futures = new ArrayList<>();
+ logger.info("Fetching topologies for epochs [{}, {}].", minEpoch,
maxEpoch);
+
+ for (long epoch = minEpoch; epoch <= maxEpoch; epoch++)
+ futures.add(FetchTopology.fetch(SharedContext.Global.instance,
peers, epoch));
+
+ FBUtilities.waitOnFutures(futures);
+ List<Topology> topologies = new ArrayList<>(futures.size());
+ for (Future<Topology> future : futures)
+ topologies.add(future.get());
+
+ return topologies;
}
@VisibleForTesting
- static Long findMinEpoch(SharedContext context, Map<InetAddressAndPort,
Set<TokenRange>> peers)
+ static Long findMinEpoch(SharedContext context, Set<InetAddressAndPort>
peers)
{
try
{
@@ -1152,7 +1185,7 @@ public class AccordService implements IAccordService,
Shutdownable
@Nullable
@Override
- public Long minEpoch(Collection<TokenRange> ranges)
+ public Long minEpoch()
{
return node.topology().minEpoch();
}
diff --git a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java
b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java
index ef670c572e..3c6e18c3d6 100644
--- a/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java
+++ b/src/java/org/apache/cassandra/service/accord/FetchMinEpoch.java
@@ -20,9 +20,7 @@ package org.apache.cassandra.service.accord;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
@@ -38,7 +36,6 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.SharedContext;
@@ -53,40 +50,32 @@ import static
org.apache.cassandra.net.MessageDelivery.logger;
// TODO (required, efficiency): this can be simplified: we seem to always use
"entire range"
public class FetchMinEpoch
{
+ private static final FetchMinEpoch instance = new FetchMinEpoch();
+
public static final IVersionedSerializer<FetchMinEpoch> serializer = new
IVersionedSerializer<>()
{
@Override
- public void serialize(FetchMinEpoch t, DataOutputPlus out, int
version) throws IOException
+ public void serialize(FetchMinEpoch t, DataOutputPlus out, int version)
{
- out.writeUnsignedVInt32(t.ranges.size());
- for (TokenRange range : t.ranges)
- TokenRange.serializer.serialize(range, out, version);
}
@Override
- public FetchMinEpoch deserialize(DataInputPlus in, int version) throws
IOException
+ public FetchMinEpoch deserialize(DataInputPlus in, int version)
{
- int size = in.readUnsignedVInt32();
- List<TokenRange> ranges = new ArrayList<>(size);
- for (int i = 0; i < size; i++)
- ranges.add(TokenRange.serializer.deserialize(in, version));
- return new FetchMinEpoch(ranges);
+ return FetchMinEpoch.instance;
}
@Override
public long serializedSize(FetchMinEpoch t, int version)
{
- long size = TypeSizes.sizeofUnsignedVInt(t.ranges.size());
- for (TokenRange range : t.ranges)
- size += TokenRange.serializer.serializedSize(range, version);
- return size;
+ return 0;
}
};
public static final IVerbHandler<FetchMinEpoch> handler = message -> {
if (AccordService.started())
{
- Long epoch =
AccordService.instance().minEpoch(message.payload.ranges);
+ Long epoch = AccordService.instance().minEpoch();
MessagingService.instance().respond(new Response(epoch), message);
}
else
@@ -96,41 +85,15 @@ public class FetchMinEpoch
}
};
- public final Collection<TokenRange> ranges;
-
- public FetchMinEpoch(Collection<TokenRange> ranges)
- {
- this.ranges = ranges;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- FetchMinEpoch that = (FetchMinEpoch) o;
- return Objects.equals(ranges, that.ranges);
- }
-
- @Override
- public int hashCode()
- {
- return Objects.hash(ranges);
- }
-
- @Override
- public String toString()
+ private FetchMinEpoch()
{
- return "FetchMinEpoch{" +
- "ranges=" + ranges +
- '}';
}
- public static Future<Long> fetch(SharedContext context,
Map<InetAddressAndPort, Set<TokenRange>> peers)
+ public static Future<Long> fetch(SharedContext context,
Set<InetAddressAndPort> peers)
{
List<Future<Long>> accum = new ArrayList<>(peers.size());
- for (Map.Entry<InetAddressAndPort, Set<TokenRange>> e :
peers.entrySet())
- accum.add(fetch(context, e.getKey(), e.getValue()));
+ for (InetAddressAndPort peer : peers)
+ accum.add(fetch(context, peer));
// TODO (required): we are collecting only successes, but we need some
threshold
return FutureCombiner.successfulOf(accum).map(epochs -> {
Long min = null;
@@ -145,21 +108,22 @@ public class FetchMinEpoch
}
@VisibleForTesting
- static Future<Long> fetch(SharedContext context, InetAddressAndPort to,
Set<TokenRange> value)
+ static Future<Long> fetch(SharedContext context, InetAddressAndPort to)
{
- FetchMinEpoch req = new FetchMinEpoch(value);
- return context.messaging().<FetchMinEpoch,
FetchMinEpoch.Response>sendWithRetries(Backoff.NO_OP.INSTANCE,
-
MessageDelivery.ImmediateRetryScheduler.instance,
-
Verb.ACCORD_FETCH_MIN_EPOCH_REQ, req,
-
Iterators.cycle(to),
-
RetryPredicate.times(DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts.value),
-
RetryErrorMessage.EMPTY)
+ Backoff backoff = Backoff.fromConfig(context,
DatabaseDescriptor.getAccord().minEpochSyncRetry);
+ return context.messaging().<FetchMinEpoch,
Response>sendWithRetries(backoff,
+
context.optionalTasks()::schedule,
+
Verb.ACCORD_FETCH_MIN_EPOCH_REQ,
+
FetchMinEpoch.instance,
+
Iterators.cycle(to),
+
RetryPredicate.ALWAYS_RETRY,
+
RetryErrorMessage.EMPTY)
.map(m -> m.payload.minEpoch);
}
public static class Response
{
- public static final IVersionedSerializer<Response> serializer = new
IVersionedSerializer<Response>()
+ public static final IVersionedSerializer<Response> serializer = new
IVersionedSerializer<>()
{
@Override
public void serialize(Response t, DataOutputPlus out, int version)
throws IOException
diff --git a/src/java/org/apache/cassandra/service/accord/FetchTopology.java
b/src/java/org/apache/cassandra/service/accord/FetchTopology.java
index d40d2654ea..dc8df4f454 100644
--- a/src/java/org/apache/cassandra/service/accord/FetchTopology.java
+++ b/src/java/org/apache/cassandra/service/accord/FetchTopology.java
@@ -22,6 +22,9 @@ import java.io.IOException;
import java.util.Collection;
import accord.topology.Topology;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.io.IVersionedSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -33,10 +36,18 @@ import org.apache.cassandra.net.MessagingUtils;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.SharedContext;
import org.apache.cassandra.service.accord.serializers.TopologySerializers;
+import org.apache.cassandra.utils.Backoff;
import org.apache.cassandra.utils.concurrent.Future;
public class FetchTopology
{
+ public String toString()
+ {
+ return "FetchTopology{" +
+ "epoch=" + epoch +
+ '}';
+ }
+
private final long epoch;
public static final IVersionedSerializer<FetchTopology> serializer = new
IVersionedSerializer<>()
@@ -67,34 +78,20 @@ public class FetchTopology
public static class Response
{
- private static Response UNKNOWN = new Response(-1, null) {
- public String toString()
- {
- return "UNKNOWN_TOPOLOGY{}";
- }
- };
-
// TODO (required): messaging version after version patch
public static final IVersionedSerializer<Response> serializer = new
IVersionedSerializer<>()
{
@Override
public void serialize(Response t, DataOutputPlus out, int version)
throws IOException
{
- if (t == UNKNOWN)
- {
- out.writeLong(-1);
- return;
- }
- out.writeLong(t.epoch);
+ out.writeUnsignedVInt(t.epoch);
TopologySerializers.topology.serialize(t.topology, out,
version);
}
@Override
public Response deserialize(DataInputPlus in, int version) throws
IOException
{
- long epoch = in.readLong();
- if (epoch == -1)
- return UNKNOWN;
+ long epoch = in.readUnsignedVInt();
Topology topology =
TopologySerializers.topology.deserialize(in, version);
return new Response(epoch, topology);
}
@@ -102,10 +99,8 @@ public class FetchTopology
@Override
public long serializedSize(Response t, int version)
{
- if (t == UNKNOWN)
- return Long.BYTES;
-
- return Long.BYTES +
TopologySerializers.topology.serializedSize(t.topology, version);
+ return TypeSizes.sizeofUnsignedVInt(t.epoch)
+ +
TopologySerializers.topology.serializedSize(t.topology, version);
}
};
@@ -121,20 +116,25 @@ public class FetchTopology
public static final IVerbHandler<FetchTopology> handler = message -> {
long epoch = message.payload.epoch;
- Topology topology =
AccordService.instance().topology().maybeGlobalForEpoch(epoch);
- if (topology == null)
- MessagingService.instance().respond(Response.UNKNOWN, message);
- else
+
+ Topology topology;
+ if (AccordService.isSetup() && (topology =
AccordService.instance().topology().maybeGlobalForEpoch(epoch)) != null)
MessagingService.instance().respond(new Response(epoch, topology),
message);
+ else
+
MessagingService.instance().respondWithFailure(RequestFailure.UNKNOWN_TOPOLOGY,
message);
};
public static Future<Topology> fetch(SharedContext context,
Collection<InetAddressAndPort> peers, long epoch)
{
- FetchTopology req = new FetchTopology(epoch);
- return context.messaging().<FetchTopology,
Response>sendWithRetries(Verb.ACCORD_FETCH_TOPOLOGY_REQ, req,
MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers),
-
// If the epoch is already discovered, no need to retry
-
(attempt, from, failure) -> AccordService.instance().currentEpoch()
< epoch,
-
MessageDelivery.RetryErrorMessage.EMPTY)
+ FetchTopology request = new FetchTopology(epoch);
+ Backoff backoff = Backoff.fromConfig(context,
DatabaseDescriptor.getAccord().fetchRetry);
+ return context.messaging().<FetchTopology,
Response>sendWithRetries(backoff,
+
context.optionalTasks()::schedule,
+
Verb.ACCORD_FETCH_TOPOLOGY_REQ,
+
request,
+
MessagingUtils.tryAliveFirst(SharedContext.Global.instance, peers,
Verb.ACCORD_FETCH_TOPOLOGY_REQ.name()),
+
(attempt, from, failure) -> true,
+
MessageDelivery.RetryErrorMessage.EMPTY)
.map(m -> m.payload.topology);
}
}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index 14fb94b6b4..865473d76c 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service.accord;
-import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
@@ -180,7 +179,7 @@ public interface IAccordService
List<CommandStoreTxnBlockedGraph> debugTxnBlockedGraph(TxnId txnId);
@Nullable
- Long minEpoch(Collection<TokenRange> ranges);
+ Long minEpoch();
void tryMarkRemoved(Topology topology, Node.Id node);
void awaitTableDrop(TableId id);
@@ -326,7 +325,7 @@ public interface IAccordService
@Nullable
@Override
- public Long minEpoch(Collection<TokenRange> ranges)
+ public Long minEpoch()
{
return null;
}
@@ -513,9 +512,9 @@ public interface IAccordService
@Nullable
@Override
- public Long minEpoch(Collection<TokenRange> ranges)
+ public Long minEpoch()
{
- return delegate.minEpoch(ranges);
+ return delegate.minEpoch();
}
@Override
diff --git
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index ceb8d911ec..1d1d1742be 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -80,6 +80,7 @@ public class DatabaseDescriptorRefTest
"org.apache.cassandra.config.AccordSpec",
"org.apache.cassandra.config.AccordSpec$JournalSpec",
"org.apache.cassandra.config.AccordSpec$MinEpochRetrySpec",
+ "org.apache.cassandra.config.AccordSpec$FetchRetrySpec",
"org.apache.cassandra.config.AccordSpec$TransactionalRangeMigration",
"org.apache.cassandra.config.AccordSpec$QueueShardModel",
"org.apache.cassandra.config.AccordSpec$QueueSubmissionModel",
diff --git
a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java
b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java
index 7bfe09b15f..f06df7e5e3 100644
--- a/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/FetchMinEpochTest.java
@@ -18,7 +18,6 @@
package org.apache.cassandra.service.accord;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -28,6 +27,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.junit.Assert;
import org.junit.Test;
@@ -36,18 +36,13 @@ import accord.utils.Gens;
import accord.utils.RandomSource;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.RetrySpec;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Murmur3Partitioner;
-import org.apache.cassandra.exceptions.RequestFailure;
import org.apache.cassandra.io.IVersionedSerializers;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.MessageDelivery;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.SimulatedMessageDelivery.Action;
-import org.apache.cassandra.service.accord.api.AccordRoutingKey.RoutingKeyKind;
-import org.apache.cassandra.utils.AccordGenerators;
-import org.apache.cassandra.utils.CassandraGenerators;
import org.apache.cassandra.utils.SimulatedMiniCluster;
import org.apache.cassandra.utils.SimulatedMiniCluster.Node;
import org.apache.cassandra.utils.concurrent.Future;
@@ -55,7 +50,6 @@ import org.assertj.core.api.Assertions;
import static accord.utils.Property.qt;
import static org.apache.cassandra.net.MessagingService.Version.VERSION_51;
-import static org.apache.cassandra.utils.AccordGenerators.fromQT;
import static org.assertj.core.api.Assertions.assertThat;
public class FetchMinEpochTest
@@ -74,24 +68,6 @@ public class FetchMinEpochTest
DatabaseDescriptor.getAccord().minEpochSyncRetry.maxAttempts = new
RetrySpec.MaxAttempt(retries);
}
- @Test
- public void requestSerde()
- {
- DataOutputBuffer output = new DataOutputBuffer();
- Gen<FetchMinEpoch> gen = fromQT(CassandraGenerators.partitioners())
- .map(CassandraGenerators::simplify)
- .flatMap(partitioner ->
-
Gens.lists(AccordGenerators.range(partitioner)
- .map(r ->
(TokenRange) r))
- .ofSizeBetween(0, 10)
- .map(FetchMinEpoch::new));
- qt().forAll(gen).check(req -> {
- maybeSetPartitioner(req);
- for (MessagingService.Version version : SUPPORTED)
- IVersionedSerializers.testSerde(output,
FetchMinEpoch.serializer, req, version.value);
- });
- }
-
@Test
public void responseSerde()
{
@@ -115,12 +91,12 @@ public class FetchMinEpochTest
Node from = cluster.createNodeAndJoin();
Node to = cluster.createNodeAndJoin();
- Future<Long> f = FetchMinEpoch.fetch(from,
to.broadcastAddressAndPort(), Collections.emptySet());
+ Future<Long> f = FetchMinEpoch.fetch(from,
to.broadcastAddressAndPort());
assertThat(f).isNotDone();
cluster.processAll();
assertThat(f).isDone();
- MessageDelivery.FailedResponseException maxRetries =
getFailedResponseException(f);
-
Assertions.assertThat(maxRetries.failure).isEqualTo(RequestFailure.TIMEOUT);
+ MessageDelivery.MaxRetriesException maxRetries =
getMaxRetriesException(f);
+
Assertions.assertThat(maxRetries.attempts).isEqualTo(expectedMaxAttempts);
});
}
@@ -139,7 +115,7 @@ public class FetchMinEpochTest
}
Node to = cluster.createNodeAndJoin();
- Future<Long> f = FetchMinEpoch.fetch(from,
to.broadcastAddressAndPort(), Collections.emptySet());
+ Future<Long> f = FetchMinEpoch.fetch(from,
to.broadcastAddressAndPort());
assertThat(f).isNotDone();
cluster.processAll();
assertThat(f).isDone();
@@ -161,10 +137,10 @@ public class FetchMinEpochTest
Node to3 = cluster.createNodeAndJoin();
Node to4 = cluster.createNodeAndJoin();
- Future<Long> f = FetchMinEpoch.fetch(from,
ImmutableMap.of(to1.broadcastAddressAndPort(), Collections.emptySet(),
-
to2.broadcastAddressAndPort(), Collections.emptySet(),
-
to3.broadcastAddressAndPort(), Collections.emptySet(),
-
to4.broadcastAddressAndPort(), Collections.emptySet()));
+ Future<Long> f = FetchMinEpoch.fetch(from,
ImmutableSet.of(to1.broadcastAddressAndPort(),
+
to2.broadcastAddressAndPort(),
+
to3.broadcastAddressAndPort(),
+
to4.broadcastAddressAndPort()));
assertThat(f).isNotDone();
cluster.processAll();
assertThat(f).isDone();
@@ -201,10 +177,10 @@ public class FetchMinEpochTest
to4.broadcastAddressAndPort(), actionGen(rs, maxRetries));
from.messagingActions((self, msg, to) ->
nodeToActions.get(to).get());
- Future<Long> f = FetchMinEpoch.fetch(from,
ImmutableMap.of(to1.broadcastAddressAndPort(), Collections.emptySet(),
-
to2.broadcastAddressAndPort(), Collections.emptySet(),
-
to3.broadcastAddressAndPort(), Collections.emptySet(),
-
to4.broadcastAddressAndPort(), Collections.emptySet()));
+ Future<Long> f = FetchMinEpoch.fetch(from,
ImmutableSet.of(to1.broadcastAddressAndPort(),
+
to2.broadcastAddressAndPort(),
+
to3.broadcastAddressAndPort(),
+
to4.broadcastAddressAndPort()));
assertThat(f).isNotDone();
cluster.processAll();
assertThat(f).isDone();
@@ -235,53 +211,6 @@ public class FetchMinEpochTest
return safeActionGen.asSupplier(actionSource);
}
- private static void maybeSetPartitioner(FetchMinEpoch req)
- {
- IPartitioner partitioner = null;
- for (TokenRange r : req.ranges)
- {
- IPartitioner rangePartitioner = null;
- if (r.start().kindOfRoutingKey() != RoutingKeyKind.SENTINEL)
- rangePartitioner = r.start().token().getPartitioner();
- if (rangePartitioner == null && r.end().kindOfRoutingKey() !=
RoutingKeyKind.SENTINEL)
- rangePartitioner = r.end().token().getPartitioner();
- if (rangePartitioner == null)
- continue;
- if (partitioner == null)
- {
- partitioner = rangePartitioner;
- }
- else
- {
- Assertions.assertThat(rangePartitioner).isEqualTo(partitioner);
- }
- }
- if (partitioner != null)
- DatabaseDescriptor.setPartitionerUnsafe(partitioner);
- }
-
- private static MessageDelivery.FailedResponseException
getFailedResponseException(Future<Long> f) throws InterruptedException,
ExecutionException
- {
- MessageDelivery.FailedResponseException exception;
- try
- {
- f.get();
- Assert.fail("Future should have failed");
- throw new AssertionError("Unreachable");
- }
- catch (ExecutionException e)
- {
- if (e.getCause() instanceof
MessageDelivery.FailedResponseException)
- {
- exception = (MessageDelivery.FailedResponseException)
e.getCause();
- }
- else
- {
- throw e;
- }
- }
- return exception;
- }
private static MessageDelivery.MaxRetriesException
getMaxRetriesException(Future<Long> f) throws InterruptedException,
ExecutionException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]