This is an automated email from the ASF dual-hosted git repository.
frankgh pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new c6814f3c69 CEP-45: Background reconciliation - replica
c6814f3c69 is described below
commit c6814f3c69278ca7e2a7f5f44056f9759bce6d63
Author: Francisco Guerrero <[email protected]>
AuthorDate: Sun Apr 19 13:06:45 2026 -0700
CEP-45: Background reconciliation - replica
patch by Francisco Guerrero; reviewed by Blake Eggleston for CASSANDRA-20378
---
.../cassandra/config/DatabaseDescriptor.java | 5 +
.../cassandra/config/MutationTrackingSpec.java | 8 +
.../cassandra/replication/ActiveLogReconciler.java | 1 +
.../cassandra/replication/CoordinatorLog.java | 33 ++++
.../cassandra/replication/MutationSummary.java | 4 +-
.../replication/MutationTrackingService.java | 176 ++++++++++++++++++++-
.../replication/MutationTrackingServiceMBean.java | 49 ++++++
.../org/apache/cassandra/replication/Shard.java | 16 ++
src/java/org/apache/cassandra/tools/NodeProbe.java | 32 ++++
.../apache/cassandra/tools/nodetool/MTAdmin.java | 112 +++++++++++++
.../cassandra/tools/nodetool/NodetoolCommand.java | 1 +
.../distributed/test/ReadRepairQueryTester.java | 6 +-
.../distributed/test/ReadRepairTestBase.java | 26 +--
.../cassandra/distributed/test/TestBaseImpl.java | 31 ++++
.../test/cql3/MultiNodeTableWalkBase.java | 3 +-
.../distributed/test/cql3/StatefulASTBase.java | 3 +-
.../test/sai/PartialUpdateHandlingTest.java | 2 +-
.../test/tracking/MutationTrackingTest.java | 165 +++++++++++++++++++
.../test/tracking/OffsetBroadcastTest.java | 6 +-
.../test/tracking/TrackedImportFailureTest.java | 2 +-
.../TrackedNonZeroCopyRepairTransferTest.java | 2 +-
.../test/tracking/TrackedRepairFailureTest.java | 6 +-
.../test/tracking/TrackedTransferTestBase.java | 6 +-
.../TrackedZeroCopyRepairTransferTest.java | 2 +-
.../cassandra/tools/nodetool/MTAdminTest.java | 117 ++++++++++++++
25 files changed, 782 insertions(+), 32 deletions(-)
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 344c2b509e..86ca89b02c 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -6059,6 +6059,11 @@ public class DatabaseDescriptor
}
}
+ public static MutationTrackingSpec getMutationTrackingConfig()
+ {
+ return conf.mutation_tracking;
+ }
+
public static boolean getMutationTrackingEnabled()
{
return conf.mutation_tracking.enabled;
diff --git a/src/java/org/apache/cassandra/config/MutationTrackingSpec.java
b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java
index d9b04a0f40..30f8acaef5 100644
--- a/src/java/org/apache/cassandra/config/MutationTrackingSpec.java
+++ b/src/java/org/apache/cassandra/config/MutationTrackingSpec.java
@@ -22,4 +22,12 @@ public class MutationTrackingSpec
{
public boolean enabled = false;
public String journal_directory;
+ /**
+ * Whether the background reconciliation process is enabled
+ */
+ public volatile boolean background_reconciliation_enabled = true;
+ /**
+ * The interval in which the backgroun reconciliation process runs
+ */
+ public volatile DurationSpec.LongMillisecondsBound
background_reconciliation_interval = new
DurationSpec.LongMillisecondsBound("1s");
}
diff --git a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java
b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java
index 38fd276c00..37c5963138 100644
--- a/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java
+++ b/src/java/org/apache/cassandra/replication/ActiveLogReconciler.java
@@ -173,6 +173,7 @@ public final class ActiveLogReconciler implements
Shutdownable
MutationTrackingService.instance().retryFailedWrite(mutationId,
toHost, failureReason);
}
+ @Override
void send()
{
RecordPointer pointer =
MutationJournal.instance().lookUp(mutationId);
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index a94dbd4af6..317a3e6eea 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -271,6 +271,39 @@ public abstract class CoordinatorLog
}
}
+ /**
+ * @return the computed union of remote-witnessed offsets minus
local-witnessed offsets
+ */
+ @Nullable
+ Offsets.Immutable collectLocallyMissingOffsets()
+ {
+ lock.readLock().lock();
+ try
+ {
+ Offsets.Mutable local = witnessedOffsets.get(localNodeId);
+ Offsets.Immutable.Builder missing = null;
+ for (int i = 0; i < participants.size(); i++)
+ {
+ int nodeId = participants.get(i);
+ if (nodeId == localNodeId) continue;
+ Offsets.Immutable diff =
Offsets.Immutable.difference(witnessedOffsets.get(nodeId), local);
+ if (!diff.isEmpty())
+ {
+ if (missing == null)
+ {
+ missing = new Offsets.Immutable.Builder(logId);
+ }
+ missing.addAll(diff);
+ }
+ }
+ return missing != null ? missing.build() : null;
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
Offsets.Immutable collectReconciledOffsets()
{
lock.readLock().lock();
diff --git a/src/java/org/apache/cassandra/replication/MutationSummary.java
b/src/java/org/apache/cassandra/replication/MutationSummary.java
index c48f533313..f1e0cc7c54 100644
--- a/src/java/org/apache/cassandra/replication/MutationSummary.java
+++ b/src/java/org/apache/cassandra/replication/MutationSummary.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.agrona.collections.Long2ObjectHashMap;
@@ -274,7 +275,8 @@ public class MutationSummary
return summaries.size();
}
- boolean isEmpty()
+ @VisibleForTesting
+ public boolean isEmpty()
{
return size() == 0;
}
diff --git
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 9854c31a9f..bee2dd2229 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -49,6 +49,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DurationSpec;
+import org.apache.cassandra.config.MutationTrackingSpec;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
@@ -87,6 +89,7 @@ import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.ownership.ReplicaGroups;
import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MBeanWrapper;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
@@ -96,19 +99,25 @@ import static
org.apache.cassandra.cql3.QueryProcessor.executeInternal;
// TODO (expected): persistence (handle restarts)
// TODO (expected): handle topology changes
-public class MutationTrackingService
+public class MutationTrackingService implements MutationTrackingServiceMBean
{
+ public static final String MBEAN_NAME =
"org.apache.cassandra.db:type=MutationTrackingService";
public static final String DISABLED_MESSAGE = "Mutation tracking is not
enabled. (See mutation_tracking.enabled in cassandra.yaml)";
private static final MutationTrackingService instance;
private static final ScheduledExecutorPlus executor;
+ private static final MutationTrackingSpec config;
+
static
{
- if (DatabaseDescriptor.getMutationTrackingEnabled())
+ config = DatabaseDescriptor.getMutationTrackingConfig();
+
+ if (config.enabled)
{
instance = new MutationTrackingService();
executor =
executorFactory().scheduled("Mutation-Tracking-Service", NORMAL);
+ MBeanWrapper.instance.registerMBean(instance, MBEAN_NAME);
}
else
{
@@ -129,12 +138,12 @@ public class MutationTrackingService
public static boolean isEnabled()
{
- return DatabaseDescriptor.getMutationTrackingEnabled();
+ return config.enabled;
}
public static void ensureEnabled()
{
- if (!DatabaseDescriptor.getMutationTrackingEnabled())
+ if (!config.enabled)
throw new IllegalStateException(DISABLED_MESSAGE);
}
@@ -188,6 +197,7 @@ public class MutationTrackingService
private final ReplicatedOffsetsBroadcaster offsetsBroadcaster = new
ReplicatedOffsetsBroadcaster();
private final LogStatePersister offsetsPersister = new LogStatePersister();
private final ActiveLogReconciler activeReconciler = new
ActiveLogReconciler();
+ private final BackgroundReconciler backgroundReconciler = new
BackgroundReconciler();
private final IncomingMutations incomingMutations = new
IncomingMutations();
private final OutgoingMutations outgoingMutations = new
OutgoingMutations();
@@ -223,14 +233,53 @@ public class MutationTrackingService
onNewClusterMetadata(null, metadata);
+ if (!keyspaceShards.isEmpty() &&
!config.background_reconciliation_enabled)
+
logBackgroundReconciliationDisabledWarning(keyspaceShards.keySet());
+
offsetsBroadcaster.start();
offsetsPersister.start();
+ backgroundReconciler.start();
ExpiredStatePurger.instance.register(incomingMutations);
started = true;
}
+ @Override
+ public void setMutationTrackingBackgroundReconciliationEnabled(boolean
enabled)
+ {
+ if (enabled != config.background_reconciliation_enabled)
+ {
+ logger.info("{} mutation tracking background reconciliation",
enabled ? "Enabling" : "Disabling");
+ config.background_reconciliation_enabled = enabled;
+ }
+ }
+
+ @Override
+ public boolean getMutationTrackingBackgroundReconciliationEnabled()
+ {
+ return config.background_reconciliation_enabled;
+ }
+
+ @Override
+ public void
setMutationTrackingBackgroundReconciliationIntervalMilliseconds(long
intervalMilliseconds)
+ {
+ if (intervalMilliseconds !=
config.background_reconciliation_interval.toMilliseconds())
+ {
+ DurationSpec.LongMillisecondsBound
backgroundReconciliationInterval =
+ new DurationSpec.LongMillisecondsBound(intervalMilliseconds,
TimeUnit.MILLISECONDS);
+ logger.info("Setting mutation tracking background reconciliation
interval from {} to {}",
+ config.background_reconciliation_interval,
backgroundReconciliationInterval);
+ config.background_reconciliation_interval =
backgroundReconciliationInterval;
+ }
+ }
+
+ @Override
+ public long
getMutationTrackingBackgroundReconciliationIntervalMilliseconds()
+ {
+ return config.background_reconciliation_interval.toMilliseconds();
+ }
+
public void pauseOffsetBroadcast(boolean pause)
{
offsetsBroadcaster.pauseOffsetBroadcast(pause);
@@ -862,6 +911,14 @@ public class MutationTrackingService
// recalculating the shards will repopulate this via the existing
callbacks
log2ShardMap = new ConcurrentHashMap<>();
keyspaceShards = applyUpdatedMetadata(keyspaceShards, prev, next,
this::nextLogId, this::onNewLog);
+
+ if (!config.background_reconciliation_enabled)
+ {
+ Set<String> newKeyspaces = new
HashSet<>(keyspaceShards.keySet());
+ newKeyspaces.removeAll(originalKeyspaceShards.keySet());
+ if (!newKeyspaces.isEmpty())
+ logBackgroundReconciliationDisabledWarning(newKeyspaces);
+ }
}
catch (Throwable t)
{
@@ -1047,6 +1104,12 @@ public class MutationTrackingService
return unwrapped;
}
+ private void logBackgroundReconciliationDisabledWarning(Set<String>
keyspaces)
+ {
+ logger.warn("Background reconciliation is disabled but mutation
tracking keyspaces exist: {}. " +
+ "Unreconciled mutations will not be automatically repaired
in the background.", keyspaces);
+ }
+
public static class KeyspaceShards
{
private enum UpdateDecision
@@ -1374,6 +1437,93 @@ public class MutationTrackingService
return rows.one().getInt("host_log_id");
}
+ private static class BackgroundReconciler
+ {
+ void start()
+ {
+ scheduleNext();
+ }
+
+ private void scheduleNext()
+ {
+ long intervalMillis =
config.background_reconciliation_interval.toMilliseconds();
+ executor.schedule(this::runAndReschedule, intervalMillis,
TimeUnit.MILLISECONDS);
+ }
+
+ private void runAndReschedule()
+ {
+ try
+ {
+ run();
+ }
+ finally
+ {
+ scheduleNext();
+ }
+ }
+
+ void run()
+ {
+ MutationTrackingService.instance().forEachKeyspace(this::run);
+ }
+
+ private void run(KeyspaceShards shards)
+ {
+ if (config.background_reconciliation_enabled)
+ shards.forEachShard(this::run);
+ }
+
+ private void run(Shard shard)
+ {
+ try
+ {
+ List<Offsets.Immutable> missing =
shard.collectLocallyMissingOffsets();
+ if (missing.isEmpty()) return;
+
+ for (Offsets.Immutable offsets : missing)
+ {
+ // Prefer pulling from the coordinator
+ int coordinatorHostId = offsets.logId().hostId();
+ InetAddressAndPort coordinator =
ClusterMetadata.current().directory.endpoint(new NodeId(coordinatorHostId));
+ InetAddressAndPort pullFrom =
FailureDetector.instance.isAlive(coordinator)
+ ? coordinator
+ : findAliveReplica(shard,
coordinatorHostId);
+ if (pullFrom == null)
+ {
+ logger.debug("No coordinator or replica is available
to process the pull mutation request for missing offset {}",
+ offsets);
+ continue; // No reachable source
+ }
+
+ // TODO (expected): backoff, rate limits, per host and
total
+ PullMutationsRequest request = new
PullMutationsRequest(offsets);
+ logger.trace("Requesting pull mutation request from
replica {} for missing offset {}", pullFrom, offsets);
+
MessagingService.instance().send(Message.out(Verb.PULL_MUTATIONS_REQ, request),
pullFrom);
+ }
+ }
+ catch (Throwable throwable)
+ {
+ // Avoid throwing an exception in the reconciliation step to
prevent the scheduled task from
+ // being killed
+ logger.error("Exception encountered during background
reconciliation of shard={}", shard, throwable);
+ }
+ }
+
+ private InetAddressAndPort findAliveReplica(Shard shard, int
excludeHostId)
+ {
+ for (InetAddressAndPort replica : shard.remoteReplicas())
+ {
+ int replicaId =
ClusterMetadata.current().directory.peerId(replica).id();
+ if (replicaId != excludeHostId &&
FailureDetector.instance.isAlive(replica))
+ {
+ logger.trace("Found alive replica {} with replica id {}",
replica, replicaId);
+ return replica;
+ }
+ }
+ return null;
+ }
+ }
+
// TODO (later): a more intelligent heuristic for offsets included in
broadcasts
private static class ReplicatedOffsetsBroadcaster
{
@@ -1509,6 +1659,24 @@ public class MutationTrackingService
activeReconciler.resumeRegularPriorityForTesting();
}
+ @VisibleForTesting
+ public void reconcileForTesting()
+ {
+ backgroundReconciler.run();
+ }
+
+ @VisibleForTesting
+ public void pauseBackgroundReconciler()
+ {
+ config.background_reconciliation_enabled = false;
+ }
+
+ @VisibleForTesting
+ public void resumeBackgroundReconciler()
+ {
+ config.background_reconciliation_enabled = true;
+ }
+
@VisibleForTesting
public static class TestAccess
{
diff --git
a/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java
b/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java
new file mode 100644
index 0000000000..a64f09da2b
--- /dev/null
+++
b/src/java/org/apache/cassandra/replication/MutationTrackingServiceMBean.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+/**
+ * MBean exposing functionality for the Mutation Tracking service
+ */
+public interface MutationTrackingServiceMBean
+{
+ /**
+ * Sets the background reconciliation state to enabled/disabled based on
the {@param enabled} parameter
+ *
+ * @param enabled whether the background reconciliation is enabled or
disabled for the mutation tracking service
+ */
+ void setMutationTrackingBackgroundReconciliationEnabled(boolean enabled);
+
+ /**
+ * @return the state of the background reconciliation for the mutation
tracking service
+ */
+ boolean getMutationTrackingBackgroundReconciliationEnabled();
+
+ /**
+ * Sets the background reconciliation interval to the provided {@param
intervalMilliseconds} value
+ *
+ * @param intervalMilliseconds the interval value in milliseconds
+ */
+ void setMutationTrackingBackgroundReconciliationIntervalMilliseconds(long
intervalMilliseconds);
+
+ /**
+ * @return the interval, in milliseconds, in which the background
reconciliation runs when enabled
+ */
+ long getMutationTrackingBackgroundReconciliationIntervalMilliseconds();
+}
diff --git a/src/java/org/apache/cassandra/replication/Shard.java
b/src/java/org/apache/cassandra/replication/Shard.java
index 265cc8b8f3..dd4e349023 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -299,6 +299,22 @@ public class Shard
return new BroadcastLogOffsets(keyspace, range, offsets, durable);
}
+ /**
+ * @return the list of the collected locally missing offsets for the logs
owned by this coordinator on
+ * this shard
+ */
+ List<Offsets.Immutable> collectLocallyMissingOffsets()
+ {
+ List<Offsets.Immutable> result = new ArrayList<>(logs.size());
+ for (CoordinatorLog log : logs.values())
+ {
+ Offsets.Immutable missing = log.collectLocallyMissingOffsets();
+ if (missing != null)
+ result.add(missing);
+ }
+ return result;
+ }
+
void collectDurablyReconciledOffsets(Log2OffsetsMap.Mutable into)
{
logs.values().forEach(log ->
log.collectDurablyReconciledOffsets(into));
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 567df09283..c4d0d17400 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -119,6 +119,8 @@ import org.apache.cassandra.metrics.ThreadPoolMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.MessagingServiceMBean;
import org.apache.cassandra.profiler.AsyncProfilerMBean;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.replication.MutationTrackingServiceMBean;
import org.apache.cassandra.service.ActiveRepairServiceMBean;
import org.apache.cassandra.service.AsyncProfilerService;
import org.apache.cassandra.service.AutoRepairService;
@@ -190,6 +192,7 @@ public class NodeProbe implements AutoCloseable
protected PermissionsCacheMBean pcProxy;
protected RolesCacheMBean rcProxy;
protected AutoRepairServiceMBean autoRepairProxy;
+ protected MutationTrackingServiceMBean mutationTrackingProxy;
protected AsyncProfilerMBean asyncProfilerProxy;
protected GuardrailsMBean grProxy;
protected volatile Output output;
@@ -339,6 +342,10 @@ public class NodeProbe implements AutoCloseable
name = new ObjectName(AutoRepairService.MBEAN_NAME);
autoRepairProxy = JMX.newMBeanProxy(mbeanServerConn, name,
AutoRepairServiceMBean.class);
+ name = new ObjectName(MutationTrackingService.MBEAN_NAME);
+ if (mbeanServerConn.isRegistered(name))
+ mutationTrackingProxy = JMX.newMBeanProxy(mbeanServerConn,
name, MutationTrackingServiceMBean.class);
+
name = new ObjectName(AsyncProfilerService.MBEAN_NAME);
asyncProfilerProxy = JMX.newMBeanProxy(mbeanServerConn, name,
AsyncProfilerMBean.class);
@@ -2847,6 +2854,31 @@ public class NodeProbe implements AutoCloseable
throw new IOException("Invalid keyspace or table name", e);
}
}
+
+ public boolean isMutationTrackingDisabled()
+ {
+ return mutationTrackingProxy == null;
+ }
+
+ public boolean getMutationTrackingBackgroundReconciliationEnabled()
+ {
+ return
mutationTrackingProxy.getMutationTrackingBackgroundReconciliationEnabled();
+ }
+
+ public void setMutationTrackingBackgroundReconciliationEnabled(boolean
enabled)
+ {
+
mutationTrackingProxy.setMutationTrackingBackgroundReconciliationEnabled(enabled);
+ }
+
+ public long
getMutationTrackingBackgroundReconciliationIntervalMilliseconds()
+ {
+ return
mutationTrackingProxy.getMutationTrackingBackgroundReconciliationIntervalMilliseconds();
+ }
+
+ public void
setMutationTrackingBackgroundReconciliationIntervalMilliseconds(long
intervalMilliseconds)
+ {
+
mutationTrackingProxy.setMutationTrackingBackgroundReconciliationIntervalMilliseconds(intervalMilliseconds);
+ }
}
class ColumnFamilyStoreMBeanIterator implements Iterator<Map.Entry<String,
ColumnFamilyStoreMBean>>
diff --git a/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java
b/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java
new file mode 100644
index 0000000000..5e8e3ea50a
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/MTAdmin.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.cassandra.tools.NodeProbe;
+
+import picocli.CommandLine.Command;
+import picocli.CommandLine.Parameters;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "mtadmin",
+ description = "Manage mutation tracking",
+ subcommands = { MTAdmin.GetConfig.class,
+ MTAdmin.SetConfig.class })
+public class MTAdmin extends AbstractCommand
+{
+ @Override
+ protected void execute(NodeProbe probe)
+ {
+ AbstractCommand cmd = new GetConfig();
+ cmd.probe(probe);
+ cmd.logger(output);
+ cmd.run();
+ }
+
+ @Command(name = "getconfig", description = "Print mutation tracking
configurations")
+ public static class GetConfig extends AbstractCommand
+ {
+ @VisibleForTesting
+ protected PrintStream out = System.out;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ if (probe.isMutationTrackingDisabled())
+ {
+ out.println("Mutation tracking is not enabled");
+ return;
+ }
+
+ out.println("background_reconciliation_enabled: " +
probe.getMutationTrackingBackgroundReconciliationEnabled());
+ out.println("background_reconciliation_interval_ms: " +
probe.getMutationTrackingBackgroundReconciliationIntervalMilliseconds());
+ }
+ }
+
+ @Command(name = "setconfig", description = "Sets the mutation tracking
configuration")
+ public static class SetConfig extends AbstractCommand
+ {
+ @VisibleForTesting
+ protected List<String> args = new ArrayList<>();
+
+ @Parameters(index = "0", arity = "0..1", description = { "Mutation
tracking param type.",
+ "Possible
parameters: " +
+
"[background_reconciliation_enabled|background_reconciliation_interval_ms]" })
+ public String paramType;
+
+ @Parameters(index = "1", description = "Mutation tracking param
value", arity = "0..1")
+ public String paramValue;
+
+ @VisibleForTesting
+ protected PrintStream out = System.out;
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ args = args.isEmpty() ? CommandUtils.concatArgs(paramType,
paramValue) : args;
+ checkArgument(args.size() == 2, "mtadmin setconfig requires
param-type and value args.");
+ String type = args.get(0);
+ String value = args.get(1);
+
+ if (probe.isMutationTrackingDisabled())
+ {
+ out.println("Mutation tracking is not enabled");
+ return;
+ }
+
+ switch (type)
+ {
+ case "background_reconciliation_enabled":
+
probe.setMutationTrackingBackgroundReconciliationEnabled(Boolean.parseBoolean(value));
+ break;
+ case "background_reconciliation_interval_ms":
+
probe.setMutationTrackingBackgroundReconciliationIntervalMilliseconds(Long.parseLong(value));
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown parameter: " +
type);
+ }
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
b/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
index af1e835d91..2915e50fbc 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/NodetoolCommand.java
@@ -147,6 +147,7 @@ import static
org.apache.cassandra.tools.nodetool.Help.printTopCommandUsage;
ListSnapshots.class,
Move.Abort.class,
Move.class,
+ MTAdmin.class,
NetStats.class,
PauseHandoff.class,
ProfileLoad.class,
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
index cbf711f203..d4351fecf8 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairQueryTester.java
@@ -120,9 +120,11 @@ public abstract class ReadRepairQueryTester extends
TestBaseImpl
@BeforeClass
public static void setupCluster() throws IOException
{
- cluster = init(Cluster.build(NUM_NODES)
+ // Disable background reconciler for read repair tests to avoid having
the
+ // background reconciler repair before the read repair takes effect
+ cluster = init(disableBackgroundReconciler(Cluster.build(NUM_NODES)
.withConfig(config ->
config.set("read_request_timeout", "1m")
-
.set("write_request_timeout", "1m"))
+
.set("write_request_timeout", "1m")))
.start());
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTestBase.java
index a72971941e..044eef39c9 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTestBase.java
@@ -139,7 +139,8 @@ public abstract class ReadRepairTestBase extends
TestBaseImpl
private void testReadRepair(ReadRepairStrategy strategy, boolean
brrThroughAccord) throws Throwable
{
- try (Cluster cluster = init(Cluster.create(3, c ->
c.with(Feature.GOSSIP, Feature.NETWORK))))
+ try (Cluster cluster = init(Cluster.create(3, c ->
c.with(Feature.GOSSIP, Feature.NETWORK)
+
.set("mutation_tracking.background_reconciliation_enabled", "false"))))
{
createKeyspace(cluster);
TransactionalMode transactionalMode = brrThroughAccord ?
TransactionalMode.test_unsafe_writes : TransactionalMode.off;
@@ -175,7 +176,9 @@ public abstract class ReadRepairTestBase extends
TestBaseImpl
@Test
public void readRepairTimeoutTest() throws Throwable
{
- try (Cluster cluster = init(Cluster.create(3, c ->
c.with(Feature.GOSSIP, Feature.NETWORK)))) {
+ try (Cluster cluster = init(Cluster.create(3, c ->
c.with(Feature.GOSSIP, Feature.NETWORK)
+
.set("mutation_tracking.background_reconciliation_enabled", "false"))))
+ {
final long reducedReadTimeout = 3000L;
createKeyspace(cluster);
cluster.forEach(i -> i.runOnInstance(() ->
DatabaseDescriptor.setReadRpcTimeout(reducedReadTimeout)));
@@ -209,7 +212,7 @@ public abstract class ReadRepairTestBase extends
TestBaseImpl
@Test
public void failingReadRepairTest() throws Throwable
{
- try (Cluster cluster = init(builder().withNodes(3).start()))
+ try (Cluster cluster =
init(disableBackgroundReconciler(builder().withNodes(3)).start()))
{
createKeyspace(cluster);
cluster.schemaChange(withTable("CREATE TABLE %s (pk int, ck int, v
int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'"));
@@ -237,7 +240,7 @@ public abstract class ReadRepairTestBase extends
TestBaseImpl
MutationTrackingUtils.fixmeSkipIfTracked(replicationType(), "Token
moves not supported");
// TODO: rewrite using FuzzTestBase to control progress through
decommission
// TODO: fails with vnode enabled
- try (Cluster cluster = init(Cluster.build(4).withoutVNodes().start(),
3))
+ try (Cluster cluster =
init(disableBackgroundReconciler(Cluster.build(4).withoutVNodes()).start(), 3))
{
List<Token> tokens = cluster.tokens();
@@ -296,7 +299,7 @@ public abstract class ReadRepairTestBase extends
TestBaseImpl
public void alterRFAndRunReadRepair() throws Throwable
{
MutationTrackingUtils.fixmeSkipIfTracked(replicationType(), "RF
changes not supported");
- try (Cluster cluster = builder().withNodes(2).start())
+ try (Cluster cluster =
disableBackgroundReconciler(builder().withNodes(2)).start())
{
cluster.schemaChange(format("CREATE KEYSPACE %s WITH replication =
" +
"{'class': 'SimpleStrategy',
'replication_factor': 1} " +
@@ -358,7 +361,7 @@ public abstract class ReadRepairTestBase extends
TestBaseImpl
*/
private void testRangeSliceQueryWithTombstones(boolean flush) throws
Throwable
{
- try (Cluster cluster = init(Cluster.create(2)))
+ try (Cluster cluster = init(Cluster.create(2, c ->
c.set("mutation_tracking.background_reconciliation_enabled", "false"))))
{
createKeyspace(cluster);
cluster.schemaChange(withTable("CREATE TABLE %s (k int, c int, v
int, PRIMARY KEY(k, c))"));
@@ -425,14 +428,15 @@ public abstract class ReadRepairTestBase extends
TestBaseImpl
MutationTrackingUtils.fixmeSkipIfTracked(replicationType(), "Token
moves not supported");
ExecutorPlus es =
ExecutorFactory.Global.executorFactory().sequential("query-executor");
String key = "test1";
- try (Cluster cluster = init(Cluster.build()
+ try (Cluster cluster = init(disableBackgroundReconciler(Cluster.build()
.withConfig(config ->
config.with(Feature.GOSSIP, Feature.NETWORK)
.set("read_request_timeout", format("%dms", Integer.MAX_VALUE))
.set("native_transport_timeout", format("%dms", Integer.MAX_VALUE))
+
.set("mutation_tracking.background_reconciliation_enabled", "false")
)
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
- .withNodes(3)
+ .withNodes(3))
.start()))
{
createKeyspace(cluster);
@@ -525,7 +529,7 @@ public abstract class ReadRepairTestBase extends
TestBaseImpl
@Test
public void testGCableTombstoneResurrectionOnRangeSliceQuery() throws
Throwable
{
- try (Cluster cluster = init(Cluster.create(2)))
+ try (Cluster cluster = init(Cluster.create(2, c ->
c.set("mutation_tracking.background_reconciliation_enabled", "false"))))
{
createKeyspace(cluster);
cluster.schemaChange(withTable("CREATE TABLE %s (k int, c int,
PRIMARY KEY(k, c)) " +
@@ -563,9 +567,9 @@ public abstract class ReadRepairTestBase extends
TestBaseImpl
@Test
public void partitionDeletionRTTimestampTieTest() throws Throwable
{
- try (Cluster cluster = init(builder()
+ try (Cluster cluster = init(disableBackgroundReconciler(builder()
.withNodes(3)
- .withInstanceInitializer(RRHelper::install)
+
.withInstanceInitializer(RRHelper::install))
.start()))
{
createKeyspace(cluster);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
index 7e91f5a7fb..466e639745 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/TestBaseImpl.java
@@ -83,6 +83,7 @@ import
org.apache.cassandra.distributed.shared.DistributedTestBase;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.replication.MutationTrackingService;
import org.apache.cassandra.service.accord.AccordCache;
import static java.lang.System.currentTimeMillis;
@@ -452,4 +453,34 @@ public class TestBaseImpl extends DistributedTestBase
}
}
}
+
+ /**
+ * Useful for tests that rely on different repair mechanisms, for example
for read repair tests
+ * where the test relies on the read repair machinery, we want to
explicitly disable the
+ * background reconciliation process.
+ *
+ * @param cluster the cluster for the test
+ * @return the cluster with the background reconciliation process disabled
on all instances
+ */
+ public static Cluster disableBackgroundReconciler(Cluster cluster)
+ {
+ for (IInvokableInstance instance : cluster)
+ {
+ instance.runOnInstance(() ->
MutationTrackingService.instance().pauseBackgroundReconciler());
+ }
+ return cluster;
+ }
+
+ /**
+ * Useful for tests that rely on different repair mechanisms, for example
for read repair tests
+ * where the test relies on the read repair machinery, we want to start
the cluster with the
+ * background reconciliation process disabled.
+ *
+ * @param builder the cluster builder
+ * @return the cluster builder with the background reconciliation process
configuration disabled
+ */
+ public static Cluster.Builder disableBackgroundReconciler(Cluster.Builder
builder)
+ {
+ return builder.appendConfig(c ->
c.set("mutation_tracking.background_reconciliation_enabled", "false"));
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
index 9fec6335f0..54ec2916d0 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/MultiNodeTableWalkBase.java
@@ -81,7 +81,8 @@ public abstract class MultiNodeTableWalkBase extends
SingleNodeTableWalkTest
.set("read_request_timeout", "180s")
.set("write_request_timeout", "180s")
.set("native_transport_timeout", "180s")
- .set("slow_query_log_timeout", "180s");
+ .set("slow_query_log_timeout", "180s")
+ .set("mutation_tracking.background_reconciliation_enabled", "false");
}
@Override
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
index 599c278003..d474779093 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/cql3/StatefulASTBase.java
@@ -158,7 +158,8 @@ public class StatefulASTBase extends TestBaseImpl
protected void clusterConfig(IInstanceConfig config)
{
- config.set("repair.retries.max_attempts", Integer.MAX_VALUE);
+ config.set("repair.retries.max_attempts", Integer.MAX_VALUE)
+ .set("mutation_tracking.background_reconciliation_enabled",
"false");
}
protected void clusterInitializer(ClassLoader cl, int node)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java
index 925a42de86..493d47119e 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/sai/PartialUpdateHandlingTest.java
@@ -89,7 +89,7 @@ public class PartialUpdateHandlingTest extends TestBaseImpl
@BeforeClass
public static void setUpCluster() throws IOException
{
- CLUSTER = Cluster.build(NODES).withConfig(config ->
config.set("hinted_handoff_enabled", false).with(GOSSIP).with(NETWORK)).start();
+ CLUSTER =
disableBackgroundReconciler(Cluster.build(NODES).withConfig(config ->
config.set("hinted_handoff_enabled",
false).with(GOSSIP).with(NETWORK))).start();
for (ReplicationType replicationType : ReplicationType.values())
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java
index c2b99267bd..85532e744a 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.distributed.test.tracking;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Ignore;
@@ -49,9 +50,15 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import static
org.apache.cassandra.distributed.test.tracking.MutationTrackingReadReconciliationTest.awaitNodeAlive;
+import static
org.apache.cassandra.distributed.test.tracking.MutationTrackingReadReconciliationTest.awaitNodeDead;
+import static
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.assertMatchingSummaryIdSpaceForKey;
import static
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.getOnlyLogId;
+import static
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.numLogReconciliations;
+import static
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.summaryForKey;
import static
org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils.summaryIdSpace;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
// TODO This test would be a lot faster if it had a shared cluster
@@ -436,4 +443,162 @@ public class MutationTrackingTest extends TestBaseImpl
}), 10);
}
}
+
+ @Test
+ public void testBackgroundPullReconciliation() throws Throwable
+ {
+ try (Cluster cluster = Cluster.build(3)
+ .withConfig(cfg ->
cfg.with(Feature.NETWORK)
+
.with(Feature.GOSSIP)
+
.set("write_request_timeout", "1000ms"))
+ .start())
+ {
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH
replication = " +
+ "{'class': 'SimpleStrategy',
'replication_factor': 3} " +
+ "AND
replication_type='tracked'"));
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int
PRIMARY KEY, v int)"));
+
+ // 1. Partition node 3 and pause push-side retries on all nodes
+ cluster.filters().allVerbs().to(3).drop();
+ cluster.filters().allVerbs().from(3).drop();
+ for (int i = 1; i <= 2; i++)
+ cluster.get(i).runOnInstance(() ->
Gossiper.instance.convict(InetAddressAndPort.getByNameUnchecked("127.0.0.3"),
Double.MAX_VALUE));
+
+ for (int i = 1; i <= 3; i++)
+ cluster.get(i).runOnInstance(() -> {
+ MutationTrackingService.instance().pauseActiveReconciler();
+
MutationTrackingService.instance().pauseBackgroundReconciler();
+ });
+
+ // wait until node 1 marks node 3 as dead
+ awaitNodeDead(cluster.get(1), cluster.get(3));
+
+ // 2. Write at QUORUM - succeeds on nodes 1, 2 but node 3 won't
get the write
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl
(k, v) VALUES (1, 1)"),
+ ConsistencyLevel.QUORUM);
+
+ // Sleep one second until write timeout elapses
+ TimeUnit.SECONDS.sleep(1);
+
+ // 3. Capture expected state from node 1
+ MutationSummary expected = summaryForKey(cluster.get(1), KEYSPACE,
"tbl", /* key */1);
+
+ // 4. Ensure node 3 does NOT have the mutation yet
+ cluster.get(3).runOnInstance(() -> {
+ TableMetadata table =
Schema.instance.getTableMetadata(KEYSPACE, "tbl");
+ assertNotNull(table);
+ DecoratedKey dk =
Murmur3Partitioner.instance.decorateKey(ByteBufferUtil.bytes(1));
+ MutationSummary summary = MutationTrackingService.instance()
+
.createSummaryForKey(dk, table.id, false);
+ assertTrue("Node 3 should have no mutations yet",
summary.isEmpty());
+ });
+
+ // 5. Reset state for node 3, let broadcasts propagate
+ cluster.filters().reset();
+ awaitNodeAlive(cluster.get(1), cluster.get(3));
+ awaitNodeAlive(cluster.get(3), cluster.get(1));
+
+ // Now broadcast offsets so node 3 learns what nodes 1 and 2 have
+ for (int i = 1; i <= 3; i++)
+ cluster.get(i).runOnInstance(() ->
MutationTrackingService.instance().broadcastOffsetsForTesting());
+
+ cluster.get(1).runOnInstance(() ->
MutationTrackingService.instance().resumeActiveReconciler());
+
+ // 6. Trigger the background reconciler on node 3 ONLY (no reads,
no push retries)
+ cluster.get(3).runOnInstance(() -> {
+
MutationTrackingService.instance().resumeBackgroundReconciler();
+ MutationTrackingService.instance().reconcileForTesting();
+ });
+
+ // 7. Wait for the pull request to be processed and mutation to
arrive
+ TimeUnit.SECONDS.sleep(2);
+
+ // Broadcast again so reconciliation state converges
+ for (int i = 1; i <= 3; i++)
+ cluster.get(i).runOnInstance(() ->
MutationTrackingService.instance().broadcastOffsetsForTesting());
+
+ // 8. Verify node 3 now has the mutation (pulled via background
reconciler)
+ assertMatchingSummaryIdSpaceForKey(cluster.get(3), KEYSPACE,
"tbl", /* key */1, expected);
+
+ // 9. Verify no read reconciliation was triggered
+ assertEquals(0, numLogReconciliations(cluster.get(3)));
+ }
+ }
+
+ @Test
+ public void testBackgroundPullReconciliationWhenCoordinatorDown() throws
Throwable
+ {
+ try (Cluster cluster = Cluster.build(3).withConfig(cfg ->
cfg.with(Feature.NETWORK)
+
.with(Feature.GOSSIP)
+
.set("write_request_timeout", "1000ms"))
+ .start())
+ {
+ cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH
replication = " +
+ "{'class': 'SimpleStrategy',
'replication_factor': 3} " +
+ "AND
replication_type='tracked'"));
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (k int
PRIMARY KEY, v int)"));
+
+ // 1. Pause push-side retires and background reconciler on all
nodes
+ for (int i = 1; i <= 3; i++)
+ cluster.get(i).runOnInstance(() -> {
+ MutationTrackingService.instance().pauseActiveReconciler();
+
MutationTrackingService.instance().pauseBackgroundReconciler();
+ });
+
+ // 2. Partition node 3, then write at QUORUM from coordinator
(node 1)
+ cluster.filters().allVerbs().to(3).drop();
+ cluster.filters().allVerbs().from(3).drop();
+ awaitNodeDead(cluster.get(1), cluster.get(3));
+
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.tbl
(k, v) VALUES (1, 1)"),
+ ConsistencyLevel.QUORUM);
+
+ // Sleep one second until write timeout elapses
+ TimeUnit.SECONDS.sleep(1);
+
+ // 3. Capture expected state from node 1 (before we partition it)
+ MutationSummary expected = summaryForKey(cluster.get(1), KEYSPACE,
"tbl", 1);
+
+ // 4. Heal node 3, then partition node 1 (the coordinator)
+ cluster.filters().reset();
+ awaitNodeAlive(cluster.get(2), cluster.get(3));
+ awaitNodeAlive(cluster.get(3), cluster.get(2));
+
+ cluster.filters().allVerbs().to(1).drop();
+ cluster.filters().allVerbs().from(1).drop();
+ for (int i = 2; i <= 3; i++)
+ cluster.get(i).runOnInstance(() ->
Gossiper.instance.convict(InetAddressAndPort.getByNameUnchecked("127.0.0.1"),
Double.MAX_VALUE));
+ awaitNodeDead(cluster.get(3), cluster.get(1));
+ awaitNodeDead(cluster.get(2), cluster.get(1));
+
+ // 5. Broadcast offsets between nodes 2 and 3 only (Node 3 learns
what node 2 has witnessed
+ // should discover the gap)
+ for (int i = 2; i <= 3; i++)
+ cluster.get(i).runOnInstance(() ->
MutationTrackingService.instance().broadcastOffsetsForTesting());
+
+ // 6. Trigger background reconciler on node 3. Coordinator is
down, so it is expected to fallback to node 2
+ // (the remaining alive replica)
+ cluster.get(3).runOnInstance(() -> {
+
MutationTrackingService.instance().resumeBackgroundReconciler();
+ MutationTrackingService.instance().reconcileForTesting();
+ });
+
+ // 7. Resume the active reconciler on instance 2, so whenever
instance 2 receives the request
+ // it will process the PULL_MUTATIONS_REQ verb
+ cluster.get(2).runOnInstance(() ->
MutationTrackingService.instance().resumeActiveReconciler());
+
+ // Wait for the pull request to be processed and mutation to arrive
+ TimeUnit.SECONDS.sleep(2);
+
+ // Broadcast again so reconciliation state converges
+ for (int i = 2; i <= 3; i++)
+ cluster.get(i).runOnInstance(() ->
MutationTrackingService.instance().broadcastOffsetsForTesting());
+
+ // 8. Verify node 3 pulled the mutation from node 2 (the fallback
replica)
+ assertMatchingSummaryIdSpaceForKey(cluster.get(3), KEYSPACE,
"tbl", 1, expected);
+
+ // 9. Verify no read reconciliation was involved
+ assertEquals(0, numLogReconciliations(cluster.get(3)));
+ }
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
index b14ec37285..f43009d363 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/OffsetBroadcastTest.java
@@ -43,9 +43,9 @@ public class OffsetBroadcastTest extends TestBaseImpl
@Test
public void testBroadcastOffsets() throws Throwable
{
- try (Cluster cluster = Cluster.build(3)
- .withConfig(cfg ->
cfg.with(Feature.NETWORK).with(Feature.GOSSIP))
- .start())
+ try (Cluster cluster = disableBackgroundReconciler(Cluster.build(3)
+
.withConfig(cfg -> cfg.with(Feature.NETWORK).with(Feature.GOSSIP)))
+ .start())
{
cluster.schemaChange(withKeyspace("CREATE KEYSPACE %s WITH
replication = " +
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java
index f98b602943..72b8eaabf9 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedImportFailureTest.java
@@ -163,7 +163,7 @@ public class TrackedImportFailureTest extends
TrackedTransferTestBase
{
ActivationRequest.Phase phase = COMMIT;
int MISSED_ACTIVATION = 2;
- try (Cluster cluster =
cluster(TrackedTransferTestBase.ByteBuddyInjections.SkipActivation.install(MISSED_ACTIVATION)))
+ try (Cluster cluster =
disableBackgroundReconciler(cluster(TrackedTransferTestBase.ByteBuddyInjections.SkipActivation.install(MISSED_ACTIVATION))))
{
TrackedTransferTestBase.ByteBuddyInjections.SkipActivation.setup(cluster,
phase);
createSchema(cluster);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedNonZeroCopyRepairTransferTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedNonZeroCopyRepairTransferTest.java
index 710e4e01f2..e22c57e5d7 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedNonZeroCopyRepairTransferTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedNonZeroCopyRepairTransferTest.java
@@ -27,6 +27,6 @@ public class TrackedNonZeroCopyRepairTransferTest extends
TrackedRepairTransferS
@BeforeClass
public static void setup() throws IOException
{
- cluster = cluster(NON_ZCS_CONFIG);
+ cluster = disableBackgroundReconciler(cluster(NON_ZCS_CONFIG));
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedRepairFailureTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedRepairFailureTest.java
index eb36f1fc00..1a4cd5c374 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedRepairFailureTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedRepairFailureTest.java
@@ -65,7 +65,7 @@ public class TrackedRepairFailureTest extends
TrackedRepairTransferTestBase
@Test
public void testFullRepairPartiallyCompleteAnomaly() throws IOException,
ExecutionException, InterruptedException, TimeoutException
{
- try (Cluster cluster = cluster(StreamReceiverFailureHelper::install))
+ try (Cluster cluster =
disableBackgroundReconciler(cluster(StreamReceiverFailureHelper::install)))
{
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3} AND
replication_type='tracked';");
String TABLE_SCHEMA_CQL = "CREATE TABLE " +
tableWithKeyspace(KEYSPACE) + " (k INT PRIMARY KEY, v INT)";
@@ -192,7 +192,7 @@ public class TrackedRepairFailureTest extends
TrackedRepairTransferTestBase
@Test
public void testFullRepairCleanupOnFailure() throws IOException,
ExecutionException, InterruptedException, TimeoutException
{
- try (Cluster cluster = cluster(StreamReceiverFailureHelper::install))
+ try (Cluster cluster =
disableBackgroundReconciler(cluster(StreamReceiverFailureHelper::install)))
{
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3} AND
replication_type='tracked';");
String TABLE_SCHEMA_CQL = "CREATE TABLE " + KEYSPACE + '.' + TABLE
+ " (k INT PRIMARY KEY, v INT)";
@@ -259,7 +259,7 @@ public class TrackedRepairFailureTest extends
TrackedRepairTransferTestBase
@Test
public void testRepairFailsOnMissedActivation() throws IOException
{
- try (Cluster cluster =
cluster(ByteBuddyInjections.SkipActivation.install(2, 3)))
+ try (Cluster cluster =
disableBackgroundReconciler(cluster(ByteBuddyInjections.SkipActivation.install(2,
3))))
{
cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3} AND
replication_type='tracked';");
cluster.schemaChange("CREATE TABLE " + tableWithKeyspace(KEYSPACE)
+ " (k INT PRIMARY KEY, v INT)");
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java
index 002dcef5f9..513dcd35eb 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedTransferTestBase.java
@@ -88,8 +88,10 @@ public abstract class TrackedTransferTestBase extends
TestBaseImpl
.set("repair_request_timeout", "2s")
.set("stream_transfer_task_timeout", "10s");
- protected static final Consumer<IInstanceConfig> ZCS_CONFIG =
CONFIG.andThen(cfg -> cfg.set("stream_entire_sstables", true));
- protected static final Consumer<IInstanceConfig> NON_ZCS_CONFIG =
CONFIG.andThen(cfg -> cfg.set("stream_entire_sstables", false));
+ protected static final Consumer<IInstanceConfig> ZCS_CONFIG =
CONFIG.andThen(cfg -> cfg.set("stream_entire_sstables", true)
+
.set("mutation_tracking.background_reconciliation_enabled", false));
+ protected static final Consumer<IInstanceConfig> NON_ZCS_CONFIG =
CONFIG.andThen(cfg -> cfg.set("stream_entire_sstables", false)
+
.set("mutation_tracking.background_reconciliation_enabled",
false));
protected static final
IIsolatedExecutor.SerializableConsumer<SSTableReader> TRANSFERS_EXIST = sstable
-> {
Assertions.assertThat(sstable.getCoordinatorLogOffsets().transfers()).isNotEmpty();
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedZeroCopyRepairTransferTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedZeroCopyRepairTransferTest.java
index 75c95b4e3f..a3642729a2 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedZeroCopyRepairTransferTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/TrackedZeroCopyRepairTransferTest.java
@@ -27,6 +27,6 @@ public class TrackedZeroCopyRepairTransferTest extends
TrackedRepairTransferSucc
@BeforeClass
public static void setup() throws IOException
{
- cluster = cluster(ZCS_CONFIG);
+ cluster = disableBackgroundReconciler(cluster(ZCS_CONFIG));
}
}
diff --git a/test/unit/org/apache/cassandra/tools/nodetool/MTAdminTest.java
b/test/unit/org/apache/cassandra/tools/nodetool/MTAdminTest.java
new file mode 100644
index 0000000000..1e9c44485f
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tools/nodetool/MTAdminTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.io.PrintStream;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import org.apache.cassandra.tools.NodeProbe;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class MTAdminTest
+{
+ @Mock
+ private NodeProbe probe;
+
+ @Mock
+ private PrintStream out;
+
+ private MTAdmin.SetConfig cmd;
+
+ @Before
+ public void setUp()
+ {
+ MockitoAnnotations.initMocks(this);
+ when(probe.isMutationTrackingDisabled()).thenReturn(false);
+ cmd = new MTAdmin.SetConfig();
+ cmd.out = out;
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoArgs()
+ {
+ cmd.execute(probe);
+ }
+
+ @Test
+ public void testMutationTrackingDisabled()
+ {
+ when(probe.isMutationTrackingDisabled()).thenReturn(true);
+ cmd.args = List.of("background_reconciliation_enabled", "true");
+
+ cmd.execute(probe);
+
+ verify(out, times(1)).println("Mutation tracking is not enabled");
+ verify(probe,
times(0)).setMutationTrackingBackgroundReconciliationEnabled(true);
+ }
+
+ @Test
+ public void testSetBackgroundReconciliationEnabled()
+ {
+ cmd.args = List.of("background_reconciliation_enabled", "true");
+
+ cmd.execute(probe);
+
+ verify(probe,
times(1)).setMutationTrackingBackgroundReconciliationEnabled(true);
+ }
+
+ @Test
+ public void testSetBackgroundReconciliationDisabled()
+ {
+ cmd.args = List.of("background_reconciliation_enabled", "false");
+
+ cmd.execute(probe);
+
+ verify(probe,
times(1)).setMutationTrackingBackgroundReconciliationEnabled(false);
+ }
+
+ @Test
+ public void testSetBackgroundReconciliationIntervalMs()
+ {
+ cmd.args = List.of("background_reconciliation_interval_ms", "5000");
+
+ cmd.execute(probe);
+
+ verify(probe,
times(1)).setMutationTrackingBackgroundReconciliationIntervalMilliseconds(5000L);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidParamType()
+ {
+ cmd.args = List.of("unknown_param", "value");
+
+ cmd.execute(probe);
+ }
+
+ @Test(expected = NumberFormatException.class)
+ public void testInvalidIntervalValue()
+ {
+ cmd.args = List.of("background_reconciliation_interval_ms",
"not_a_number");
+
+ cmd.execute(probe);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]