This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new c6814f3c69 CEP-45: Background reconciliation - replica
c6814f3c69 is described below

commit c6814f3c69278ca7e2a7f5f44056f9759bce6d63
Author: Francisco Guerrero <[email protected]>
AuthorDate: Sun Apr 19 13:06:45 2026 -0700

    CEP-45: Background reconciliation - replica
    
    patch by Francisco Guerrero; reviewed by Blake Eggleston for CASSANDRA-20378
---
 .../cassandra/config/DatabaseDescriptor.java       |   5 +
 .../cassandra/config/MutationTrackingSpec.java     |   8 +
 .../cassandra/replication/ActiveLogReconciler.java |   1 +
 .../cassandra/replication/CoordinatorLog.java      |  33 ++++
 .../cassandra/replication/MutationSummary.java     |   4 +-
 .../replication/MutationTrackingService.java       | 176 ++++++++++++++++++++-
 .../replication/MutationTrackingServiceMBean.java  |  49 ++++++
 .../org/apache/cassandra/replication/Shard.java    |  16 ++
 src/java/org/apache/cassandra/tools/NodeProbe.java |  32 ++++
 .../apache/cassandra/tools/nodetool/MTAdmin.java   | 112 +++++++++++++
 .../cassandra/tools/nodetool/NodetoolCommand.java  |   1 +
 .../distributed/test/ReadRepairQueryTester.java    |   6 +-
 .../distributed/test/ReadRepairTestBase.java       |  26 +--
 .../cassandra/distributed/test/TestBaseImpl.java   |  31 ++++
 .../test/cql3/MultiNodeTableWalkBase.java          |   3 +-
 .../distributed/test/cql3/StatefulASTBase.java     |   3 +-
 .../test/sai/PartialUpdateHandlingTest.java        |   2 +-
 .../test/tracking/MutationTrackingTest.java        | 165 +++++++++++++++++++
 .../test/tracking/OffsetBroadcastTest.java         |   6 +-
 .../test/tracking/TrackedImportFailureTest.java    |   2 +-
 .../TrackedNonZeroCopyRepairTransferTest.java      |   2 +-
 .../test/tracking/TrackedRepairFailureTest.java    |   6 +-
 .../test/tracking/TrackedTransferTestBase.java     |   6 +-
 .../TrackedZeroCopyRepairTransferTest.java         |   2 +-
 .../cassandra/tools/nodetool/MTAdminTest.java      | 117 ++++++++++++++
 25 files changed, 782 insertions(+), 32 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 344c2b509e..86ca89b02c 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -6059,6 +6059,11 @@ public class DatabaseDescriptor
         }
     }
 
+    public static MutationTrackingSpec getMutationTrackingConfig()
+    {
+        return conf.mutation_tracking;
+    }
+
     public static boolean getMutationTrackingEnabled()
     {
         return conf.mutation_tracking.enabled;
diff --git a/src/java/org/apache/cassandra/config/MutationTrackingSpec.java 
b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java
index d9b04a0f40..30f8acaef5 100644
--- a/src/java/org/apache/cassandra/config/MutationTrackingSpec.java
+++ b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java
@@ -22,4 +22,12 @@ public class MutationTrackingSpec
 {
     public boolean enabled = false;
     public String journal_directory;
+    /**
+     * Whether the background reconciliation process is enabled
+     */
+    public volatile boolean background_reconciliation_enabled = true;
+    /**
+     * The interval in which the backgroun reconciliation process runs
+     */
+    public volatile DurationSpec.LongMillisecondsBound 
background_reconciliation_interval = new 
DurationSpec.LongMillisecondsBound("1s");
 }
diff --git a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java 
b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java
index 38fd276c00..37c5963138 100644
--- a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java
+++ b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java
@@ -173,6 +173,7 @@ public final class ActiveLogReconciler implements 
Shutdownable
             MutationTrackingService.instance().retryFailedWrite(mutationId, 
toHost, failureReason);
         }
 
+        @Override
         void send()
         {
             RecordPointer pointer = 
MutationJournal.instance().lookUp(mutationId);
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index a94dbd4af6..317a3e6eea 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -271,6 +271,39 @@ public abstract class CoordinatorLog
         }
     }
 
+    /**
+     * @return the computed union of remote-witnessed offsets minus 
local-witnessed offsets
+     */
+    @Nullable
+    Offsets.Immutable collectLocallyMissingOffsets()
+    {
+        lock.readLock().lock();
+        try
+        {
+            Offsets.Mutable local = witnessedOffsets.get(localNodeId);
+            Offsets.Immutable.Builder missing = null;
+            for (int i = 0; i < participants.size(); i++)
+            {
+                int nodeId = participants.get(i);
+                if (nodeId == localNodeId) continue;
+                Offsets.Immutable diff = 
Offsets.Immutable.difference(witnessedOffsets.get(nodeId), local);
+                if (!diff.isEmpty())
+                {
+                    if (missing == null)
+                    {
+                        missing = new Offsets.Immutable.Builder(logId);
+                    }
+                    missing.addAll(diff);
+                }
+            }
+            return missing != null ? missing.build() : null;
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+    }
+
     Offsets.Immutable collectReconciledOffsets()
     {
         lock.readLock().lock();
diff --git a/src/java/org/apache/cassandra/replication/MutationSummary.java 
b/src/java/org/apache/cassandra/replication/MutationSummary.java
index c48f533313..f1e0cc7c54 100644
--- a/src/java/org/apache/cassandra/replication/MutationSummary.java
+++ b/src/java/org/apache/cassandra/replication/MutationSummary.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.agrona.collections.Long2ObjectHashMap;
@@ -274,7 +275,8 @@ public class MutationSummary
         return summaries.size();
     }
 
-    boolean isEmpty()
+    @VisibleForTesting
+    public boolean isEmpty()
     {
         return size() == 0;
     }
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 9854c31a9f..bee2dd2229 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.config.MutationTrackingSpec;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
@@ -87,6 +89,7 @@ import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.ownership.ReplicaGroups;
 import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.lang.String.format;
@@ -96,19 +99,25 @@ import static 
org.apache.cassandra.cql3.QueryProcessor.executeInternal;
 
 // TODO (expected): persistence (handle restarts)
 // TODO (expected): handle topology changes
-public class MutationTrackingService
+public class MutationTrackingService implements MutationTrackingServiceMBean
 {
+    public static final String MBEAN_NAME = 
"org.apache.cassandra.db:type=MutationTrackingService";
     public static final String DISABLED_MESSAGE = "Mutation tracking is not 
enabled. (See mutation_tracking.enabled in cassandra.yaml)";
 
     private static final MutationTrackingService instance;
     private static final ScheduledExecutorPlus executor;
 
+    private static final MutationTrackingSpec config;
+
     static
     {
-        if (DatabaseDescriptor.getMutationTrackingEnabled())
+        config = DatabaseDescriptor.getMutationTrackingConfig();
+
+        if (config.enabled)
         {
             instance = new MutationTrackingService();
             executor = 
executorFactory().scheduled("Mutation-Tracking-Service", NORMAL);
+            MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
         }
         else
         {
@@ -129,12 +138,12 @@ public class MutationTrackingService
 
     public static boolean isEnabled()
     {
-        return DatabaseDescriptor.getMutationTrackingEnabled();
+        return config.enabled;
     }
 
     public static void ensureEnabled()
     {
-        if (!DatabaseDescriptor.getMutationTrackingEnabled())
+        if (!config.enabled)
             throw new IllegalStateException(DISABLED_MESSAGE);
     }
 
@@ -188,6 +197,7 @@ public class MutationTrackingService
     private final ReplicatedOffsetsBroadcaster offsetsBroadcaster = new 
ReplicatedOffsetsBroadcaster();
     private final LogStatePersister offsetsPersister = new LogStatePersister();
     private final ActiveLogReconciler activeReconciler = new 
ActiveLogReconciler();
+    private final BackgroundReconciler backgroundReconciler = new 
BackgroundReconciler();
 
     private final IncomingMutations incomingMutations = new 
IncomingMutations();
     private final OutgoingMutations outgoingMutations = new 
OutgoingMutations();
@@ -223,14 +233,53 @@ public class MutationTrackingService
 
         onNewClusterMetadata(null, metadata);
 
+        if (!keyspaceShards.isEmpty() && 
!config.background_reconciliation_enabled)
+            
logBackgroundReconciliationDisabledWarning(keyspaceShards.keySet());
+
         offsetsBroadcaster.start();
         offsetsPersister.start();
+        backgroundReconciler.start();
 
         ExpiredStatePurger.instance.register(incomingMutations);
 
         started = true;
     }
 
+    @Override
+    public void setMutationTrackingBackgroundReconciliationEnabled(boolean 
enabled)
+    {
+        if (enabled != config.background_reconciliation_enabled)
+        {
+            logger.info("{} mutation tracking background reconciliation", 
enabled ? "Enabling" : "Disabling");
+            config.background_reconciliation_enabled = enabled;
+        }
+    }
+
+    @Override
+    public boolean getMutationTrackingBackgroundReconciliationEnabled()
+    {
+        return config.background_reconciliation_enabled;
+    }
+
+    @Override
+    public void 
setMutationTrackingBackgroundReconciliationIntervalMilliseconds(long 
intervalMilliseconds)
+    {
+        if (intervalMilliseconds  != 
config.background_reconciliation_interval.toMilliseconds())
+        {
+            DurationSpec.LongMillisecondsBound 
backgroundReconciliationInterval =
+            new DurationSpec.LongMillisecondsBound(intervalMilliseconds, 
TimeUnit.MILLISECONDS);
+            logger.info("Setting mutation tracking background reconciliation 
interval from {} to {}",
+                        config.background_reconciliation_interval, 
backgroundReconciliationInterval);
+            config.background_reconciliation_interval = 
backgroundReconciliationInterval;
+        }
+    }
+
+    @Override
+    public long 
getMutationTrackingBackgroundReconciliationIntervalMilliseconds()
+    {
+        return config.background_reconciliation_interval.toMilliseconds();
+    }
+
     public void pauseOffsetBroadcast(boolean pause)
     {
         offsetsBroadcaster.pauseOffsetBroadcast(pause);
@@ -862,6 +911,14 @@ public class MutationTrackingService
             // recalculating the shards will repopulate this via the existing 
callbacks
             log2ShardMap = new ConcurrentHashMap<>();
             keyspaceShards = applyUpdatedMetadata(keyspaceShards, prev, next, 
this::nextLogId, this::onNewLog);
+
+            if (!config.background_reconciliation_enabled)
+            {
+                Set<String> newKeyspaces = new 
HashSet<>(keyspaceShards.keySet());
+                newKeyspaces.removeAll(originalKeyspaceShards.keySet());
+                if (!newKeyspaces.isEmpty())
+                    logBackgroundReconciliationDisabledWarning(newKeyspaces);
+            }
         }
         catch (Throwable t)
         {
@@ -1047,6 +1104,12 @@ public class MutationTrackingService
         return unwrapped;
     }
 
+    private void logBackgroundReconciliationDisabledWarning(Set<String> 
keyspaces)
+    {
+        logger.warn("Background reconciliation is disabled but mutation 
tracking keyspaces exist: {}. " +
+                    "Unreconciled mutations will not be automatically repaired 
in the background.", keyspaces);
+    }
+
     public static class KeyspaceShards
     {
         private enum UpdateDecision
@@ -1374,6 +1437,93 @@ public class MutationTrackingService
         return rows.one().getInt("host_log_id");
     }
 
+    private static class BackgroundReconciler
+    {
+        void start()
+        {
+            scheduleNext();
+        }
+
+        private void scheduleNext()
+        {
+            long intervalMillis = 
config.background_reconciliation_interval.toMilliseconds();
+            executor.schedule(this::runAndReschedule, intervalMillis, 
TimeUnit.MILLISECONDS);
+        }
+
+        private void runAndReschedule()
+        {
+            try
+            {
+                run();
+            }
+            finally
+            {
+                scheduleNext();
+            }
+        }
+
+        void run()
+        {
+            MutationTrackingService.instance().forEachKeyspace(this::run);
+        }
+
+        private void run(KeyspaceShards shards)
+        {
+            if (config.background_reconciliation_enabled)
+                shards.forEachShard(this::run);
+        }
+
+        private void run(Shard shard)
+        {
+            try
+            {
+                List<Offsets.Immutable> missing = 
shard.collectLocallyMissingOffsets();
+                if (missing.isEmpty()) return;
+
+                for (Offsets.Immutable offsets : missing)
+                {
+                    // Prefer pulling from the coordinator
+                    int coordinatorHostId = offsets.logId().hostId();
+                    InetAddressAndPort coordinator = 
ClusterMetadata.current().directory.endpoint(new NodeId(coordinatorHostId));
+                    InetAddressAndPort pullFrom = 
FailureDetector.instance.isAlive(coordinator)
+                                                  ? coordinator
+                                                  : findAliveReplica(shard, 
coordinatorHostId);
+                    if (pullFrom == null)
+                    {
+                        logger.debug("No coordinator or replica is available 
to process the pull mutation request for missing offset {}",
+                                     offsets);
+                        continue; // No reachable source
+                    }
+
+                    // TODO (expected): backoff, rate limits, per host and 
total
+                    PullMutationsRequest request = new 
PullMutationsRequest(offsets);
+                    logger.trace("Requesting pull mutation request from 
replica {} for missing offset {}", pullFrom, offsets);
+                    
MessagingService.instance().send(Message.out(Verb.PULL_MUTATIONS_REQ, request), 
pullFrom);
+                }
+            }
+            catch (Throwable throwable)
+            {
+                // Avoid throwing an exception in the reconciliation step to 
prevent the scheduled task from
+                // being killed
+                logger.error("Exception encountered during background 
reconciliation of shard={}", shard, throwable);
+            }
+        }
+
+        private InetAddressAndPort findAliveReplica(Shard shard, int 
excludeHostId)
+        {
+            for (InetAddressAndPort replica : shard.remoteReplicas())
+            {
+                int replicaId = 
ClusterMetadata.current().directory.peerId(replica).id();
+                if (replicaId != excludeHostId && 
FailureDetector.instance.isAlive(replica))
+                {
+                    logger.trace("Found alive replica {} with replica id {}", 
replica, replicaId);
+                    return replica;
+                }
+            }
+            return null;
+        }
+    }
+
     // TODO (later): a more intelligent heuristic for offsets included in 
broadcasts
     private static class ReplicatedOffsetsBroadcaster
     {
@@ -1509,6 +1659,24 @@ public class MutationTrackingService
         activeReconciler.resumeRegularPriorityForTesting();
     }
 
+    @VisibleForTesting
+    public void reconcileForTesting()
+    {
+        backgroundReconciler.run();
+    }
+
+    @VisibleForTesting
+    public void pauseBackgroundReconciler()
+    {
+        config.background_reconciliation_enabled = false;
+    }
+
+    @VisibleForTesting
+    public void resumeBackgroundReconciler()
+    {
+        config.background_reconciliation_enabled = true;
+    }
+
     @VisibleForTesting
     public static class TestAccess
     {
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java
new file mode 100644
index 0000000000..a64f09da2b
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+/**
+ * MBean exposing functionality for the Mutation Tracking service
+ */
+public interface MutationTrackingServiceMBean
+{
+    /**
+     * Sets the background reconciliation state to enabled/disabled based on 
the {@param enabled} parameter
+     *
+     * @param enabled whether the background reconciliation is enabled or 
disabled for the mutation tracking service
+     */
+    void setMutationTrackingBackgroundReconciliationEnabled(boolean enabled);
+
+    /**
+     * @return the state of the background reconciliation for the mutation 
tracking service
+     */
+    boolean getMutationTrackingBackgroundReconciliationEnabled();
+
+    /**
+     * Sets the background reconciliation interval to the provided {@param 
intervalMilliseconds} value
+     *
+     * @param intervalMilliseconds the interval value in milliseconds
+     */
+    void setMutationTrackingBackgroundReconciliationIntervalMilliseconds(long 
intervalMilliseconds);
+
+    /**
+     * @return the interval, in milliseconds, in which the background 
reconciliation runs when enabled
+     */
+    long getMutationTrackingBackgroundReconciliationIntervalMilliseconds();
+}
diff --git a/src/java/org/apache/cassandra/replication/Shard.java 
b/src/java/org/apache/cassandra/replication/Shard.java
index 265cc8b8f3..dd4e349023 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -299,6 +299,22 @@ public class Shard
         return new BroadcastLogOffsets(keyspace, range, offsets, durable);
     }
 
+    /**
+     * @return the list of the collected locally missing offsets for the logs 
owned by this coordinator on
+     * this shard
+     */
+    List<Offsets.Immutable> collectLocallyMissingOffsets()
+    {
+        List<Offsets.Immutable> result = new ArrayList<>(logs.size());
+        for (CoordinatorLog log : logs.values())
+        {
+            Offsets.Immutable missing = log.collectLocallyMissingOffsets();
+            if (missing != null)
+                result.add(missing);
+        }
+        return result;
+    }
+
     void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
     {
         logs.values().forEach(log -> 
log.collectDurablyReconciledOffsets(into));
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 567df09283..c4d0d17400 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -119,6 +119,8 @@ import org.apache.cassandra.metrics.ThreadPoolMetrics;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.net.MessagingServiceMBean;
 import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.replication.MutationTrackingServiceMBean;
 import org.apache.cassandra.service.ActiveRepairServiceMBean;
 import org.apache.cassandra.service.AsyncProfilerService;
 import org.apache.cassandra.service.AutoRepairService;
@@ -190,6 +192,7 @@ public class NodeProbe implements AutoCloseable
     protected PermissionsCacheMBean pcProxy;
     protected RolesCacheMBean rcProxy;
     protected AutoRepairServiceMBean autoRepairProxy;
+    protected MutationTrackingServiceMBean mutationTrackingProxy;
     protected AsyncProfilerMBean asyncProfilerProxy;
     protected GuardrailsMBean grProxy;
     protected volatile Output output;
@@ -339,6 +342,10 @@ public class NodeProbe implements AutoCloseable
             name = new ObjectName(AutoRepairService.MBEAN_NAME);
             autoRepairProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
AutoRepairServiceMBean.class);
 
+            name = new ObjectName(MutationTrackingService.MBEAN_NAME);
+            if (mbeanServerConn.isRegistered(name))
+                mutationTrackingProxy = JMX.newMBeanProxy(mbeanServerConn, 
name, MutationTrackingServiceMBean.class);
+
             name = new ObjectName(AsyncProfilerService.MBEAN_NAME);
             asyncProfilerProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
AsyncProfilerMBean.class);
 
@@ -2847,6 +2854,31 @@ public class NodeProbe implements AutoCloseable
             throw new IOException("Invalid keyspace or table name", e);
         }
     }
+
+    public boolean isMutationTrackingDisabled()
+    {
+        return mutationTrackingProxy == null;
+    }
+
+    public boolean getMutationTrackingBackgroundReconciliationEnabled()
+    {
+        return 
mutationTrackingProxy.getMutationTrackingBackgroundReconciliationEnabled();
+    }
+
+    public void setMutationTrackingBackgroundReconciliationEnabled(boolean 
enabled)
+    {
+        
mutationTrackingProxy.setMutationTrackingBackgroundReconciliationEnabled(enabled);
+    }
+
+    public long 
getMutationTrackingBackgroundReconciliationIntervalMilliseconds()
+    {
+        return 
mutationTrackingProxy.getMutationTrackingBackgroundReconciliationIntervalMilliseconds();
+    }
+
+    public void 
setMutationTrackingBackgroundReconciliationIntervalMilliseconds(long 
intervalMilliseconds)
+    {
+        
mutationTrackingProxy.setMutationTrackingBackgroundReconciliationIntervalMilliseconds(intervalMilliseconds);
+    }
 }
 
 class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String, 
ColumnFamilyStoreMBean>>
diff --git a/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java 
b/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java
new file mode 100644
index 0000000000..5e8e3ea50a
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.tools.NodeProbe;
+
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Parameters;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "mtadmin",
+         description = "Manage mutation tracking",
+         subcommands = { MTAdmin.GetConfig.class,
+                         MTAdmin.SetConfig.class })
+public class MTAdmin extends AbstractCommand
+{
+    @Override
+    protected void execute(NodeProbe probe)
+    {
+        AbstractCommand cmd = new GetConfig();
+        cmd.probe(probe);
+        cmd.logger(output);
+        cmd.run();
+    }
+
+    @Command(name = "getconfig", description = "Print mutation tracking 
configurations")
+    public static class GetConfig extends AbstractCommand
+    {
+        @VisibleForTesting
+        protected PrintStream out = System.out;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            if (probe.isMutationTrackingDisabled())
+            {
+                out.println("Mutation tracking is not enabled");
+                return;
+            }
+
+            out.println("background_reconciliation_enabled: " + 
probe.getMutationTrackingBackgroundReconciliationEnabled());
+            out.println("background_reconciliation_interval_ms: " + 
probe.getMutationTrackingBackgroundReconciliationIntervalMilliseconds());
+        }
+    }
+
+    @Command(name = "setconfig", description = "Sets the mutation tracking 
configuration")
+    public static class SetConfig extends AbstractCommand
+    {
+        @VisibleForTesting
+        protected List<String> args = new ArrayList<>();
+
+        @Parameters(index = "0", arity = "0..1", description = { "Mutation 
tracking param type.",
+                                                              "Possible 
parameters: " +
+                                                              
"[background_reconciliation_enabled|background_reconciliation_interval_ms]" })
+        public String paramType;
+
+        @Parameters(index = "1", description = "Mutation tracking param 
value", arity = "0..1")
+        public String paramValue;
+
+        @VisibleForTesting
+        protected PrintStream out = System.out;
+
+        @Override
+        public void execute(NodeProbe probe)
+        {
+            args = args.isEmpty() ? CommandUtils.concatArgs(paramType, 
paramValue) : args;
+            checkArgument(args.size() == 2, "mtadmin setconfig requires 
param-type and value args.");
+            String type = args.get(0);
+            String value = args.get(1);
+
+            if (probe.isMutationTrackingDisabled())
+            {
+                out.println("Mutation tracking is not enabled");
+                return;
+            }
+
+            switch (type)
+            {
+                case "background_reconciliation_enabled":
+                    
probe.setMutationTrackingBackgroundReconciliationEnabled(Boolean.parseBoolean(value));
+                    break;
+                case "background_reconciliation_interval_ms":
+                    
probe.setMutationTrackingBackgroundReconciliationIntervalMilliseconds(Long.parseLong(value));
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unknown parameter: " + 
type);
+            }
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java 
b/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
index af1e835d91..2915e50fbc 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
@@ -147,6 +147,7 @@ import static 
org.apache.cassandra.tools.nodetool.Help.printTopCommandUsage;
                          ListSnapshots.class,
                          Move.Abort.class,
                          Move.class,
+                         MTAdmin.class,
                          NetStats.class,
                          PauseHandoff.class,
                          ProfileLoad.class,
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
index cbf711f203..d4351fecf8 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
@@ -120,9 +120,11 @@ public abstract class ReadRepairQueryTester extends 
TestBaseImpl
     @BeforeClass
     public static void setupCluster() throws IOException
     {
-        cluster = init(Cluster.build(NUM_NODES)
+        // Disable background reconciler for read repair tests to avoid having 
the
+        // background reconciler repair before the read repair takes effect
+        cluster = init(disableBackgroundReconciler(Cluster.build(NUM_NODES)
                               .withConfig(config -> 
config.set("read_request_timeout", "1m")
-                                                          
.set("write_request_timeout", "1m"))
+                                                          
.set("write_request_timeout", "1m")))
                               .start());
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTestBase.java
index a72971941e..044eef39c9 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTestBase.java
@@ -139,7 +139,8 @@ public abstract class ReadRepairTestBase extends 
TestBaseImpl
 
     private void testReadRepair(ReadRepairStrategy strategy, boolean 
brrThroughAccord) throws Throwable
     {
-        try (Cluster cluster = init(Cluster.create(3, c -> 
c.with(Feature.GOSSIP, Feature.NETWORK))))
+        try (Cluster cluster = init(Cluster.create(3, c -> 
c.with(Feature.GOSSIP, Feature.NETWORK)
+                                                            
.set("mutation_tracking.background_reconciliation_enabled", "false"))))
         {
             createKeyspace(cluster);
             TransactionalMode transactionalMode = brrThroughAccord ? 
TransactionalMode.test_unsafe_writes : TransactionalMode.off;
@@ -175,7 +176,9 @@ public abstract class ReadRepairTestBase extends 
TestBaseImpl
     @Test
     public void readRepairTimeoutTest() throws Throwable
     {
-        try (Cluster cluster = init(Cluster.create(3, c -> 
c.with(Feature.GOSSIP, Feature.NETWORK)))) {
+        try (Cluster cluster = init(Cluster.create(3, c -> 
c.with(Feature.GOSSIP, Feature.NETWORK)
+                                                            
.set("mutation_tracking.background_reconciliation_enabled", "false"))))
+        {
             final long reducedReadTimeout = 3000L;
             createKeyspace(cluster);
             cluster.forEach(i -> i.runOnInstance(() -> 
DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout)));
@@ -209,7 +212,7 @@ public abstract class ReadRepairTestBase extends 
TestBaseImpl
     @Test
     public void failingReadRepairTest() throws Throwable
     {
-        try (Cluster cluster = init(builder().withNodes(3).start()))
+        try (Cluster cluster = 
init(disableBackgroundReconciler(builder().withNodes(3)).start()))
         {
             createKeyspace(cluster);
             cluster.schemaChange(withTable("CREATE TABLE %s (pk int, ck int, v 
int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"));
@@ -237,7 +240,7 @@ public abstract class ReadRepairTestBase extends 
TestBaseImpl
         MutationTrackingUtils.fixmeSkipIfTracked(replicationType(), "Token 
moves not supported");
         // TODO: rewrite using FuzzTestBase to control progress through 
decommission
         // TODO: fails with vnode enabled
-        try (Cluster cluster = init(Cluster.build(4).withoutVNodes().start(), 
3))
+        try (Cluster cluster = 
init(disableBackgroundReconciler(Cluster.build(4).withoutVNodes()).start(), 3))
         {
             List<Token> tokens = cluster.tokens();
 
@@ -296,7 +299,7 @@ public abstract class ReadRepairTestBase extends 
TestBaseImpl
     public void alterRFAndRunReadRepair() throws Throwable
     {
         MutationTrackingUtils.fixmeSkipIfTracked(replicationType(), "RF 
changes not supported");
-        try (Cluster cluster = builder().withNodes(2).start())
+        try (Cluster cluster = 
disableBackgroundReconciler(builder().withNodes(2)).start())
         {
             cluster.schemaChange(format("CREATE KEYSPACE %s WITH replication = 
" +
                                         "{'class': 'SimpleStrategy', 
'replication_factor': 1} " +
@@ -358,7 +361,7 @@ public abstract class ReadRepairTestBase extends 
TestBaseImpl
      */
     private void testRangeSliceQueryWithTombstones(boolean flush) throws 
Throwable
     {
-        try (Cluster cluster = init(Cluster.create(2)))
+        try (Cluster cluster = init(Cluster.create(2, c -> 
c.set("mutation_tracking.background_reconciliation_enabled", "false"))))
         {
             createKeyspace(cluster);
             cluster.schemaChange(withTable("CREATE TABLE %s (k int, c int, v 
int, PRIMARY KEY(k, c))"));
@@ -425,14 +428,15 @@ public abstract class ReadRepairTestBase extends 
TestBaseImpl
         MutationTrackingUtils.fixmeSkipIfTracked(replicationType(), "Token 
moves not supported");
         ExecutorPlus es = 
ExecutorFactory.Global.executorFactory().sequential("query-executor");
         String key = "test1";
-        try (Cluster cluster = init(Cluster.build()
+        try (Cluster cluster = init(disableBackgroundReconciler(Cluster.build()
                                            .withConfig(config -> 
config.with(Feature.GOSSIP, Feature.NETWORK)
                                                                        
.set("read_request_timeout", format("%dms", Integer.MAX_VALUE))
                                                                        
.set("native_transport_timeout", format("%dms", Integer.MAX_VALUE))
+                                                                       
.set("mutation_tracking.background_reconciliation_enabled", "false")
                                            )
                                            
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
                                            
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
-                                           .withNodes(3)
+                                           .withNodes(3))
                                            .start()))
         {
             createKeyspace(cluster);
@@ -525,7 +529,7 @@ public abstract class ReadRepairTestBase extends 
TestBaseImpl
     @Test
     public void testGCableTombstoneResurrectionOnRangeSliceQuery() throws 
Throwable
     {
-        try (Cluster cluster = init(Cluster.create(2)))
+        try (Cluster cluster = init(Cluster.create(2, c -> 
c.set("mutation_tracking.background_reconciliation_enabled", "false"))))
         {
             createKeyspace(cluster);
             cluster.schemaChange(withTable("CREATE TABLE %s (k int, c int, 
PRIMARY KEY(k, c)) " +
@@ -563,9 +567,9 @@ public abstract class ReadRepairTestBase extends 
TestBaseImpl
     @Test
     public void partitionDeletionRTTimestampTieTest() throws Throwable
     {
-        try (Cluster cluster = init(builder()
+        try (Cluster cluster = init(disableBackgroundReconciler(builder()
                                     .withNodes(3)
-                                    .withInstanceInitializer(RRHelper::install)
+                                    
.withInstanceInitializer(RRHelper::install))
                                     .start()))
         {
             createKeyspace(cluster);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java 
b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 7e91f5a7fb..466e639745 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -83,6 +83,7 @@ import 
org.apache.cassandra.distributed.shared.DistributedTestBase;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.replication.MutationTrackingService;
 import org.apache.cassandra.service.accord.AccordCache;
 
 import static java.lang.System.currentTimeMillis;
@@ -452,4 +453,34 @@ public class TestBaseImpl extends DistributedTestBase
             }
         }
     }
+
+    /**
+     * Useful for tests that rely on different repair mechanisms, for example 
for read repair tests
+     * where the test relies on the read repair machinery, we want to 
explicitly disable the
+     * background reconciliation process.
+     *
+     * @param cluster the cluster for the test
+     * @return the cluster with the background reconciliation process disabled 
on all instances
+     */
+    public static Cluster disableBackgroundReconciler(Cluster cluster)
+    {
+        for (IInvokableInstance instance : cluster)
+        {
+            instance.runOnInstance(() -> 
MutationTrackingService.instance().pauseBackgroundReconciler());
+        }
+        return cluster;
+    }
+
+    /**
+     * Useful for tests that rely on different repair mechanisms, for example 
for read repair tests
+     * where the test relies on the read repair machinery, we want to start 
the cluster with the
+     * background reconciliation process disabled.
+     *
+     * @param builder the cluster builder
+     * @return the cluster builder with the background reconciliation process 
configuration disabled
+     */
+    public static Cluster.Builder disableBackgroundReconciler(Cluster.Builder 
builder)
+    {
+        return builder.appendConfig(c -> 
c.set("mutation_tracking.background_reconciliation_enabled", "false"));
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
index 9fec6335f0..54ec2916d0 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
@@ -81,7 +81,8 @@ public abstract class MultiNodeTableWalkBase extends 
SingleNodeTableWalkTest
          .set("read_request_timeout", "180s")
          .set("write_request_timeout", "180s")
          .set("native_transport_timeout", "180s")
-         .set("slow_query_log_timeout", "180s");
+         .set("slow_query_log_timeout", "180s")
+         .set("mutation_tracking.background_reconciliation_enabled", "false");
     }
 
     @Override
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
index 599c278003..d474779093 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
@@ -158,7 +158,8 @@ public class StatefulASTBase extends TestBaseImpl
 
     protected void clusterConfig(IInstanceConfig config)
     {
-        config.set("repair.retries.max_attempts", Integer.MAX_VALUE);
+        config.set("repair.retries.max_attempts", Integer.MAX_VALUE)
+              .set("mutation_tracking.background_reconciliation_enabled", 
"false");
     }
 
     protected void clusterInitializer(ClassLoader cl, int node)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java
index 925a42de86..493d47119e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java
@@ -89,7 +89,7 @@ public class PartialUpdateHandlingTest extends TestBaseImpl
     @BeforeClass
     public static void setUpCluster() throws IOException
     {
-        CLUSTER = Cluster.build(NODES).withConfig(config -> 
config.set("hinted_handoff_enabled", false).with(GOSSIP).with(NETWORK)).start();
+        CLUSTER = 
disableBackgroundReconciler(Cluster.build(NODES).withConfig(config -> 
config.set("hinted_handoff_enabled", 
false).with(GOSSIP).with(NETWORK))).start();
 
         for (ReplicationType replicationType : ReplicationType.values())
         {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java
index c2b99267bd..85532e744a 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.distributed.test.tracking;
 
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -49,9 +50,15 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
+import static 
org.apache.cassandra.distributed.test.tracking.MutationTrackingReadReconciliationTest.awaitNodeAlive;
+import static 
org.apache.cassandra.distributed.test.tracking.MutationTrackingReadReconciliationTest.awaitNodeDead;
+import static 
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.assertMatchingSummaryIdSpaceForKey;
 import static 
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.getOnlyLogId;
+import static 
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.numLogReconciliations;
+import static 
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.summaryForKey;
 import static 
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.summaryIdSpace;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 // TODO This test would be a lot faster if it had a shared cluster
@@ -436,4 +443,162 @@ public class MutationTrackingTest extends TestBaseImpl
                 }), 10);
         }
     }
+
+    @Test
+    public void testBackgroundPullReconciliation() throws Throwable
+    {
+        try (Cluster cluster = Cluster.build(3)
+                                      .withConfig(cfg -> 
cfg.with(Feature.NETWORK)
+                                                            
.with(Feature.GOSSIP)
+                                                            
.set("write_request_timeout", "1000ms"))
+                                      .start())
+        {
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH 
replication = " +
+                                              "{'class': 'SimpleStrategy', 
'replication_factor': 3} " +
+                                              "AND 
replication_type='tracked'"));
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int 
PRIMARY KEY, v int)"));
+
+            // 1. Partition node 3 and pause push-side retries on all nodes
+            cluster.filters().allVerbs().to(3).drop();
+            cluster.filters().allVerbs().from(3).drop();
+            for (int i = 1; i <= 2; i++)
+                cluster.get(i).runOnInstance(() -> 
Gossiper.instance.convict(InetAddressAndPort.getByNameUnchecked("127.0.0.3"), 
Double.MAX_VALUE));
+
+            for (int i = 1; i <= 3; i++)
+                cluster.get(i).runOnInstance(() -> {
+                    MutationTrackingService.instance().pauseActiveReconciler();
+                    
MutationTrackingService.instance().pauseBackgroundReconciler();
+                });
+
+            // wait until node 1 marks node 3 as dead
+            awaitNodeDead(cluster.get(1), cluster.get(3));
+
+            // 2. Write at QUORUM - succeeds on nodes 1, 2 but node 3 won't 
get the write
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl 
(k, v) VALUES (1, 1)"),
+                                           ConsistencyLevel.QUORUM);
+
+            // Sleep one second until write timeout elapses
+            TimeUnit.SECONDS.sleep(1);
+
+            // 3. Capture expected state from node 1
+            MutationSummary expected = summaryForKey(cluster.get(1), KEYSPACE, 
"tbl", /* key */1);
+
+            // 4. Ensure node 3 does NOT have the mutation yet
+            cluster.get(3).runOnInstance(() -> {
+                TableMetadata table = 
Schema.instance.getTableMetadata(KEYSPACE, "tbl");
+                assertNotNull(table);
+                DecoratedKey dk = 
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+                MutationSummary summary = MutationTrackingService.instance()
+                                                                 
.createSummaryForKey(dk, table.id, false);
+                assertTrue("Node 3 should have no mutations yet", 
summary.isEmpty());
+            });
+
+            // 5. Reset state for node 3, let broadcasts propagate
+            cluster.filters().reset();
+            awaitNodeAlive(cluster.get(1), cluster.get(3));
+            awaitNodeAlive(cluster.get(3), cluster.get(1));
+
+            // Now broadcast offsets so node 3 learns what nodes 1 and 2 have
+            for (int i = 1; i <= 3; i++)
+                cluster.get(i).runOnInstance(() -> 
MutationTrackingService.instance().broadcastOffsetsForTesting());
+
+            cluster.get(1).runOnInstance(() -> 
MutationTrackingService.instance().resumeActiveReconciler());
+
+            // 6. Trigger the background reconciler on node 3 ONLY (no reads, 
no push retries)
+            cluster.get(3).runOnInstance(() -> {
+                
MutationTrackingService.instance().resumeBackgroundReconciler();
+                MutationTrackingService.instance().reconcileForTesting();
+            });
+
+            // 7. Wait for the pull request to be processed and mutation to 
arrive
+            TimeUnit.SECONDS.sleep(2);
+
+            // Broadcast again so reconciliation state converges
+            for (int i = 1; i <= 3; i++)
+                cluster.get(i).runOnInstance(() -> 
MutationTrackingService.instance().broadcastOffsetsForTesting());
+
+            // 8. Verify node 3 now has the mutation (pulled via background 
reconciler)
+            assertMatchingSummaryIdSpaceForKey(cluster.get(3), KEYSPACE, 
"tbl", /* key */1, expected);
+
+            // 9. Verify no read reconciliation was triggered
+            assertEquals(0, numLogReconciliations(cluster.get(3)));
+        }
+    }
+
+    @Test
+    public void testBackgroundPullReconciliationWhenCoordinatorDown() throws 
Throwable
+    {
+        try (Cluster cluster = Cluster.build(3).withConfig(cfg -> 
cfg.with(Feature.NETWORK)
+                                                                     
.with(Feature.GOSSIP)
+                                                                     
.set("write_request_timeout", "1000ms"))
+                                      .start())
+        {
+            cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH 
replication = " +
+                                              "{'class': 'SimpleStrategy', 
'replication_factor': 3} " +
+                                              "AND 
replication_type='tracked'"));
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int 
PRIMARY KEY, v int)"));
+
+            // 1. Pause push-side retires and background reconciler on all 
nodes
+            for (int i = 1; i <= 3; i++)
+                cluster.get(i).runOnInstance(() -> {
+                    MutationTrackingService.instance().pauseActiveReconciler();
+                    
MutationTrackingService.instance().pauseBackgroundReconciler();
+                });
+
+            // 2. Partition node 3, then write at QUORUM from coordinator 
(node 1)
+            cluster.filters().allVerbs().to(3).drop();
+            cluster.filters().allVerbs().from(3).drop();
+            awaitNodeDead(cluster.get(1), cluster.get(3));
+
+            cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl 
(k, v) VALUES (1, 1)"),
+                                           ConsistencyLevel.QUORUM);
+
+            // Sleep one second until write timeout elapses
+            TimeUnit.SECONDS.sleep(1);
+
+            // 3. Capture expected state from node 1 (before we partition it)
+            MutationSummary expected = summaryForKey(cluster.get(1), KEYSPACE, 
"tbl", 1);
+
+            // 4. Heal node 3, then partition node 1 (the coordinator)
+            cluster.filters().reset();
+            awaitNodeAlive(cluster.get(2), cluster.get(3));
+            awaitNodeAlive(cluster.get(3), cluster.get(2));
+
+            cluster.filters().allVerbs().to(1).drop();
+            cluster.filters().allVerbs().from(1).drop();
+            for (int i = 2; i <= 3; i++)
+                cluster.get(i).runOnInstance(() -> 
Gossiper.instance.convict(InetAddressAndPort.getByNameUnchecked("127.0.0.1"), 
Double.MAX_VALUE));
+            awaitNodeDead(cluster.get(3), cluster.get(1));
+            awaitNodeDead(cluster.get(2), cluster.get(1));
+
+            // 5. Broadcast offsets between nodes 2 and 3 only (Node 3 learns 
what node 2 has witnessed
+            // should discover the gap)
+            for (int i = 2; i <= 3; i++)
+                cluster.get(i).runOnInstance(() -> 
MutationTrackingService.instance().broadcastOffsetsForTesting());
+
+            // 6. Trigger background reconciler on node 3. Coordinator is 
down, so it is expected to fallback to node 2
+            //    (the remaining alive replica)
+            cluster.get(3).runOnInstance(() -> {
+                
MutationTrackingService.instance().resumeBackgroundReconciler();
+                MutationTrackingService.instance().reconcileForTesting();
+            });
+
+            // 7. Resume the active reconciler on instance 2, so whenever 
instance 2 receives the request
+            //    it will process the PULL_MUTATIONS_REQ verb
+            cluster.get(2).runOnInstance(() -> 
MutationTrackingService.instance().resumeActiveReconciler());
+
+            // Wait for the pull request to be processed and mutation to arrive
+            TimeUnit.SECONDS.sleep(2);
+
+            // Broadcast again so reconciliation state converges
+            for (int i = 2; i <= 3; i++)
+                cluster.get(i).runOnInstance(() -> 
MutationTrackingService.instance().broadcastOffsetsForTesting());
+
+            // 8. Verify node 3 pulled the mutation from node 2 (the fallback 
replica)
+            assertMatchingSummaryIdSpaceForKey(cluster.get(3), KEYSPACE, 
"tbl", 1, expected);
+
+            // 9. Verify no read reconciliation was involved
+            assertEquals(0, numLogReconciliations(cluster.get(3)));
+        }
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
index b14ec37285..f43009d363 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
@@ -43,9 +43,9 @@ public class OffsetBroadcastTest extends TestBaseImpl
     @Test
     public void testBroadcastOffsets() throws Throwable
     {
-        try (Cluster cluster = Cluster.build(3)
-                                      .withConfig(cfg -> 
cfg.with(Feature.NETWORK).with(Feature.GOSSIP))
-                                      .start())
+        try (Cluster cluster = disableBackgroundReconciler(Cluster.build(3)
+                                                                  
.withConfig(cfg -> cfg.with(Feature.NETWORK).with(Feature.GOSSIP)))
+                                                                  .start())
         {
 
             cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH 
replication = " +
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java
index f98b602943..72b8eaabf9 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java
@@ -163,7 +163,7 @@ public class TrackedImportFailureTest extends 
TrackedTransferTestBase
     {
         ActivationRequest.Phase phase = COMMIT;
         int MISSED_ACTIVATION = 2;
-        try (Cluster cluster = 
cluster(TrackedTransferTestBase.ByteBuddyInjections.SkipActivation.install(MISSED_ACTIVATION)))
+        try (Cluster cluster = 
disableBackgroundReconciler(cluster(TrackedTransferTestBase.ByteBuddyInjections.SkipActivation.install(MISSED_ACTIVATION))))
         {
             
TrackedTransferTestBase.ByteBuddyInjections.SkipActivation.setup(cluster, 
phase);
             createSchema(cluster);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedNonZeroCopyRepairTransferTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedNonZeroCopyRepairTransferTest.java
index 710e4e01f2..e22c57e5d7 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedNonZeroCopyRepairTransferTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedNonZeroCopyRepairTransferTest.java
@@ -27,6 +27,6 @@ public class TrackedNonZeroCopyRepairTransferTest extends 
TrackedRepairTransferS
     @BeforeClass
     public static void setup() throws IOException
     {
-        cluster = cluster(NON_ZCS_CONFIG);
+        cluster = disableBackgroundReconciler(cluster(NON_ZCS_CONFIG));
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedRepairFailureTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedRepairFailureTest.java
index eb36f1fc00..1a4cd5c374 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedRepairFailureTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedRepairFailureTest.java
@@ -65,7 +65,7 @@ public class TrackedRepairFailureTest extends 
TrackedRepairTransferTestBase
     @Test
     public void testFullRepairPartiallyCompleteAnomaly() throws IOException, 
ExecutionException, InterruptedException, TimeoutException
     {
-        try (Cluster cluster = cluster(StreamReceiverFailureHelper::install))
+        try (Cluster cluster = 
disableBackgroundReconciler(cluster(StreamReceiverFailureHelper::install)))
         {
             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3} AND 
replication_type='tracked';");
             String TABLE_SCHEMA_CQL = "CREATE TABLE " + 
tableWithKeyspace(KEYSPACE) + " (k INT PRIMARY KEY, v INT)";
@@ -192,7 +192,7 @@ public class TrackedRepairFailureTest extends 
TrackedRepairTransferTestBase
     @Test
     public void testFullRepairCleanupOnFailure() throws IOException, 
ExecutionException, InterruptedException, TimeoutException
     {
-        try (Cluster cluster = cluster(StreamReceiverFailureHelper::install))
+        try (Cluster cluster = 
disableBackgroundReconciler(cluster(StreamReceiverFailureHelper::install)))
         {
             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3} AND 
replication_type='tracked';");
             String TABLE_SCHEMA_CQL = "CREATE TABLE " + KEYSPACE + '.' + TABLE 
+ " (k INT PRIMARY KEY, v INT)";
@@ -259,7 +259,7 @@ public class TrackedRepairFailureTest extends 
TrackedRepairTransferTestBase
     @Test
     public void testRepairFailsOnMissedActivation() throws IOException
     {
-        try (Cluster cluster = 
cluster(ByteBuddyInjections.SkipActivation.install(2, 3)))
+        try (Cluster cluster = 
disableBackgroundReconciler(cluster(ByteBuddyInjections.SkipActivation.install(2,
 3))))
         {
             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3} AND 
replication_type='tracked';");
             cluster.schemaChange("CREATE TABLE " + tableWithKeyspace(KEYSPACE) 
+ " (k INT PRIMARY KEY, v INT)");
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java
index 002dcef5f9..513dcd35eb 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java
@@ -88,8 +88,10 @@ public abstract class TrackedTransferTestBase extends 
TestBaseImpl
                                                                      
.set("repair_request_timeout", "2s")
                                                                      
.set("stream_transfer_task_timeout", "10s");
 
-    protected static final Consumer<IInstanceConfig> ZCS_CONFIG = 
CONFIG.andThen(cfg -> cfg.set("stream_entire_sstables", true));
-    protected static final Consumer<IInstanceConfig> NON_ZCS_CONFIG = 
CONFIG.andThen(cfg -> cfg.set("stream_entire_sstables", false));
+    protected static final Consumer<IInstanceConfig> ZCS_CONFIG = 
CONFIG.andThen(cfg -> cfg.set("stream_entire_sstables", true)
+                                                                               
            .set("mutation_tracking.background_reconciliation_enabled", false));
+    protected static final Consumer<IInstanceConfig> NON_ZCS_CONFIG = 
CONFIG.andThen(cfg -> cfg.set("stream_entire_sstables", false)
+                                                                               
                .set("mutation_tracking.background_reconciliation_enabled", 
false));
 
     protected static final 
IIsolatedExecutor.SerializableConsumer<SSTableReader> TRANSFERS_EXIST = sstable 
-> {
         
Assertions.assertThat(sstable.getCoordinatorLogOffsets().transfers()).isNotEmpty();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedZeroCopyRepairTransferTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedZeroCopyRepairTransferTest.java
index 75c95b4e3f..a3642729a2 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedZeroCopyRepairTransferTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedZeroCopyRepairTransferTest.java
@@ -27,6 +27,6 @@ public class TrackedZeroCopyRepairTransferTest extends 
TrackedRepairTransferSucc
     @BeforeClass
     public static void setup() throws IOException
     {
-        cluster = cluster(ZCS_CONFIG);
+        cluster = disableBackgroundReconciler(cluster(ZCS_CONFIG));
     }
 }
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/MTAdminTest.java 
b/test/unit/org/apache/cassandra/tools/nodetool/MTAdminTest.java
new file mode 100644
index 0000000000..1e9c44485f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/nodetool/MTAdminTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.cassandra.tools.NodeProbe;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class MTAdminTest
+{
+    @Mock
+    private NodeProbe probe;
+
+    @Mock
+    private PrintStream out;
+
+    private MTAdmin.SetConfig cmd;
+
+    @Before
+    public void setUp()
+    {
+        MockitoAnnotations.initMocks(this);
+        when(probe.isMutationTrackingDisabled()).thenReturn(false);
+        cmd = new MTAdmin.SetConfig();
+        cmd.out = out;
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNoArgs()
+    {
+        cmd.execute(probe);
+    }
+
+    @Test
+    public void testMutationTrackingDisabled()
+    {
+        when(probe.isMutationTrackingDisabled()).thenReturn(true);
+        cmd.args = List.of("background_reconciliation_enabled", "true");
+
+        cmd.execute(probe);
+
+        verify(out, times(1)).println("Mutation tracking is not enabled");
+        verify(probe, 
times(0)).setMutationTrackingBackgroundReconciliationEnabled(true);
+    }
+
+    @Test
+    public void testSetBackgroundReconciliationEnabled()
+    {
+        cmd.args = List.of("background_reconciliation_enabled", "true");
+
+        cmd.execute(probe);
+
+        verify(probe, 
times(1)).setMutationTrackingBackgroundReconciliationEnabled(true);
+    }
+
+    @Test
+    public void testSetBackgroundReconciliationDisabled()
+    {
+        cmd.args = List.of("background_reconciliation_enabled", "false");
+
+        cmd.execute(probe);
+
+        verify(probe, 
times(1)).setMutationTrackingBackgroundReconciliationEnabled(false);
+    }
+
+    @Test
+    public void testSetBackgroundReconciliationIntervalMs()
+    {
+        cmd.args = List.of("background_reconciliation_interval_ms", "5000");
+
+        cmd.execute(probe);
+
+        verify(probe, 
times(1)).setMutationTrackingBackgroundReconciliationIntervalMilliseconds(5000L);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidParamType()
+    {
+        cmd.args = List.of("unknown_param", "value");
+
+        cmd.execute(probe);
+    }
+
+    @Test(expected = NumberFormatException.class)
+    public void testInvalidIntervalValue()
+    {
+        cmd.args = List.of("background_reconciliation_interval_ms", 
"not_a_number");
+
+        cmd.execute(probe);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to