This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 279c0527aa Allow CMS reconfiguration to work around DOWN nodes
279c0527aa is described below
commit 279c0527aa3d52e1474fee5f37c0227ed6f9da5f
Author: Marcus Eriksson <[email protected]>
AuthorDate: Thu Sep 26 15:56:03 2024 +0200
Allow CMS reconfiguration to work around DOWN nodes
Patch by Sam Tunnicliffe and marcuse; reviewed by Sam Tunnicliffe for
CASSANDRA-19943
Co-authored-by: Sam Tunnicliffe <[email protected]>
---
CHANGES.txt | 1 +
.../config/CassandraRelevantProperties.java | 4 +
.../cassandra/locator/CMSPlacementStrategy.java | 138 ++++--------
.../org/apache/cassandra/locator/MetaStrategy.java | 25 ++-
.../org/apache/cassandra/metrics/TCMMetrics.java | 8 +-
.../streaming/DataMovementVerbHandler.java | 4 +-
.../org/apache/cassandra/tcm/CMSOperations.java | 6 +-
.../cassandra/tcm/ClusterMetadataService.java | 18 +-
.../cassandra/tcm/sequences/ReconfigureCMS.java | 9 +-
.../cassandra/tcm/serialization/Version.java | 1 +
.../cms/PrepareCMSReconfiguration.java | 243 ++++++++++++++++-----
.../apache/cassandra/distributed/test/CASTest.java | 13 +-
.../test/log/ClusterMetadataTestHelper.java | 2 +-
.../test/log/MetadataChangeSimulationTest.java | 2 +-
.../test/log/ReconfigureCMSStreamingTest.java | 78 +++++++
.../distributed/test/log/ReconfigureCMSTest.java | 145 +++++++++++-
.../apache/cassandra/locator/MetaStrategyTest.java | 6 +-
17 files changed, 517 insertions(+), 186 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 252b9c919c..a69a7222ff 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Allow CMS reconfiguration to work around DOWN nodes (CASSANDRA-19943)
* Make TableParams.Serializer set allowAutoSnapshots and incrementalBackups
(CASSANDRA-19954)
* Make sstabledump possible to show tombstones only (CASSANDRA-19939)
* Ensure that RFP queries potentially stale replicas even with only key
columns in the row filter (CASSANDRA-19938)
diff --git
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index a0baf8de90..56a8aad315 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -533,6 +533,10 @@ public enum CassandraRelevantProperties
*/
TCM_RECENTLY_SEALED_PERIOD_INDEX_SIZE("cassandra.recently_sealed_period_index_size",
"10"),
+ /**
+ * for testing purposes disable the automatic CMS reconfiguration after a
bootstrap/replace/move operation
+ */
+
TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE("cassandra.test.skip_cms_reconfig_after_topology_change",
"false"),
/**
* should replica groups in data placements be sorted to ensure the
primary replica is first in the list
*/
diff --git a/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
b/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
index 6ba47ae04d..716b40a34c 100644
--- a/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
+++ b/src/java/org/apache/cassandra/locator/CMSPlacementStrategy.java
@@ -31,7 +31,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.membership.Directory;
@@ -39,123 +38,70 @@ import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.ownership.TokenMap;
-import static org.apache.cassandra.locator.SimpleStrategy.REPLICATION_FACTOR;
-
/**
* CMS Placement Strategy is how CMS keeps the number of its members at a
configured level, given current
* cluster topolgy. It allows to add and remove CMS members when cluster
topology changes. For example, during
- * node replacement or decommission.
+ * node replacement or decommission. This attempts to achieve rack diversity,
while keeping CMS placements
+ * close to how "regular" data would get replicated to keep the bounces safe,
as long as the user
+ * bounces at most `f` members of the replica group, where `f = (RF - 1)/2`.
*/
-public interface CMSPlacementStrategy
+public class CMSPlacementStrategy
{
- Set<NodeId> reconfigure(ClusterMetadata metadata);
- boolean needsReconfiguration(ClusterMetadata metadata);
+ public final Map<String, Integer> rf;
+ public final BiFunction<ClusterMetadata, NodeId, Boolean> filter;
- static CMSPlacementStrategy fromReplicationParams(ReplicationParams
params, Predicate<NodeId> filter)
+ public CMSPlacementStrategy(Map<String, Integer> rf, Predicate<NodeId>
filter)
{
- if (params.isMeta())
- {
- assert !params.options.containsKey(REPLICATION_FACTOR);
- Map<String, Integer> dcRf = new HashMap<>();
- for (Map.Entry<String, String> entry : params.options.entrySet())
- {
- String dc = entry.getKey();
- ReplicationFactor rf =
ReplicationFactor.fromString(entry.getValue());
- dcRf.put(dc, rf.fullReplicas);
- }
- return new DatacenterAware(dcRf, filter);
- }
- else
- {
- throw new IllegalStateException("Can't parse the params: " +
params);
- }
+ this(rf, new DefaultNodeFilter(filter));
}
- /**
- * Default reconfiguration strategy: attempts to achieve rack diversity,
while keeping CMS placements
- * close to how "regular" data would get replicated to keep the bounces
safe, as long as the user
- * bounces at most `f` members of the replica group, where `f = RF/2`.
- */
- class DatacenterAware implements CMSPlacementStrategy
+ @VisibleForTesting
+ public CMSPlacementStrategy(Map<String, Integer> rf,
BiFunction<ClusterMetadata, NodeId, Boolean> filter)
{
- public final Map<String, Integer> rf;
- public final BiFunction<ClusterMetadata, NodeId, Boolean> filter;
-
- public DatacenterAware(Map<String, Integer> rf, Predicate<NodeId>
filter)
- {
- this(rf, new DefaultNodeFilter(filter));
- }
-
- @VisibleForTesting
- public DatacenterAware(Map<String, Integer> rf,
BiFunction<ClusterMetadata, NodeId, Boolean> filter)
- {
- this.rf = rf;
- this.filter = filter;
- }
+ // todo: verify only test uses with other filter
+ this.rf = rf;
+ this.filter = filter;
+ }
- public Set<NodeId> reconfigure(ClusterMetadata metadata)
+ public Set<NodeId> reconfigure(ClusterMetadata metadata)
+ {
+ Map<String, ReplicationFactor> rf = new HashMap<>(this.rf.size());
+ for (Map.Entry<String, Integer> e : this.rf.entrySet())
{
- Map<String, ReplicationFactor> rf = new HashMap<>(this.rf.size());
- for (Map.Entry<String, Integer> e : this.rf.entrySet())
- {
- Collection<InetAddressAndPort> nodesInDc =
metadata.directory.allDatacenterEndpoints().get(e.getKey());
- if (nodesInDc.isEmpty())
- throw new IllegalStateException(String.format("There are
no nodes in %s datacenter", e.getKey()));
- if (nodesInDc.size() < e.getValue())
- throw new
Transformation.RejectedTransformationException(String.format("There are not
enough nodes in %s datacenter to satisfy replication factor", e.getKey()));
+ Collection<InetAddressAndPort> nodesInDc =
metadata.directory.allDatacenterEndpoints().get(e.getKey());
+ if (nodesInDc.isEmpty())
+ throw new IllegalStateException(String.format("There are no
nodes in %s datacenter", e.getKey()));
+ if (nodesInDc.size() < e.getValue())
+ throw new
Transformation.RejectedTransformationException(String.format("There are not
enough nodes in %s datacenter to satisfy replication factor", e.getKey()));
- rf.put(e.getKey(), ReplicationFactor.fullOnly(e.getValue()));
- }
- return reconfigure(metadata, rf);
+ rf.put(e.getKey(), ReplicationFactor.fullOnly(e.getValue()));
}
- public Set<NodeId> reconfigure(ClusterMetadata metadata, Map<String,
ReplicationFactor> rf)
+ Directory tmpDirectory = metadata.directory;
+ TokenMap tmpTokenMap = metadata.tokenMap;
+ for (NodeId peerId : metadata.directory.peerIds())
{
- Directory tmpDirectory = metadata.directory;
- TokenMap tokenMap = metadata.tokenMap;
- for (NodeId peerId : metadata.directory.peerIds())
+ if (!filter.apply(metadata, peerId))
{
- if (!filter.apply(metadata, peerId))
- {
- tmpDirectory = tmpDirectory.without(peerId);
- tokenMap = tokenMap.unassignTokens(peerId);
- }
+ tmpDirectory = tmpDirectory.without(peerId);
+ tmpTokenMap = tmpTokenMap.unassignTokens(peerId);
}
-
- // Although MetaStrategy has its own entireRange, it uses a custom
partitioner which isn't compatible with
- // regular, non-CMS placements. For that reason, we select
replicas here using tokens provided by the
- // globally configured partitioner.
- Token minToken =
DatabaseDescriptor.getPartitioner().getMinimumToken();
- EndpointsForRange endpoints =
NetworkTopologyStrategy.calculateNaturalReplicas(minToken,
-
new Range<>(minToken, minToken),
-
tmpDirectory,
-
tokenMap,
-
rf);
-
- return
endpoints.endpoints().stream().map(metadata.directory::peerId).collect(Collectors.toSet());
}
- public boolean needsReconfiguration(ClusterMetadata metadata)
- {
- Map<String, ReplicationFactor> rf = new HashMap<>(this.rf.size());
- for (Map.Entry<String, Integer> e : this.rf.entrySet())
- {
- Collection<InetAddressAndPort> nodesInDc =
metadata.directory.allDatacenterEndpoints().get(e.getKey());
- if (nodesInDc.size() < e.getValue())
- return true;
- rf.put(e.getKey(), ReplicationFactor.fullOnly(e.getValue()));
- }
-
- Set<NodeId> currentCms = metadata.fullCMSMembers()
- .stream()
- .map(metadata.directory::peerId)
- .collect(Collectors.toSet());
- Set<NodeId> newCms = reconfigure(metadata, rf);
- return !currentCms.equals(newCms);
- }
+ // Although MetaStrategy has its own entireRange, it uses a custom
partitioner which isn't compatible with
+ // regular, non-CMS placements. For that reason, we select replicas
here using tokens provided by the
+ // globally configured partitioner.
+ Token minToken = DatabaseDescriptor.getPartitioner().getMinimumToken();
+ EndpointsForRange endpoints =
NetworkTopologyStrategy.calculateNaturalReplicas(minToken,
+
new Range<>(minToken, minToken),
+
tmpDirectory,
+
tmpTokenMap,
+
rf);
+
+ return
endpoints.endpoints().stream().map(metadata.directory::peerId).collect(Collectors.toSet());
}
- class DefaultNodeFilter implements BiFunction<ClusterMetadata, NodeId,
Boolean>
+ static class DefaultNodeFilter implements BiFunction<ClusterMetadata,
NodeId, Boolean>
{
private final Predicate<NodeId> filter;
diff --git a/src/java/org/apache/cassandra/locator/MetaStrategy.java
b/src/java/org/apache/cassandra/locator/MetaStrategy.java
index 536ac2fa22..a7afa4898c 100644
--- a/src/java/org/apache/cassandra/locator/MetaStrategy.java
+++ b/src/java/org/apache/cassandra/locator/MetaStrategy.java
@@ -30,6 +30,8 @@ import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tcm.sequences.LockedRanges;
+import static
org.apache.cassandra.locator.NetworkTopologyStrategy.REPLICATION_FACTOR;
+
/**
* MetaStrategy is designed for distributed cluster metadata keyspace, and
should not be used by
* the users directly. This strategy allows a configurable number of nodes to
own an entire range and
@@ -55,9 +57,26 @@ public class MetaStrategy extends SystemStrategy
return new Replica(addr, entireRange, true);
}
+ private final ReplicationFactor rf;
+
public MetaStrategy(String keyspaceName, Map<String, String> configOptions)
{
super(keyspaceName, configOptions);
+ int replicas = 0;
+ if (configOptions != null)
+ {
+ for (Map.Entry<String, String> entry : configOptions.entrySet())
+ {
+ String dc = entry.getKey();
+ // prepareOptions should have transformed any
"replication_factor" options by now
+ if (dc.equalsIgnoreCase(REPLICATION_FACTOR))
+ continue;
+ ReplicationFactor rf =
ReplicationFactor.fromString(entry.getValue());
+ replicas += rf.allReplicas;
+ }
+ }
+
+ rf = ReplicationFactor.fullOnly(replicas);
}
@Override
@@ -75,11 +94,7 @@ public class MetaStrategy extends SystemStrategy
@Override
public ReplicationFactor getReplicationFactor()
{
- ClusterMetadata metadata = ClusterMetadata.currentNullable();
- if (metadata == null || metadata.epoch.isEqualOrBefore(Epoch.FIRST))
- return ReplicationFactor.fullOnly(1);
- int rf =
metadata.placements.get(ReplicationParams.meta(metadata)).writes.forRange(entireRange).get().byEndpoint.size();
- return ReplicationFactor.fullOnly(rf);
+ return rf;
}
@Override
diff --git a/src/java/org/apache/cassandra/metrics/TCMMetrics.java
b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
index e58a3b11cd..29858ead80 100644
--- a/src/java/org/apache/cassandra/metrics/TCMMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TCMMetrics.java
@@ -25,13 +25,12 @@ import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.locator.CMSPlacementStrategy;
-import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+import static
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
public class TCMMetrics
{
@@ -96,10 +95,7 @@ public class TCMMetrics
needsCMSReconfiguration =
Metrics.register(factory.createMetricName("NeedsCMSReconfiguration"), () -> {
ClusterMetadata metadata = ClusterMetadata.currentNullable();
- if (metadata == null)
- return 0;
- CMSPlacementStrategy placementStrategy =
CMSPlacementStrategy.fromReplicationParams(ReplicationParams.meta(metadata),
nodeId -> true);
- return placementStrategy.needsReconfiguration(metadata) ? 1 : 0;
+ return metadata != null && needsReconfiguration(metadata) ? 1 : 0;
});
fetchedPeerLogEntries =
Metrics.histogram(factory.createMetricName("FetchedPeerLogEntries"), false);
diff --git
a/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java
b/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java
index 2b1d791ac7..9415f89285 100644
--- a/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java
+++ b/src/java/org/apache/cassandra/streaming/DataMovementVerbHandler.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.NoPayload;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.tcm.ClusterMetadata;
public class DataMovementVerbHandler implements IVerbHandler<DataMovement>
{
@@ -47,8 +48,9 @@ public class DataMovementVerbHandler implements
IVerbHandler<DataMovement>
{
MessagingService.instance().respond(NoPayload.noPayload, message); //
let coordinator know we received the message
StreamPlan streamPlan = new
StreamPlan(StreamOperation.fromString(message.payload.streamOperation));
+ ClusterMetadata metadata = ClusterMetadata.current();
Schema.instance.getNonLocalStrategyKeyspaces().stream().forEach((ksm)
-> {
- if (ksm.replicationStrategy.getReplicationFactor().allReplicas <=
1)
+ if
(metadata.placements.get(ksm.params.replication).writes.byEndpoint().keySet().size()
<= 1)
return;
message.payload.movements.get(ksm.params.replication).asMap().forEach((local,
endpoints) -> {
diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java
b/src/java/org/apache/cassandra/tcm/CMSOperations.java
index 30ee48b20d..87e6d0e372 100644
--- a/src/java/org/apache/cassandra/tcm/CMSOperations.java
+++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.CMSPlacementStrategy;
import org.apache.cassandra.schema.ReplicationParams;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
@@ -45,6 +44,8 @@ import
org.apache.cassandra.tcm.transformations.cms.AdvanceCMSReconfiguration;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
+import static
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration.needsReconfiguration;
+
public class CMSOperations implements CMSOperationsMBean
{
public static final String MBEAN_OBJECT_NAME =
"org.apache.cassandra.tcm:type=CMSOperations";
@@ -142,8 +143,7 @@ public class CMSOperations implements CMSOperationsMBean
ClusterMetadata metadata = ClusterMetadata.current();
String members =
metadata.fullCMSMembers().stream().sorted().map(Object::toString).collect(Collectors.joining(","));
info.put(MEMBERS, members);
- CMSPlacementStrategy placementStrategy =
CMSPlacementStrategy.fromReplicationParams(ReplicationParams.meta(metadata),
nodeId -> true);
- info.put(NEEDS_RECONFIGURATION,
Boolean.toString(placementStrategy.needsReconfiguration(metadata)));
+ info.put(NEEDS_RECONFIGURATION,
Boolean.toString(needsReconfiguration(metadata)));
info.put(IS_MEMBER,
Boolean.toString(cms.isCurrentMember(FBUtilities.getBroadcastAddressAndPort())));
info.put(SERVICE_STATE,
ClusterMetadataService.state(metadata).toString());
info.put(IS_MIGRATING, Boolean.toString(cms.isMigrating()));
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
index 236e82392a..845b6741f9 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -41,6 +42,7 @@ import
org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ExceptionCode;
import org.apache.cassandra.exceptions.StartupException;
+import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.io.util.FileInputStreamPlus;
import org.apache.cassandra.io.util.FileOutputStreamPlus;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -74,6 +76,7 @@ import org.apache.cassandra.utils.concurrent.ImmediateFuture;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.stream.Collectors.toSet;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE;
import static org.apache.cassandra.tcm.ClusterMetadataService.State.GOSSIP;
import static org.apache.cassandra.tcm.ClusterMetadataService.State.LOCAL;
import static org.apache.cassandra.tcm.ClusterMetadataService.State.REMOTE;
@@ -364,7 +367,13 @@ public class ClusterMetadataService
public void reconfigureCMS(ReplicationParams replicationParams)
{
- Transformation transformation = new
PrepareCMSReconfiguration.Complex(replicationParams);
+ ClusterMetadata metadata = ClusterMetadata.current();
+ Set<NodeId> downNodes = new HashSet<>();
+ for (InetAddressAndPort ep : metadata.directory.allJoinedEndpoints())
+ if (!FailureDetector.instance.isAlive(ep))
+ downNodes.add(metadata.directory.peerId(ep));
+ PrepareCMSReconfiguration.Complex transformation = new
PrepareCMSReconfiguration.Complex(replicationParams, downNodes);
+ transformation.verify(metadata);
ClusterMetadataService.instance()
.commit(transformation);
@@ -374,6 +383,13 @@ public class ClusterMetadataService
public void ensureCMSPlacement(ClusterMetadata metadata)
{
+ if (TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.getBoolean())
+ {
+ logger.info("Not performing CMS reconfiguration as {} property is
set. This should only be used for testing.",
+
TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.getKey());
+ return;
+ }
+
try
{
reconfigureCMS(ReplicationParams.meta(metadata));
diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
index 1a96833ffb..fb6d9998c4 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm.sequences;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -205,9 +206,15 @@ public class ReconfigureCMS extends
MultiStepOperation<AdvanceCMSReconfiguration
{
if (!metadata.fullCMSMembers().contains(toRemove))
return;
+ Set<NodeId> downNodes = new HashSet<>();
+ for (InetAddressAndPort ep : metadata.directory.allJoinedEndpoints())
+ if (!FailureDetector.instance.isAlive(ep))
+ downNodes.add(metadata.directory.peerId(ep));
+ PrepareCMSReconfiguration.Simple transformation = new
PrepareCMSReconfiguration.Simple(metadata.directory.peerId(toRemove),
downNodes);
+ transformation.verify(metadata);
// We can force removal from the CMS as it doesn't alter the size of
the service
- ClusterMetadataService.instance().commit(new
PrepareCMSReconfiguration.Simple(metadata.directory.peerId(toRemove)));
+ ClusterMetadataService.instance().commit(transformation);
InProgressSequences.finishInProgressSequences(SequenceKey.instance);
if (ClusterMetadata.current().isCMSMember(toRemove))
diff --git a/src/java/org/apache/cassandra/tcm/serialization/Version.java
b/src/java/org/apache/cassandra/tcm/serialization/Version.java
index 7960c963ed..108348a504 100644
--- a/src/java/org/apache/cassandra/tcm/serialization/Version.java
+++ b/src/java/org/apache/cassandra/tcm/serialization/Version.java
@@ -40,6 +40,7 @@ public enum Version
V2(2),
/**
* - Serialize allowAutoSnapshot and incrementalBackups when serializing
TableParams
+ * - down nodes serialized in PrepareCMSReconfiguration
*/
V3(3),
diff --git
a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
index 2d41fb53a6..c8b15c4fa5 100644
---
a/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
+++
b/src/java/org/apache/cassandra/tcm/transformations/cms/PrepareCMSReconfiguration.java
@@ -20,10 +20,15 @@ package org.apache.cassandra.tcm.transformations.cms;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.function.Function;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -33,6 +38,8 @@ import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.locator.CMSPlacementStrategy;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicationFactor;
import org.apache.cassandra.schema.DistributedSchema;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
@@ -49,13 +56,30 @@ import org.apache.cassandra.tcm.serialization.Version;
import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
import static org.apache.cassandra.locator.MetaStrategy.entireRange;
+import static org.apache.cassandra.tcm.CMSOperations.REPLICATION_FACTOR;
-public class PrepareCMSReconfiguration
+public abstract class PrepareCMSReconfiguration implements Transformation
{
private static final Logger logger =
LoggerFactory.getLogger(PrepareCMSReconfiguration.class);
+ final Set<NodeId> downNodes;
- private static Transformation.Result executeInternal(ClusterMetadata prev,
Function<ClusterMetadata.Transformer, ClusterMetadata.Transformer> transform,
Diff diff)
+ public PrepareCMSReconfiguration(Set<NodeId> downNodes)
{
+ this.downNodes = downNodes;
+ }
+
+ protected abstract Predicate<NodeId>
additionalFilteringPredicate(Set<NodeId> downNodes);
+ protected abstract ReplicationParams newReplicationParams(ClusterMetadata
prev);
+
+ protected Transformation.Result executeInternal(ClusterMetadata prev,
Function<ClusterMetadata.Transformer, ClusterMetadata.Transformer> transform)
+ {
+ Diff diff = getDiff(prev);
+ if (!diff.hasChanges())
+ {
+ logger.info("Proposed CMS reconfiguration resulted in no required
modifications at epoch {}", prev.epoch.getEpoch());
+ return Transformation.success(prev.transformer(),
LockedRanges.AffectedRanges.EMPTY);
+ }
+
LockedRanges.Key lockKey = LockedRanges.keyFor(prev.nextEpoch());
Set<NodeId> cms =
prev.fullCMSMembers().stream().map(prev.directory::peerId).collect(Collectors.toSet());
Set<NodeId> tmp = new HashSet<>(cms);
@@ -71,17 +95,81 @@ public class PrepareCMSReconfiguration
return Transformation.success(transform.apply(transformer),
LockedRanges.AffectedRanges.EMPTY);
}
- public static class Simple implements Transformation
+ private Diff getDiff(ClusterMetadata prev)
+ {
+ Set<NodeId> currentCms = prev.fullCMSMemberIds();
+ Map<String, Integer> dcRF = extractRf(newReplicationParams(prev));
+ Set<NodeId> newCms = prepareNewCMS(dcRF, prev);
+ if (newCms.equals(currentCms))
+ return Diff.NOCHANGE;
+ return diff(currentCms, newCms);
+ }
+
+ private Set<NodeId> prepareNewCMS(Map<String, Integer> dcRf,
ClusterMetadata prev)
+ {
+ CMSPlacementStrategy placementStrategy = new
CMSPlacementStrategy(dcRf, additionalFilteringPredicate(downNodes));
+ return placementStrategy.reconfigure(prev);
+ }
+
+ public void verify(ClusterMetadata prev)
+ {
+ Map<String, Integer> dcRf = extractRf(newReplicationParams(prev));
+ int expectedSize =
dcRf.values().stream().mapToInt(Integer::intValue).sum();
+ Set<NodeId> newCms = prepareNewCMS(dcRf, prev);
+ if (newCms.size() < (expectedSize / 2) + 1)
+ throw new IllegalStateException("Too many nodes are currently DOWN
to safely perform the reconfiguration");
+ }
+
+ private static void serializeDownNodes(PrepareCMSReconfiguration
transformation, DataOutputPlus out, Version version) throws IOException
+ {
+ out.writeUnsignedVInt32(transformation.downNodes.size());
+ for (NodeId nodeId : transformation.downNodes)
+ NodeId.serializer.serialize(nodeId, out, version);
+ }
+
+ private static Set<NodeId> deserializeDownNodes(DataInputPlus in, Version
version) throws IOException
+ {
+ Set<NodeId> downNodes = new HashSet<>();
+ int count = in.readUnsignedVInt32();
+ for (int i = 0; i < count; i++)
+ downNodes.add(NodeId.serializer.deserialize(in, version));
+ return downNodes;
+ }
+
+ private static long serializedDownNodesSize(PrepareCMSReconfiguration
transformation, Version version)
+ {
+ long size =
TypeSizes.sizeofUnsignedVInt(transformation.downNodes.size());
+ for (NodeId nodeId : transformation.downNodes)
+ size += NodeId.serializer.serializedSize(nodeId, version);
+ return size;
+ }
+
+ public static class Simple extends PrepareCMSReconfiguration
{
public static final Simple.Serializer serializer = new Serializer();
private final NodeId toReplace;
- public Simple(NodeId toReplace)
+ public Simple(NodeId toReplace, Set<NodeId> downNodes)
{
+ super(downNodes);
this.toReplace = toReplace;
}
+ @Override
+ protected Predicate<NodeId> additionalFilteringPredicate(Set<NodeId>
downNodes)
+ {
+ // exclude the node being replaced from the new CMS, and avoid any
down nodes
+ return nodeId -> !nodeId.equals(toReplace) &&
!downNodes.contains(nodeId);
+ }
+
+ @Override
+ protected ReplicationParams newReplicationParams(ClusterMetadata prev)
+ {
+ // a simple reconfiguration retains the existing replication params
+ return ReplicationParams.meta(prev);
+ }
+
@Override
public Kind kind()
{
@@ -94,27 +182,16 @@ public class PrepareCMSReconfiguration
if
(!prev.fullCMSMembers().contains(prev.directory.getNodeAddresses(toReplace).broadcastAddress))
return new Rejected(INVALID, String.format("%s is not a member
of CMS. Members: %s", toReplace, prev.fullCMSMembers()));
- ReplicationParams metaParams = ReplicationParams.meta(prev);
- CMSPlacementStrategy placementStrategy =
CMSPlacementStrategy.fromReplicationParams(metaParams, nodeId ->
!nodeId.equals(toReplace));
- Set<NodeId> currentCms = prev.fullCMSMembers()
- .stream()
- .map(prev.directory::peerId)
- .collect(Collectors.toSet());
-
- Set<NodeId> newCms = placementStrategy.reconfigure(prev);
- if (newCms.equals(currentCms))
- {
- logger.info("Proposed CMS reconfiguration resulted in no
required modifications at epoch {}", prev.epoch.getEpoch());
- return Transformation.success(prev.transformer(),
LockedRanges.AffectedRanges.EMPTY);
- }
- Diff diff = diff(currentCms, newCms);
- return executeInternal(prev, t -> t, diff);
+ // A simple reconfiguration only kicks off the sequence of
membership changes, no additional metadata
+ // transformation is required.
+ return executeInternal(prev, t -> t);
}
public String toString()
{
return "PrepareCMSReconfiguration#Simple{" +
"toReplace=" + toReplace +
+ ", downNodes=" + downNodes +
'}';
}
@@ -122,34 +199,58 @@ public class PrepareCMSReconfiguration
{
public void serialize(Transformation t, DataOutputPlus out,
Version version) throws IOException
{
- Simple tranformation = (Simple) t;
- NodeId.serializer.serialize(tranformation.toReplace, out,
version);
+ Simple transformation = (Simple) t;
+ NodeId.serializer.serialize(transformation.toReplace, out,
version);
+ if (version.isAtLeast(Version.V3))
+
PrepareCMSReconfiguration.serializeDownNodes(transformation, out, version);
}
public Simple deserialize(DataInputPlus in, Version version)
throws IOException
{
- return new Simple(NodeId.serializer.deserialize(in, version));
+ NodeId replaceNode = NodeId.serializer.deserialize(in,
version);
+ Set<NodeId> downNodes = version.isAtLeast(Version.V3)
+ ?
PrepareCMSReconfiguration.deserializeDownNodes(in, version)
+ : Collections.emptySet();
+ return new Simple(replaceNode, downNodes);
}
public long serializedSize(Transformation t, Version version)
{
- Simple tranformation = (Simple) t;
- return
NodeId.serializer.serializedSize(tranformation.toReplace, version);
+ Simple transformation = (Simple) t;
+ long size =
NodeId.serializer.serializedSize(transformation.toReplace, version);
+ if (version.isAtLeast(Version.V3))
+ size +=
PrepareCMSReconfiguration.serializedDownNodesSize(transformation, version);
+ return size;
}
}
}
- public static class Complex implements Transformation
+ public static class Complex extends PrepareCMSReconfiguration
{
public static final Complex.Serializer serializer = new
Complex.Serializer();
private final ReplicationParams replicationParams;
- public Complex(ReplicationParams replicationParams)
+ public Complex(ReplicationParams replicationParams, Set<NodeId>
downNodes)
{
+ super(downNodes);
this.replicationParams = replicationParams;
}
+ @Override
+ protected Predicate<NodeId> additionalFilteringPredicate(Set<NodeId>
downNodes)
+ {
+ // exclude any down nodes
+ return nodeId -> !downNodes.contains(nodeId);
+ }
+
+ @Override
+ protected ReplicationParams newReplicationParams(ClusterMetadata
ignored)
+ {
+ // desired replication params are supplied, so just return them
+ return replicationParams;
+ }
+
@Override
public Kind kind()
{
@@ -159,34 +260,21 @@ public class PrepareCMSReconfiguration
@Override
public Result execute(ClusterMetadata prev)
{
+ // In a complex reconfiguration, in addition to initiating the
sequence of membership changes,
+ // we're modifying the replication params of the metadata keyspace
so we supply a function to do that
KeyspaceMetadata keyspace =
prev.schema.getKeyspaceMetadata(SchemaConstants.METADATA_KEYSPACE_NAME);
KeyspaceMetadata newKeyspace = keyspace.withSwapped(new
KeyspaceParams(keyspace.params.durableWrites, replicationParams));
- CMSPlacementStrategy placementStrategy =
CMSPlacementStrategy.fromReplicationParams(replicationParams, nodeId -> true);
-
- Set<NodeId> currentCms = prev.fullCMSMembers()
- .stream()
- .map(prev.directory::peerId)
- .collect(Collectors.toSet());
-
- Set<NodeId> newCms = placementStrategy.reconfigure(prev);
- if (newCms.equals(currentCms))
- {
- logger.info("Proposed CMS reconfiguration resulted in no
required modifications at epoch {}", prev.epoch.getEpoch());
- return Transformation.success(prev.transformer(),
LockedRanges.AffectedRanges.EMPTY);
- }
- Diff diff = diff(currentCms, newCms);
-
return executeInternal(prev,
- transformer ->
transformer.with(prev.placements.replaceParams(prev.nextEpoch(),ReplicationParams.meta(prev),
replicationParams))
- .with(new
DistributedSchema(prev.schema.getKeyspaces().withAddedOrUpdated(newKeyspace))),
- diff);
+ transformer ->
transformer.with(prev.placements.replaceParams(prev.nextEpoch(),
ReplicationParams.meta(prev), replicationParams))
+ .with(new
DistributedSchema(prev.schema.getKeyspaces().withAddedOrUpdated(newKeyspace))));
}
public String toString()
{
return "PrepareCMSReconfiguration#Complex{" +
"replicationParams=" + replicationParams +
+ ", downNodes=" + downNodes +
'}';
}
@@ -194,19 +282,28 @@ public class PrepareCMSReconfiguration
{
public void serialize(Transformation t, DataOutputPlus out,
Version version) throws IOException
{
- Complex tranformation = (Complex) t;
-
ReplicationParams.serializer.serialize(tranformation.replicationParams, out,
version);
+ Complex transformation = (Complex) t;
+
ReplicationParams.serializer.serialize(transformation.replicationParams, out,
version);
+ if (version.isAtLeast(Version.V3))
+
PrepareCMSReconfiguration.serializeDownNodes(transformation, out, version);
}
public Complex deserialize(DataInputPlus in, Version version)
throws IOException
{
- return new
Complex(ReplicationParams.serializer.deserialize(in, version));
+ ReplicationParams params =
ReplicationParams.serializer.deserialize(in, version);
+ Set<NodeId> downNodes = version.isAtLeast(Version.V3)
+ ?
PrepareCMSReconfiguration.deserializeDownNodes(in, version)
+ : Collections.emptySet();
+ return new Complex(params, downNodes);
}
public long serializedSize(Transformation t, Version version)
{
- Complex tranformation = (Complex) t;
- return
ReplicationParams.serializer.serializedSize(tranformation.replicationParams,
version);
+ Complex transformation = (Complex) t;
+ long size =
ReplicationParams.serializer.serializedSize(transformation.replicationParams,
version);
+ if (version.isAtLeast(Version.V3))
+ size +=
PrepareCMSReconfiguration.serializedDownNodesSize(transformation, version);
+ return size;
}
}
}
@@ -233,8 +330,51 @@ public class PrepareCMSReconfiguration
return new Diff(additions, removals);
}
+ private static Map<String, Integer> extractRf(ReplicationParams params)
+ {
+ if (params.isMeta())
+ {
+ assert !params.options.containsKey(REPLICATION_FACTOR);
+ Map<String, Integer> dcRf = new HashMap<>();
+ for (Map.Entry<String, String> entry : params.options.entrySet())
+ {
+ String dc = entry.getKey();
+ ReplicationFactor rf =
ReplicationFactor.fromString(entry.getValue());
+ dcRf.put(dc, rf.fullReplicas);
+ }
+ return dcRf;
+ }
+ else
+ {
+ throw new IllegalStateException("Can't parse the params: " +
params);
+ }
+ }
+
+ public static boolean needsReconfiguration(ClusterMetadata metadata)
+ {
+ Map<String, Integer> dcRf =
extractRf(ReplicationParams.meta(metadata));
+ Set<NodeId> currentCms = metadata.fullCMSMembers()
+ .stream()
+ .map(metadata.directory::peerId)
+ .collect(Collectors.toSet());
+ int expectedSize =
dcRf.values().stream().mapToInt(Integer::intValue).sum();
+ if (currentCms.size() != expectedSize)
+ return true;
+ for (Map.Entry<String, Integer> dcRfEntry : dcRf.entrySet())
+ {
+ Collection<InetAddressAndPort> nodesInDc =
metadata.directory.allDatacenterEndpoints().get(dcRfEntry.getKey());
+ if (nodesInDc.size() < dcRfEntry.getValue())
+ return true;
+ }
+
+ CMSPlacementStrategy placementStrategy = new
CMSPlacementStrategy(dcRf, nodeId -> true);
+ Set<NodeId> newCms = placementStrategy.reconfigure(metadata);
+ return !currentCms.equals(newCms);
+ }
+
public static class Diff
{
+ public static final Diff NOCHANGE = new Diff(Collections.emptyList(),
Collections.emptyList());
public static final Serializer serializer = new Serializer();
public final List<NodeId> additions;
@@ -246,6 +386,11 @@ public class PrepareCMSReconfiguration
this.removals = removals;
}
+ public boolean hasChanges()
+ {
+ return this != NOCHANGE;
+ }
+
public String toString()
{
return "Diff{" +
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
index 80da569783..a08ba58a06 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/CASTest.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
-
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -31,7 +30,6 @@ import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
-import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
@@ -43,7 +41,6 @@ import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
-
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.SimpleSeedProvider;
import org.apache.cassandra.net.Verb;
@@ -54,9 +51,11 @@ import
org.apache.cassandra.tcm.sequences.InProgressSequences;
import org.apache.cassandra.utils.FBUtilities;
import static
org.apache.cassandra.config.CassandraRelevantProperties.PAXOS_USE_SELF_EXECUTION;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
import static
org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.SERIAL;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
@@ -76,10 +75,6 @@ import static org.junit.Assert.fail;
public class CASTest extends CASCommonTestCases
{
- static
- {
-
CassandraRelevantProperties.TCM_USE_ATOMIC_LONG_PROCESSOR.setBoolean(true);
- }
/**
* The {@code cas_contention_timeout} used during the tests
@@ -98,6 +93,8 @@ public class CASTest extends CASCommonTestCases
public static void beforeClass() throws Throwable
{
PAXOS_USE_SELF_EXECUTION.setBoolean(false);
+ TCM_SKIP_CMS_RECONFIGURATION_AFTER_TOPOLOGY_CHANGE.setBoolean(true);
+ TCM_USE_ATOMIC_LONG_PROCESSOR.setBoolean(true);
TestBaseImpl.beforeClass();
// At times during these tests, node1 is going to be blocked from
appending entries to its local metadata
// log in order to induce divergent views of cluster topology between
instances. This precludes it from
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index b3dbdf796d..d06d79afd1 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -827,7 +827,7 @@ public class ClusterMetadataTestHelper
public static void reconfigureCms(ReplicationParams replication)
{
- ClusterMetadata metadata =
ClusterMetadataService.instance().commit(new
PrepareCMSReconfiguration.Complex(replication));
+ ClusterMetadata metadata =
ClusterMetadataService.instance().commit(new
PrepareCMSReconfiguration.Complex(replication, Collections.emptySet()));
while
(metadata.inProgressSequences.contains(ReconfigureCMS.SequenceKey.instance))
{
AdvanceCMSReconfiguration next = ((ReconfigureCMS)
metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance)).next;
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
index 89253b72b8..931ee320e5 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
@@ -529,7 +529,7 @@ public class MetadataChangeSimulationTest extends
CMSTestBase
for (String s : ntsRf.asMap().keySet())
cmsRf.put(s, 3);
- simulateBounces(ntsRf, new
CMSPlacementStrategy.DatacenterAware(cmsRf, (cm, n) -> random.nextInt(10) > 1),
random);
+ simulateBounces(ntsRf, new CMSPlacementStrategy(cmsRf, (cm, n) ->
random.nextInt(10) > 1), random);
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingTest.java
new file mode 100644
index 0000000000..909e0a46d3
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSStreamingTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.sequences.AddToCMS;
+import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+public class ReconfigureCMSStreamingTest extends TestBaseImpl
+{
+ @Test
+ public void testRF1() throws IOException
+ {
+ /*
+ 1. place the CMS on the "wrong" node (2)
+ 2. execute a bunch of schema changes
+ 3. run "cms reconfigure 1" which moves the cms back to the correct
node (1)
+ 4. make sure node1 has all the epochs that node2 had before
reconfiguration
+ */
+ try (Cluster cluster = init(Cluster.build()
+ .withNodes(3)
+ .withConfig(c ->
c.with(Feature.GOSSIP, Feature.NETWORK))
+ .start()))
+ {
+ cluster.get(2).runOnInstance(() -> AddToCMS.initiate());
+ cluster.get(1).runOnInstance(() ->
ClusterMetadataService.instance().commit(new
RemoveFromCMS(InetAddressAndPort.getByNameUnchecked("127.0.0.1"), true)));
+
+ for (int i = 0; i < 20; i++)
+ cluster.schemaChange(withKeyspace("create table %s.tbl"+i+"
(id int primary key)"));
+
+ long[] epochsBefore =
epochs(cluster.get(2).executeInternal("select * from
system_cluster_metadata.distributed_metadata_log"));
+ cluster.get(3).nodetoolResult("cms", "reconfigure", "1");
+ long[] epochsAfter = epochs(cluster.get(1).executeInternal("select
epoch from system_cluster_metadata.distributed_metadata_log"));
+ assertTrue(epochsBefore.length > 20); // at least 20 schema
changes above
+ assertTrue(epochsAfter.length > epochsBefore.length); // we get a
few more epochs from reconfiguration
+ for (int i = 0; i < epochsBefore.length; i++)
+ assertEquals(epochsBefore[i] + " != " + epochsAfter[i],
epochsBefore[i], epochsAfter[i]);
+ }
+ }
+
+ private long[] epochs(Object[][] res)
+ {
+ long[] epochs = new long[res.length];
+ for (int i = 0; i < res.length; i++)
+ epochs[i] = (long)res[i][0];
+ Arrays.sort(epochs);
+ return epochs;
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
index e8713ae35f..d1f983f19a 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
@@ -18,7 +18,12 @@
package org.apache.cassandra.distributed.test.log;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.junit.Assert;
@@ -27,6 +32,9 @@ import org.junit.Test;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.distributed.shared.NetworkTopology;
import org.apache.cassandra.locator.MetaStrategy;
import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
@@ -40,6 +48,15 @@ import org.apache.cassandra.tcm.sequences.ReconfigureCMS;
import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
import org.apache.cassandra.utils.FBUtilities;
+import static
org.apache.cassandra.config.CassandraRelevantProperties.BOOTSTRAP_SKIP_SCHEMA_CHECK;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.awaitRingJoin;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.replaceHostAndStart;
+import static
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.psjava.util.AssertStatus.assertTrue;
+
public class ReconfigureCMSTest extends FuzzTestBase
{
@Test
@@ -59,8 +76,8 @@ public class ReconfigureCMSTest extends FuzzTestBase
cluster.get(nodeSelector.get()).nodetoolResult("cms",
"reconfigure", "5").asserts().success();
cluster.get(1).runOnInstance(() -> {
ClusterMetadata metadata = ClusterMetadata.current();
- Assert.assertEquals(5, metadata.fullCMSMembers().size());
- Assert.assertEquals(ReplicationParams.simpleMeta(5,
metadata.directory.knownDatacenters()),
+ assertEquals(5, metadata.fullCMSMembers().size());
+ assertEquals(ReplicationParams.simpleMeta(5,
metadata.directory.knownDatacenters()),
metadata.placements.keys().stream().filter(ReplicationParams::isMeta).findFirst().get());
});
cluster.stream().forEach(i -> {
@@ -70,8 +87,8 @@ public class ReconfigureCMSTest extends FuzzTestBase
cluster.get(nodeSelector.get()).nodetoolResult("cms",
"reconfigure", "1").asserts().success();
cluster.get(1).runOnInstance(() -> {
ClusterMetadata metadata = ClusterMetadata.current();
- Assert.assertEquals(1, metadata.fullCMSMembers().size());
- Assert.assertEquals(ReplicationParams.simpleMeta(1,
metadata.directory.knownDatacenters()),
+ assertEquals(1, metadata.fullCMSMembers().size());
+ assertEquals(ReplicationParams.simpleMeta(1,
metadata.directory.knownDatacenters()),
metadata.placements.keys().stream().filter(ReplicationParams::isMeta).findFirst().get());
});
}
@@ -89,7 +106,7 @@ public class ReconfigureCMSTest extends FuzzTestBase
{
cluster.get(1).nodetoolResult("cms", "reconfigure",
"2").asserts().success();
cluster.get(1).runOnInstance(() -> {
- ClusterMetadataService.instance().commit(new
PrepareCMSReconfiguration.Complex(ReplicationParams.simple(3).asMeta()));
+ ClusterMetadataService.instance().commit(new
PrepareCMSReconfiguration.Complex(ReplicationParams.simple(3).asMeta(),
Collections.emptySet()));
ReconfigureCMS reconfigureCMS = (ReconfigureCMS)
ClusterMetadata.current().inProgressSequences.get(ReconfigureCMS.SequenceKey.instance);
ClusterMetadataService.instance().commit(reconfigureCMS.next);
ProgressBarrier.propagateLast(MetaStrategy.affectedRanges(ClusterMetadata.current()));
@@ -110,15 +127,15 @@ public class ReconfigureCMSTest extends FuzzTestBase
ProgressBarrier.propagateLast(MetaStrategy.affectedRanges(ClusterMetadata.current()));
ClusterMetadata metadata = ClusterMetadata.current();
Assert.assertNull(metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance));
- Assert.assertEquals(2, metadata.fullCMSMembers().size());
+ assertEquals(2, metadata.fullCMSMembers().size());
ReplicationParams params = ReplicationParams.meta(metadata);
DataPlacement placements = metadata.placements.get(params);
- Assert.assertEquals(placements.reads, placements.writes);
- Assert.assertEquals(metadata.fullCMSMembers().size(),
Integer.parseInt(params.asMap().get("dc0")));
+ assertEquals(placements.reads, placements.writes);
+ assertEquals(metadata.fullCMSMembers().size(),
Integer.parseInt(params.asMap().get("dc0")));
});
cluster.get(1).runOnInstance(() -> {
- ClusterMetadataService.instance().commit(new
PrepareCMSReconfiguration.Complex(ReplicationParams.simple(4).asMeta()));
+ ClusterMetadataService.instance().commit(new
PrepareCMSReconfiguration.Complex(ReplicationParams.simple(4).asMeta(),
Collections.emptySet()));
ProgressBarrier.propagateLast(MetaStrategy.affectedRanges(ClusterMetadata.current()));
ReconfigureCMS reconfigureCMS = (ReconfigureCMS)
ClusterMetadata.current().inProgressSequences.get(ReconfigureCMS.SequenceKey.instance);
@@ -136,10 +153,116 @@ public class ReconfigureCMSTest extends FuzzTestBase
ClusterMetadata metadata = ClusterMetadata.current();
Assert.assertNull(metadata.inProgressSequences.get(ReconfigureCMS.SequenceKey.instance));
Assert.assertTrue(metadata.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort()));
- Assert.assertEquals(3, metadata.fullCMSMembers().size());
+ assertEquals(3, metadata.fullCMSMembers().size());
DataPlacement placements =
metadata.placements.get(ReplicationParams.meta(metadata));
- Assert.assertEquals(placements.reads, placements.writes);
+ assertEquals(placements.reads, placements.writes);
+ });
+ }
+ }
+
+ @Test
+ public void testReconfigureTooManyNodesDown() throws IOException,
ExecutionException, InterruptedException
+ {
+ try (Cluster cluster = init(Cluster.build(3)
+ .withConfig(conf ->
conf.with(Feature.NETWORK, Feature.GOSSIP))
+ .start()))
+ {
+ cluster.get(2).shutdown().get();
+ cluster.get(3).shutdown().get();
+ // Fails as the CMS size would be less than a quorum of what was
specified (i.e. 3/2 + 1)
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().failure();
+ cluster.get(2).startup();
+ cluster.get(1).runOnInstance(() -> assertEquals(1,
ClusterMetadata.current().fullCMSMembers().size()));
+
+ // Succeeds, but flags that a further reconfiguration is required
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().success();
+ cluster.get(1).runOnInstance(() -> assertEquals(2,
ClusterMetadata.current().fullCMSMembers().size()));
+ cluster.get(1).runOnInstance(() ->
assertTrue(PrepareCMSReconfiguration.needsReconfiguration(ClusterMetadata.current())));
+
+ // All good
+ cluster.get(3).startup();
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().success();
+ cluster.get(1).runOnInstance(() -> assertEquals(3,
ClusterMetadata.current().fullCMSMembers().size()));
+ cluster.get(1).runOnInstance(() ->
assertFalse(PrepareCMSReconfiguration.needsReconfiguration(ClusterMetadata.current())));
+ }
+ }
+
+ @Test
+ public void testReplaceSameSize() throws IOException, ExecutionException,
InterruptedException
+ {
+ TokenSupplier even = TokenSupplier.evenlyDistributedTokens(3);
+ try (Cluster cluster = init(Cluster.build(3)
+ .withConfig(c ->
c.with(Feature.GOSSIP, Feature.NETWORK))
+ .withTokenSupplier(node ->
even.token(node == 4 ? 2 : node))
+ .start()))
+ {
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().success();
+ cluster.get(2).shutdown().get();
+ // now create a new node to replace the other node
+ IInvokableInstance replacingNode = replaceHostAndStart(cluster,
cluster.get(2), props -> {
+ // since we have a downed host there might be a schema version
which is old show up but
+ // can't be fetched since the host is down...
+ props.set(BOOTSTRAP_SKIP_SCHEMA_CHECK, true);
+ });
+ // wait till the replacing node is in the ring
+ awaitRingJoin(cluster.get(1), replacingNode);
+ awaitRingJoin(replacingNode, cluster.get(1));
+ replacingNode.runOnInstance(() -> {
+ ClusterMetadata metadata = ClusterMetadata.current();
+
assertTrue(metadata.isCMSMember(FBUtilities.getBroadcastAddressAndPort()));
+ assertEquals(3, metadata.fullCMSMembers().size());
});
}
}
+
+ @Test
+ public void testReconfigurePickAliveNodesIfPossible() throws Exception
+ {
+ try (Cluster cluster = init(Cluster.build(5)
+ .withConfig(conf ->
conf.with(Feature.NETWORK, Feature.GOSSIP))
+ .start()))
+ {
+ cluster.get(2).shutdown().get();
+ cluster.get(3).shutdown().get();
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().success();
+ cluster.get(2).startup();
+ cluster.get(3).startup();
+
+ Set<String> expectedCMSMembers = expectedCMS(cluster, 1, 4, 5);
+ cluster.forEach(inst -> assertEquals(expectedCMSMembers,
ClusterUtils.getCMSMembers(inst)));
+ }
+ }
+
+ @Test
+ public void testReconfigurationViolatesRackDiversityIfNecessary() throws
Exception
+ {
+ // rack1: node1, node3
+ // rack2: node2
+ // rack4: node4
+ // ideal placement for CMS is 1, 2, 4 but if 2 is down, violate rack
diversity and pick 1, 3, 4
+ try (Cluster cluster = init(Cluster.build(4)
+
.withNodeIdTopology(networkTopology(4, (nodeid) -> nodeid % 2 == 1 ?
dcAndRack("dc1", "rack1")
+
: dcAndRack("dc1", "rack" + nodeid)))
+ .withConfig(conf ->
conf.with(Feature.NETWORK, Feature.GOSSIP))
+ .start()))
+ {
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().success();
+ Set<String> rackDiverse = expectedCMS(cluster, 1, 2, 4);
+ cluster.forEach(inst -> assertEquals(rackDiverse,
ClusterUtils.getCMSMembers(inst)));
+ cluster.get(2).shutdown().get();
+ cluster.get(1).nodetoolResult("cms", "reconfigure",
"3").asserts().success();
+ cluster.get(2).startup();
+ Set<String> notRackDiverse = expectedCMS(cluster, 1, 4, 3);
+ cluster.forEach(inst -> assertEquals(notRackDiverse,
ClusterUtils.getCMSMembers(inst)));
+ }
+ }
+
+ // We can't assume that nodeId matches endpoint (ie node3 = 127.0.0.3 etc)
+ private Set<String> expectedCMS(Cluster cluster, int... instanceIds)
+ {
+ Set<String> expectedCMSMembers = new HashSet<>(instanceIds.length);
+ for (int id : instanceIds)
+
expectedCMSMembers.add(cluster.get(id).config().broadcastAddress().getAddress().toString());
+ return expectedCMSMembers;
+ }
}
diff --git a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
index d90cbd2464..c6a96fa71e 100644
--- a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
@@ -110,7 +110,7 @@ public class MetaStrategyTest
rf.put("dc2", 2);
rf.put("dc3", 2);
- CMSPlacementStrategy placementStrategy = new
CMSPlacementStrategy.DatacenterAware(rf, (cd, n) -> true);
+ CMSPlacementStrategy placementStrategy = new CMSPlacementStrategy(rf,
(cd, n) -> true);
Assert.assertEquals(nodeIds(metadata.directory,
1, 2, 4, 5, 7, 8),
placementStrategy.reconfigure(metadata));
@@ -119,8 +119,8 @@ public class MetaStrategyTest
1, 2, 4, 5, 7, 8),
placementStrategy.reconfigure(metadata));
- placementStrategy = new CMSPlacementStrategy.DatacenterAware(rf, (cd,
n) -> !n.equals(metadata.directory.peerId(addr(2).broadcastAddress)) &&
-
!n.equals(metadata.directory.peerId(addr(2).broadcastAddress)));
+ placementStrategy = new CMSPlacementStrategy(rf, (cd, n) ->
!n.equals(metadata.directory.peerId(addr(2).broadcastAddress)) &&
+
!n.equals(metadata.directory.peerId(addr(2).broadcastAddress)));
Assert.assertEquals(nodeIds(metadata.directory,
1, 3, 4, 5, 7, 8),
placementStrategy.reconfigure(metadata));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]