This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 51e01a3862 Repair Paxos for the distributed metadata log when CMS
membership changes
51e01a3862 is described below
commit 51e01a3862afc1ebaffd765b2490a581461c3f14
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Mar 20 17:55:44 2025 +0000
Repair Paxos for the distributed metadata log when CMS membership changes
Patch by Josh McKenzie and Sam Tunnicliffe; reviewed by Marcus Ericksson for
CASSANDRA-20467
Co-authored-by: Josh McKenzie <[email protected]>
Co-authored-by: Sam Tunnicliffe <[email protected]>
---
CHANGES.txt | 1 +
.../apache/cassandra/db/DiskBoundaryManager.java | 2 +-
.../apache/cassandra/tcm/MultiStepOperation.java | 4 +-
.../org/apache/cassandra/tcm/Transformation.java | 6 ++
.../apache/cassandra/tcm/sequences/AddToCMS.java | 2 +-
.../cassandra/tcm/sequences/ReconfigureCMS.java | 67 +++++++++++++++++-----
.../distributed/test/PaxosRepairTest.java | 21 ++++---
.../distributed/test/log/ReconfigureCMSTest.java | 64 +++++++++++++++++++++
8 files changed, 142 insertions(+), 25 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 98389b77cc..1360ab01bc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Repair Paxos for the distributed metadata log when CMS membership changes
(CASSANDRA-20467)
* Reintroduce CASSANDRA-17411 in trunk (CASSANDRA-19346)
* Add min/max/mean/percentiles to timer metrics vtable (CASSANDRA-20466)
* Add support for time, date, timestamp types in scalar constraint
(CASSANDRA-20274)
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 2a25dc4efc..5c6b59a0b1 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -152,7 +152,7 @@ public class DiskBoundaryManager
}
else
{
- // Reason we use use the future settled metadata is that if we
decommission a node, we want to stream
+ // Reason we use the future settled metadata is that if we
decommission a node, we want to stream
// from that node to the correct location on disk, if we didn't,
we would put new files in the wrong places.
// We do this to minimize the amount of data we need to move in
rebalancedisks once everything settled
placement =
metadata.writePlacementAllSettled(cfs.keyspace.getMetadata());
diff --git a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
index 019086dccd..d447974f85 100644
--- a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
+++ b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
@@ -40,7 +40,7 @@ import
org.apache.cassandra.tcm.serialization.MetadataSerializer;
* For example, in order to join, the joining node has to execute the
following steps:
* * PrepareJoin, which introduces node's tokens, but makes no changes to
range ownership, and creates BootstrapAndJoin
* in-progress sequence
- * * StartJoin, which adds the bootstrapping node to the write placements
for the ranges it gains
+ * * StartJoin, which adds the bootstrapping node to the write placements
for the ranges it gains
* * MidJoin, which adds the bootstrapping node to the read placements for
the ranges it has gained, and removes
* owners of these ranges from the read placements
* * FinishJoin, which removes owners of the gained ranges from the write
placements.
@@ -126,7 +126,7 @@ public abstract class MultiStepOperation<CONTEXT>
/**
* Returns the {@link Transformation.Kind} of the next step due to be
executed in the sequence. Used when executing
- * a {@link Transformation} which is part of a sequence (specifically,
subclasses of
+ * a {@link Transformation} which is part of a sequence (often, this is an
implementation of
* {@link org.apache.cassandra.tcm.transformations.ApplyPlacementDeltas})
to validate that it is being applied at
* the correct point (i.e. that the type of the transform matches the
expected next)
* matches the If all steps
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java
b/src/java/org/apache/cassandra/tcm/Transformation.java
index e4b81bc765..8cfda01e26 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -48,6 +48,12 @@ import
org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS;
import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS;
import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
+/**
+ * Implementations should be pure transformations from one ClusterMetadata
state to another. They are likely to be
+ * replayed during startup to rebuild the node's current state and so should
be free of side effects and should not
+ * depend on external state, configuration or resources. They must produce
consistent outputs when run on every instance
+ * in a cluster, regardless of any specific characteristics of the instance.
+ */
public interface Transformation
{
Serializer transformationSerializer = new Serializer();
diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
index 149c99b8b2..0d5ee2f067 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
@@ -82,7 +82,7 @@ public class AddToCMS extends MultiStepOperation<Epoch>
.commit(new
StartAddToCMS(addr))
.inProgressSequences.get(nodeId);
InProgressSequences.resume(sequence);
- ReconfigureCMS.repairPaxosTopology();
+ ReconfigureCMS.repairPaxosForCMSTopologyChange();
}
public AddToCMS(Epoch latestModification,
diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
index 57b5e68e80..38566812bf 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
@@ -33,9 +33,6 @@ import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -74,9 +71,18 @@ import
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Future;
-import static
org.apache.cassandra.streaming.StreamOperation.RESTORE_REPLICA_COUNT;
import static org.apache.cassandra.locator.MetaStrategy.entireRange;
+import static
org.apache.cassandra.streaming.StreamOperation.RESTORE_REPLICA_COUNT;
+/**
+ * This class is slightly different from most other MultiStepOperations in
that it doesn't reify every component
+ * transformation when it is constructed (see how {@link BootstrapAndJoin}
encloses its StartJoin/MidJoin/FinishJoin
+ * transforms for a counter example). Instead, each instance includes a single
transformation with kind {@link
+ * Transformation.Kind#ADVANCE_CMS_RECONFIGURATION}, representing the _next_
step to be executed. That transformation
+ * instance holds all the state necessary to generate the subsequent
ADVANCE_CMS_RECONFIGURATION step. As each of these
+ * transformations is applied, they logically progress the multi-step
operation by installing a new ReconfigureCMS
+ * instance with the idx pointer bumped and the next step encoded.
+ */
public class ReconfigureCMS extends
MultiStepOperation<AdvanceCMSReconfiguration>
{
public static final Serializer serializer = new Serializer();
@@ -119,6 +125,7 @@ public class ReconfigureCMS extends
MultiStepOperation<AdvanceCMSReconfiguration
{
return MultiStepOperation.Kind.RECONFIGURE_CMS;
}
+
@Override
protected SequenceKey sequenceKey()
{
@@ -176,6 +183,13 @@ public class ReconfigureCMS extends
MultiStepOperation<AdvanceCMSReconfiguration
Replica replica = new Replica(endpoint, entireRange, true);
streamRanges(replica, activeTransition.streamCandidates);
}
+ else
+ {
+ // Run a paxos repair before starting either the addition or
removal of a CMS member, where in both
+ // cases there is no active transition.
+ repairPaxosForCMSTopologyChange();
+ }
+
// Commit the next step in the sequence
ClusterMetadataService.instance().commit(transitionCMS.next);
return SequenceState.continuable();
@@ -298,17 +312,42 @@ public class ReconfigureCMS extends
MultiStepOperation<AdvanceCMSReconfiguration
'}';
}
- static void repairPaxosTopology()
+ static void repairPaxosForCMSTopologyChange()
{
- Retry.Backoff retry = new
Retry.Backoff(TCMMetrics.instance.repairPaxosTopologyRetries);
+ // This *should* be redundant, primarily because the state machine
which manages a node's cluster metadata
+ // doesn't rely on the distributed metadata log table directly but is
driven by metadata replication messages
+ // which distribute log entries around the cluster.
+ //
+ // However, it is still worthwhile to guard against a failure to sync
paxos accept/commit operations by a subset
+ // of replicas, in this case the CMS members. Paxos repair is designed
to ensure that any operation witnessed by
+ // at least one replica prior to a topology change is witnessed by a
majority of the replica set after the
+ // topology change. Essentially ensuring that the pre- and post-
change quorums overlap.
+ //
+ // The way that CMS reconfiguration proceeds is to first expand the
membership to the maximal state by adding all
+ // new members and then to shrink back down to the desired size by
pruning out the leaving members. Each step
+ // only modifies the membership group by adding or removing a single
member and unlike operations on other
+ // keyspaces, there are no changes to the ranges involved. As all CMS
members replicate the entire token
+ // range for that keyspace, these ownership changes are relatively
simple.
+ //
+ // For example, if the CMS membership is currently {1, 2, 3} and we
want to transition it to {4, 5, 6} the
+ // reconfiguration goes through these steps:
+ // * {1, 2, 3}
+ // * {1, 2, 3, 4}
+ // * {1, 2, 3, 4, 5}
+ // * {1, 2, 3, 4, 5, 6}
+ // * {2, 3, 4, 5, 6}
+ // * {3, 4, 5, 6}
+ // * {4, 5, 6}
+ // When adding a member, the new member streams data in from an
existing member, analogous to bootstrapping.
+ // When removing, there is no need for streaming as the existing
members' ownership is not changing. Running a
+ // paxos repair at the beginning of each step, before streaming where
applicable, will ensure that the
+ // overlapping quorums invariant holds.
- // The system.paxos table is what we're actually repairing and that
uses the system configured partitioner
- // so although we use MetaStrategy.entireRange for streaming between
CMS members, we don't use it here
- Range<Token> entirePaxosRange = new
Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
-
DatabaseDescriptor.getPartitioner().getMinimumToken());
- List<Supplier<Future<?>>> remaining =
ActiveRepairService.instance().repairPaxosForTopologyChangeAsync(SchemaConstants.METADATA_KEYSPACE_NAME,
-
Collections.singletonList(entirePaxosRange),
-
"bootstrap");
+ Retry.Backoff retry = new
Retry.Backoff(TCMMetrics.instance.repairPaxosTopologyRetries);
+ List<Supplier<Future<?>>> remaining = ActiveRepairService.instance()
+
.repairPaxosForTopologyChangeAsync(SchemaConstants.METADATA_KEYSPACE_NAME,
+
Collections.singletonList(entireRange),
+
"CMS reconfiguration");
while (!retry.reachedMax())
{
@@ -316,7 +355,6 @@ public class ReconfigureCMS extends
MultiStepOperation<AdvanceCMSReconfiguration
for (Supplier<Future<?>> supplier : remaining)
tasks.put(supplier, supplier.get());
remaining.clear();
- logger.info("Performing paxos topology repair on: {}", remaining);
for (Map.Entry<Supplier<Future<?>>, Future<?>> e :
tasks.entrySet())
{
@@ -331,6 +369,7 @@ public class ReconfigureCMS extends
MultiStepOperation<AdvanceCMSReconfiguration
}
catch (InterruptedException t)
{
+ logger.info("Interrupted while repairing paxos topology,
aborting.", t);
return;
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
index 0197106bc6..a2af4a2137 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
@@ -34,7 +34,6 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +48,7 @@ import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
@@ -57,6 +57,8 @@ import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.SharedContext;
@@ -304,16 +306,17 @@ public class PaxosRepairTest extends TestBaseImpl
}
- @Ignore
@Test
public void topologyChangePaxosTest() throws Throwable
{
// TODO: fails with vnode enabled
- try (Cluster cluster =
Cluster.build(4).withConfig(WITH_NETWORK).withoutVNodes().createWithoutStarting())
+ try (Cluster cluster = builder().withNodes(3)
+
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
+
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
+ .withConfig(WITH_NETWORK)
+ .withoutVNodes()
+ .start())
{
- for (int i=1; i<=3; i++)
- cluster.get(i).startup();
-
init(cluster);
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + "
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.' +
TABLE + " (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
@@ -333,7 +336,11 @@ public class PaxosRepairTest extends TestBaseImpl
cluster.filters().reset();
// node 4 starting should repair paxos and inform the other nodes
of its gossip state
- cluster.get(4).startup();
+ IInstanceConfig config = cluster.newInstanceConfig()
+ .set("auto_bootstrap", true)
+
.set(Constants.KEY_DTEST_FULL_STARTUP, true);
+ IInvokableInstance node4 = cluster.bootstrap(config);
+ node4.startup();
Assert.assertFalse(hasUncommittedQuorum(cluster, KEYSPACE, TABLE));
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
index d1f983f19a..2869fe913a 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
@@ -19,8 +19,10 @@
package org.apache.cassandra.distributed.test.log;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -40,6 +42,8 @@ import org.apache.cassandra.locator.MetaStrategy;
import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.service.paxos.PaxosRepairHistory;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.ownership.DataPlacement;
@@ -257,6 +261,66 @@ public class ReconfigureCMSTest extends FuzzTestBase
}
}
+ @Test
+ public void cmsTopologyChangePaxosTest() throws Throwable
+ {
+ // Use a 4 node cluster so we have room to decommission one node while
still maintaining RF
+ try (Cluster cluster = builder().withNodes(4)
+ .withConfig(c ->
c.with(Feature.NETWORK))
+ .withoutVNodes()
+ .start())
+ {
+ IInvokableInstance node1 = cluster.get(1);
+ IInvokableInstance node2 = cluster.get(2);
+ IInvokableInstance node3 = cluster.get(3);
+ IInvokableInstance node4 = cluster.get(4);
+
+ // no paxos repair history initially
+ PaxosRepairHistory empty =
PaxosRepairHistory.empty(MetaStrategy.partitioner);
+ cluster.forEach(i -> assertEquals(empty, paxosRepairHistory(i)));
+
+ node1.nodetoolResult("cms", "reconfigure",
"2").asserts().success();
+ // Nodes 3 & 4 are not involved in the first cms reconfiguration,
so should still have no paxos repair
+ // history for the metadata log table
+ assertEquals(empty, paxosRepairHistory(node3));
+ assertEquals(empty, paxosRepairHistory(node4));
+ // Node 1 & 2 should have completed a paxos repair. For this
keyspace, that is always over the entire
+ // range, so there is only ever a single entry in the repair
history which equates to prh.size() == 0
+ PaxosRepairHistory node1History = paxosRepairHistory(node1);
+ assertEquals(0, node1History.size());
+ assertEquals(node1History, paxosRepairHistory(node2));
+
+ // node 1 leaving should cause a cms reconfiguration which runs a
paxos repair which involves nodes 2 & 3
+ // does participate in while node 4 remains uninvolved.
+ node1.nodetoolResult("decommission").asserts().success();
+ assertEquals(empty, paxosRepairHistory(node4));
+
+ PaxosRepairHistory node3History = paxosRepairHistory(node3);
+ assertEquals(0, node3History.size());
+ assertEquals(node3History, paxosRepairHistory(node2));
+ // verify that the ballot for this second repair is > the one for
the first
+ Ballot node3Ballot =
node3History.ballotForToken(MetaStrategy.partitioner.getMinimumToken());
+ Ballot node1Ballot =
node1History.ballotForToken(MetaStrategy.partitioner.getMinimumToken());
+ assertTrue(node3Ballot.unixMicros() > node1Ballot.unixMicros());
+ }
+ }
+
+ private PaxosRepairHistory paxosRepairHistory(IInvokableInstance instance)
+ {
+ Object[][] rows = instance.executeInternal("select points from
system.paxos_repair_history " +
+ "where keyspace_name = ? " +
+ "and table_name = ?",
+
SchemaConstants.METADATA_KEYSPACE_NAME,
+
DistributedMetadataLogKeyspace.TABLE_NAME);
+
+ if (rows.length == 0)
+ return
PaxosRepairHistory.empty(SchemaConstants.METADATA_KEYSPACE_NAME,
DistributedMetadataLogKeyspace.TABLE_NAME);
+ assertEquals(1, rows.length);
+ //noinspection unchecked
+ List<ByteBuffer> points = (List<ByteBuffer>)rows[0][0];
+ return
PaxosRepairHistory.fromTupleBufferList(MetaStrategy.partitioner, points);
+ }
+
// We can't assume that nodeId matches endpoint (ie node3 = 127.0.0.3 etc)
private Set<String> expectedCMS(Cluster cluster, int... instanceIds)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]