This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e4ffc0c6abb8aafa81f11b0446897e27474ef1d5 Author: Alex Petrov <[email protected]> AuthorDate: Wed Oct 2 10:19:48 2024 +0200 Fix CASTest Make it possible for AtomicProcessor to respond to Fetch requests. Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-20018. --- .../schema/DistributedMetadataLogKeyspace.java | 4 +-- .../cassandra/service/accord/AccordService.java | 9 +++++- .../cassandra/tcm/AbstractLocalProcessor.java | 6 +--- .../cassandra/tcm/AtomicLongBackedProcessor.java | 14 +++++---- .../cassandra/tcm/ClusterMetadataService.java | 16 +++++++--- src/java/org/apache/cassandra/tcm/Epoch.java | 1 + src/java/org/apache/cassandra/tcm/FetchCMSLog.java | 22 +++++++++----- .../org/apache/cassandra/tcm/FetchPeerLog.java | 9 ++++-- .../apache/cassandra/tcm/PaxosBackedProcessor.java | 11 +++++-- .../org/apache/cassandra/tcm/PeerLogFetcher.java | 2 +- src/java/org/apache/cassandra/tcm/Processor.java | 34 +++++++++++++++------- .../apache/cassandra/tcm/ReconstructLogState.java | 30 +++++++++++++++---- .../org/apache/cassandra/tcm/RemoteProcessor.java | 15 ++++++---- .../cassandra/tcm/StubClusterMetadataService.java | 9 +++++- .../org/apache/cassandra/tcm/log/LocalLog.java | 4 +-- .../org/apache/cassandra/tcm/log/LogReader.java | 4 +-- .../org/apache/cassandra/tcm/log/LogStorage.java | 2 +- .../apache/cassandra/tcm/migration/Election.java | 3 +- .../cassandra/tcm/migration/GossipProcessor.java | 9 +++++- .../cassandra/distributed/test/CASTestBase.java | 3 ++ .../distributed/test/PaxosRepair2Test.java | 1 - .../test/log/CoordinatorPathTestBase.java | 13 +++++++-- .../distributed/test/log/ReconstructEpochTest.java | 10 +++---- .../distributed/test/log/TestProcessor.java | 11 +++++-- .../fuzz/topology/TopologyMixupTestBase.java | 7 +++-- .../tcm/ValidatingClusterMetadataService.java | 8 ++++- 26 files changed, 185 insertions(+), 72 deletions(-) diff --git a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java index 9a3eaaf49d..5fecc5756f 100644 --- a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java +++ b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java @@ -171,9 +171,9 @@ public final class DistributedMetadataLogKeyspace * here. One more alternative is to keep a lazily-initialized AccordTopology table on CMS nodes for a * number of recent epochs, and keep a node-local cache of this table on other nodes. */ - public static LogState getLogState(Epoch start, Epoch end) + public static LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot) { - return serialLogReader.getLogState(start, end); + return serialLogReader.getLogState(start, end, includeSnapshot); } public static class DistributedTableLogReader implements LogReader diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index b0b7edf71a..e73209da02 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -127,6 +127,7 @@ import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.journal.Params; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.AccordClientRequestMetrics; +import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessageDelivery; @@ -157,6 +158,7 @@ import org.apache.cassandra.service.consensus.migration.TableMigrationState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.Retry; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tracing.Tracing; @@ -551,7 +553,12 @@ public class AccordService implements IAccordService, Shutdownable public static List<ClusterMetadata> tcmLoadRange(long min, long max) { - List<ClusterMetadata> afterLoad = ClusterMetadataService.instance().processor().reconstructFull(Epoch.create(min), Epoch.create(max)); + List<ClusterMetadata> afterLoad = ClusterMetadataService.instance() + .processor() + .reconstruct(Epoch.create(min), Epoch.create(max), + Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), + TCMMetrics.instance.fetchLogRetries)); + if (Invariants.isParanoid()) Invariants.checkState(afterLoad.get(0).epoch.getEpoch() == min, "Unexpected epoch: expected %d but given %d", min, afterLoad.get(0).epoch.getEpoch()); while (!afterLoad.isEmpty() && afterLoad.get(0).epoch.getEpoch() < min) diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java index 6d126becc8..e5c58ef3ad 100644 --- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java @@ -172,7 +172,6 @@ public abstract class AbstractLocalProcessor implements Processor } } - private LogState toLogState(Transformation.Success success, Entry.Id entryId, Epoch lastKnown, Transformation transform) { if (lastKnown == null || lastKnown.isDirectlyBefore(success.metadata.epoch)) @@ -197,9 +196,6 @@ public abstract class AbstractLocalProcessor implements Processor return logState; } - - @Override public abstract ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy); protected abstract boolean tryCommitOne(Entry.Id entryId, Transformation transform, Epoch previousEpoch, Epoch nextEpoch); - -} \ No newline at end of file +} diff --git a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java index 9a43c3eee9..1bf81b6048 100644 --- a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java @@ -81,7 +81,13 @@ public class AtomicLongBackedProcessor extends AbstractLocalProcessor } @Override - public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) + { + return getLogState(start, end, includeSnapshot, retryPolicy); + } + + @Override + public LogState getLogState(Epoch lowEpoch, Epoch highEpoch, boolean includeSnapshot, Retry.Deadline retryPolicy) { try { @@ -130,11 +136,7 @@ public class AtomicLongBackedProcessor extends AbstractLocalProcessor @Override public synchronized LogState getLogState(Epoch startEpoch) { - ImmutableList.Builder<Entry> builder = ImmutableList.builder(); - ClusterMetadata latest = metadataSnapshots.getLatestSnapshot(); - Epoch actualSince = latest != null && latest.epoch.isAfter(startEpoch) ? latest.epoch : startEpoch; - entries.stream().filter(e -> e.epoch.isAfter(actualSince)).forEach(builder::add); - return new LogState(latest, builder.build()); + return getLogState(startEpoch, Epoch.MAX); } @Override diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 40012459e7..a10c696fef 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -160,15 +160,15 @@ public class ClusterMetadataService { log = logSpec.sync().withStorage(new AtomicLongBackedProcessor.InMemoryStorage()).createLog(); localProcessor = wrapProcessor.apply(new AtomicLongBackedProcessor(log, logSpec.isReset())); - fetchLogHandler = new FetchCMSLog.Handler((e, ignored) -> logSpec.storage().getLogState(e)); } else { log = logSpec.async().createLog(); localProcessor = wrapProcessor.apply(new PaxosBackedProcessor(log)); - fetchLogHandler = new FetchCMSLog.Handler(); } + fetchLogHandler = new FetchCMSLog.Handler(); + Commit.Replicator replicator = CassandraRelevantProperties.TCM_USE_NO_OP_REPLICATOR.getBoolean() ? Commit.Replicator.NO_OP : new Commit.DefaultReplicator(() -> log.metadata().directory); @@ -792,6 +792,7 @@ public class ClusterMetadataService { return commitsPaused.get(); } + /** * Switchable implementation that allow us to go between local and remote implementation whenever we need it. * When the node becomes a member of CMS, it switches back to being a regular member of a cluster, and all @@ -869,9 +870,16 @@ public class ClusterMetadataService return delegate().fetchLogAndWait(waitFor, retryPolicy); } - public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + @Override + public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) + { + return delegate().getLocalState(start, end, includeSnapshot, retryPolicy); + } + + @Override + public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) { - return delegate().reconstruct(lowEpoch, highEpoch, retryPolicy); + return delegate().getLogState(start, end, includeSnapshot, retryPolicy); } public String toString() diff --git a/src/java/org/apache/cassandra/tcm/Epoch.java b/src/java/org/apache/cassandra/tcm/Epoch.java index d2e451d068..79dc94ba34 100644 --- a/src/java/org/apache/cassandra/tcm/Epoch.java +++ b/src/java/org/apache/cassandra/tcm/Epoch.java @@ -57,6 +57,7 @@ public class Epoch implements Comparable<Epoch>, Serializable }; public static final Epoch FIRST = new Epoch(1); + public static final Epoch MAX = new Epoch(Long.MAX_VALUE); public static final Epoch EMPTY = new Epoch(0); public static final Epoch UPGRADE_STARTUP = new Epoch(Long.MIN_VALUE); public static final Epoch UPGRADE_GOSSIP = new Epoch(Long.MIN_VALUE + 1); diff --git a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java index 38ef550ba5..3878a9c4cb 100644 --- a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java +++ b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java @@ -19,11 +19,13 @@ package org.apache.cassandra.tcm; import java.io.IOException; -import java.util.function.BiFunction; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; @@ -32,7 +34,6 @@ import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.utils.FBUtilities; @@ -89,16 +90,16 @@ public class FetchCMSLog * to node-local (which only relevant in cases of CMS expansions/shrinks, and can only be requested by the * CMS node that collects the highest epoch from the quorum of peers). */ - private final BiFunction<Epoch, Boolean, LogState> logStateSupplier; + private final Supplier<Processor> processor; public Handler() { - this(DistributedMetadataLogKeyspace::getLogState); + this(() -> ClusterMetadataService.instance().processor()); } - public Handler(BiFunction<Epoch, Boolean, LogState> logStateSupplier) + public Handler(Supplier<Processor> processor) { - this.logStateSupplier = logStateSupplier; + this.processor = processor; } public void doVerb(Message<FetchCMSLog> message) throws IOException @@ -114,7 +115,14 @@ public class FetchCMSLog // If both we and the other node believe it should be caught up with a linearizable read boolean consistentFetch = request.consistentFetch && !ClusterMetadataService.instance().isCurrentMember(message.from()); - LogState delta = logStateSupplier.apply(message.payload.lowerBound, consistentFetch); + Retry.Deadline retry = Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), + TCMMetrics.instance.fetchLogRetries); + LogState delta; + if (consistentFetch) + delta = processor.get().getLogState(message.payload.lowerBound, Epoch.MAX, false, retry); + else + delta = processor.get().getLocalState(message.payload.lowerBound, Epoch.MAX, false, retry); + TCMMetrics.instance.cmsLogEntriesServed(message.payload.lowerBound, delta.latestEpoch()); logger.info("Responding to {}({}) with log delta: {}", message.from(), request, delta); MessagingService.instance().send(message.responseWith(delta), message.from()); diff --git a/src/java/org/apache/cassandra/tcm/FetchPeerLog.java b/src/java/org/apache/cassandra/tcm/FetchPeerLog.java index 1347dcf049..1e79d6cb7c 100644 --- a/src/java/org/apache/cassandra/tcm/FetchPeerLog.java +++ b/src/java/org/apache/cassandra/tcm/FetchPeerLog.java @@ -19,10 +19,12 @@ package org.apache.cassandra.tcm; import java.io.IOException; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -31,7 +33,6 @@ import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.tcm.log.LogState; -import org.apache.cassandra.tcm.log.LogStorage; public class FetchPeerLog { @@ -82,7 +83,11 @@ public class FetchPeerLog ClusterMetadata metadata = ClusterMetadata.current(); logger.info("Received peer log fetch request {} from {}: start = {}, current = {}", request, message.from(), message.payload.start, metadata.epoch); - LogState delta = LogStorage.SystemKeyspace.getLogState(message.payload.start); + LogState delta = ClusterMetadataService.instance() + .processor() + .getLocalState(message.payload.start, Epoch.MAX, false, + Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), + new Retry.Jitter(TCMMetrics.instance.fetchLogRetries))); TCMMetrics.instance.peerLogEntriesServed(message.payload.start, delta.latestEpoch()); logger.info("Responding with log delta: {}", delta); MessagingService.instance().send(message.responseWith(delta), message.from()); diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java index dbaac24041..dcdca627db 100644 --- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java +++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java @@ -167,9 +167,16 @@ public class PaxosBackedProcessor extends AbstractLocalProcessor throw new ReadTimeoutException(ConsistencyLevel.QUORUM, blockFor - collected.size(), blockFor, false); } - public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + @Override + public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) + { + return log.storage().getLogState(start, end, includeSnapshot); + } + + @Override + public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) { - return DistributedMetadataLogKeyspace.getLogState(lowEpoch, highEpoch); + return DistributedMetadataLogKeyspace.getLogState(start, end, includeSnapshot); } private static <T> T unwrap(Promise<T> promise) diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java index 3564ab93f7..7192551c68 100644 --- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java +++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java @@ -108,7 +108,7 @@ public class PeerLogFetcher } else { - throw new IllegalStateException(String.format("Queried for epoch %s, but could not catch up", awaitAtleast)); + throw new IllegalStateException(String.format("Queried for epoch %s, but could not catch up. Current epoch: %s", awaitAtleast, fetched.epoch)); } }); diff --git a/src/java/org/apache/cassandra/tcm/Processor.java b/src/java/org/apache/cassandra/tcm/Processor.java index 5558b253d6..98ab0232ea 100644 --- a/src/java/org/apache/cassandra/tcm/Processor.java +++ b/src/java/org/apache/cassandra/tcm/Processor.java @@ -23,13 +23,12 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import accord.utils.Invariants; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.tcm.log.Entry; import org.apache.cassandra.tcm.log.LogState; -import static java.util.concurrent.TimeUnit.NANOSECONDS; - public interface Processor { /** @@ -78,23 +77,36 @@ public interface Processor ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy); - LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy); + /** + * Queries node's _local_ state. It is not guaranteed to be contiguous, but can be used for restoring CMS state/ + */ + LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy); + + /** + * Queries global log state. + */ + LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy); - default List<ClusterMetadata> reconstructFull(Epoch lowEpoch, Epoch highEpoch) + /** + * Reconstructs + */ + default List<ClusterMetadata> reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) { - LogState logState = reconstruct(lowEpoch, highEpoch, Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS), - TCMMetrics.instance.commitRetries)); + LogState logState = getLogState(lowEpoch, highEpoch, true, retryPolicy); if (logState.isEmpty()) return Collections.emptyList(); List<ClusterMetadata> cms = new ArrayList<>(logState.entries.size()); - ClusterMetadata accum = logState.baseState; - cms.add(accum); + + ClusterMetadata acc = logState.baseState; + cms.add(acc); for (Entry entry : logState.entries) { - Transformation.Result res = entry.transform.execute(accum); + Invariants.checkState(entry.epoch.isDirectlyAfter(acc.epoch), "%s should have been directly after %s", entry.epoch, acc.epoch); + Transformation.Result res = entry.transform.execute(acc); assert res.isSuccess() : res.toString(); - accum = res.success().metadata; - cms.add(accum); + acc = res.success().metadata; + cms.add(acc); } return cms; } + } diff --git a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java index f6a60f070a..c8930853ad 100644 --- a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java +++ b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java @@ -19,7 +19,11 @@ package org.apache.cassandra.tcm; import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -27,7 +31,6 @@ import org.apache.cassandra.metrics.TCMMetrics; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; import org.apache.cassandra.tcm.log.LogState; import org.apache.cassandra.utils.FBUtilities; @@ -37,11 +40,13 @@ public class ReconstructLogState public final Epoch lowerBound; public final Epoch higherBound; + public final boolean includeSnapshot; - public ReconstructLogState(Epoch lowerBound, Epoch higherBound) + public ReconstructLogState(Epoch lowerBound, Epoch higherBound, boolean includeSnapshot) { this.lowerBound = lowerBound; this.higherBound = higherBound; + this.includeSnapshot = includeSnapshot; } static class Serializer implements IVersionedSerializer<ReconstructLogState> @@ -51,19 +56,21 @@ public class ReconstructLogState { Epoch.serializer.serialize(t.lowerBound, out); Epoch.serializer.serialize(t.higherBound, out); + out.writeBoolean(t.includeSnapshot); } public ReconstructLogState deserialize(DataInputPlus in, int version) throws IOException { Epoch lowerBound = Epoch.serializer.deserialize(in); Epoch higherBound = Epoch.serializer.deserialize(in); - return new ReconstructLogState(lowerBound, higherBound); + return new ReconstructLogState(lowerBound, higherBound, in.readBoolean()); } public long serializedSize(ReconstructLogState t, int version) { return Epoch.serializer.serializedSize(t.lowerBound) + - Epoch.serializer.serializedSize(t.higherBound); + Epoch.serializer.serializedSize(t.higherBound) + + TypeSizes.BOOL_SIZE; } } @@ -71,6 +78,16 @@ public class ReconstructLogState { public static final Handler instance = new Handler(); + private final Supplier<Processor> processor; + + public Handler() + { + this(() -> ClusterMetadataService.instance().processor()); + } + public Handler(Supplier<Processor> processor) + { + this.processor = processor; + } public void doVerb(Message<ReconstructLogState> message) throws IOException { TCMMetrics.instance.reconstructLogStateCall.mark(); @@ -79,7 +96,10 @@ public class ReconstructLogState if (!ClusterMetadataService.instance().isCurrentMember(FBUtilities.getBroadcastAddressAndPort())) throw new NotCMSException("This node is not in the CMS, can't generate a consistent log fetch response to " + message.from()); - LogState result = DistributedMetadataLogKeyspace.getLogState(request.lowerBound, request.higherBound); + LogState result = processor.get().getLogState(request.lowerBound, request.higherBound, request.includeSnapshot, + Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), + TCMMetrics.instance.fetchLogRetries)); + MessagingService.instance().send(message.responseWith(result), message.from()); } } diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java index 635be54cf9..e9417adfec 100644 --- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java +++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java @@ -152,7 +152,13 @@ public final class RemoteProcessor implements Processor } @Override - public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) + { + return log.getLocalEntries(start, end, includeSnapshot); + } + + @Override + public LogState getLogState(Epoch lowEpoch, Epoch highEpoch, boolean includeSnapshot, Retry.Deadline retryPolicy) { try { @@ -160,9 +166,9 @@ public final class RemoteProcessor implements Processor List<InetAddressAndPort> candidates = new ArrayList<>(log.metadata().fullCMSMembers()); sendWithCallbackAsync(request, Verb.TCM_RECONSTRUCT_EPOCH_REQ, - new ReconstructLogState(lowEpoch, highEpoch), + new ReconstructLogState(lowEpoch, highEpoch, includeSnapshot), new CandidateIterator(candidates), - new Retry.Backoff(TCMMetrics.instance.fetchLogRetries)); + retryPolicy); return request.get(retryPolicy.remainingNanos(), TimeUnit.NANOSECONDS); } catch (InterruptedException e) @@ -187,8 +193,7 @@ public final class RemoteProcessor implements Processor } } - private static Future<ClusterMetadata> fetchLogAndWaitInternal(CandidateIterator candidates, - LocalLog log) + private static Future<ClusterMetadata> fetchLogAndWaitInternal(CandidateIterator candidates, LocalLog log) { try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.time()) { diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java index 47a2ba26f1..72cf2e7aa9 100644 --- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java @@ -155,7 +155,14 @@ public class StubClusterMetadataService extends ClusterMetadataService throw new UnsupportedOperationException(); } - public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + @Override + public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) + { + throw new UnsupportedOperationException(); + } + + @Override + public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) { throw new UnsupportedOperationException(); } diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java b/src/java/org/apache/cassandra/tcm/log/LocalLog.java index 0bcb982f1a..c737814c94 100644 --- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java +++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java @@ -351,9 +351,9 @@ public abstract class LocalLog implements Closeable return storage.getLogState(since, false); } - public LogState getLocalEntries(Epoch since, Epoch until) + public LogState getLocalEntries(Epoch since, Epoch until, boolean includeSnapshot) { - return storage.getLogState(since, until); + return storage.getLogState(since, until, includeSnapshot); } public ClusterMetadata waitForHighestConsecutive() diff --git a/src/java/org/apache/cassandra/tcm/log/LogReader.java b/src/java/org/apache/cassandra/tcm/log/LogReader.java index b1e7ab3264..effc4d7561 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogReader.java +++ b/src/java/org/apache/cassandra/tcm/log/LogReader.java @@ -120,7 +120,7 @@ public interface LogReader } } - default LogState getLogState(Epoch start, Epoch end) + default LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot) { try { @@ -136,7 +136,7 @@ public interface LogReader { if (entry.epoch.isAfter(start)) entries.add(entry); - else + else if (includeSnapshot) closestSnapshot = entry.transform.execute(closestSnapshot).success().metadata; } return new LogState(closestSnapshot, entries.build()); diff --git a/src/java/org/apache/cassandra/tcm/log/LogStorage.java b/src/java/org/apache/cassandra/tcm/log/LogStorage.java index 7772d7d07e..e739a8aae7 100644 --- a/src/java/org/apache/cassandra/tcm/log/LogStorage.java +++ b/src/java/org/apache/cassandra/tcm/log/LogStorage.java @@ -57,7 +57,7 @@ public interface LogStorage extends LogReader } @Override - public LogState getLogState(Epoch start, Epoch end) + public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot) { return LogState.EMPTY; } diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java b/src/java/org/apache/cassandra/tcm/migration/Election.java index c2fe67439a..f80170b4e2 100644 --- a/src/java/org/apache/cassandra/tcm/migration/Election.java +++ b/src/java/org/apache/cassandra/tcm/migration/Election.java @@ -141,7 +141,8 @@ public class Election Register.maybeRegister(); updateInitiator(currentCoordinator, MIGRATED); - MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false)); + MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, + DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false)); } private void abort(Set<InetAddressAndPort> sendTo) diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java index 6c02318f48..36baa59eb3 100644 --- a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java +++ b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java @@ -41,7 +41,14 @@ public class GossipProcessor implements Processor return ClusterMetadata.current(); } - public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + @Override + public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) + { + throw new IllegalStateException("Can't reconstruct log state when running in gossip mode. Enable the ClusterMetadataService with `nodetool addtocms`."); + } + + @Override + public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) { throw new IllegalStateException("Can't reconstruct log state when running in gossip mode. Enable the ClusterMetadataService with `nodetool addtocms`."); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java index 58b47cc9b7..1c441ded3e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java @@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.test; import java.util.Collections; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert; @@ -185,6 +186,8 @@ public abstract class CASTestBase extends TestBaseImpl public static void assertVisibleInRing(IInstance peer) { InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(peer.broadcastAddress()); + long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(30); + while (System.nanoTime() < deadline && !Gossiper.instance.isAlive(endpoint)); Assert.assertTrue(Gossiper.instance.isAlive(endpoint)); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java index 88ef1af750..ed066b2c3b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java +++ b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java @@ -189,7 +189,6 @@ public class PaxosRepair2Test extends TestBaseImpl Ballot staleBallot = Paxos.newBallot(Ballot.none(), org.apache.cassandra.db.ConsistencyLevel.SERIAL); try (Cluster cluster = init(Cluster.create(3, cfg -> cfg .set("paxos_variant", "v2") - .set("accord.enabled", false) // this test monkeys with TCM which can cause confussion for Accord while it fetches epochs... .set("paxos_purge_grace_period", "0s") .set("truncate_request_timeout_in_ms", 1000L))) ) diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java index 59fdc0e6e4..4e8d4bd905 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java @@ -734,6 +734,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase log, new Processor() { + @Override public Commit.Result commit(Entry.Id entryId, Transformation event, Epoch lastKnown, Retry.Deadline retryPolicy) { if (lastKnown == null) @@ -747,6 +748,7 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase return result; } + @Override public ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy) { Epoch since = log.waitForHighestConsecutive().epoch; @@ -755,9 +757,16 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase return log.waitForHighestConsecutive(); } - public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + @Override + public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) { - return log.getLocalEntries(lowEpoch, highEpoch); + return getLogState(start, end, includeSnapshot, retryPolicy); + } + + @Override + public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) + { + return log.getLocalEntries(start, end, includeSnapshot); } }, (a,b) -> {}, diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java index e89502e90c..cf795048c3 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java @@ -54,11 +54,11 @@ public class ReconstructEpochTest extends TestBaseImpl for (int[] cfg : new int[][]{ new int[]{ 6, 9 }, new int[]{ 2, 20 }, new int[]{ 5, 5 }, - new int[]{ 15, 20 }}) + new int[]{ 15, 20 } }) { int start = cfg[0]; int end = cfg[1]; - LogState logState = DistributedMetadataLogKeyspace.getLogState(Epoch.create(start), Epoch.create(end)); + LogState logState = DistributedMetadataLogKeyspace.getLogState(Epoch.create(start), Epoch.create(end), true); Assert.assertEquals(start, logState.baseState.epoch.getEpoch()); Iterator<Entry> iter = logState.entries.iterator(); for (int i = start + 1; i <= end; i++) @@ -71,14 +71,15 @@ public class ReconstructEpochTest extends TestBaseImpl for (int[] cfg : new int[][]{ new int[]{ 6, 9 }, new int[]{ 2, 20 }, new int[]{ 5, 5 }, - new int[]{ 15, 20 }}) + new int[]{ 15, 20 } }) { int start = cfg[0]; int end = cfg[1]; LogState logState = ClusterMetadataService.instance() .processor() - .reconstruct(Epoch.create(start), + .getLogState(Epoch.create(start), Epoch.create(end), + true, Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS), TCMMetrics.instance.commitRetries)); @@ -90,5 +91,4 @@ public class ReconstructEpochTest extends TestBaseImpl }); } } - } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java index 6f359af057..6ee5e975ea 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java @@ -70,9 +70,16 @@ public class TestProcessor implements Processor return delegate.fetchLogAndWait(waitFor, retryPolicy); } - public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + @Override + public LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) + { + return delegate.getLocalState(start, end, includeSnapshot, retryPolicy); + } + + @Override + public LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy) { - return delegate.reconstruct(lowEpoch, highEpoch, retryPolicy); + return delegate.getLogState(start, end, includeSnapshot, retryPolicy); } protected void waitIfPaused() diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java index 5d974a4f17..834d2a9b81 100644 --- a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java +++ b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java @@ -530,8 +530,11 @@ public abstract class TopologyMixupTestBase<S extends TopologyMixupTestBase.Sche public void close() throws Exception { epochHistory = cluster.get(cmsGroup[0]).callOnInstance(() -> { - LogState all = ClusterMetadataService.instance().processor().reconstruct(Epoch.EMPTY, Epoch.create(Long.MAX_VALUE), Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS), - TCMMetrics.instance.commitRetries)); + LogState all = ClusterMetadataService.instance() + .processor() + .getLogState(Epoch.EMPTY, Epoch.create(Long.MAX_VALUE), false, + Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS), + TCMMetrics.instance.commitRetries)); StringBuilder sb = new StringBuilder("Epochs:"); for (Entry e : all.entries) sb.append("\n").append(e.epoch.getEpoch()).append(": ").append(e.transform); diff --git a/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java b/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java index 128fdeca7b..0d7bbf7f8e 100644 --- a/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java +++ b/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java @@ -132,7 +132,13 @@ public class ValidatingClusterMetadataService extends StubClusterMetadataService } @Override - public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy) + public LogState getLocalState(Epoch lowEpoch, Epoch highEpoch, boolean includeSnapshot, Retry.Deadline retryPolicy) + { + return getLogState(lowEpoch, highEpoch, includeSnapshot, retryPolicy); + } + + @Override + public LogState getLogState(Epoch lowEpoch, Epoch highEpoch, boolean includeSnapshot, Retry.Deadline retryPolicy) { if (!epochs.containsKey(lowEpoch)) throw new AssertionError("Unknown epoch: " + lowEpoch); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
