This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e4ffc0c6abb8aafa81f11b0446897e27474ef1d5
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Oct 2 10:19:48 2024 +0200

    Fix CASTest
    
    Make it possible for AtomicProcessor to respond to Fetch requests.
    
    Patch by Alex Petrov; reviewed by David Capwell for CASSANDRA-20018.
---
 .../schema/DistributedMetadataLogKeyspace.java     |  4 +--
 .../cassandra/service/accord/AccordService.java    |  9 +++++-
 .../cassandra/tcm/AbstractLocalProcessor.java      |  6 +---
 .../cassandra/tcm/AtomicLongBackedProcessor.java   | 14 +++++----
 .../cassandra/tcm/ClusterMetadataService.java      | 16 +++++++---
 src/java/org/apache/cassandra/tcm/Epoch.java       |  1 +
 src/java/org/apache/cassandra/tcm/FetchCMSLog.java | 22 +++++++++-----
 .../org/apache/cassandra/tcm/FetchPeerLog.java     |  9 ++++--
 .../apache/cassandra/tcm/PaxosBackedProcessor.java | 11 +++++--
 .../org/apache/cassandra/tcm/PeerLogFetcher.java   |  2 +-
 src/java/org/apache/cassandra/tcm/Processor.java   | 34 +++++++++++++++-------
 .../apache/cassandra/tcm/ReconstructLogState.java  | 30 +++++++++++++++----
 .../org/apache/cassandra/tcm/RemoteProcessor.java  | 15 ++++++----
 .../cassandra/tcm/StubClusterMetadataService.java  |  9 +++++-
 .../org/apache/cassandra/tcm/log/LocalLog.java     |  4 +--
 .../org/apache/cassandra/tcm/log/LogReader.java    |  4 +--
 .../org/apache/cassandra/tcm/log/LogStorage.java   |  2 +-
 .../apache/cassandra/tcm/migration/Election.java   |  3 +-
 .../cassandra/tcm/migration/GossipProcessor.java   |  9 +++++-
 .../cassandra/distributed/test/CASTestBase.java    |  3 ++
 .../distributed/test/PaxosRepair2Test.java         |  1 -
 .../test/log/CoordinatorPathTestBase.java          | 13 +++++++--
 .../distributed/test/log/ReconstructEpochTest.java | 10 +++----
 .../distributed/test/log/TestProcessor.java        | 11 +++++--
 .../fuzz/topology/TopologyMixupTestBase.java       |  7 +++--
 .../tcm/ValidatingClusterMetadataService.java      |  8 ++++-
 26 files changed, 185 insertions(+), 72 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java 
b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java
index 9a3eaaf49d..5fecc5756f 100644
--- a/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/DistributedMetadataLogKeyspace.java
@@ -171,9 +171,9 @@ public final class DistributedMetadataLogKeyspace
      *  here. One more alternative is to keep a lazily-initialized 
AccordTopology table on CMS nodes for a
      *  number of recent epochs, and keep a node-local cache of this table on 
other nodes.
      */
-    public static LogState getLogState(Epoch start, Epoch end)
+    public static LogState getLogState(Epoch start, Epoch end, boolean 
includeSnapshot)
     {
-        return serialLogReader.getLogState(start, end);
+        return serialLogReader.getLogState(start, end, includeSnapshot);
     }
 
     public static class DistributedTableLogReader implements LogReader
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index b0b7edf71a..e73209da02 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -127,6 +127,7 @@ import 
org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.journal.Params;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.metrics.AccordClientRequestMetrics;
+import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageDelivery;
@@ -157,6 +158,7 @@ import 
org.apache.cassandra.service.consensus.migration.TableMigrationState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.Retry;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.ownership.DataPlacement;
 import org.apache.cassandra.tracing.Tracing;
@@ -551,7 +553,12 @@ public class AccordService implements IAccordService, 
Shutdownable
 
     public static List<ClusterMetadata> tcmLoadRange(long min, long max)
     {
-        List<ClusterMetadata> afterLoad = 
ClusterMetadataService.instance().processor().reconstructFull(Epoch.create(min),
 Epoch.create(max));
+        List<ClusterMetadata> afterLoad = ClusterMetadataService.instance()
+                                                                .processor()
+                                                                
.reconstruct(Epoch.create(min), Epoch.create(max),
+                                                                             
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+                                                                               
                               TCMMetrics.instance.fetchLogRetries));
+
         if (Invariants.isParanoid())
             Invariants.checkState(afterLoad.get(0).epoch.getEpoch() == min, 
"Unexpected epoch: expected %d but given %d", min, 
afterLoad.get(0).epoch.getEpoch());
         while (!afterLoad.isEmpty() && afterLoad.get(0).epoch.getEpoch() < min)
diff --git a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java 
b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
index 6d126becc8..e5c58ef3ad 100644
--- a/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AbstractLocalProcessor.java
@@ -172,7 +172,6 @@ public abstract class AbstractLocalProcessor implements 
Processor
         }
     }
 
-
     private LogState toLogState(Transformation.Success success, Entry.Id 
entryId, Epoch lastKnown, Transformation transform)
     {
         if (lastKnown == null || 
lastKnown.isDirectlyBefore(success.metadata.epoch))
@@ -197,9 +196,6 @@ public abstract class AbstractLocalProcessor implements 
Processor
         return logState;
     }
 
-
-    @Override
     public abstract ClusterMetadata fetchLogAndWait(Epoch waitFor, 
Retry.Deadline retryPolicy);
     protected abstract boolean tryCommitOne(Entry.Id entryId, Transformation 
transform, Epoch previousEpoch, Epoch nextEpoch);
-
-}
\ No newline at end of file
+}
diff --git a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java 
b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
index 9a43c3eee9..1bf81b6048 100644
--- a/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/AtomicLongBackedProcessor.java
@@ -81,7 +81,13 @@ public class AtomicLongBackedProcessor extends 
AbstractLocalProcessor
     }
 
     @Override
-    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    public LogState getLocalState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
+    {
+        return getLogState(start, end, includeSnapshot, retryPolicy);
+    }
+
+    @Override
+    public LogState getLogState(Epoch lowEpoch, Epoch highEpoch, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
     {
         try
         {
@@ -130,11 +136,7 @@ public class AtomicLongBackedProcessor extends 
AbstractLocalProcessor
         @Override
         public synchronized LogState getLogState(Epoch startEpoch)
         {
-            ImmutableList.Builder<Entry> builder = ImmutableList.builder();
-            ClusterMetadata latest = metadataSnapshots.getLatestSnapshot();
-            Epoch actualSince = latest != null && 
latest.epoch.isAfter(startEpoch) ? latest.epoch : startEpoch;
-            entries.stream().filter(e -> 
e.epoch.isAfter(actualSince)).forEach(builder::add);
-            return new LogState(latest, builder.build());
+            return getLogState(startEpoch, Epoch.MAX);
         }
 
         @Override
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 40012459e7..a10c696fef 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -160,15 +160,15 @@ public class ClusterMetadataService
         {
             log = logSpec.sync().withStorage(new 
AtomicLongBackedProcessor.InMemoryStorage()).createLog();
             localProcessor = wrapProcessor.apply(new 
AtomicLongBackedProcessor(log, logSpec.isReset()));
-            fetchLogHandler = new FetchCMSLog.Handler((e, ignored) -> 
logSpec.storage().getLogState(e));
         }
         else
         {
             log = logSpec.async().createLog();
             localProcessor = wrapProcessor.apply(new 
PaxosBackedProcessor(log));
-            fetchLogHandler = new FetchCMSLog.Handler();
         }
 
+        fetchLogHandler = new FetchCMSLog.Handler();
+
         Commit.Replicator replicator = 
CassandraRelevantProperties.TCM_USE_NO_OP_REPLICATOR.getBoolean()
                                        ? Commit.Replicator.NO_OP
                                        : new Commit.DefaultReplicator(() -> 
log.metadata().directory);
@@ -792,6 +792,7 @@ public class ClusterMetadataService
     {
         return commitsPaused.get();
     }
+
     /**
      * Switchable implementation that allow us to go between local and remote 
implementation whenever we need it.
      * When the node becomes a member of CMS, it switches back to being a 
regular member of a cluster, and all
@@ -869,9 +870,16 @@ public class ClusterMetadataService
             return delegate().fetchLogAndWait(waitFor, retryPolicy);
         }
 
-        public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+        @Override
+        public LogState getLocalState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
+        {
+            return delegate().getLocalState(start, end, includeSnapshot, 
retryPolicy);
+        }
+
+        @Override
+        public LogState getLogState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
         {
-            return delegate().reconstruct(lowEpoch, highEpoch, retryPolicy);
+            return delegate().getLogState(start, end, includeSnapshot, 
retryPolicy);
         }
 
         public String toString()
diff --git a/src/java/org/apache/cassandra/tcm/Epoch.java 
b/src/java/org/apache/cassandra/tcm/Epoch.java
index d2e451d068..79dc94ba34 100644
--- a/src/java/org/apache/cassandra/tcm/Epoch.java
+++ b/src/java/org/apache/cassandra/tcm/Epoch.java
@@ -57,6 +57,7 @@ public class Epoch implements Comparable<Epoch>, Serializable
     };
 
     public static final Epoch FIRST = new Epoch(1);
+    public static final Epoch MAX = new Epoch(Long.MAX_VALUE);
     public static final Epoch EMPTY = new Epoch(0);
     public static final Epoch UPGRADE_STARTUP = new Epoch(Long.MIN_VALUE);
     public static final Epoch UPGRADE_GOSSIP = new Epoch(Long.MIN_VALUE + 1);
diff --git a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java 
b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
index 38ef550ba5..3878a9c4cb 100644
--- a/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
+++ b/src/java/org/apache/cassandra/tcm/FetchCMSLog.java
@@ -19,11 +19,13 @@
 package org.apache.cassandra.tcm;
 
 import java.io.IOException;
-import java.util.function.BiFunction;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -32,7 +34,6 @@ import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
 import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -89,16 +90,16 @@ public class FetchCMSLog
          * to node-local (which only relevant in cases of CMS 
expansions/shrinks, and can only be requested by the
          * CMS node that collects the highest epoch from the quorum of peers).
          */
-        private final BiFunction<Epoch, Boolean, LogState> logStateSupplier;
+        private final Supplier<Processor> processor;
 
         public Handler()
         {
-            this(DistributedMetadataLogKeyspace::getLogState);
+            this(() -> ClusterMetadataService.instance().processor());
         }
 
-        public Handler(BiFunction<Epoch, Boolean, LogState> logStateSupplier)
+        public Handler(Supplier<Processor> processor)
         {
-            this.logStateSupplier = logStateSupplier;
+            this.processor = processor;
         }
 
         public void doVerb(Message<FetchCMSLog> message) throws IOException
@@ -114,7 +115,14 @@ public class FetchCMSLog
             // If both we and the other node believe it should be caught up 
with a linearizable read
             boolean consistentFetch = request.consistentFetch && 
!ClusterMetadataService.instance().isCurrentMember(message.from());
 
-            LogState delta = 
logStateSupplier.apply(message.payload.lowerBound, consistentFetch);
+            Retry.Deadline retry = 
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+                                                                    
TCMMetrics.instance.fetchLogRetries);
+            LogState delta;
+            if (consistentFetch)
+                delta = 
processor.get().getLogState(message.payload.lowerBound, Epoch.MAX, false, 
retry);
+            else
+                delta = 
processor.get().getLocalState(message.payload.lowerBound, Epoch.MAX, false, 
retry);
+
             
TCMMetrics.instance.cmsLogEntriesServed(message.payload.lowerBound, 
delta.latestEpoch());
             logger.info("Responding to {}({}) with log delta: {}", 
message.from(), request, delta);
             MessagingService.instance().send(message.responseWith(delta), 
message.from());
diff --git a/src/java/org/apache/cassandra/tcm/FetchPeerLog.java 
b/src/java/org/apache/cassandra/tcm/FetchPeerLog.java
index 1347dcf049..1e79d6cb7c 100644
--- a/src/java/org/apache/cassandra/tcm/FetchPeerLog.java
+++ b/src/java/org/apache/cassandra/tcm/FetchPeerLog.java
@@ -19,10 +19,12 @@
 package org.apache.cassandra.tcm;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -31,7 +33,6 @@ import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.tcm.log.LogState;
-import org.apache.cassandra.tcm.log.LogStorage;
 
 public class FetchPeerLog
 {
@@ -82,7 +83,11 @@ public class FetchPeerLog
 
             ClusterMetadata metadata = ClusterMetadata.current();
             logger.info("Received peer log fetch request {} from {}: start = 
{}, current = {}", request, message.from(), message.payload.start, 
metadata.epoch);
-            LogState delta = 
LogStorage.SystemKeyspace.getLogState(message.payload.start);
+            LogState delta = ClusterMetadataService.instance()
+                                                   .processor()
+                                                   
.getLocalState(message.payload.start, Epoch.MAX, false,
+                                                                  
Retry.Deadline.after(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+                                                                               
        new Retry.Jitter(TCMMetrics.instance.fetchLogRetries)));
             TCMMetrics.instance.peerLogEntriesServed(message.payload.start, 
delta.latestEpoch());
             logger.info("Responding with log delta: {}", delta);
             MessagingService.instance().send(message.responseWith(delta), 
message.from());
diff --git a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java 
b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
index dbaac24041..dcdca627db 100644
--- a/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/PaxosBackedProcessor.java
@@ -167,9 +167,16 @@ public class PaxosBackedProcessor extends 
AbstractLocalProcessor
         throw new ReadTimeoutException(ConsistencyLevel.QUORUM, blockFor - 
collected.size(), blockFor, false);
     }
 
-    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    @Override
+    public LogState getLocalState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
+    {
+        return log.storage().getLogState(start, end, includeSnapshot);
+    }
+
+    @Override
+    public LogState getLogState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
     {
-        return DistributedMetadataLogKeyspace.getLogState(lowEpoch, highEpoch);
+        return DistributedMetadataLogKeyspace.getLogState(start, end, 
includeSnapshot);
     }
 
     private static <T> T unwrap(Promise<T> promise)
diff --git a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java 
b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
index 3564ab93f7..7192551c68 100644
--- a/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
+++ b/src/java/org/apache/cassandra/tcm/PeerLogFetcher.java
@@ -108,7 +108,7 @@ public class PeerLogFetcher
                 }
                 else
                 {
-                    throw new IllegalStateException(String.format("Queried for 
epoch %s, but could not catch up", awaitAtleast));
+                    throw new IllegalStateException(String.format("Queried for 
epoch %s, but could not catch up. Current epoch: %s", awaitAtleast, 
fetched.epoch));
                 }
             });
 
diff --git a/src/java/org/apache/cassandra/tcm/Processor.java 
b/src/java/org/apache/cassandra/tcm/Processor.java
index 5558b253d6..98ab0232ea 100644
--- a/src/java/org/apache/cassandra/tcm/Processor.java
+++ b/src/java/org/apache/cassandra/tcm/Processor.java
@@ -23,13 +23,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import accord.utils.Invariants;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.tcm.log.Entry;
 import org.apache.cassandra.tcm.log.LogState;
 
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-
 public interface Processor
 {
     /**
@@ -78,23 +77,36 @@ public interface Processor
 
     ClusterMetadata fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy);
 
-    LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, Retry.Deadline 
retryPolicy);
+    /**
+     * Queries node's _local_ state. It is not guaranteed to be contiguous, 
but can be used for restoring CMS state/
+     */
+    LogState getLocalState(Epoch start, Epoch end, boolean includeSnapshot, 
Retry.Deadline retryPolicy);
+
+    /**
+     * Queries global log state.
+     */
+    LogState getLogState(Epoch start, Epoch end, boolean includeSnapshot, 
Retry.Deadline retryPolicy);
 
-    default List<ClusterMetadata> reconstructFull(Epoch lowEpoch, Epoch 
highEpoch)
+    /**
+     * Reconstructs
+     */
+    default List<ClusterMetadata> reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
     {
-        LogState logState = reconstruct(lowEpoch, highEpoch, 
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
-                                                                               
               TCMMetrics.instance.commitRetries));
+        LogState logState = getLogState(lowEpoch, highEpoch, true, 
retryPolicy);
         if (logState.isEmpty()) return Collections.emptyList();
         List<ClusterMetadata> cms = new ArrayList<>(logState.entries.size());
-        ClusterMetadata accum = logState.baseState;
-        cms.add(accum);
+
+        ClusterMetadata acc = logState.baseState;
+        cms.add(acc);
         for (Entry entry : logState.entries)
         {
-            Transformation.Result res = entry.transform.execute(accum);
+            Invariants.checkState(entry.epoch.isDirectlyAfter(acc.epoch), "%s 
should have been directly after %s", entry.epoch, acc.epoch);
+            Transformation.Result res = entry.transform.execute(acc);
             assert res.isSuccess() : res.toString();
-            accum = res.success().metadata;
-            cms.add(accum);
+            acc = res.success().metadata;
+            cms.add(acc);
         }
         return cms;
     }
+
 }
diff --git a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java 
b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
index f6a60f070a..c8930853ad 100644
--- a/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
+++ b/src/java/org/apache/cassandra/tcm/ReconstructLogState.java
@@ -19,7 +19,11 @@
 package org.apache.cassandra.tcm;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -27,7 +31,6 @@ import org.apache.cassandra.metrics.TCMMetrics;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
 import org.apache.cassandra.tcm.log.LogState;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -37,11 +40,13 @@ public class ReconstructLogState
 
     public final Epoch lowerBound;
     public final Epoch higherBound;
+    public final boolean includeSnapshot;
 
-    public ReconstructLogState(Epoch lowerBound, Epoch higherBound)
+    public ReconstructLogState(Epoch lowerBound, Epoch higherBound, boolean 
includeSnapshot)
     {
         this.lowerBound = lowerBound;
         this.higherBound = higherBound;
+        this.includeSnapshot = includeSnapshot;
     }
 
     static class Serializer implements 
IVersionedSerializer<ReconstructLogState>
@@ -51,19 +56,21 @@ public class ReconstructLogState
         {
             Epoch.serializer.serialize(t.lowerBound, out);
             Epoch.serializer.serialize(t.higherBound, out);
+            out.writeBoolean(t.includeSnapshot);
         }
 
         public ReconstructLogState deserialize(DataInputPlus in, int version) 
throws IOException
         {
             Epoch lowerBound = Epoch.serializer.deserialize(in);
             Epoch higherBound = Epoch.serializer.deserialize(in);
-            return new ReconstructLogState(lowerBound, higherBound);
+            return new ReconstructLogState(lowerBound, higherBound, 
in.readBoolean());
         }
 
         public long serializedSize(ReconstructLogState t, int version)
         {
             return Epoch.serializer.serializedSize(t.lowerBound) +
-                   Epoch.serializer.serializedSize(t.higherBound);
+                   Epoch.serializer.serializedSize(t.higherBound) +
+                   TypeSizes.BOOL_SIZE;
         }
     }
 
@@ -71,6 +78,16 @@ public class ReconstructLogState
     {
         public static final Handler instance = new Handler();
 
+        private final Supplier<Processor> processor;
+
+        public Handler()
+        {
+            this(() -> ClusterMetadataService.instance().processor());
+        }
+        public Handler(Supplier<Processor> processor)
+        {
+            this.processor = processor;
+        }
         public void doVerb(Message<ReconstructLogState> message) throws 
IOException
         {
             TCMMetrics.instance.reconstructLogStateCall.mark();
@@ -79,7 +96,10 @@ public class ReconstructLogState
             if 
(!ClusterMetadataService.instance().isCurrentMember(FBUtilities.getBroadcastAddressAndPort()))
                 throw new NotCMSException("This node is not in the CMS, can't 
generate a consistent log fetch response to " + message.from());
 
-            LogState result = 
DistributedMetadataLogKeyspace.getLogState(request.lowerBound, 
request.higherBound);
+            LogState result = processor.get().getLogState(request.lowerBound, 
request.higherBound, request.includeSnapshot,
+                                                          
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
+                                                                               
            TCMMetrics.instance.fetchLogRetries));
+
             MessagingService.instance().send(message.responseWith(result), 
message.from());
         }
     }
diff --git a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java 
b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
index 635be54cf9..e9417adfec 100644
--- a/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/RemoteProcessor.java
@@ -152,7 +152,13 @@ public final class RemoteProcessor implements Processor
     }
 
     @Override
-    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    public LogState getLocalState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
+    {
+        return log.getLocalEntries(start, end, includeSnapshot);
+    }
+
+    @Override
+    public LogState getLogState(Epoch lowEpoch, Epoch highEpoch, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
     {
         try
         {
@@ -160,9 +166,9 @@ public final class RemoteProcessor implements Processor
             List<InetAddressAndPort> candidates = new 
ArrayList<>(log.metadata().fullCMSMembers());
             sendWithCallbackAsync(request,
                                   Verb.TCM_RECONSTRUCT_EPOCH_REQ,
-                                  new ReconstructLogState(lowEpoch, highEpoch),
+                                  new ReconstructLogState(lowEpoch, highEpoch, 
includeSnapshot),
                                   new CandidateIterator(candidates),
-                                  new 
Retry.Backoff(TCMMetrics.instance.fetchLogRetries));
+                                  retryPolicy);
             return request.get(retryPolicy.remainingNanos(), 
TimeUnit.NANOSECONDS);
         }
         catch (InterruptedException e)
@@ -187,8 +193,7 @@ public final class RemoteProcessor implements Processor
         }
     }
 
-    private static Future<ClusterMetadata> 
fetchLogAndWaitInternal(CandidateIterator candidates,
-                                                                   LocalLog 
log)
+    private static Future<ClusterMetadata> 
fetchLogAndWaitInternal(CandidateIterator candidates, LocalLog log)
     {
         try (Timer.Context ctx = TCMMetrics.instance.fetchCMSLogLatency.time())
         {
diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
index 47a2ba26f1..72cf2e7aa9 100644
--- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
@@ -155,7 +155,14 @@ public class StubClusterMetadataService extends 
ClusterMetadataService
             throw new UnsupportedOperationException();
         }
 
-        public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+        @Override
+        public LogState getLocalState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public LogState getLogState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
         {
             throw new UnsupportedOperationException();
         }
diff --git a/src/java/org/apache/cassandra/tcm/log/LocalLog.java 
b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
index 0bcb982f1a..c737814c94 100644
--- a/src/java/org/apache/cassandra/tcm/log/LocalLog.java
+++ b/src/java/org/apache/cassandra/tcm/log/LocalLog.java
@@ -351,9 +351,9 @@ public abstract class LocalLog implements Closeable
         return storage.getLogState(since, false);
     }
 
-    public LogState getLocalEntries(Epoch since, Epoch until)
+    public LogState getLocalEntries(Epoch since, Epoch until, boolean 
includeSnapshot)
     {
-        return storage.getLogState(since, until);
+        return storage.getLogState(since, until, includeSnapshot);
     }
 
     public ClusterMetadata waitForHighestConsecutive()
diff --git a/src/java/org/apache/cassandra/tcm/log/LogReader.java 
b/src/java/org/apache/cassandra/tcm/log/LogReader.java
index b1e7ab3264..effc4d7561 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogReader.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogReader.java
@@ -120,7 +120,7 @@ public interface LogReader
         }
     }
 
-    default LogState getLogState(Epoch start, Epoch end)
+    default LogState getLogState(Epoch start, Epoch end, boolean 
includeSnapshot)
     {
         try
         {
@@ -136,7 +136,7 @@ public interface LogReader
                 {
                     if (entry.epoch.isAfter(start))
                         entries.add(entry);
-                    else
+                    else if (includeSnapshot)
                         closestSnapshot = 
entry.transform.execute(closestSnapshot).success().metadata;
                 }
                 return new LogState(closestSnapshot, entries.build());
diff --git a/src/java/org/apache/cassandra/tcm/log/LogStorage.java 
b/src/java/org/apache/cassandra/tcm/log/LogStorage.java
index 7772d7d07e..e739a8aae7 100644
--- a/src/java/org/apache/cassandra/tcm/log/LogStorage.java
+++ b/src/java/org/apache/cassandra/tcm/log/LogStorage.java
@@ -57,7 +57,7 @@ public interface LogStorage extends LogReader
         }
 
         @Override
-        public LogState getLogState(Epoch start, Epoch end)
+        public LogState getLogState(Epoch start, Epoch end, boolean 
includeSnapshot)
         {
             return LogState.EMPTY;
         }
diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java 
b/src/java/org/apache/cassandra/tcm/migration/Election.java
index c2fe67439a..f80170b4e2 100644
--- a/src/java/org/apache/cassandra/tcm/migration/Election.java
+++ b/src/java/org/apache/cassandra/tcm/migration/Election.java
@@ -141,7 +141,8 @@ public class Election
         Register.maybeRegister();
 
         updateInitiator(currentCoordinator, MIGRATED);
-        MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, 
DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false));
+        MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ,
+                                      
DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false));
     }
 
     private void abort(Set<InetAddressAndPort> sendTo)
diff --git a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java 
b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
index 6c02318f48..36baa59eb3 100644
--- a/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
+++ b/src/java/org/apache/cassandra/tcm/migration/GossipProcessor.java
@@ -41,7 +41,14 @@ public class GossipProcessor implements Processor
         return ClusterMetadata.current();
     }
 
-    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    @Override
+    public LogState getLocalState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
+    {
+        throw new IllegalStateException("Can't reconstruct log state when 
running in gossip mode. Enable the ClusterMetadataService with `nodetool 
addtocms`.");
+    }
+
+    @Override
+    public LogState getLogState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
     {
         throw new IllegalStateException("Can't reconstruct log state when 
running in gossip mode. Enable the ClusterMetadataService with `nodetool 
addtocms`.");
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java 
b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
index 58b47cc9b7..1c441ded3e 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTestBase.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.test;
 
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.Assert;
@@ -185,6 +186,8 @@ public abstract class CASTestBase extends TestBaseImpl
     public static void assertVisibleInRing(IInstance peer)
     {
         InetAddressAndPort endpoint = 
InetAddressAndPort.getByAddress(peer.broadcastAddress());
+        long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(30);
+        while (System.nanoTime() < deadline && 
!Gossiper.instance.isAlive(endpoint));
         Assert.assertTrue(Gossiper.instance.isAlive(endpoint));
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
index 88ef1af750..ed066b2c3b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
@@ -189,7 +189,6 @@ public class PaxosRepair2Test extends TestBaseImpl
         Ballot staleBallot = Paxos.newBallot(Ballot.none(), 
org.apache.cassandra.db.ConsistencyLevel.SERIAL);
         try (Cluster cluster = init(Cluster.create(3, cfg -> cfg
                                                              
.set("paxos_variant", "v2")
-                                                             
.set("accord.enabled", false) // this test monkeys with TCM which can cause 
confussion for Accord while it fetches epochs...
                                                              
.set("paxos_purge_grace_period", "0s")
                                                              
.set("truncate_request_timeout_in_ms", 1000L)))
         )
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
index 59fdc0e6e4..4e8d4bd905 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
@@ -734,6 +734,7 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
                                        log,
                                        new Processor()
                                        {
+                                           @Override
                                            public Commit.Result 
commit(Entry.Id entryId, Transformation event, Epoch lastKnown, Retry.Deadline 
retryPolicy)
                                            {
                                                if (lastKnown == null)
@@ -747,6 +748,7 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
                                                return result;
                                            }
 
+                                           @Override
                                            public ClusterMetadata 
fetchLogAndWait(Epoch waitFor, Retry.Deadline retryPolicy)
                                            {
                                                Epoch since = 
log.waitForHighestConsecutive().epoch;
@@ -755,9 +757,16 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
                                                return 
log.waitForHighestConsecutive();
                                            }
 
-                                           public LogState reconstruct(Epoch 
lowEpoch, Epoch highEpoch, Retry.Deadline retryPolicy)
+                                           @Override
+                                           public LogState getLocalState(Epoch 
start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy)
                                            {
-                                               return 
log.getLocalEntries(lowEpoch, highEpoch);
+                                               return getLogState(start, end, 
includeSnapshot, retryPolicy);
+                                           }
+
+                                           @Override
+                                           public LogState getLogState(Epoch 
start, Epoch end, boolean includeSnapshot, Retry.Deadline retryPolicy)
+                                           {
+                                               return 
log.getLocalEntries(start, end, includeSnapshot);
                                            }
                                        },
                                        (a,b) -> {},
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
index e89502e90c..cf795048c3 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconstructEpochTest.java
@@ -54,11 +54,11 @@ public class ReconstructEpochTest extends TestBaseImpl
                 for (int[] cfg : new int[][]{ new int[]{ 6, 9 },
                                               new int[]{ 2, 20 },
                                               new int[]{ 5, 5 },
-                                              new int[]{ 15, 20 }})
+                                              new int[]{ 15, 20 } })
                 {
                     int start = cfg[0];
                     int end = cfg[1];
-                    LogState logState = 
DistributedMetadataLogKeyspace.getLogState(Epoch.create(start), 
Epoch.create(end));
+                    LogState logState = 
DistributedMetadataLogKeyspace.getLogState(Epoch.create(start), 
Epoch.create(end), true);
                     Assert.assertEquals(start, 
logState.baseState.epoch.getEpoch());
                     Iterator<Entry> iter = logState.entries.iterator();
                     for (int i = start + 1; i <= end; i++)
@@ -71,14 +71,15 @@ public class ReconstructEpochTest extends TestBaseImpl
                 for (int[] cfg : new int[][]{ new int[]{ 6, 9 },
                                               new int[]{ 2, 20 },
                                               new int[]{ 5, 5 },
-                                              new int[]{ 15, 20 }})
+                                              new int[]{ 15, 20 } })
                 {
                     int start = cfg[0];
                     int end = cfg[1];
                     LogState logState = ClusterMetadataService.instance()
                                                               .processor()
-                                                              
.reconstruct(Epoch.create(start),
+                                                              
.getLogState(Epoch.create(start),
                                                                            
Epoch.create(end),
+                                                                           
true,
                                                                            
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(TimeUnit.NANOSECONDS),
                                                                                
                             TCMMetrics.instance.commitRetries));
 
@@ -90,5 +91,4 @@ public class ReconstructEpochTest extends TestBaseImpl
             });
         }
     }
-
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java
index 6f359af057..6ee5e975ea 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/TestProcessor.java
@@ -70,9 +70,16 @@ public class TestProcessor implements Processor
         return delegate.fetchLogAndWait(waitFor, retryPolicy);
     }
 
-    public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+    @Override
+    public LogState getLocalState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
+    {
+        return delegate.getLocalState(start, end, includeSnapshot, 
retryPolicy);
+    }
+
+    @Override
+    public LogState getLogState(Epoch start, Epoch end, boolean 
includeSnapshot, Retry.Deadline retryPolicy)
     {
-        return delegate.reconstruct(lowEpoch, highEpoch, retryPolicy);
+        return delegate.getLogState(start, end, includeSnapshot, retryPolicy);
     }
 
     protected void waitIfPaused()
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
index 5d974a4f17..834d2a9b81 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
@@ -530,8 +530,11 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
         public void close() throws Exception
         {
             epochHistory = cluster.get(cmsGroup[0]).callOnInstance(() -> {
-                LogState all = 
ClusterMetadataService.instance().processor().reconstruct(Epoch.EMPTY, 
Epoch.create(Long.MAX_VALUE), 
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
-                                                                               
                                                                                
      TCMMetrics.instance.commitRetries));
+                LogState all = ClusterMetadataService.instance()
+                                                     .processor()
+                                                     .getLogState(Epoch.EMPTY, 
Epoch.create(Long.MAX_VALUE), false,
+                                                                  
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
+                                                                               
                    TCMMetrics.instance.commitRetries));
                 StringBuilder sb = new StringBuilder("Epochs:");
                 for (Entry e : all.entries)
                     sb.append("\n").append(e.epoch.getEpoch()).append(": 
").append(e.transform);
diff --git 
a/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java 
b/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java
index 128fdeca7b..0d7bbf7f8e 100644
--- a/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java
+++ b/test/unit/org/apache/cassandra/tcm/ValidatingClusterMetadataService.java
@@ -132,7 +132,13 @@ public class ValidatingClusterMetadataService extends 
StubClusterMetadataService
             }
 
             @Override
-            public LogState reconstruct(Epoch lowEpoch, Epoch highEpoch, 
Retry.Deadline retryPolicy)
+            public LogState getLocalState(Epoch lowEpoch, Epoch highEpoch, 
boolean includeSnapshot, Retry.Deadline retryPolicy)
+            {
+                return getLogState(lowEpoch, highEpoch, includeSnapshot, 
retryPolicy);
+            }
+
+            @Override
+            public LogState getLogState(Epoch lowEpoch, Epoch highEpoch, 
boolean includeSnapshot, Retry.Deadline retryPolicy)
             {
                 if (!epochs.containsKey(lowEpoch))
                     throw new AssertionError("Unknown epoch: " + lowEpoch);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to