This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7a888149df Support topology-safe changes to Datacenter & Rack for live
nodes
7a888149df is described below
commit 7a888149dff4afaea8753571097bd8bca6a4fbfd
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Fri Jan 12 13:00:00 2024 +0000
Support topology-safe changes to Datacenter & Rack for live nodes
Patch by Sam Tunnicliffe; reviewed by Marcus Eriksson for
CASSANDRA-20528
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/SystemKeyspace.java | 7 +
.../apache/cassandra/service/StorageService.java | 18 ++
.../cassandra/service/StorageServiceMBean.java | 2 +
.../org/apache/cassandra/tcm/Transformation.java | 3 +-
.../tcm/listeners/LegacyStateListener.java | 7 +-
.../tcm/listeners/PlacementsChangeListener.java | 2 +-
.../apache/cassandra/tcm/membership/Directory.java | 25 +-
.../apache/cassandra/tcm/membership/Location.java | 12 +
.../cassandra/tcm/ownership/DataPlacement.java | 5 +
.../cassandra/tcm/ownership/DataPlacements.java | 18 ++
.../cassandra/tcm/ownership/ReplicaGroups.java | 15 ++
.../apache/cassandra/tcm/ownership/TokenMap.java | 4 +-
.../tcm/ownership/VersionedEndpoints.java | 7 +-
.../tcm/sequences/CancelCMSReconfiguration.java | 2 +-
.../cassandra/tcm/transformations/AlterSchema.java | 2 +-
.../tcm/transformations/AlterTopology.java | 193 +++++++++++++++
src/java/org/apache/cassandra/tools/NodeTool.java | 1 +
.../cassandra/tools/nodetool/AlterTopology.java | 49 ++++
.../distributed/test/log/AlterTopologyTest.java | 267 +++++++++++++++++++++
.../test/log/MetadataChangeSimulationTest.java | 4 +-
.../cassandra/distributed/test/log/ModelState.java | 14 ++
.../distributed/test/log/ReconfigureCMSTest.java | 4 +-
.../distributed/test/log/SimulatedOperation.java | 21 ++
.../cassandra/harry/model/TokenPlacementModel.java | 17 ++
.../SchemaChangeDuringRangeMovementTest.java | 48 +---
.../service/AlterTopologyArgParsingTest.java | 149 ++++++++++++
.../cassandra/tcm/membership/DirectoryTest.java | 126 ++++++++++
.../InProgressSequenceCancellationTest.java | 36 +--
.../cassandra/tcm/sequences/SequencesUtils.java | 72 ++++++
30 files changed, 1036 insertions(+), 95 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2fb55b032e..26eb4a062c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Support topology-safe changes to Datacenter & Rack for live nodes
(CASSANDRA-20528)
* Add SSTableIntervalTree latency metric (CASSANDRA-20502)
* Ignore repetitions of semicolon in CQLSH (CASSANDRA-19956)
* Avoid NPE during cms initialization abort (CASSANDRA-20527)
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 81186c512a..36fc953360 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -975,6 +975,13 @@ public final class SystemKeyspace
executeInternal(format(req, LOCAL, LOCAL), rack);
}
+ public static synchronized void updateLocation(Location location)
+ {
+ String req = "INSERT INTO system.%s (key, data_center, rack) VALUES
('%s', ?, ?)";
+ executeInternal(format(req, LOCAL, LOCAL), location.datacenter,
location.rack);
+ forceBlockingFlush(LOCAL);
+ }
+
public static Set<String> tokensAsSet(Collection<Token> tokens)
{
if (tokens.isEmpty())
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index e8a3b40a86..bc914a9e55 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -186,6 +186,7 @@ import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.compatibility.GossipHelper;
import org.apache.cassandra.tcm.compatibility.TokenRingUtils;
import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
@@ -199,6 +200,7 @@ import
org.apache.cassandra.tcm.sequences.InProgressSequences;
import org.apache.cassandra.tcm.sequences.SingleNodeSequences;
import org.apache.cassandra.tcm.transformations.Assassinate;
import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
import org.apache.cassandra.tcm.transformations.Register;
import org.apache.cassandra.tcm.transformations.Startup;
import org.apache.cassandra.tcm.transformations.Unregister;
@@ -5553,4 +5555,20 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
{
return DatabaseDescriptor.getPaxosRepairRaceWait();
}
+
+ public void alterTopology(String changes)
+ {
+ Map<NodeId, Location> updates = AlterTopology.parseArgs(changes,
ClusterMetadata.current().directory);
+ logger.info("Received request to modify rack assignments. Proposed
changes: {}", updates);
+ if (updates.isEmpty())
+ return;
+
+ AlterTopology transform = new AlterTopology(updates,
ClusterMetadataService.instance().placementProvider());
+ ClusterMetadataService.instance()
+ .commit(transform,
+ m -> { logger.info("Rack changes
committed successfully"); return m; },
+ (c, r) -> {
+ throw new
IllegalArgumentException("Unable to commit rack changes: " + r);
+ });
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index b738ecd486..74017b2579 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -1360,4 +1360,6 @@ public interface StorageServiceMBean extends
NotificationEmitter
void setPaxosRepairRaceWait(boolean paxosRepairCoordinatorWait);
boolean getPaxosRepairRaceWait();
+ // Comma delimited list of "nodeId=dc:rack" or "endpoint=dc:rack"
+ void alterTopology(String updates);
}
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java
b/src/java/org/apache/cassandra/tcm/Transformation.java
index 8cfda01e26..864d9a5d94 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -219,7 +219,8 @@ public interface Transformation
PREPARE_SIMPLE_CMS_RECONFIGURATION(31, () ->
PrepareCMSReconfiguration.Simple.serializer),
PREPARE_COMPLEX_CMS_RECONFIGURATION(32, () ->
PrepareCMSReconfiguration.Complex.serializer),
ADVANCE_CMS_RECONFIGURATION(33, () ->
AdvanceCMSReconfiguration.serializer),
- CANCEL_CMS_RECONFIGURATION(34, () ->
CancelCMSReconfiguration.serializer)
+ CANCEL_CMS_RECONFIGURATION(34, () ->
CancelCMSReconfiguration.serializer),
+ ALTER_TOPOLOGY(35, () -> AlterTopology.serializer),
;
private final Supplier<AsymmetricMetadataSerializer<Transformation, ?
extends Transformation>> serializer;
diff --git
a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
index f0fced7ae6..798583a533 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java
@@ -115,6 +115,9 @@ public class LegacyStateListener implements
ChangeListener.Async
// state for the local node.
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
Gossiper.instance.addLocalApplicationState(SCHEMA,
StorageService.instance.valueFactory.schema(next.schema.getVersion()));
+ // if the local node's location has changed, update
system.local.
+ if
(!next.directory.location(change).equals(prev.directory.location(change)))
+
SystemKeyspace.updateLocation(next.directory.location(change));
}
if (next.directory.peerState(change) == REGISTERED)
@@ -181,6 +184,8 @@ public class LegacyStateListener implements
ChangeListener.Async
{
return prev.peerState(nodeId) != next.peerState(nodeId) ||
!Objects.equals(prev.getNodeAddresses(nodeId),
next.getNodeAddresses(nodeId)) ||
- !Objects.equals(prev.version(nodeId), next.version(nodeId));
+ !Objects.equals(prev.version(nodeId), next.version(nodeId)) ||
+ !Objects.equals(prev.location(nodeId), next.location(nodeId));
+
}
}
diff --git
a/src/java/org/apache/cassandra/tcm/listeners/PlacementsChangeListener.java
b/src/java/org/apache/cassandra/tcm/listeners/PlacementsChangeListener.java
index 80da4ec844..605b526378 100644
--- a/src/java/org/apache/cassandra/tcm/listeners/PlacementsChangeListener.java
+++ b/src/java/org/apache/cassandra/tcm/listeners/PlacementsChangeListener.java
@@ -34,7 +34,7 @@ public class PlacementsChangeListener implements
ChangeListener
private boolean shouldInvalidate(ClusterMetadata prev, ClusterMetadata
next)
{
if
(!prev.placements.lastModified().equals(next.placements.lastModified()) &&
- !prev.placements.equals(next.placements)) // <- todo should we
update lastModified if the result is the same?
+ !prev.placements.equivalentTo(next.placements)) // <- todo should
we update lastModified if the result is the same?
return true;
if (prev.schema.getKeyspaces().size() !=
next.schema.getKeyspaces().size())
diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java
b/src/java/org/apache/cassandra/tcm/membership/Directory.java
index 51ab84c520..436ded0ab3 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Directory.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java
@@ -237,7 +237,6 @@ public class Directory implements MetadataValue<Directory>
{
InetAddressAndPort endpoint = peers.get(id);
Location location = locations.get(id);
-
BTreeMultimap<String, InetAddressAndPort> rackEP =
(BTreeMultimap<String, InetAddressAndPort>) racksByDC.get(location.datacenter);
if (rackEP == null)
rackEP = BTreeMultimap.empty();
@@ -268,6 +267,26 @@ public class Directory implements MetadataValue<Directory>
newRacksByDC);
}
+ public Directory withUpdatedRackAndDc(NodeId id, Location location)
+ {
+ if (!peers.containsKey(id))
+ throw new IllegalArgumentException(String.format("Node %s has no
registered location to update", id));
+
+ return withoutRackAndDC(id).withLocation(id,
location).withRackAndDC(id);
+ }
+
+ private Directory withLocation(NodeId id, Location location)
+ {
+ if (!locations.containsKey(id))
+ throw new IllegalArgumentException(String.format("Node %s has no
registered location to update", id));
+
+ if (locations.get(id).equals(location))
+ return this;
+
+ return new Directory(nextId, lastModified, peers,
locations.withForce(id, location), states, versions, hostIds,
+ addresses, endpointsByDC, racksByDC);
+ }
+
public Directory without(NodeId id)
{
InetAddressAndPort endpoint = peers.get(id);
@@ -665,7 +684,7 @@ public class Directory implements MetadataValue<Directory>
Directory directory = (Directory) o;
return Objects.equals(lastModified, directory.lastModified) &&
- isEquivalent(directory);
+ equivalentTo(directory);
}
private static Pair<NodeVersion, NodeVersion>
minMaxVersions(BTreeMap<NodeId, NodeState> states, BTreeMap<NodeId,
NodeVersion> versions)
@@ -700,7 +719,7 @@ public class Directory implements MetadataValue<Directory>
* does not check equality of lastModified
*/
@VisibleForTesting
- public boolean isEquivalent(Directory directory)
+ public boolean equivalentTo(Directory directory)
{
return nextId == directory.nextId &&
Objects.equals(peers, directory.peers) &&
diff --git a/src/java/org/apache/cassandra/tcm/membership/Location.java
b/src/java/org/apache/cassandra/tcm/membership/Location.java
index faf8230d94..08ad29dde9 100644
--- a/src/java/org/apache/cassandra/tcm/membership/Location.java
+++ b/src/java/org/apache/cassandra/tcm/membership/Location.java
@@ -67,6 +67,18 @@ public class Location
return datacenter + '/' + rack;
}
+ public static Location fromString(String value)
+ {
+ if (value == null || value.isEmpty())
+ return null;
+
+ String[] parts = value.split(":");
+ if (parts.length < 2)
+ throw new IllegalArgumentException("Invalid datacenter:rack - " +
value);
+ else
+ return new Location(parts[0].trim(), parts[1].trim());
+ }
+
public static class Serializer implements MetadataSerializer<Location>
{
public void serialize(Location t, DataOutputPlus out, Version version)
throws IOException
diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java
b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java
index f42d8b5479..12920d6862 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java
@@ -184,6 +184,11 @@ public class DataPlacement
return Objects.hash(reads, writes);
}
+ public boolean equivalentTo(DataPlacement other)
+ {
+ return reads.equivalentTo(other.reads) &&
writes.equivalentTo(other.writes);
+ }
+
public static class Serializer implements MetadataSerializer<DataPlacement>
{
private final IPartitioner partitioner;
diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
index 988d2b1bcb..b89ecde7d3 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java
@@ -141,6 +141,24 @@ public class DataPlacements extends
ReplicationMap<DataPlacement> implements Met
'}';
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (!(o instanceof DataPlacements)) return false;
+ DataPlacements that = (DataPlacements) o;
+ return this.map.equals(that.map);
+ }
+
+ public boolean equivalentTo(DataPlacements other)
+ {
+ if (!map.keySet().equals(other.map.keySet()))
+ return false;
+ return map.entrySet()
+ .stream()
+ .allMatch(e ->
e.getValue().equivalentTo(other.get(e.getKey())));
+ }
+
public static DataPlacements sortReplicaGroups(DataPlacements placements,
Comparator<Replica> comparator)
{
Builder builder = DataPlacements.builder(placements.size());
diff --git a/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java
b/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java
index adc26ff820..33d160fa10 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/ReplicaGroups.java
@@ -534,4 +534,19 @@ public class ReplicaGroups
{
return Objects.hash(ranges, endpoints);
}
+
+ public boolean equivalentTo(ReplicaGroups other)
+ {
+ if (!ranges.equals(other.ranges))
+ return false;
+
+ for (int i = 0; i < ranges.size(); i++)
+ {
+ EndpointsForRange e1 = endpoints.get(i).get();
+ EndpointsForRange e2 = other.forRange(ranges.get(i)).get();
+ if (e1.size() != e2.size() || !e1.stream().allMatch(e2::contains))
+ return false;
+ }
+ return true;
+ }
}
diff --git a/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java
b/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java
index c32f6c351c..c17d91ae48 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java
@@ -255,7 +255,7 @@ public class TokenMap implements MetadataValue<TokenMap>
if (!(o instanceof TokenMap)) return false;
TokenMap tokenMap = (TokenMap) o;
return Objects.equals(lastModified, tokenMap.lastModified) &&
- isEquivalent(tokenMap);
+ equivalentTo(tokenMap);
}
@Override
@@ -269,7 +269,7 @@ public class TokenMap implements MetadataValue<TokenMap>
*
* does not check equality of lastModified
*/
- public boolean isEquivalent(TokenMap tokenMap)
+ public boolean equivalentTo(TokenMap tokenMap)
{
return Objects.equals(map, tokenMap.map) &&
Objects.equals(partitioner, tokenMap.partitioner);
diff --git
a/src/java/org/apache/cassandra/tcm/ownership/VersionedEndpoints.java
b/src/java/org/apache/cassandra/tcm/ownership/VersionedEndpoints.java
index 90148f2c83..2f429138d2 100644
--- a/src/java/org/apache/cassandra/tcm/ownership/VersionedEndpoints.java
+++ b/src/java/org/apache/cassandra/tcm/ownership/VersionedEndpoints.java
@@ -116,7 +116,9 @@ public interface VersionedEndpoints<E extends Endpoints<E>>
extends MetadataValu
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ForRange forRange = (ForRange) o;
- return
Objects.equals(endpointsForRange.sorted(Replica::compareTo),
forRange.endpointsForRange.sorted(Replica::compareTo));
+ return lastModified.equals(forRange.lastModified) &&
+ Objects.equals(endpointsForRange.sorted(Replica::compareTo),
+
forRange.endpointsForRange.sorted(Replica::compareTo));
}
public boolean isEmpty()
@@ -184,7 +186,8 @@ public interface VersionedEndpoints<E extends Endpoints<E>>
extends MetadataValu
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ForToken forToken = (ForToken) o;
- return Objects.equals(endpointsForToken,
forToken.endpointsForToken);
+ return lastModified.equals(forToken.lastModified) &&
+ Objects.equals(endpointsForToken,
forToken.endpointsForToken);
}
public boolean isEmpty()
diff --git
a/src/java/org/apache/cassandra/tcm/sequences/CancelCMSReconfiguration.java
b/src/java/org/apache/cassandra/tcm/sequences/CancelCMSReconfiguration.java
index 3d6499f61b..665c6ec797 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/CancelCMSReconfiguration.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/CancelCMSReconfiguration.java
@@ -77,7 +77,7 @@ public class CancelCMSReconfiguration implements
Transformation
.withoutWriteReplica(prev.nextEpoch(),
pendingReplica)
.build();
}
- if (!placement.reads.equals(placement.writes))
+ if (!placement.reads.equivalentTo(placement.writes))
return new Rejected(ExceptionCode.INVALID,
String.format("Placements will be inconsistent if this transformation is
applied:\nReads %s\nWrites: %s",
placement.reads,
placement.writes));
diff --git a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
index cec1d42ca8..faa86e357b 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/AlterSchema.java
@@ -224,7 +224,7 @@ public class AlterSchema implements Transformation
calculatedPlacements.forEach((params, newPlacement) -> {
DataPlacement previousPlacement = prev.placements.get(params);
// Preserve placement versioning that has resulted from
natural application where possible
- if (previousPlacement.equals(newPlacement))
+ if (previousPlacement.equivalentTo(newPlacement))
newPlacementsBuilder.with(params, previousPlacement);
else
newPlacementsBuilder.with(params, newPlacement);
diff --git
a/src/java/org/apache/cassandra/tcm/transformations/AlterTopology.java
b/src/java/org/apache/cassandra/tcm/transformations/AlterTopology.java
new file mode 100644
index 0000000000..e247f66a88
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/transformations/AlterTopology.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.DataPlacements;
+import org.apache.cassandra.tcm.ownership.PlacementProvider;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
+
+public class AlterTopology implements Transformation
+{
+ private static final Logger logger =
LoggerFactory.getLogger(AlterTopology.class);
+ public static final Serializer serializer = new Serializer();
+
+ private final Map<NodeId, Location> updates;
+ private final PlacementProvider placementProvider;
+
+ public AlterTopology(Map<NodeId, Location> updates, PlacementProvider
placementProvider)
+ {
+ this.updates = updates;
+ this.placementProvider = placementProvider;
+ }
+
+ public static Map<NodeId, Location> parseArgs(String args, Directory
directory)
+ {
+ Map<NodeId, Location> asMap = new HashMap<>();
+ for (String change : args.split(","))
+ {
+ String[] parts = change.trim().split("=");
+ if (parts.length != 2)
+ throw new IllegalArgumentException("Invalid specification: " +
change);
+
+ if (parts[0].isEmpty() || parts[1].isEmpty())
+ throw new IllegalArgumentException("Invalid specification: " +
change);
+
+ NodeId id = getNodeIdFromString(parts[0].trim(), directory);
+ if (asMap.containsKey(id))
+ throw new IllegalArgumentException("Multiple updates for node
" + id + " (" + parts[0].trim() + " )");
+ asMap.put(getNodeIdFromString(parts[0].trim(), directory),
Location.fromString(parts[1].trim()));
+ }
+ return asMap;
+ }
+
+ private static NodeId getNodeIdFromString(String s, Directory directory)
+ {
+ // first try to parse the id as a node id, either in UUID or int form
+ try
+ {
+ return NodeId.fromString(s);
+ }
+ catch (Exception e)
+ {
+ // fall back to trying the supplied id as an endpoint
+ try
+ {
+ InetAddressAndPort endpoint = InetAddressAndPort.getByName(s);
+ return directory.peerId(endpoint);
+ }
+ catch (UnknownHostException u)
+ {
+ throw new IllegalArgumentException("Invalid node identifier
supplied: " + s);
+ }
+
+ }
+ }
+
+ @Override
+ public Kind kind()
+ {
+ return Kind.ALTER_TOPOLOGY;
+ }
+
+ @Override
+ public Result execute(ClusterMetadata prev)
+ {
+ // Check no inflight range movements
+ if (!prev.lockedRanges.locked.isEmpty())
+ return new Rejected(INVALID, "The requested topology changes
cannot be executed while there are ongoing range movements.");
+
+ Directory dir = prev.directory;
+ // Check all node ids are present
+ Set<NodeId> missing = updates.keySet()
+ .stream()
+ .filter(location -> (null ==
dir.location(location)))
+ .collect(Collectors.toSet());
+ if (!missing.isEmpty())
+ return new Rejected(INVALID, String.format("Some updates specify
an unregistered node: %s", missing));
+
+ // Validate there will be no change to placements
+ Directory updated = prev.directory;
+ for (Map.Entry<NodeId, Location> update : updates.entrySet())
+ updated = updated.withUpdatedRackAndDc(update.getKey(),
update.getValue());
+ ClusterMetadata proposed =
prev.transformer().with(updated).build().metadata;
+ DataPlacements proposedPlacements =
placementProvider.calculatePlacements(prev.placements.lastModified(),
+
proposed.tokenMap.toRanges(),
+
proposed,
+
proposed.schema.getKeyspaces());
+ if (!proposedPlacements.equivalentTo(prev.placements))
+ {
+ logger.info("Rejecting topology modifications which would
materially change data placements: {}", updates);
+ return new Rejected(INVALID, "Proposed updates modify data
placements, violating consistency guarantees");
+ }
+
+ ClusterMetadata.Transformer next = prev.transformer().with(updated);
+ return Transformation.success(next, LockedRanges.AffectedRanges.EMPTY);
+ }
+
+
+ @Override
+ public String toString()
+ {
+ return "AlterTopology{" +
+ "updates=" + updates +
+ '}';
+ }
+
+ static class Serializer implements
AsymmetricMetadataSerializer<Transformation, AlterTopology>
+ {
+ public void serialize(Transformation t, DataOutputPlus out, Version
version) throws IOException
+ {
+ assert t instanceof AlterTopology;
+ AlterTopology alterTopology = (AlterTopology)t;
+ int size = alterTopology.updates.size();
+ out.writeInt(size);
+ for (Map.Entry<NodeId, Location> entry :
alterTopology.updates.entrySet())
+ {
+ NodeId.serializer.serialize(entry.getKey(), out, version);
+ Location.serializer.serialize(entry.getValue(), out, version);
+ }
+ }
+
+ public AlterTopology deserialize(DataInputPlus in, Version version)
throws IOException
+ {
+ int size = in.readInt();
+ Map<NodeId, Location> updates = new HashMap<>(size);
+ for (int i = 0; i < size; i++)
+ updates.put(NodeId.serializer.deserialize(in, version),
Location.serializer.deserialize(in, version));
+ return new AlterTopology(updates,
ClusterMetadataService.instance().placementProvider());
+ }
+
+ public long serializedSize(Transformation t, Version version)
+ {
+ assert t instanceof AlterTopology;
+ AlterTopology alterTopology = (AlterTopology) t;
+ long size = TypeSizes.sizeof(alterTopology.updates.size());
+ for (Map.Entry<NodeId, Location> entry :
alterTopology.updates.entrySet())
+ {
+ size += NodeId.serializer.serializedSize(entry.getKey(),
version);
+ size += Location.serializer.serializedSize(entry.getValue(),
version);
+ }
+ return size;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java
b/src/java/org/apache/cassandra/tools/NodeTool.java
index d7bcc25b71..1cc12f3882 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -95,6 +95,7 @@ public class NodeTool
{
List<Class<? extends NodeToolCmdRunnable>> commands = newArrayList(
AbortBootstrap.class,
+ AlterTopology.class,
Assassinate.class,
CassHelp.class,
CIDRFilteringStats.class,
diff --git a/src/java/org/apache/cassandra/tools/nodetool/AlterTopology.java
b/src/java/org/apache/cassandra/tools/nodetool/AlterTopology.java
new file mode 100644
index 0000000000..e8078d3d57
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/AlterTopology.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Command(name = "altertopology", description = "Modify the datacenter and/or
rack of one or more nodes")
+public class AlterTopology extends NodeToolCmd
+{
+ @Arguments(usage = "<node=dc:rack> [<node=dc:rack>...]", description =
"One or more node identifiers, which may be either a node id, host id or
broadcast address, each with a target dc:rack")
+ private List<String> args = new ArrayList<>();
+
+ @Override
+ public void execute(NodeProbe probe)
+ {
+ checkArgument(!args.isEmpty(), "Invalid arguments; no changes
specified");
+ try
+ {
+ probe.getStorageService().alterTopology(String.join(",", args));
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/AlterTopologyTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/AlterTopologyTest.java
new file mode 100644
index 0000000000..85c7040a58
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/AlterTopologyTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.harry.SchemaSpec;
+import org.apache.cassandra.harry.dsl.HistoryBuilder;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor;
+import org.apache.cassandra.harry.gen.Generator;
+import org.apache.cassandra.harry.gen.Generators;
+import org.apache.cassandra.harry.gen.SchemaGenerators;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.ownership.PlacementProvider;
+import org.apache.cassandra.tcm.sequences.SequencesUtils.ClearLockedRanges;
+import org.apache.cassandra.tcm.sequences.SequencesUtils.LockRanges;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
+import org.apache.cassandra.tcm.transformations.CustomTransformation;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+
+import static java.time.Duration.ofSeconds;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static
org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce;
+import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
+import static org.junit.Assert.assertEquals;
+
+public class AlterTopologyTest extends FuzzTestBase
+{
+ @Test
+ public void testTopologyChanges() throws Exception
+ {
+ Generator<SchemaSpec> schemaGen =
SchemaGenerators.schemaSpecGen(KEYSPACE, "change_topology_test", 1000);
+ try (Cluster cluster =
builder().withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
+ .withRack("dc1", "rack1", 1)
+ .withRack("dc1", "rack2", 1)
+ .withRack("dc1", "rack3", 1)
+ .withRack("dc1", "rack4", 1)
+ .withConfig(config ->
config.with(GOSSIP))
+ .withNodes(4)
+ .start())
+ {
+ IInvokableInstance cmsInstance = cluster.get(1);
+
+ withRandom(rng -> {
+ SchemaSpec schema = schemaGen.generate(rng);
+ Generators.TrackingGenerator<Integer> pkGen =
Generators.tracking(Generators.int32(0,
Math.min(schema.valueGenerators.pkPopulation(), 1000)));
+ Generator<Integer> ckGen = Generators.int32(0,
Math.min(schema.valueGenerators.ckPopulation(), 1000));
+
+ HistoryBuilder history = new
ReplayingHistoryBuilder(schema.valueGenerators,
+ (hb) ->
InJvmDTestVisitExecutor.builder()
+
.nodeSelector(i -> 1)
+
.build(schema, hb, cluster));
+ history.custom(() -> {
+ cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE +
+ " WITH replication = {'class':
'NetworkTopologyStrategy', 'dc1' : 3 };");
+ cluster.schemaChange(schema.compile());
+ waitForCMSToQuiesce(cluster, cmsInstance);
+ }, "Setup");
+
+
+ Runnable writeAndValidate = () -> {
+ for (int i = 0; i < 2000; i++)
+ history.insert(pkGen.generate(rng),
ckGen.generate(rng));
+
+ for (int pk : pkGen.generated())
+ history.selectPartition(pk);
+ };
+ writeAndValidate.run();
+
+ cluster.forEach(i -> i.runOnInstance(() -> {
+ CustomTransformation.registerExtension(LockRanges.NAME,
LockRanges.serializer);
+
CustomTransformation.registerExtension(ClearLockedRanges.NAME,
ClearLockedRanges.serializer);
+ }));
+
+ // a dc change which affects placements is not allowed, so
expect a rejection
+ history.custom(() -> {
+ cmsInstance.runOnInstance(() -> {
+ PlacementProvider pp =
ClusterMetadataService.instance().placementProvider();
+ NodeId id = ClusterMetadata.current().myNodeId();
+ Map<NodeId, Location> updates = new HashMap<>();
+ updates.put(id, new Location("dcX", "rack1"));
+ assertAlterTopologyRejection(pp, updates, "Proposed
updates modify data placements");
+ });
+ }, "DC change affecting placements");
+
+ // a rack change which affects placements is also not allowed
+ history.custom(() -> {
+ cmsInstance.runOnInstance(() -> {
+ PlacementProvider pp =
ClusterMetadataService.instance().placementProvider();
+ NodeId id = ClusterMetadata.current().myNodeId();
+ Map<NodeId, Location> updates = new HashMap<>();
+ updates.put(id, new Location("dc1", "rack2"));
+ assertAlterTopologyRejection(pp, updates, "Proposed
updates modify data placements");
+ });
+ },"Rack change affecting placements ");
+
+ // submit an update which would not modify placements so would
normally be accepted
+ history.custom(() -> {
+ cmsInstance.runOnInstance(() -> {
+ PlacementProvider pp =
ClusterMetadataService.instance().placementProvider();
+ NodeId id = ClusterMetadata.current().myNodeId();
+ Map<NodeId, Location> updates = new HashMap<>();
+ updates.put(id, new Location("dc1", "rack99"));
+ // if there are locked ranges, implying in-progress
range movements, any update is rejected
+ ClusterMetadataService.instance().commit(new
CustomTransformation(LockRanges.NAME, new LockRanges()));
+ assertAlterTopologyRejection(pp, updates, "The
requested topology changes cannot be executed while there are ongoing range
movements");
+
+ // but if no movements are in flight, the update is
allowed
+ ClusterMetadataService.instance().commit(new
CustomTransformation(ClearLockedRanges.NAME, new ClearLockedRanges()));
+ ClusterMetadataService.instance().commit(new
AlterTopology(updates, pp));
+ if
(!ClusterMetadata.current().directory.location(id).rack.equals("rack99"))
+ throw new AssertionError("Expected rack to have
changed");
+ });
+ }, "Rack change not affecting placements");
+
+ // changing multiple/all racks atomically
+ history.custom(() -> {
+ cmsInstance.runOnInstance(() -> {
+ PlacementProvider pp =
ClusterMetadataService.instance().placementProvider();
+ Map<NodeId, Location> updates = new HashMap<>();
+ Directory dir = ClusterMetadata.current().directory;
+ for (NodeId nodeId : dir.peerIds())
+ updates.put(nodeId, new Location("dc1", "rack" +
(nodeId.id() + 100)));
+
+ ClusterMetadataService.instance().commit(new
AlterTopology(updates, pp));
+ dir = ClusterMetadata.current().directory;
+ for (NodeId nodeId : dir.peerIds())
+ if
(!ClusterMetadata.current().directory.location(nodeId).rack.equals("rack" +
(nodeId.id() + 100)))
+ throw new AssertionError("Expected rack to have
changed");
+ });
+ }, "Modify all racks not affecting placements");
+
+ // renaming a datacenter is supported, as long as it is not
referenced in any replication params as that
+ // would impact placements
+ history.custom(() -> {
+ cmsInstance.runOnInstance(() -> {
+ PlacementProvider pp =
ClusterMetadataService.instance().placementProvider();
+ Map<NodeId, Location> updates = new HashMap<>();
+ Directory dir = ClusterMetadata.current().directory;
+ for (NodeId nodeId : dir.peerIds())
+ updates.put(nodeId, new Location("renamed_dc",
dir.location(nodeId).rack));
+ assertAlterTopologyRejection(pp, updates, "Proposed
updates modify data placements");
+ });
+ }, "Renaming DC referenced in replication params");
+
+ // after modifying replication for the test keyspace, this
should be allowed
+ history.custom(() -> {
+ cmsInstance.runOnInstance(() -> {
+ PlacementProvider pp =
ClusterMetadataService.instance().placementProvider();
+ Map<NodeId, Location> updates = new HashMap<>();
+ QueryProcessor.executeInternal("ALTER KEYSPACE " +
KEYSPACE +
+ " WITH replication =
{'class': 'SimpleStrategy', 'replication_factor' : 3 };");
+ Directory dir = ClusterMetadata.current().directory;
+ for (NodeId nodeId : dir.peerIds())
+ updates.put(nodeId, new Location("renamed_dc",
dir.location(nodeId).rack));
+
+ ClusterMetadataService.instance().commit(new
AlterTopology(updates, pp));
+
+ for (NodeId nodeId : dir.peerIds())
+ if
(!ClusterMetadata.current().directory.location(nodeId).datacenter.equals("renamed_dc"))
+ throw new AssertionError("Expected dc to have
changed");
+
+ // modify both datacenter and racks
+ dir = ClusterMetadata.current().directory;
+ for (NodeId nodeId : dir.peerIds())
+ updates.put(nodeId, new
Location("renamed_dc_again", "rack" + (nodeId.id() + 200)));
+
+ ClusterMetadataService.instance().commit(new
AlterTopology(updates, pp));
+ dir = ClusterMetadata.current().directory;
+ for (NodeId nodeId : dir.peerIds())
+ if
(!ClusterMetadata.current().directory.location(nodeId).equals(new
Location("renamed_dc_again", "rack" + (nodeId.id() + 200))))
+ throw new AssertionError("Expected dc to have
changed");
+ });
+ waitForCMSToQuiesce(cluster, cmsInstance);
+ },"Renaming DC not referenced in replication params");
+
+ // updates to system tables run asynchronously so spin until
they're done
+ history.custom(() -> {
+ cluster.forEach(i -> await(60).until(() ->
i.callOnInstance(() -> {
+ ClusterMetadata metadata = ClusterMetadata.current();
+ NodeId myId = metadata.myNodeId();
+ Directory dir = metadata.directory;
+ for (NodeId nodeId : dir.peerIds())
+ {
+ String query = nodeId.equals(myId)
+ ? "select data_center, rack from
system.local"
+ : String.format("select data_center,
rack from system.peers_v2 where peer = '%s'",
+
dir.endpoint(nodeId).getHostAddress(false));
+ UntypedResultSet res =
QueryProcessor.executeInternal(query);
+ if
(!res.one().getString("data_center").equals("renamed_dc_again"))
+ return false;
+ if (!res.one().getString("rack").equals("rack" +
(nodeId.id() + 200)))
+ return false;
+ }
+ return true;
+ })));
+ }, "Verify local system table updates");
+
+ // check gossip is also updated
+ history.custom(() -> {
+ Map<String, Map<String, String>> gossipInfo =
ClusterUtils.gossipInfo(cmsInstance);
+ gossipInfo.forEach((ep, states) -> {
+ String nodeId = states.get("HOST_ID").split(":")[1];
+ String dc = states.get("DC").split(":")[1];
+ assertEquals("renamed_dc_again", dc);
+ String rack = states.get("RACK").split(":")[1];
+ String expected = "rack" +
(NodeId.fromString(nodeId).id() + 200);
+ assertEquals(expected, rack);
+ });
+ }, "Verify gossip state");
+
+ writeAndValidate.run();
+ });
+ }
+ }
+
+ private static void assertAlterTopologyRejection(PlacementProvider pp,
Map<NodeId, Location> updates, String error)
+ {
+ ClusterMetadataService.instance()
+ .commit(new AlterTopology(updates, pp),
+ m -> { throw new
AssertionError("Expected rejection");},
+ (c, r) -> {
+ if (!(c == ExceptionCode.INVALID &&
r.startsWith(error)))
+ throw new
AssertionError("Unexpected failure response: " + r);
+ return ClusterMetadata.current();
+ });
+
+ }
+
+ private static ConditionFactory await(int seconds)
+ {
+ return
Awaitility.await().atMost(ofSeconds(seconds)).pollDelay(ofSeconds(1));
+ }
+
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
index 414f74bbef..44fdc7d701 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/MetadataChangeSimulationTest.java
@@ -935,10 +935,10 @@ public class MetadataChangeSimulationTest extends
CMSTestBase
while (!state.inFlightOperations.isEmpty())
{
state =
state.inFlightOperations.get(random.nextInt(state.inFlightOperations.size())).advance(state);
- Assert.assertEquals(allSettled,
sut.service.metadata().writePlacementAllSettled(ksm));
+
Assert.assertTrue(allSettled.equivalentTo(sut.service.metadata().writePlacementAllSettled(ksm)));
validatePlacements(sut, state);
}
- Assert.assertEquals(allSettled,
sut.service.metadata().placements.get(ksm.params.replication));
+
Assert.assertTrue(allSettled.equivalentTo(sut.service.metadata().placements.get(ksm.params.replication)));
}
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ModelState.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ModelState.java
index 0eb73b8694..9dea76040d 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/log/ModelState.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/log/ModelState.java
@@ -343,6 +343,20 @@ public class ModelState
return this;
}
+ public Transformer withUpdatedRacks(Map<Node, String> updates)
+ {
+ assert currentNodes.containsAll(updates.keySet());
+ List<Node> newNodes = new ArrayList<>();
+ currentNodes.forEach(node -> {
+ if (updates.containsKey(node))
+ newNodes.add(node.withNewRack(updates.get(node)));
+ else
+ newNodes.add(node);
+ });
+ currentNodes = newNodes;
+ return this;
+ }
+
public Transformer
updateSimulation(PlacementSimulator.SimulatedPlacements simulatedPlacements)
{
this.simulatedPlacements = simulatedPlacements;
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
index 2869fe913a..0f95735e5a 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
@@ -134,7 +134,7 @@ public class ReconfigureCMSTest extends FuzzTestBase
assertEquals(2, metadata.fullCMSMembers().size());
ReplicationParams params = ReplicationParams.meta(metadata);
DataPlacement placements = metadata.placements.get(params);
- assertEquals(placements.reads, placements.writes);
+ assertTrue(placements.reads.equivalentTo(placements.writes));
assertEquals(metadata.fullCMSMembers().size(),
Integer.parseInt(params.asMap().get("dc0")));
});
@@ -159,7 +159,7 @@ public class ReconfigureCMSTest extends FuzzTestBase
Assert.assertTrue(metadata.fullCMSMembers().contains(FBUtilities.getBroadcastAddressAndPort()));
assertEquals(3, metadata.fullCMSMembers().size());
DataPlacement placements =
metadata.placements.get(ReplicationParams.meta(metadata));
- assertEquals(placements.reads, placements.writes);
+
Assert.assertTrue(placements.reads.equivalentTo(placements.writes));
});
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java
b/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java
index 7763831933..c6fa2da54b 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/SimulatedOperation.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed.test.log;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -40,10 +41,13 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.ownership.VersionedEndpoints;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
import org.apache.cassandra.tcm.sequences.BootstrapAndReplace;
import org.apache.cassandra.tcm.sequences.LeaveStreams;
import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
import org.apache.cassandra.tcm.transformations.PrepareJoin;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
@@ -101,6 +105,23 @@ public abstract class SimulatedOperation
}
+ public static ModelState changeRacks(CMSSut sut, ModelState state,
Map<Node, String> updates)
+ {
+ ModelState.Transformer transformer = state.transformer()
+ .withUpdatedRacks(updates)
+
.updateSimulation(state.simulatedPlacements);
+
+ Map<NodeId, Location> serviceUpdates = new HashMap<>();
+ for (Map.Entry<Node, String> entry : updates.entrySet())
+ {
+ Node n = entry.getKey();
+ String rack = entry.getValue();
+ serviceUpdates.put(n.nodeId(), new Location(n.dc(), rack));
+ }
+ sut.service.commit(new AlterTopology(serviceUpdates,
sut.service.placementProvider()));
+ return transformer.transform();
+ }
+
public static ModelState leave(CMSSut sut, ModelState state, Node node)
{
ModelState.Transformer transformer = state.transformer();
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/TokenPlacementModel.java
b/test/harry/main/org/apache/cassandra/harry/model/TokenPlacementModel.java
index 2c24ec257c..c9dcbcb137 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/TokenPlacementModel.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/TokenPlacementModel.java
@@ -725,6 +725,7 @@ public class TokenPlacementModel
long token(int tokenIdx);
Lookup forceToken(int tokenIdx, long token);
void reset();
+ int rackIdx(String rack);
default NodeId nodeId(int nodeIdx)
{
@@ -785,6 +786,11 @@ public class TokenPlacementModel
return null;
}
+ public int rackIdx(String rack)
+ {
+ throw new UnsupportedOperationException();
+ }
+
public void reset()
{
throw new UnsupportedOperationException();
@@ -843,6 +849,11 @@ public class TokenPlacementModel
{
return String.format("rack%d", rackIdx);
}
+
+ public int rackIdx(String rack)
+ {
+ throw new UnsupportedOperationException();
+ }
}
public static class HumanReadableTokensLookup extends DefaultLookup {
@@ -1022,6 +1033,12 @@ public class TokenPlacementModel
{
return new Node(tokenIdx, nodeIdx, dcIdx, rackIdx,
lookup.forceToken(tokenIdx, override));
}
+
+ public Node withNewRack(String newRack)
+ {
+ return new Node(tokenIdx, nodeIdx, dcIdx, lookup.rackIdx(newRack),
lookup);
+ }
+
public Murmur3Partitioner.LongToken longToken()
{
return new Murmur3Partitioner.LongToken(token());
diff --git
a/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
b/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
index e6cdb1be82..d54b2e5e31 100644
---
a/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
+++
b/test/unit/org/apache/cassandra/schema/SchemaChangeDuringRangeMovementTest.java
@@ -20,30 +20,21 @@ package org.apache.cassandra.schema;
import org.junit.Test;
-import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
-import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ClusterMetadataService;
-import org.apache.cassandra.tcm.Transformation;
-import org.apache.cassandra.tcm.sequences.LockedRanges;
import org.apache.cassandra.tcm.transformations.AlterSchema;
import org.apache.cassandra.triggers.TriggersTest;
+import static
org.apache.cassandra.tcm.sequences.SequencesUtils.ClearLockedRanges;
+import static org.apache.cassandra.tcm.sequences.SequencesUtils.LockRanges;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class SchemaChangeDuringRangeMovementTest extends CQLTester
{
- // at the moment, the detail of the specific LockedRanges doesn't matter,
transformations
- // which are rejected in the presence of locking are rejected whatever is
actually locked
- private static final LockedRanges.AffectedRanges toLock =
- LockedRanges.AffectedRanges.singleton(ReplicationParams.simple(3),
- new
Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
-
DatabaseDescriptor.getPartitioner().getRandomToken()));
-
@Test
public void testAlwaysPermittedChanges() throws Throwable
{
@@ -216,39 +207,4 @@ public class SchemaChangeDuringRangeMovementTest extends
CQLTester
metadata = ClusterMetadataService.instance().commit(new
ClearLockedRanges());
assertTrue(metadata.lockedRanges.locked.isEmpty());
}
-
-
- // Custom transforms to lock/unlock an arbitrary set of ranges to
- // avoid having to actually initiate some range movement
- private static class LockRanges implements Transformation
- {
- @Override
- public Kind kind()
- {
- return Kind.CUSTOM;
- }
-
- @Override
- public Result execute(ClusterMetadata metadata)
- {
- LockedRanges newLocked =
metadata.lockedRanges.lock(LockedRanges.keyFor(metadata.epoch), toLock);
- return
Transformation.success(metadata.transformer().with(newLocked), toLock);
- }
- }
-
- private static class ClearLockedRanges implements Transformation
- {
- @Override
- public Kind kind()
- {
- return Kind.CUSTOM;
- }
-
- @Override
- public Result execute(ClusterMetadata metadata)
- {
- LockedRanges newLocked = LockedRanges.EMPTY;
- return
Transformation.success(metadata.transformer().with(newLocked),
LockedRanges.AffectedRanges.EMPTY);
- }
- }
}
diff --git
a/test/unit/org/apache/cassandra/service/AlterTopologyArgParsingTest.java
b/test/unit/org/apache/cassandra/service/AlterTopologyArgParsingTest.java
new file mode 100644
index 0000000000..59a7bb58d9
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/AlterTopologyArgParsingTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.util.Map;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.MembershipUtils;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.transformations.AlterTopology;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class AlterTopologyArgParsingTest
+{
+ Location loc = new Location("test_dc", "test_rack");
+ NodeId id = new NodeId(1);
+ Directory dir;
+
+ @Before
+ public void setup()
+ {
+ dir = new Directory();
+ }
+
+ @Test
+ public void testSingleChangeByInt()
+ {
+ String arg = "1=test_dc:test_rack";
+ Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+ assertEquals(1, parsed.size());
+ assertEquals(parsed.get(id), loc);
+ }
+
+ @Test
+ public void testSingleChangeByUUID()
+ {
+ String arg = String.format("%s=test_dc:test_rack",
id.toUUID().toString());
+ Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+ assertEquals(1, parsed.size());
+ assertEquals(parsed.get(id), loc);
+ }
+
+ @Test
+ public void testSingleChangeByEndpoint()
+ {
+ InetAddressAndPort ep = MembershipUtils.endpoint(1);
+ dir = dir.with(new NodeAddresses(ep), loc); // this will associate
NodeId(1) with ep
+ String arg = String.format("%s=test_dc:test_rack",
ep.getHostAddressAndPort());
+ Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+ assertEquals(1, parsed.size());
+ assertEquals(parsed.get(id), loc);
+ }
+
+ @Test
+ public void testSingleChangeByEndpointAddress()
+ {
+ InetAddressAndPort ep = MembershipUtils.endpoint(1);
+ dir = dir.with(new NodeAddresses(ep), loc); // this will associate
NodeId(1) with ep
+ String arg = String.format("%s=test_dc:test_rack",
ep.getHostAddress(false));
+ Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+ assertEquals(1, parsed.size());
+ assertEquals(parsed.get(id), loc);
+ }
+
+ @Test
+ public void testInvalidArg()
+ {
+ String[] args = new String[]{ "invalid", "1=", "=dc:rack", "1=dc",
"1=dc:" };
+ for (String invalid : args)
+ {
+ try
+ {
+ AlterTopology.parseArgs(invalid, dir);
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ }
+ }
+ }
+
+ @Test
+ public void testMultipleChanges()
+ {
+ NodeId otherId = new NodeId(2);
+ InetAddressAndPort ep = MembershipUtils.endpoint(1);
+ dir = dir.with(new NodeAddresses(ep), loc); // this will associate
NodeId(1) with ep
+ String arg = String.format("%s=dc1:rack1,%s=dc2:rack2,3=dc3:rack3,",
+ ep.getHostAddress(true),
+ otherId.toUUID().toString());
+ Map<NodeId, Location> parsed = AlterTopology.parseArgs(arg, dir);
+ assertEquals(3, parsed.size());
+ assertEquals(parsed.get(id).datacenter, "dc1");
+ assertEquals(parsed.get(id).rack, "rack1");
+ assertEquals(parsed.get(otherId).datacenter, "dc2");
+ assertEquals(parsed.get(otherId).rack, "rack2");
+ assertEquals(parsed.get(new NodeId(3)).datacenter, "dc3");
+ assertEquals(parsed.get(new NodeId(3)).rack, "rack3");
+ }
+
+ @Test
+ public void testMultipleChangesForSameNode()
+ {
+ InetAddressAndPort ep = MembershipUtils.endpoint(1);
+ dir = dir.with(new NodeAddresses(ep), loc); // this will associate
NodeId(1) with ep
+ String epString = ep.getHostAddress(true);
+ String idString = id.toUUID().toString();
+ assertIllegalArgument(String.format("%1$s=dc1:rack1,%1$s=dc2:rack2",
id.id()));
+ assertIllegalArgument(String.format("%s=dc1:rack1,%s=dc2:rack2",
id.id(), idString));
+ assertIllegalArgument(String.format("%s=dc1:rack1,%s=dc2:rack2",
id.id(), epString));
+ assertIllegalArgument(String.format("%1$s=dc1:rack1,%1$s=dc2:rack2",
epString));
+ assertIllegalArgument(String.format("%1$s=dc1:rack1,%1$s=dc2:rack2",
idString));
+
assertIllegalArgument(String.format("%s=dc1:rack1,%s=dc2:rack2,%s=dc3:rack3",
id.id(), idString, epString));
+ }
+
+ private void assertIllegalArgument(String arg)
+ {
+ try
+ {
+ AlterTopology.parseArgs(arg, dir);
+ fail("Expected exception");
+ }
+ catch (IllegalArgumentException e) {}
+ }
+}
diff --git a/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
b/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
new file mode 100644
index 0000000000..ea66961ee5
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/membership/DirectoryTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.membership;
+
+import org.junit.Test;
+
+import static org.apache.cassandra.tcm.membership.MembershipUtils.endpoint;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class DirectoryTest
+{
+
+ @Test
+ public void updateLocationTest()
+ {
+ Location DC1_R1 = new Location("datacenter1", "rack1");
+ Directory dir = new Directory();
+ assertTrue(dir.isEmpty());
+ assertTrue(dir.knownDatacenters().isEmpty());
+
+ NodeId missing = new NodeId(1000);
+ assertInvalidLocationUpdate(dir, missing, DC1_R1, "Node " + missing +
" has no registered location to update");
+
+ // add a new node and retrieve its Location
+ NodeAddresses addresses = new NodeAddresses(endpoint(1));
+ dir = dir.with(addresses, DC1_R1);
+ NodeId node = dir.peerId(addresses.broadcastAddress);
+ assertEquals(DC1_R1, dir.location(node));
+ assertTrue(dir.knownDatacenters().contains("datacenter1"));
+
+ // endpoints by DC & rack are not updated immediately, this is an
explicit step when a node joins
+ assertTrue(dir.allDatacenterEndpoints().isEmpty());
+ assertTrue(dir.allDatacenterRacks().isEmpty());
+
+ // when a node joins, its DC and rack become active
+ dir = dir.withRackAndDC(node);
+
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter1").contains(addresses.broadcastAddress));
+
assertTrue(dir.allDatacenterRacks().get("datacenter1").get("rack1").contains(addresses.broadcastAddress));
+
+ // update rack
+ Location DC1_R2 = new Location("datacenter1", "rack2");
+ dir = dir.withUpdatedRackAndDc(node, DC1_R2);
+ assertEquals(DC1_R2, dir.location(node));
+
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter1").contains(addresses.broadcastAddress));
+
assertTrue(dir.allDatacenterRacks().get("datacenter1").get("rack2").contains(addresses.broadcastAddress));
+ // previous rack is no longer present as it was made empty
+
assertFalse(dir.allDatacenterRacks().get("datacenter1").containsKey("rack1"));
+
+ // update DC
+ Location DC2_R2 = new Location("datacenter2", "rack2");
+ dir = dir.withUpdatedRackAndDc(node, DC2_R2);
+ assertEquals(DC2_R2, dir.location(node));
+
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(addresses.broadcastAddress));
+
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(addresses.broadcastAddress));
+ // datacenter1 is no longer present as it was made empty
+ assertFalse(dir.allDatacenterRacks().containsKey("datacenter1"));
+ assertFalse(dir.knownDatacenters().contains("datacenter1"));
+ assertTrue(dir.knownDatacenters().contains("datacenter2"));
+
+ // Add a second node in the same dc & rack
+ NodeAddresses otherAddresses = new NodeAddresses(endpoint(2));
+ dir = dir.with(otherAddresses, DC2_R2);
+ NodeId otherNode = dir.peerId(otherAddresses.broadcastAddress);
+ dir = dir.withRackAndDC(otherNode);
+
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(addresses.broadcastAddress));
+
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(otherAddresses.broadcastAddress));
+
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(addresses.broadcastAddress));
+
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(otherAddresses.broadcastAddress));
+
+ // now updating the rack of the first node should not remove rack2
altogether as it not empty
+ Location DC2_R3 = new Location("datacenter2", "rack3");
+ dir = dir.withUpdatedRackAndDc(node, DC2_R3);
+ assertEquals(DC2_R3, dir.location(node));
+ // updated node is removed from rack2 and added to rack3
+
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(addresses.broadcastAddress));
+
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack3").contains(addresses.broadcastAddress));
+
assertFalse(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(addresses.broadcastAddress));
+ // other node is still present in rack2
+
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(otherAddresses.broadcastAddress));
+
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack2").contains(otherAddresses.broadcastAddress));
+
assertFalse(dir.allDatacenterRacks().get("datacenter2").get("rack3").contains(otherAddresses.broadcastAddress));
+
+ // simulate what happens when the nodes leave the cluster
+ dir = dir.withoutRackAndDC(otherNode);
+
assertFalse(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(otherAddresses.broadcastAddress));
+
assertFalse(dir.allDatacenterRacks().get("datacenter2").containsKey("rack2"));
+
assertTrue(dir.allDatacenterEndpoints().asMap().get("datacenter2").contains(addresses.broadcastAddress));
+
assertTrue(dir.allDatacenterRacks().get("datacenter2").get("rack3").contains(addresses.broadcastAddress));
+
+ dir = dir.withoutRackAndDC(node);
+ assertTrue(dir.allDatacenterEndpoints().isEmpty());
+ assertTrue(dir.allDatacenterRacks().isEmpty());
+ }
+
+ private void assertInvalidLocationUpdate(Directory dir, NodeId nodeId,
Location loc, String message)
+ {
+ try
+ {
+ dir.withUpdatedRackAndDc(nodeId, loc);
+ fail("Expected an exception");
+ }
+ catch (IllegalArgumentException e)
+ {
+ assertTrue(e.getMessage().equals(message));
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
index 1c1a44296b..100979104f 100644
---
a/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
+++
b/test/unit/org/apache/cassandra/tcm/sequences/InProgressSequenceCancellationTest.java
@@ -34,7 +34,6 @@ import
org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.EndpointsForRange;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.ClusterMetadata;
@@ -44,10 +43,8 @@ import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.ownership.DataPlacement;
import org.apache.cassandra.tcm.ownership.DataPlacements;
import org.apache.cassandra.tcm.ownership.PlacementDeltas;
-import org.apache.cassandra.tcm.ownership.ReplicaGroups;
import org.apache.cassandra.tcm.transformations.PrepareJoin;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.PrepareReplace;
@@ -304,9 +301,9 @@ public class InProgressSequenceCancellationTest
private void assertRelevantMetadata(ClusterMetadata first, ClusterMetadata
second)
{
- assertPlacementsEquivalent(first.placements, second.placements);
- assertTrue(first.directory.isEquivalent(second.directory));
- assertTrue(first.tokenMap.isEquivalent(second.tokenMap));
+ assertTrue(first.placements.equivalentTo(second.placements));
+ assertTrue(first.directory.equivalentTo(second.directory));
+ assertTrue(first.tokenMap.equivalentTo(second.tokenMap));
assertEquals(first.lockedRanges.locked.keySet(),
second.lockedRanges.locked.keySet());
}
@@ -314,31 +311,4 @@ public class InProgressSequenceCancellationTest
{
return new ClusterMetadata(Murmur3Partitioner.instance, directory);
}
-
- private void assertPlacementsEquivalent(DataPlacements first,
DataPlacements second)
- {
- assertEquals(first.keys(), second.keys());
-
- first.asMap().forEach((params, placement) -> {
- DataPlacement otherPlacement = second.get(params);
- ReplicaGroups r1 = placement.reads;
- ReplicaGroups r2 = otherPlacement.reads;
- assertEquals(r1.ranges, r2.ranges);
- r1.forEach((range, e1) -> {
- EndpointsForRange e2 = r2.forRange(range).get();
- assertEquals(e1.size(),e2.size());
- assertTrue(e1.get().stream().allMatch(e2::contains));
- });
-
- ReplicaGroups w1 = placement.reads;
- ReplicaGroups w2 = otherPlacement.reads;
- assertEquals(w1.ranges, w2.ranges);
- w1.forEach((range, e1) -> {
- EndpointsForRange e2 = w2.forRange(range).get();
- assertEquals(e1.size(),e2.size());
- assertTrue(e1.get().stream().allMatch(e2::contains));
- });
-
- });
- }
}
diff --git a/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java
b/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java
index eb310cc78b..44b3b59ce3 100644
--- a/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java
+++ b/test/unit/org/apache/cassandra/tcm/sequences/SequencesUtils.java
@@ -18,19 +18,27 @@
package org.apache.cassandra.tcm.sequences;
+import java.io.Serializable;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.function.Predicate;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.ReplicationParams;
+import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.Epoch;
import org.apache.cassandra.tcm.Transformation;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.ownership.DataPlacements;
import org.apache.cassandra.tcm.ownership.PlacementDeltas;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.tcm.transformations.PrepareJoin;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.PrepareMove;
@@ -174,4 +182,68 @@ public class SequencesUtils
{
return Epoch.create(epoch);
}
+
+ // Custom transforms to lock/unlock an arbitrary set of ranges to
+ // avoid having to actually initiate some range movement
+ public static class LockRanges implements Transformation, Serializable
+ {
+ public static final AsymmetricMetadataSerializer<Transformation,
LockRanges> serializer = new AsymmetricMetadataSerializer<Transformation,
LockRanges>()
+ {
+ @Override
+ public void serialize(Transformation t, DataOutputPlus out,
Version version){}
+ @Override
+ public LockRanges deserialize(DataInputPlus in, Version version)
{return new LockRanges();}
+ @Override
+ public long serializedSize(Transformation t, Version version)
{return 0;}
+ };
+
+ public static final String NAME = "TestLockRanges";
+
+ // at the moment, the detail of the specific LockedRanges doesn't
matter, transformations
+ // which are rejected in the presence of locking are rejected whatever
is actually locked
+ private static final LockedRanges.AffectedRanges toLock =
+ LockedRanges.AffectedRanges.singleton(ReplicationParams.simple(3),
+ new
Range<>(Murmur3Partitioner.instance.getMinimumToken(),
+
Murmur3Partitioner.instance.getRandomToken()));
+
+ @Override
+ public Kind kind()
+ {
+ return Kind.CUSTOM;
+ }
+
+ @Override
+ public Result execute(ClusterMetadata metadata)
+ {
+ LockedRanges newLocked =
metadata.lockedRanges.lock(LockedRanges.keyFor(metadata.epoch), toLock);
+ return
Transformation.success(metadata.transformer().with(newLocked), toLock);
+ }
+ }
+
+ public static class ClearLockedRanges implements Transformation,
Serializable
+ {
+ public static final AsymmetricMetadataSerializer<Transformation,
ClearLockedRanges> serializer = new
AsymmetricMetadataSerializer<Transformation, ClearLockedRanges>()
+ {
+ @Override
+ public void serialize(Transformation t, DataOutputPlus out,
Version version) {}
+ @Override
+ public ClearLockedRanges deserialize(DataInputPlus in, Version
version) {return new ClearLockedRanges();}
+ @Override
+ public long serializedSize(Transformation t, Version version)
{return 0;}
+ };
+ public static final String NAME = "TestClearLockedRanges";
+
+ @Override
+ public Kind kind()
+ {
+ return Kind.CUSTOM;
+ }
+
+ @Override
+ public Result execute(ClusterMetadata metadata)
+ {
+ LockedRanges newLocked = LockedRanges.EMPTY;
+ return
Transformation.success(metadata.transformer().with(newLocked),
LockedRanges.AffectedRanges.EMPTY);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]