This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 80b97e27e715c1c7255f7a0e5c262d24f47a27f6 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Wed Mar 1 12:53:19 2023 +0000 [CEP-21] Produce placements equivalent to current replication strategies Minimal modifications to AbstractReplicationStrategy implementations to support the production of DataPlacements using ClusterMetadata while retaining calculateNaturalReplicas. Also adds tests to compare the output of both methods and assert their equivalence. Eventually, the original implementations based on TokenMetadata will be retired and will be retained in the test source to guard against regressions. Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../locator/AbstractReplicaCollection.java | 40 ++- .../locator/AbstractReplicationStrategy.java | 10 +- .../apache/cassandra/locator/LocalStrategy.java | 50 +-- .../org/apache/cassandra/locator/MetaStrategy.java | 23 +- .../cassandra/locator/NetworkTopologyStrategy.java | 70 ++-- .../apache/cassandra/locator/SimpleStrategy.java | 41 ++- .../apache/cassandra/locator/TokenMetadata.java | 159 +++++---- .../apache/cassandra/service/StorageService.java | 11 +- .../service/reads/range/ReplicaPlanIterator.java | 3 +- .../org/apache/cassandra/tcm/ClusterMetadata.java | 54 ++- .../org/apache/cassandra/tcm/MetadataKeys.java | 5 +- .../AsEndpoints.java} | 21 +- .../AsLocations.java} | 27 +- .../AsTokenMap.java} | 24 +- .../cassandra/tcm/compatibility/GossipHelper.java | 2 + .../tcm/compatibility/TokenRingUtils.java | 150 ++++++++ .../apache/cassandra/tcm/membership/Directory.java | 4 +- .../tcm/ownership/PrimaryRangeComparator.java | 51 +++ .../apache/cassandra/tcm/ownership/TokenMap.java | 302 ++++++++++++++++ .../tcm/ownership/UniformRangePlacement.java | 324 +++++++++++++++++ .../tcm/transformations/cms/EntireRange.java | 8 +- .../org/apache/cassandra/utils/BiMultiValMap.java | 16 + .../locator/NetworkTopologyStrategyTest.java | 3 +- .../cassandra/locator/TokenMetadataTest.java | 3 +- .../cassandra/service/LeaveAndBootstrapTest.java | 3 +- .../org/apache/cassandra/service/MoveTest.java | 3 +- .../cassandra/tcm/membership/MembershipUtils.java | 10 + .../cassandra/tcm/ownership/OwnershipUtils.java | 139 ++++++++ .../tcm/ownership/PlacementDeltasTest.java | 201 +++++++++++ .../UniformRangePlacementEquivalenceTest.java | 388 +++++++++++++++++++++ .../tcm/ownership/UniformRangePlacementTest.java | 290 +++++++++++++++ 31 files changed, 2233 insertions(+), 202 deletions(-) diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java index cc210622d1..fa8d17facd 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java @@ -22,6 +22,7 @@ import com.carrotsearch.hppc.ObjectIntHashMap; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import java.util.AbstractList; import java.util.AbstractMap; @@ -100,6 +101,18 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect return contents[begin + index]; } + public ReplicaList map(Function<Replica, Replica> map) + { + Replica[] contents = new Replica[size]; + for (int i = 0; i < contents.length; i++) + { + if (this.contents[i] != null) + contents[i] = map.apply(this.contents[i]); + } + + return new ReplicaList(contents, begin, contents.length); + } + public void add(Replica replica) { // can only add to full array - if we have sliced it, we must be a snapshot @@ -591,33 +604,30 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect /** * <p> - * It's not clear whether {@link AbstractReplicaCollection} should implement the order sensitive {@link Object#equals(Object) equals} - * of {@link java.util.List} or the order oblivious {@link Object#equals(Object) equals} of {@link java.util.Set}. We never rely on equality - * in the database so rather then leave in a potentially surprising implementation we have it throw {@link UnsupportedOperationException}. - * </p> - * <p> - * Don't implement this and pick one behavior over the other. If you want equality you can static import {@link com.google.common.collect.Iterables#elementsEqual(Iterable, Iterable)} - * and use that to get order sensitive equals. + * Implements order sensitive {@link Object#equals(Object)} #equals() equals} of {@link java.util.List}. * </p> */ public final boolean equals(Object o) { - throw new UnsupportedOperationException("AbstractReplicaCollection equals unsupported"); + if (!(o instanceof AbstractReplicaCollection)) + return false; + + return Iterators.elementsEqual(iterator(), ((AbstractReplicaCollection) o).iterator()); } /** * <p> - * It's not clear whether {@link AbstractReplicaCollection} should implement the order sensitive {@link Object#hashCode() hashCode} - * of {@link java.util.List} or the order oblivious {@link Object#hashCode() equals} of {@link java.util.Set}. We never rely on hashCode - * in the database so rather then leave in a potentially surprising implementation we have it throw {@link UnsupportedOperationException}. - * </p> - * <p> - * Don't implement this and pick one behavior over the other. + * Implements order sensitive {@link Object#hashCode() hashCode} of {@link java.util.List}. * </p> */ public final int hashCode() { - throw new UnsupportedOperationException("AbstractReplicaCollection hashCode unsupported"); + int result = 1; + + for (Replica e : this) + result = 31 * result + (e == null ? 0 : e.hashCode()); + + return result; } @Override diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index b3b4ae6b67..d5a54971ee 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -22,6 +22,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -47,6 +48,9 @@ import org.apache.cassandra.service.DatacenterSyncWriteResponseHandler; import org.apache.cassandra.service.DatacenterWriteResponseHandler; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.WriteResponseHandler; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; +import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.utils.FBUtilities; import org.cliffc.high_scale_lib.NonBlockingHashMap; @@ -94,13 +98,13 @@ public abstract class AbstractReplicationStrategy { Token searchToken = searchPosition.getToken(); long currentRingVersion = tokenMetadata.getRingVersion(); - Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken); + Token keyToken = TokenRingUtils.firstToken(tokenMetadata.sortedTokens(), searchToken); EndpointsForRange endpoints = getCachedReplicas(currentRingVersion, keyToken); if (endpoints == null) { TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap(); // if our cache got invalidated, it's possible there is a new token to account for too - keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken); + keyToken = TokenRingUtils.firstToken(tm.sortedTokens(), searchToken); endpoints = calculateNaturalReplicas(searchToken, tm); replicas.put(tm.getRingVersion(), keyToken, endpoints); } @@ -133,6 +137,8 @@ public abstract class AbstractReplicationStrategy */ public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata); + public abstract DataPlacement calculateDataPlacement(List<Range<Token>> ranges, ClusterMetadata metadata); + public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForWrite replicaPlan, Runnable callback, WriteType writeType, diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java index 0e3a9185fe..3f41ef806f 100644 --- a/src/java/org/apache/cassandra/locator/LocalStrategy.java +++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java @@ -17,31 +17,46 @@ */ package org.apache.cassandra.locator; -import java.util.Collections; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.RingPosition; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.transformations.cms.EntireRange; -public class LocalStrategy extends AbstractReplicationStrategy +public class LocalStrategy extends SystemStrategy { private static final ReplicationFactor RF = ReplicationFactor.fullOnly(1); - private final EndpointsForRange replicas; +// private final EndpointsForRange replicas; public LocalStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) { super(keyspaceName, tokenMetadata, snitch, configOptions); - replicas = EndpointsForRange.of( - new Replica(FBUtilities.getBroadcastAddressAndPort(), - DatabaseDescriptor.getPartitioner().getMinimumToken(), - DatabaseDescriptor.getPartitioner().getMinimumToken(), - true - ) - ); +// replicas = EndpointsForRange.of( +// new Replica(FBUtilities.getBroadcastAddressAndPort(), +// DatabaseDescriptor.getPartitioner().getMinimumToken(), +// DatabaseDescriptor.getPartitioner().getMinimumToken(), +// true +// ) +// ); + } + + @Override + public DataPlacement calculateDataPlacement(List<Range<Token>> ranges, ClusterMetadata metadata) + { + return EntireRange.placement; + } + + @Override + public ReplicationFactor getReplicationFactor() + { + return RF; } /** @@ -52,17 +67,12 @@ public class LocalStrategy extends AbstractReplicationStrategy @Override public EndpointsForRange getNaturalReplicas(RingPosition<?> searchPosition) { - return replicas; + return EntireRange.localReplicas; } public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata) { - return replicas; - } - - public ReplicationFactor getReplicationFactor() - { - return RF; + return EntireRange.localReplicas; } public void validateOptions() throws ConfigurationException diff --git a/src/java/org/apache/cassandra/locator/MetaStrategy.java b/src/java/org/apache/cassandra/locator/MetaStrategy.java index a62b0ea2c7..295e06f1a9 100644 --- a/src/java/org/apache/cassandra/locator/MetaStrategy.java +++ b/src/java/org/apache/cassandra/locator/MetaStrategy.java @@ -17,10 +17,17 @@ */ package org.apache.cassandra.locator; +import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.tcm.transformations.cms.EntireRange; public class MetaStrategy extends SystemStrategy { @@ -32,9 +39,23 @@ public class MetaStrategy extends SystemStrategy @Override public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata tokenMetadata) { - return ClusterMetadata.current().cmsReplicas; + return replicas(); } + @Override + public DataPlacement calculateDataPlacement(List<Range<Token>> ranges, ClusterMetadata metadata) + { + PlacementForRange placement = PlacementForRange.builder(1).withReplicaGroup(replicas()).build(); + return new DataPlacement(placement, placement); + } + + private static EndpointsForRange replicas() + { + Set<InetAddressAndPort> members = ClusterMetadata.current().cmsMembers(); + return EndpointsForRange.builder(EntireRange.entireRange, members.size()) + .addAll(members.stream().map(EntireRange::replica).collect(Collectors.toList())) + .build(); + } @Override public ReplicationFactor getReplicationFactor() { diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java index 5e6a2f8ec9..49be984c44 100644 --- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java +++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java @@ -30,13 +30,20 @@ import org.apache.cassandra.dht.Datacenters; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.TokenMetadata.Topology; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.compatibility.AsEndpoints; +import org.apache.cassandra.tcm.compatibility.AsLocations; +import org.apache.cassandra.tcm.compatibility.AsTokenMap; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.PlacementForRange; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.Pair; import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; @@ -106,7 +113,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy * For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure * clashing names aren't a problem. */ - Set<Pair<String, String>> racks; + Set<Location> racks; /** Number of replicas left to fill from this DC. */ int rfLeft; @@ -117,7 +124,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy int rackCount, int nodeCount, EndpointsForRange.Builder replicas, - Set<Pair<String, String>> racks) + Set<Location> racks) { this.replicas = replicas; this.racks = racks; @@ -137,7 +144,7 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy * Attempts to add an endpoint to the replicas for this datacenter, adding to the replicas set if successful. * Returns true if the endpoint was added, and this datacenter does not require further replicas. */ - boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location, Range<Token> replicatedRange) + boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Location location, Range<Token> replicatedRange) { if (done()) return false; @@ -181,19 +188,23 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy { // we want to preserve insertion order so that the first added endpoint becomes primary ArrayList<Token> sortedTokens = tokenMetadata.sortedTokens(); - Token replicaEnd = TokenMetadata.firstToken(sortedTokens, searchToken); - Token replicaStart = tokenMetadata.getPredecessor(replicaEnd); + Token replicaEnd = TokenRingUtils.firstToken(sortedTokens, searchToken); + Token replicaStart = TokenRingUtils.getPredecessor(sortedTokens, replicaEnd); Range<Token> replicatedRange = new Range<>(replicaStart, replicaEnd); + return calculateNaturalReplicas(searchToken, replicatedRange, tokenMetadata, tokenMetadata, tokenMetadata, datacenters); + } + private EndpointsForRange calculateNaturalReplicas(Token searchToken, + Range<Token> replicatedRange, + AsEndpoints endpoints, + AsLocations locations, + AsTokenMap tokens, + Map<String, ReplicationFactor> datacenters) + { EndpointsForRange.Builder builder = new EndpointsForRange.Builder(replicatedRange); - Set<Pair<String, String>> seenRacks = new HashSet<>(); + Set<Location> seenRacks = new HashSet<>(); - Topology topology = tokenMetadata.getTopology(); - // all endpoints in each DC, so we can check when we have exhausted all the members of a DC - Multimap<String, InetAddressAndPort> allEndpoints = topology.getDatacenterEndpoints(); - // all racks in a DC so we can check when we have exhausted all racks in a DC - Map<String, ImmutableMultimap<String, InetAddressAndPort>> racks = topology.getDatacenterRacks(); - assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members"; + assert !locations.allDatacenterEndpoints().isEmpty() && !locations.allDatacenterRacks().isEmpty() : "not aware of any cluster members"; int dcsToFill = 0; Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2); @@ -203,29 +214,48 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy { String dc = en.getKey(); ReplicationFactor rf = en.getValue(); - int nodeCount = sizeOrZero(allEndpoints.get(dc)); + int nodeCount = sizeOrZero(locations.datacenterEndpoints(dc)); if (rf.allReplicas <= 0 || nodeCount <= 0) continue; - DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, builder, seenRacks); + DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(locations.datacenterRacks(dc)), nodeCount, builder, seenRacks); dcs.put(dc, dcEndpoints); ++dcsToFill; } - Iterator<Token> tokenIter = TokenMetadata.ringIterator(sortedTokens, searchToken, false); + Iterator<Token> tokenIter = TokenRingUtils.ringIterator(tokens.tokens(), searchToken, false); while (dcsToFill > 0 && tokenIter.hasNext()) { Token next = tokenIter.next(); - InetAddressAndPort ep = tokenMetadata.getEndpoint(next); - Pair<String, String> location = topology.getLocation(ep); - DatacenterEndpoints dcEndpoints = dcs.get(location.left); + NodeId owner = tokens.owner(next); + InetAddressAndPort ep = endpoints.endpoint(owner); + Location location = locations.location(owner); + DatacenterEndpoints dcEndpoints = dcs.get(location.datacenter); if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location, replicatedRange)) --dcsToFill; } return builder.build(); } + @Override + public DataPlacement calculateDataPlacement(List<Range<Token>> ranges, + ClusterMetadata metadata) + { + PlacementForRange.Builder builder = PlacementForRange.builder(); + for (Range<Token> range : ranges) + { + builder.withReplicaGroup(calculateNaturalReplicas(range.right, + range, + metadata.directory, // AsEndpoints + metadata.directory, // AsLocations + metadata.tokenMap, + datacenters)); + } + PlacementForRange built = builder.build(); + return new DataPlacement(built, built); + } + private int sizeOrZero(Multimap<?, ?> collection) { return collection != null ? collection.asMap().size() : 0; diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java index 488b601ce7..1384fa8f9c 100644 --- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java +++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.slf4j.Logger; @@ -35,6 +36,13 @@ import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.compatibility.AsEndpoints; +import org.apache.cassandra.tcm.compatibility.AsTokenMap; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.PlacementForRange; /** @@ -56,17 +64,37 @@ public class SimpleStrategy extends AbstractReplicationStrategy this.rf = ReplicationFactor.fromString(this.configOptions.get(REPLICATION_FACTOR)); } + @Override + public DataPlacement calculateDataPlacement(List<Range<Token>> ranges, ClusterMetadata metadata) + { + PlacementForRange.Builder builder = PlacementForRange.builder(); + for (Range<Token> range : ranges) + builder.withReplicaGroup(calculateNaturalReplicas(range.right, metadata.tokenMap.tokens(), range, metadata.directory, metadata.tokenMap)); + + PlacementForRange built = builder.build(); + return new DataPlacement(built, built); + } + @Override public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata) { ArrayList<Token> ring = metadata.sortedTokens(); + Token replicaEnd = TokenRingUtils.firstToken(ring, token); + Token replicaStart = TokenRingUtils.getPredecessor(ring, replicaEnd); + Range<Token> replicaRange = new Range<>(replicaStart, replicaEnd); + return calculateNaturalReplicas(token, ring, replicaRange, metadata, metadata); + } + + private EndpointsForRange calculateNaturalReplicas(Token token, + List<Token> ring, + Range<Token> replicaRange, + AsEndpoints endpoints, + AsTokenMap tokens) + { if (ring.isEmpty()) - return EndpointsForRange.empty(new Range<>(metadata.partitioner.getMinimumToken(), metadata.partitioner.getMinimumToken())); + return EndpointsForRange.empty(new Range<>(tokens.partitioner().getMinimumToken(), token.getPartitioner().getMinimumToken())); - Token replicaEnd = TokenMetadata.firstToken(ring, token); - Token replicaStart = metadata.getPredecessor(replicaEnd); - Range<Token> replicaRange = new Range<>(replicaStart, replicaEnd); - Iterator<Token> iter = TokenMetadata.ringIterator(ring, token, false); + Iterator<Token> iter = TokenRingUtils.ringIterator(ring, token, false); EndpointsForRange.Builder replicas = new EndpointsForRange.Builder(replicaRange, rf.allReplicas); @@ -74,7 +102,8 @@ public class SimpleStrategy extends AbstractReplicationStrategy while (replicas.size() < rf.allReplicas && iter.hasNext()) { Token tk = iter.next(); - InetAddressAndPort ep = metadata.getEndpoint(tk); + NodeId owner = tokens.owner(tk); + InetAddressAndPort ep = endpoints.endpoint(owner); if (!replicas.endpoints().contains(ep)) replicas.add(new Replica(ep, replicaRange, replicas.size() < rf.fullReplicas)); } diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java index 6b0ea7d810..abbc76d482 100644 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@ -45,6 +45,12 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.ReplicaCollection.Builder.Conflict; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.compatibility.AsEndpoints; +import org.apache.cassandra.tcm.compatibility.AsLocations; +import org.apache.cassandra.tcm.compatibility.AsTokenMap; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.utils.BiMultiValMap; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.SortedBiMultiValMap; @@ -52,7 +58,7 @@ import org.apache.cassandra.utils.SortedBiMultiValMap; import static org.apache.cassandra.config.CassandraRelevantProperties.LINE_SEPARATOR; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; -public class TokenMetadata +public class TokenMetadata implements AsEndpoints, AsLocations, AsTokenMap { private static final Logger logger = LoggerFactory.getLogger(TokenMetadata.class); @@ -786,10 +792,7 @@ public class TokenMetadata public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token> tokens) { - Collection<Range<Token>> ranges = new ArrayList<>(tokens.size()); - for (Token right : tokens) - ranges.add(new Range<>(getPredecessor(right), right)); - return ranges; + return TokenRingUtils.getPrimaryRangesFor(sortedTokens(), tokens); } @Deprecated @@ -1041,22 +1044,6 @@ public class TokenMetadata return newPendingRanges; } - public Token getPredecessor(Token token) - { - List<Token> tokens = sortedTokens(); - int index = Collections.binarySearch(tokens, token); - assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings(); - return index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1); - } - - public Token getSuccessor(Token token) - { - List<Token> tokens = sortedTokens(); - int index = Collections.binarySearch(tokens, token); - assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings(); - return (index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1); - } - private String tokenToEndpointMapKeysAsStrings() { lock.readLock().lock(); @@ -1174,66 +1161,6 @@ public class TokenMetadata } } - public static int firstTokenIndex(final ArrayList<Token> ring, Token start, boolean insertMin) - { - assert ring.size() > 0; - // insert the minimum token (at index == -1) if we were asked to include it and it isn't a member of the ring - int i = Collections.binarySearch(ring, start); - if (i < 0) - { - i = (i + 1) * (-1); - if (i >= ring.size()) - i = insertMin ? -1 : 0; - } - return i; - } - - public static Token firstToken(final ArrayList<Token> ring, Token start) - { - return ring.get(firstTokenIndex(ring, start, false)); - } - - /** - * iterator over the Tokens in the given ring, starting with the token for the node owning start - * (which does not have to be a Token in the ring) - * @param includeMin True if the minimum token should be returned in the ring even if it has no owner. - */ - public static Iterator<Token> ringIterator(final ArrayList<Token> ring, Token start, boolean includeMin) - { - if (ring.isEmpty()) - return includeMin ? Iterators.singletonIterator(start.getPartitioner().getMinimumToken()) - : Collections.emptyIterator(); - - final boolean insertMin = includeMin && !ring.get(0).isMinimum(); - final int startIndex = firstTokenIndex(ring, start, insertMin); - return new AbstractIterator<Token>() - { - int j = startIndex; - protected Token computeNext() - { - if (j < -1) - return endOfData(); - try - { - // return minimum for index == -1 - if (j == -1) - return start.getPartitioner().getMinimumToken(); - // return ring token for other indexes - return ring.get(j); - } - finally - { - j++; - if (j == ring.size()) - j = insertMin ? -1 : 0; - if (j == startIndex) - // end iteration - j = -2; - } - } - }; - } - /** used by tests */ public void clearUnsafe() { @@ -1626,4 +1553,74 @@ public class TokenMetadata } } } + + // Methods of org.apache.cassandra.tcm.compatibility.AsEndpoints + @Override + public NodeId peerId(InetAddressAndPort endpoint) + { + return new NodeId(getHostId(endpoint)); + } + + @Override + public InetAddressAndPort endpoint(NodeId id) + { + return getEndpointForHostId(id.uuid); + } + + // Methods of org.apache.cassandra.tcm.compatibility.AsLocations + @Override + public Location location(NodeId peer) + { + Pair<String, String> dcRack = topology.getLocation(endpoint(peer)); + return new Location(dcRack.left, dcRack.right); + } + + @Override + public Set<InetAddressAndPort> datacenterEndpoints(String datacenter) + { + return new HashSet<>(topology.dcEndpoints.get(datacenter)); + } + + @Override + public Multimap<String, InetAddressAndPort> allDatacenterEndpoints() + { + return topology.getDatacenterEndpoints(); + } + + @Override + public Multimap<String, InetAddressAndPort> datacenterRacks(String datacenter) + { + return topology.dcRacks.get(datacenter); + } + + @Override + public Map<String, Multimap<String, InetAddressAndPort>> allDatacenterRacks() + { + // terrible, but temporary + Map<String, Multimap<String, InetAddressAndPort>> dcRacks = new HashMap<>(); + topology.getDatacenterRacks().forEach((dc, racks) -> { + dcRacks.put(dc, HashMultimap.create(racks)); + }); + return dcRacks; + } + + // Methods of org.apache.cassandra.tcm.compatibility.AsTokenMap + @Override + public IPartitioner partitioner() + { + return partitioner; + } + + @Override + public ImmutableList<Token> tokens() + { + return ImmutableList.copyOf(sortedTokens); + } + + @Override + public NodeId owner(Token token) + { + return new NodeId(getHostId(getEndpoint(token))); + } + } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 75557a4489..b4fdcd1d99 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -208,6 +208,7 @@ import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Startup; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.transformations.Register; import org.apache.cassandra.transport.ClientResourceLimits; @@ -4830,13 +4831,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); Collection<Range<Token>> primaryRanges = new HashSet<>(); TokenMetadata metadata = tokenMetadata.cloneOnlyTokenMap(); - for (Token token : metadata.sortedTokens()) + List<Token> sortedTokens = metadata.sortedTokens(); + for (Token token : sortedTokens) { EndpointsForRange replicas = strategy.calculateNaturalReplicas(token, metadata); if (replicas.size() > 0 && replicas.get(0).endpoint().equals(ep)) { Preconditions.checkState(replicas.get(0).isFull()); - primaryRanges.add(new Range<>(metadata.getPredecessor(token), token)); + primaryRanges.add(new Range<>(TokenRingUtils.getPredecessor(sortedTokens, token), token)); } } return primaryRanges; @@ -4858,7 +4860,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); Collection<Range<Token>> localDCPrimaryRanges = new HashSet<>(); - for (Token token : metadata.sortedTokens()) + List<Token> sortedTokens = metadata.sortedTokens(); + for (Token token : sortedTokens) { EndpointsForRange replicas = strategy.calculateNaturalReplicas(token, metadata); for (Replica replica : replicas) @@ -4867,7 +4870,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { if (replica.endpoint().equals(referenceEndpoint)) { - localDCPrimaryRanges.add(new Range<>(metadata.getPredecessor(token), token)); + localDCPrimaryRanges.add(new Range<>(TokenRingUtils.getPredecessor(sortedTokens, token), token)); } break; } diff --git a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java index ef88c9dffd..605c4561b9 100644 --- a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java +++ b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java @@ -36,6 +36,7 @@ import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.ReplicaPlans; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.Pair; @@ -92,7 +93,7 @@ class ReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead> List<AbstractBounds<PartitionPosition>> ranges = new ArrayList<>(); // divide the queryRange into pieces delimited by the ring and minimum tokens - Iterator<Token> ringIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true); + Iterator<Token> ringIter = TokenRingUtils.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true); AbstractBounds<PartitionPosition> remainder = queryRange; while (ringIter.hasNext()) { diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 90d5343d2c..bbccd3af2b 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -19,6 +19,7 @@ package org.apache.cassandra.tcm; import java.io.IOException; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -26,11 +27,13 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.EndpointsForRange; @@ -44,6 +47,7 @@ import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.TokenMap; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.tcm.transformations.cms.EntireRange; @@ -65,6 +69,7 @@ public class ClusterMetadata public final DistributedSchema schema; public final Directory directory; + public final TokenMap tokenMap; public final EndpointsForRange cmsReplicas; public final ImmutableSet<InetAddressAndPort> cmsMembers; @@ -73,7 +78,8 @@ public class ClusterMetadata this(partitioner, Directory.EMPTY); } - private ClusterMetadata(IPartitioner partitioner, Directory directory) + @VisibleForTesting + public ClusterMetadata(IPartitioner partitioner, Directory directory) { this(partitioner, directory, DistributedSchema.first()); } @@ -86,6 +92,7 @@ public class ClusterMetadata partitioner, schema, directory, + new TokenMap(partitioner), ImmutableSet.of(), ImmutableMap.of()); } @@ -96,15 +103,21 @@ public class ClusterMetadata IPartitioner partitioner, DistributedSchema schema, Directory directory, + TokenMap tokenMap, Set<InetAddressAndPort> cmsMembers, Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions) { + // TODO: token map is a feature of the specific placement strategy, and so may not be a relevant component of + // ClusterMetadata in the long term. We need to consider how the actual components of metadata can be evolved + // over time. + assert tokenMap == null || tokenMap.partitioner().getClass().equals(partitioner.getClass()) : "Partitioner for TokenMap doesn't match base partitioner"; this.epoch = epoch; this.period = period; this.lastInPeriod = lastInPeriod; this.partitioner = partitioner; this.schema = schema; this.directory = directory; + this.tokenMap = tokenMap; this.cmsMembers = ImmutableSet.copyOf(cmsMembers); this.extensions = ImmutableMap.copyOf(extensions); @@ -143,6 +156,7 @@ public class ClusterMetadata partitioner, schema, directory, + tokenMap, cmsMembers, extensions); } @@ -166,6 +180,7 @@ public class ClusterMetadata private final IPartitioner partitioner; private DistributedSchema schema; private Directory directory; + private TokenMap tokenMap; private final Set<InetAddressAndPort> cmsMembers; private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions; private final Set<MetadataKey> modifiedKeys; @@ -179,6 +194,7 @@ public class ClusterMetadata this.partitioner = metadata.partitioner; this.schema = metadata.schema; this.directory = metadata.directory; + this.tokenMap = metadata.tokenMap; this.cmsMembers = new HashSet<>(metadata.cmsMembers); extensions = new HashMap<>(metadata.extensions); modifiedKeys = new HashSet<>(); @@ -202,6 +218,25 @@ public class ClusterMetadata return this; } + public Transformer proposeToken(NodeId nodeId, Collection<Token> tokens) + { + tokenMap = tokenMap.assignTokens(nodeId, tokens); + directory = directory.withRackAndDC(nodeId); + return this; + } + + public Transformer unproposeTokens(NodeId nodeId) + { + tokenMap = tokenMap.unassignTokens(nodeId); + return this; + } + + public Transformer unproposeTokens(NodeId nodeId, Collection<Token> tokens) + { + tokenMap = tokenMap.unassignTokens(nodeId, tokens); + return this; + } + public Transformer withCMSMember(InetAddressAndPort member) { cmsMembers.add(member); @@ -270,12 +305,19 @@ public class ClusterMetadata directory = directory.withLastModified(epoch); } + if (tokenMap != base.tokenMap) + { + modifiedKeys.add(MetadataKeys.TOKEN_MAP); + tokenMap = tokenMap.withLastModified(epoch); + } + return new Transformed(new ClusterMetadata(epoch, period, lastInPeriod, partitioner, schema, directory, + tokenMap, cmsMembers, extensions), ImmutableSet.copyOf(modifiedKeys)); @@ -291,6 +333,7 @@ public class ClusterMetadata ", partitioner=" + partitioner + ", schema=" + schema + ", directory=" + schema + + ", tokenMap=" + tokenMap + ", extensions=" + extensions + ", cmsMembers=" + cmsMembers + ", modifiedKeys=" + modifiedKeys + @@ -328,13 +371,14 @@ public class ClusterMetadata lastInPeriod == that.lastInPeriod && schema.equals(that.schema) && directory.equals(that.directory) && + tokenMap.equals(that.tokenMap) && extensions.equals(that.extensions); } @Override public int hashCode() { - return Objects.hash(epoch, lastInPeriod, schema, directory, extensions); + return Objects.hash(epoch, lastInPeriod, schema, directory, tokenMap, extensions); } public static ClusterMetadata current() @@ -378,6 +422,7 @@ public class ClusterMetadata out.writeUTF(metadata.partitioner.getClass().getCanonicalName()); DistributedSchema.serializer.serialize(metadata.schema, out, version); Directory.serializer.serialize(metadata.directory, out, version); + TokenMap.serializer.serialize(metadata.tokenMap, out, version); out.writeInt(metadata.extensions.size()); for (Map.Entry<ExtensionKey<?, ?>, ExtensionValue<?>> entry : metadata.extensions.entrySet()) { @@ -401,6 +446,7 @@ public class ClusterMetadata IPartitioner partitioner = FBUtilities.newPartitioner(in.readUTF()); DistributedSchema schema = DistributedSchema.serializer.deserialize(in, version); Directory dir = Directory.serializer.deserialize(in, version); + TokenMap tokenMap = TokenMap.serializer.deserialize(in, version); int items = in.readInt(); Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions = new HashMap<>(items); for (int i = 0; i < items; i++) @@ -420,6 +466,7 @@ public class ClusterMetadata partitioner, schema, dir, + tokenMap, members, extensions); } @@ -437,7 +484,8 @@ public class ClusterMetadata TypeSizes.BOOL_SIZE + sizeof(metadata.partitioner.getClass().getCanonicalName()) + DistributedSchema.serializer.serializedSize(metadata.schema, version) + - Directory.serializer.serializedSize(metadata.directory, version); + Directory.serializer.serializedSize(metadata.directory, version) + + TokenMap.serializer.serializedSize(metadata.tokenMap, version); size += TypeSizes.INT_SIZE; for (InetAddressAndPort member : metadata.cmsMembers) diff --git a/src/java/org/apache/cassandra/tcm/MetadataKeys.java b/src/java/org/apache/cassandra/tcm/MetadataKeys.java index b545e03d4c..d1bde221cc 100644 --- a/src/java/org/apache/cassandra/tcm/MetadataKeys.java +++ b/src/java/org/apache/cassandra/tcm/MetadataKeys.java @@ -28,8 +28,11 @@ public class MetadataKeys public static final MetadataKey SCHEMA = make(CORE_NS, "schema", "dist_schema"); public static final MetadataKey NODE_DIRECTORY = make(CORE_NS, "membership", "node_directory"); + public static final MetadataKey TOKEN_MAP = make(CORE_NS, "ownership", "token_map"); - public static final ImmutableSet<MetadataKey> CORE_METADATA = ImmutableSet.of(SCHEMA); + public static final ImmutableSet<MetadataKey> CORE_METADATA = ImmutableSet.of(SCHEMA, + NODE_DIRECTORY, + TOKEN_MAP); public static MetadataKey make(String...parts) { diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java b/src/java/org/apache/cassandra/tcm/compatibility/AsEndpoints.java similarity index 58% copy from src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java copy to src/java/org/apache/cassandra/tcm/compatibility/AsEndpoints.java index 8ae2302527..fd8a4d39f6 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/AsEndpoints.java @@ -16,22 +16,13 @@ * limitations under the License. */ -package org.apache.cassandra.tcm.transformations.cms; +package org.apache.cassandra.tcm.compatibility; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.membership.NodeId; -public class EntireRange +public interface AsEndpoints { - public static final Range<Token> entireRange = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), - DatabaseDescriptor.getPartitioner().getMinimumToken()); - public static Replica replica(InetAddressAndPort addr) - { - return new Replica(addr, entireRange, true); - } - -} \ No newline at end of file + public NodeId peerId(InetAddressAndPort endpoint); + public InetAddressAndPort endpoint(NodeId id); +} diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java b/src/java/org/apache/cassandra/tcm/compatibility/AsLocations.java similarity index 58% copy from src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java copy to src/java/org/apache/cassandra/tcm/compatibility/AsLocations.java index 8ae2302527..3286f1f1d9 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/AsLocations.java @@ -16,22 +16,23 @@ * limitations under the License. */ -package org.apache.cassandra.tcm.transformations.cms; +package org.apache.cassandra.tcm.compatibility; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Multimap; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeId; -public class EntireRange +public interface AsLocations { - public static final Range<Token> entireRange = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), - DatabaseDescriptor.getPartitioner().getMinimumToken()); - public static Replica replica(InetAddressAndPort addr) - { - return new Replica(addr, entireRange, true); - } + Location location(NodeId peer); + Set<InetAddressAndPort> datacenterEndpoints(String datacenter); + Multimap<String, InetAddressAndPort> allDatacenterEndpoints(); -} \ No newline at end of file + Multimap<String, InetAddressAndPort> datacenterRacks(String datacenter); + Map<String, Multimap<String, InetAddressAndPort>> allDatacenterRacks(); +} diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java b/src/java/org/apache/cassandra/tcm/compatibility/AsTokenMap.java similarity index 57% copy from src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java copy to src/java/org/apache/cassandra/tcm/compatibility/AsTokenMap.java index 8ae2302527..9960dc10b8 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/AsTokenMap.java @@ -16,22 +16,18 @@ * limitations under the License. */ -package org.apache.cassandra.tcm.transformations.cms; +package org.apache.cassandra.tcm.compatibility; +import java.util.List; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.membership.NodeId; -public class EntireRange +public interface AsTokenMap { - public static final Range<Token> entireRange = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), - DatabaseDescriptor.getPartitioner().getMinimumToken()); - public static Replica replica(InetAddressAndPort addr) - { - return new Replica(addr, entireRange, true); - } - -} \ No newline at end of file + public IPartitioner partitioner(); + // todo: this could return NavigableSet + public List<Token> tokens(); + public NodeId owner(Token token); +} diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index ad83e404ae..477a1ea2d8 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -31,6 +31,7 @@ import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Period; import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.ownership.TokenMap; public class GossipHelper { @@ -48,6 +49,7 @@ public class GossipHelper DatabaseDescriptor.getPartitioner(), DistributedSchema.fromSystemTables(SchemaKeyspace.fetchNonSystemKeyspaces()), Directory.EMPTY, + new TokenMap(DatabaseDescriptor.getPartitioner()), Collections.emptySet(), Collections.emptyMap()); } diff --git a/src/java/org/apache/cassandra/tcm/compatibility/TokenRingUtils.java b/src/java/org/apache/cassandra/tcm/compatibility/TokenRingUtils.java new file mode 100644 index 0000000000..1698ba399e --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/compatibility/TokenRingUtils.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.compatibility; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +/** + * Functions for interacting with a sorted list of tokens as modelled as a ring + */ +public class TokenRingUtils +{ + private static final Logger logger = LoggerFactory.getLogger(TokenRingUtils.class); + + public static int firstTokenIndex(final List<Token> ring, Token start, boolean insertMin) + { + assert ring.size() > 0 : ring.toString(); + // insert the minimum token (at index == -1) if we were asked to include it and it isn't a member of the ring + int i = Collections.binarySearch(ring, start); + if (i < 0) + { + i = (i + 1) * (-1); + if (i >= ring.size()) + i = insertMin ? -1 : 0; + } + return i; + } + + public static Token firstToken(List<Token> ring, Token start) + { + return ring.get(firstTokenIndex(ring, start, false)); + } + + public static Token getPredecessor(List<Token> ring, Token start) + { + int idx = firstTokenIndex(ring, start, false); + return ring.get(idx == 0 ? ring.size() - 1 : idx - 1); + } + + public static Collection<Range<Token>> getPrimaryRangesFor(List<Token> ring, Collection<Token> tokens) + { + Collection<Range<Token>> ranges = new ArrayList<>(tokens.size()); + for (Token right : tokens) + ranges.add(new Range<>(getPredecessor(ring, right), right)); + return ranges; + } + + /** + * iterator over the Tokens in the given ring, starting with the token for the node owning start + * (which does not have to be a Token in the ring) + * @param includeMin True if the minimum token should be returned in the ring even if it has no owner. + */ + public static Iterator<Token> ringIterator(final List<Token> ring, Token start, boolean includeMin) + { + if (ring.isEmpty()) + return includeMin ? Iterators.singletonIterator(start.getPartitioner().getMinimumToken()) + : Collections.emptyIterator(); + + final boolean insertMin = includeMin && !ring.get(0).isMinimum(); + final int startIndex = firstTokenIndex(ring, start, insertMin); + return new AbstractIterator<Token>() + { + int j = startIndex; + protected Token computeNext() + { + if (j < -1) + return endOfData(); + try + { + // return minimum for index == -1 + if (j == -1) + return start.getPartitioner().getMinimumToken(); + // return ring token for other indexes + return ring.get(j); + } + finally + { + j++; + if (j == ring.size()) + j = insertMin ? -1 : 0; + if (j == startIndex) + // end iteration + j = -2; + } + } + }; + } + + /** + * Get all ranges that span the ring given a set + * of tokens. All ranges are in sorted order of + * ranges. + * @return ranges in sorted order + */ + public static List<Range<Token>> getAllRanges(List<Token> sortedTokens) + { + if (logger.isTraceEnabled()) + logger.trace("computing ranges for {}", StringUtils.join(sortedTokens, ", ")); + + if (sortedTokens.isEmpty()) + return Collections.emptyList(); + int size = sortedTokens.size(); + List<Range<Token>> ranges = new ArrayList<>(size + 1); + for (int i = 1; i < size; ++i) + { + Range<Token> range = new Range<>(sortedTokens.get(i - 1), sortedTokens.get(i)); + ranges.add(range); + } + Range<Token> range = new Range<>(sortedTokens.get(size - 1), sortedTokens.get(0)); + ranges.add(range); + + return ranges; + } + + public static Range<Token> getRange(List<Token> ring, Token token) + { + int idx = firstTokenIndex(ring, token, false); + Token replicaEnd = ring.get(idx); + Token replicaStart = ring.get(idx == 0 ? ring.size() - 1 : idx - 1); + return new Range<>(replicaStart, replicaEnd); + } +} diff --git a/src/java/org/apache/cassandra/tcm/membership/Directory.java b/src/java/org/apache/cassandra/tcm/membership/Directory.java index d04c52dfc1..842c78f95c 100644 --- a/src/java/org/apache/cassandra/tcm/membership/Directory.java +++ b/src/java/org/apache/cassandra/tcm/membership/Directory.java @@ -34,6 +34,8 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.MetadataValue; +import org.apache.cassandra.tcm.compatibility.AsEndpoints; +import org.apache.cassandra.tcm.compatibility.AsLocations; import org.apache.cassandra.tcm.serialization.MetadataSerializer; import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.net.MessagingService; @@ -44,7 +46,7 @@ import org.apache.cassandra.utils.btree.BTreeMultimap; import static org.apache.cassandra.db.TypeSizes.sizeof; -public class Directory implements MetadataValue<Directory> +public class Directory implements MetadataValue<Directory>, AsEndpoints, AsLocations { public static final Serializer serializer = new Serializer(); diff --git a/src/java/org/apache/cassandra/tcm/ownership/PrimaryRangeComparator.java b/src/java/org/apache/cassandra/tcm/ownership/PrimaryRangeComparator.java new file mode 100644 index 0000000000..412c09bb59 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/PrimaryRangeComparator.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.Comparator; + +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.NodeId; + +public class PrimaryRangeComparator implements Comparator<Replica> +{ + private final TokenMap tokens; + private final Directory directory; + + public PrimaryRangeComparator(TokenMap tokens, Directory directory) + { + this.tokens = tokens; + this.directory = directory; + } + + @Override + public int compare(Replica o1, Replica o2) + { + assert o1.range().equals(o2.range()); + Token target = o1.range().right.equals(tokens.partitioner().getMinimumToken()) + ? tokens.tokens().get(0) + : o1.range().right; + NodeId owner = tokens.owner(target); + return directory.peerId(o1.endpoint()).equals(owner) + ? -1 + : directory.peerId(o2.endpoint()).equals(owner) ? 1 : 0; + } +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java b/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java new file mode 100644 index 0000000000..8dde234c5f --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/TokenMap.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.MetadataValue; +import org.apache.cassandra.tcm.compatibility.AsTokenMap; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.BiMultiValMap; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.SortedBiMultiValMap; + +import static org.apache.cassandra.db.TypeSizes.sizeof; + +public class TokenMap implements MetadataValue<TokenMap>, AsTokenMap +{ + public static final Serializer serializer = new Serializer(); + + private static final Logger logger = LoggerFactory.getLogger(TokenMap.class); + + private final SortedBiMultiValMap<Token, NodeId> map; + private final List<Token> tokens; + private final List<Range<Token>> ranges; + // TODO: move partitioner to the users (SimpleStrategy and Uniform Range Placement?) + private final IPartitioner partitioner; + private final Epoch lastModified; + + public TokenMap(IPartitioner partitioner) + { + this(Epoch.EMPTY, partitioner, SortedBiMultiValMap.create()); + } + + private TokenMap(Epoch lastModified, IPartitioner partitioner, SortedBiMultiValMap<Token, NodeId> map) + { + this.lastModified = lastModified; + this.partitioner = partitioner; + this.map = map; + this.tokens = tokens(); + this.ranges = toRanges(tokens, partitioner); + } + + @Override + public TokenMap withLastModified(Epoch epoch) + { + return new TokenMap(epoch, partitioner, map); + } + + @Override + public Epoch lastModified() + { + return lastModified; + } + + public TokenMap assignTokens(NodeId id, Collection<Token> tokens) + { + SortedBiMultiValMap<Token, NodeId> finalisedCopy = SortedBiMultiValMap.create(map); + tokens.forEach(t -> finalisedCopy.putIfAbsent(t, id)); + return new TokenMap(lastModified, partitioner, finalisedCopy); + } + + /** + * Find a token that is immediately after this one in the ring. + */ + public Token nextInRing(Token token, boolean exactMatchOnly) + { + int idx = Collections.binarySearch(tokens, token); + if (idx < 0) + { + if (exactMatchOnly) + throw new IllegalArgumentException(String.format("Can not find token %s in the ring %s", token, token)); + int realIdx = -1 - idx; + if (realIdx == tokens.size()) + realIdx = 0; + + return tokens.get(realIdx); + } + + if (idx == tokens.size() - 1) + return tokens.get(0); + else + return tokens.get(idx + 1); + } + + public TokenMap unassignTokens(NodeId id) + { + SortedBiMultiValMap<Token, NodeId> finalisedCopy = SortedBiMultiValMap.create(map); + finalisedCopy.removeValue(id); + return new TokenMap(lastModified, partitioner, finalisedCopy); + } + + public TokenMap unassignTokens(NodeId id, Collection<Token> tokens) + { + SortedBiMultiValMap<Token, NodeId> finalisedCopy = SortedBiMultiValMap.create(map); + for (Token token : tokens) + { + NodeId nodeId = finalisedCopy.remove(token); + assert nodeId.equals(id); + } + + return new TokenMap(lastModified, partitioner, finalisedCopy); + } + + public BiMultiValMap<Token, NodeId> asMap() + { + return SortedBiMultiValMap.create(map); + } + + public boolean isEmpty() + { + return map.isEmpty(); + } + + public IPartitioner partitioner() + { + return partitioner; + } + + public ImmutableList<Token> tokens() + { + return ImmutableList.copyOf(map.keySet()); + } + + public ImmutableList<Token> tokens(NodeId nodeId) + { + Collection<Token> tokens = map.inverse().get(nodeId); + if (tokens == null) + return null; + return ImmutableList.copyOf(tokens); + } + + public List<Range<Token>> toRanges() + { + return ranges; + } + + public static List<Range<Token>> toRanges(List<Token> tokens, IPartitioner partitioner) + { + if (tokens.isEmpty()) + return Collections.emptyList(); + + List<Range<Token>> ranges = new ArrayList<>(tokens.size() + 1); + maybeAdd(ranges, new Range<>(partitioner.getMinimumToken(), tokens.get(0))); + for (int i = 1; i < tokens.size(); i++) + maybeAdd(ranges, new Range<>(tokens.get(i - 1), tokens.get(i))); + maybeAdd(ranges, new Range<>(tokens.get(tokens.size() - 1), partitioner.getMinimumToken())); + if (ranges.isEmpty()) + ranges.add(new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken())); + return ranges; + } + + private static void maybeAdd(List<Range<Token>> ranges, Range<Token> r) + { + if (r.left.compareTo(r.right) != 0) + ranges.add(r); + } + + public Token nextToken(List<Token> tokens, Token token) + { + return tokens.get(nextTokenIndex(tokens, token)); + } + + //Duplicated from TokenMetadata::firstTokenIndex + public static int nextTokenIndex(final List<Token> ring, Token start) + { + assert ring.size() > 0; + int i = Collections.binarySearch(ring, start); + if (i < 0) + { + i = (i + 1) * (-1); + if (i >= ring.size()) + i = 0; + } + return i; + } + + public NodeId owner(Token token) + { + return map.get(token); + } + + public String toString() + { + return "TokenMap{" + + toDebugString() + + '}'; + } + + public void logDebugString() + { + logger.info(toDebugString()); + } + + public String toDebugString() + { + StringBuilder b = new StringBuilder(); + for (Map.Entry<Token, NodeId> entry : map.entrySet()) + b.append('[').append(entry.getKey()).append("] => ").append(entry.getValue().uuid).append(";\n"); + return b.toString(); + } + + public static class Serializer implements MetadataSerializer<TokenMap> + { + public void serialize(TokenMap t, DataOutputPlus out, Version version) throws IOException + { + Epoch.serializer.serialize(t.lastModified, out, version); + out.writeUTF(t.partitioner.getClass().getCanonicalName()); + out.writeInt(t.map.size()); + for (Map.Entry<Token, NodeId> entry : t.map.entrySet()) + { + Token.metadataSerializer.serialize(entry.getKey(), out, version); + NodeId.serializer.serialize(entry.getValue(), out, version); + } + } + + public TokenMap deserialize(DataInputPlus in, Version version) throws IOException + { + Epoch lastModified = Epoch.serializer.deserialize(in, version); + IPartitioner partitioner = FBUtilities.newPartitioner(in.readUTF()); + int size = in.readInt(); + SortedBiMultiValMap<Token, NodeId> tokens = SortedBiMultiValMap.create(); + for (int i = 0; i < size; i++) + tokens.put(Token.metadataSerializer.deserialize(in, version), + NodeId.serializer.deserialize(in, version)); + return new TokenMap(lastModified, partitioner, tokens); + } + + public long serializedSize(TokenMap t, Version version) + { + long size = Epoch.serializer.serializedSize(t.lastModified, version); + size += sizeof(t.partitioner.getClass().getCanonicalName()); + size += sizeof(t.map.size()); + for (Map.Entry<Token, NodeId> entry : t.map.entrySet()) + { + size += Token.metadataSerializer.serializedSize(entry.getKey(), version); + size += NodeId.serializer.serializedSize(entry.getValue(), version); + } + return size; + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof TokenMap)) return false; + TokenMap tokenMap = (TokenMap) o; + return Objects.equals(lastModified, tokenMap.lastModified) && + isEquivalent(tokenMap); + } + + @Override + public int hashCode() + { + return Objects.hash(lastModified, map, partitioner); + } + + /** + * returns true if this token map is functionally equivalent to the given one + * + * does not check equality of lastModified + */ + public boolean isEquivalent(TokenMap tokenMap) + { + return Objects.equals(map, tokenMap.map) && + Objects.equals(partitioner, tokenMap.partitioner); + } +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java new file mode 100644 index 0000000000..c0e7be3e4f --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/UniformRangePlacement.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; + +/** + * The defining feature of this placement stategy is that all layouts (i.e. replication params) use the same + * set of ranges. So when splitting the current ranges, we only need to calculate the splits once and apply to + * all existing placements. + * + * Also, when using this strategy, the read and write placements should (eventually) be identical. While range + * movements/bootstraps/decommissions are in-flight, this will not be the case as the read and write replica + * sets will diverge while nodes are acquiring/relinquishing ranges. Although there may always be such operations + * ongoing, this is technically a temporary state. + * + * Because of this, when calculating the steps to transition between the current state and a proposed new state, + * we work from the associated TokenMaps, the assumption being that eventually both the read and write placements + * will converge and will, at that point, reflect those TokenMaps. + * This means that the starting point of each transition is the intended end state of the preceding transitions. + * + * To do this calculation, we create a canonical DataPlacement from the current TokenMap and split it according + * to the proposed tokens. As we iterate through and split the existing ranges, we construct a new DataPlacement for + * each currently defined. There is no movement of data between the initial and new placements, only splitting of + * existing ranges, so we can simply copy the replica groups from the old ranges to the new. + * e.g.: + * We start with a portion of the ring containing tokens 100, 200 assigned to nodes A & B respectively. The node + * with the next highest token after B is C. RF is 2. We want to add a new token, 150 and assign it to a new node, D. + * Starting placement, with tokens 100, 200 + * (0, 100] : {A,B} + * (100, 200] : {B,C} + * Next placement, with tokens 100, 150, 200 but no ownership changes + * (0, 100] : {A,B} + * (100, 150] : {B,C} + * (150, 200] : {B,C} + * No actual movement has occurred, only the ranges have been adjusted. + * We then calculate the desired end state, with new nodes taking ownership of their assigned tokens. + * If Node D takes ownership of Token 150, the desired end state would be: + * (0, 100] : {A,D} + * (100, 150] : {D,C} + * (150, 200] : {B,C} + * As mentioned, distinct placements are maintained for reads and writes, enabling ranges to be temporarily + * overreplicated during a movement operation (analogous to the previous concept of pending ranges). + */ +public class UniformRangePlacement implements PlacementProvider +{ + private static final Logger logger = LoggerFactory.getLogger(UniformRangePlacement.class); + + @Override + public PlacementTransitionPlan planForJoin(ClusterMetadata metadata, + NodeId joining, + Set<Token> tokens, + Keyspaces keyspaces) + { + // There are no other nodes in the cluster, so the joining node will be taking ownership of the entire range. + if (metadata.tokenMap.isEmpty()) + { + DataPlacements placements = calculatePlacements(metadata.transformer() + .proposeToken(joining, tokens) + .build() + .metadata, + keyspaces); + PlacementDeltas.Builder toStart = PlacementDeltas.builder(placements.size()); + placements.withDistributed((params, placement) -> { + toStart.put(params, DataPlacement.empty().difference(placements.get(params))); + }); + return new PlacementTransitionPlan(toStart.build(), + PlacementDeltas.empty(), + PlacementDeltas.empty(), + PlacementDeltas.empty()); + } + + DataPlacements base = calculatePlacements(metadata, keyspaces); + DataPlacements start = splitRanges(metadata.tokenMap, metadata.tokenMap.assignTokens(joining, tokens), base); + + DataPlacements finalPlacements = calculatePlacements(metadata.transformer() + .proposeToken(joining, tokens) + .build() + .metadata, + keyspaces); + + DataPlacements maximalPlacements = start.combineReplicaGroups(finalPlacements); + + PlacementDeltas.Builder toStart = PlacementDeltas.builder(base.size()); + PlacementDeltas.Builder toMaximal = PlacementDeltas.builder(base.size()); + PlacementDeltas.Builder toFinal = PlacementDeltas.builder(base.size()); + + start.withDistributed((params, startPlacement) -> { + toStart.put(params, base.get(params).difference(startPlacement)); + toMaximal.put(params, startPlacement.difference(maximalPlacements.get(params))); + toFinal.put(params, maximalPlacements.get(params).difference(finalPlacements.get(params))); + }); + + return new PlacementTransitionPlan(toStart.build(), toMaximal.build(), toFinal.build(), PlacementDeltas.empty()); + } + + @Override + public PlacementTransitionPlan planForMove(ClusterMetadata metadata, NodeId nodeId, Set<Token> tokens, Keyspaces keyspaces) + { + DataPlacements base = calculatePlacements(metadata, keyspaces); + + TokenMap withMoved = metadata.transformer() + .proposeToken(nodeId, tokens) + .build().metadata.tokenMap; + // Introduce new tokens, but do not change the ownership just yet + DataPlacements start = splitRanges(metadata.tokenMap, withMoved, base); + + DataPlacements withoutLeaving = calculatePlacements(metadata.transformer() + .unproposeTokens(nodeId) + .proposeToken(nodeId, tokens) + .build().metadata, + keyspaces); + + // Old tokens owned by the move target are now owned by the next-in-ring; + // Move target now owns its newly assigned tokens + DataPlacements end = splitRanges(metadata.tokenMap, withMoved, withoutLeaving); + + DataPlacements maximal = start.combineReplicaGroups(end); + + + PlacementDeltas.Builder split = PlacementDeltas.builder(); + PlacementDeltas.Builder toMaximal = PlacementDeltas.builder(); + PlacementDeltas.Builder toFinal = PlacementDeltas.builder(); + PlacementDeltas.Builder merge = PlacementDeltas.builder(); + + base.withDistributed((params, placement) -> { + split.put(params, placement.difference(start.get(params))); + }); + + maximal.withDistributed((params, placement) -> { + toMaximal.put(params, start.get(params).difference(placement)); + toFinal.put(params, placement.difference(end.get(params))); + }); + + end.withDistributed((params, placement) -> { + merge.put(params, placement.difference(withoutLeaving.get(params))); + }); + + return new PlacementTransitionPlan(split.build(), toMaximal.build(), toFinal.build(), merge.build()); + } + + @Override + public PlacementTransitionPlan planForDecommission(ClusterMetadata metadata, + NodeId nodeId, + Keyspaces keyspaces) + { + // Determine the set of placements to start from. This is the canonical set of placements based on the current + // TokenMap and collection of Keyspaces/ReplicationParams. + List<Range<Token>> currentRanges = calculateRanges(metadata.tokenMap); + DataPlacements start = calculatePlacements(currentRanges, metadata, keyspaces); + + ClusterMetadata proposed = metadata.transformer() + .unproposeTokens(nodeId) + .build() + .metadata; + DataPlacements withoutLeaving = calculatePlacements(proposed, keyspaces); + + DataPlacements finalPlacement = splitRanges(proposed.tokenMap, metadata.tokenMap, + withoutLeaving); + + DataPlacements maximal = start.combineReplicaGroups(finalPlacement); + + PlacementDeltas.Builder toMaximal = PlacementDeltas.builder(start.size()); + PlacementDeltas.Builder toFinal = PlacementDeltas.builder(start.size()); + PlacementDeltas.Builder merge = PlacementDeltas.builder(start.size()); + + maximal.withDistributed((params, maxPlacement) -> { + // Add new nodes as replicas + PlacementDeltas.PlacementDelta maxDelta = start.get(params).difference(maxPlacement); + toMaximal.put(params, maxDelta); + + PlacementDeltas.PlacementDelta leftDelta = maxPlacement.difference(finalPlacement.get(params)); + toFinal.put(params, leftDelta); + + PlacementDeltas.PlacementDelta finalDelta = finalPlacement.get(params).difference(withoutLeaving.get(params)); + merge.put(params, finalDelta); + }); + + return new PlacementTransitionPlan(PlacementDeltas.empty(), toMaximal.build(), toFinal.build(), merge.build()); + } + + @Override + public PlacementTransitionPlan planForReplacement(ClusterMetadata metadata, + NodeId replaced, + NodeId replacement, + Keyspaces keyspaces) + { + DataPlacements startPlacements = calculatePlacements(metadata, keyspaces); + DataPlacements finalPlacements = calculatePlacements(metadata.transformer() + .unproposeTokens(replaced) + .proposeToken(replacement, metadata.tokenMap.tokens(replaced)) + .build() + .metadata, + keyspaces); + DataPlacements maximalPlacements = startPlacements.combineReplicaGroups(finalPlacements); + + PlacementDeltas.Builder toMaximal = PlacementDeltas.builder(startPlacements.size()); + PlacementDeltas.Builder toFinal = PlacementDeltas.builder(startPlacements.size()); + + maximalPlacements.withDistributed((params, max) -> { + toMaximal.put(params, startPlacements.get(params).difference(max)); + toFinal.put(params, max.difference(finalPlacements.get(params))); + }); + + return new PlacementTransitionPlan(PlacementDeltas.empty(), toMaximal.build(), toFinal.build(), PlacementDeltas.empty()); + } + + public DataPlacements splitRanges(TokenMap current, + TokenMap proposed, + DataPlacements currentPlacements) + { + ImmutableList<Token> currentTokens = current.tokens(); + ImmutableList<Token> proposedTokens = proposed.tokens(); + if (currentTokens.isEmpty() || currentTokens.equals(proposedTokens)) + { + return currentPlacements; + } + else + { + if (!proposedTokens.containsAll(currentTokens)) + throw new IllegalArgumentException("Proposed tokens must be superset of existing tokens"); + // we need to split some existing ranges, so apply the new set of tokens to the current canonical + // placements to get a set of placements with the proposed ranges but the current replicas + return splitRangesForAllPlacements(proposedTokens, currentPlacements); + } + } + + @VisibleForTesting + DataPlacements splitRangesForAllPlacements(List<Token> proposedTokens, DataPlacements current) + { + DataPlacements.Builder builder = DataPlacements.builder(current.size()); + current.asMap().forEach((params, placement) -> { + // Don't split ranges for local-only placements + if (params.isLocal() || params.isMeta()) + builder.with(params, placement); + else + builder.with(params, placement.splitRangesForPlacement(proposedTokens)); + }); + return builder.build(); + } + + public DataPlacements calculatePlacements(ClusterMetadata metadata, Keyspaces keyspaces) + { + if (metadata.tokenMap.tokens().isEmpty()) + return DataPlacements.empty(); + + return calculatePlacements(keyspaces, metadata); + } + + private DataPlacements calculatePlacements(Keyspaces keyspaces, ClusterMetadata metadata) + { + List<Range<Token>> ranges = calculateRanges(metadata.tokenMap); + return calculatePlacements(ranges, metadata, keyspaces); + } + + public List<Range<Token>> calculateRanges(TokenMap tokenMap) + { + List<Token> tokens = tokenMap.tokens(); + List<Range<Token>> ranges = new ArrayList<>(tokens.size() + 1); + maybeAdd(ranges, new Range<>(tokenMap.partitioner().getMinimumToken(), tokens.get(0))); + for (int i = 1; i < tokens.size(); i++) + maybeAdd(ranges, new Range<>(tokens.get(i - 1), tokens.get(i))); + maybeAdd(ranges, new Range<>(tokens.get(tokens.size() - 1), tokenMap.partitioner().getMinimumToken())); + if (ranges.isEmpty()) + ranges.add(new Range<>(tokenMap.partitioner().getMinimumToken(), tokenMap.partitioner().getMinimumToken())); + return ranges; + } + + public DataPlacements calculatePlacements(List<Range<Token>> ranges, + ClusterMetadata metadata, + Keyspaces keyspaces) + { + Map<ReplicationParams, DataPlacement> placements = new HashMap<>(); + for (KeyspaceMetadata ksMetadata : keyspaces) + { + logger.trace("Calculating data placements for {}", ksMetadata.name); + AbstractReplicationStrategy replication = ksMetadata.replicationStrategy; + ReplicationParams params = ksMetadata.params.replication; + placements.computeIfAbsent(params, p -> replication.calculateDataPlacement(ranges, metadata)); + } + + return DataPlacements.builder(placements).build(); + } + + private void maybeAdd(List<Range<Token>> ranges, Range<Token> r) + { + if (r.left.compareTo(r.right) != 0) + ranges.add(r); + } +} \ No newline at end of file diff --git a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java b/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java index 8ae2302527..82ba7117ab 100644 --- a/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java +++ b/src/java/org/apache/cassandra/tcm/transformations/cms/EntireRange.java @@ -18,12 +18,15 @@ package org.apache.cassandra.tcm.transformations.cms; - import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.PlacementForRange; +import org.apache.cassandra.utils.FBUtilities; public class EntireRange { @@ -34,4 +37,7 @@ public class EntireRange return new Replica(addr, entireRange, true); } + public static final EndpointsForRange localReplicas = EndpointsForRange.of(replica(FBUtilities.getBroadcastAddressAndPort())); + public static final DataPlacement placement = new DataPlacement(PlacementForRange.builder().withReplicaGroup(localReplicas).build(), + PlacementForRange.builder().withReplicaGroup(localReplicas).build()); } \ No newline at end of file diff --git a/src/java/org/apache/cassandra/utils/BiMultiValMap.java b/src/java/org/apache/cassandra/utils/BiMultiValMap.java index 7a87217967..f439c5c496 100644 --- a/src/java/org/apache/cassandra/utils/BiMultiValMap.java +++ b/src/java/org/apache/cassandra/utils/BiMultiValMap.java @@ -20,6 +20,7 @@ package org.apache.cassandra.utils; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import com.google.common.collect.HashMultimap; @@ -143,4 +144,19 @@ public class BiMultiValMap<K, V> implements Map<K, V> { return reverseMap.keySet(); } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof BiMultiValMap)) return false; + BiMultiValMap<?, ?> that = (BiMultiValMap<?, ?>) o; + return forwardMap.equals(that.forwardMap) && reverseMap.equals(that.reverseMap); + } + + @Override + public int hashCode() + { + return Objects.hash(forwardMap, reverseMap); + } } diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java index 81d6694c72..7693387110 100644 --- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java @@ -49,6 +49,7 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.TokenMetadata.Topology; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.locator.NetworkTopologyStrategy.REPLICATION_FACTOR; @@ -327,7 +328,7 @@ public class NetworkTopologyStrategyTest for (Map.Entry<String, Integer> dc : datacenters.entrySet()) skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddressAndPort>()); - Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false); + Iterator<Token> tokenIter = TokenRingUtils.ringIterator(tokenMetadata.sortedTokens(), searchToken, false); while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints, datacenters)) { Token next = tokenIter.next(); diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java index c1b76b3bfc..44aa63d678 100644 --- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java +++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java @@ -39,6 +39,7 @@ import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Token; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -72,7 +73,7 @@ public class TokenMetadataTest private static void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected) { ArrayList<Token> actual = new ArrayList<>(); - Iterators.addAll(actual, TokenMetadata.ringIterator(ring, token(start), includeMin)); + Iterators.addAll(actual, TokenRingUtils.ringIterator(ring, token(start), includeMin)); assertEquals(actual.toString(), expected.length, actual.size()); for (int i = 0; i < expected.length; i++) assertEquals("Mismatch at index " + i + ": " + actual, token(expected[i]), actual.get(i)); diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java index b6d039b439..84aed2296e 100644 --- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java +++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java @@ -47,6 +47,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.SimpleSnitch; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import static org.junit.Assert.*; @@ -102,7 +103,7 @@ public class LeaveAndBootstrapTest for (Token token : keyTokens) { List<InetAddressAndPort> endpoints = new ArrayList<>(); - Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), token, false); + Iterator<Token> tokenIter = TokenRingUtils.ringIterator(tmd.sortedTokens(), token, false); while (tokenIter.hasNext()) { endpoints.add(tmd.getEndpoint(tokenIter.next())); diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index 6dce8f3398..5260e75ce4 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -68,6 +68,7 @@ import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Tables; +import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -608,7 +609,7 @@ public class MoveTest for (Token token : keyTokens) { List<InetAddressAndPort> endpoints = new ArrayList<>(); - Iterator<Token> tokenIter = TokenMetadata.ringIterator(tmd.sortedTokens(), token, false); + Iterator<Token> tokenIter = TokenRingUtils.ringIterator(tmd.sortedTokens(), token, false); while (tokenIter.hasNext()) { endpoints.add(tmd.getEndpoint(tokenIter.next())); diff --git a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java index e015f67283..c858c2ab06 100644 --- a/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java +++ b/test/unit/org/apache/cassandra/tcm/membership/MembershipUtils.java @@ -25,6 +25,16 @@ import org.apache.cassandra.locator.InetAddressAndPort; public class MembershipUtils { + public static NodeAddresses nodeAddresses(Random random) + { + return nodeAddresses(randomEndpoint(random)); + } + + public static NodeAddresses nodeAddresses(InetAddressAndPort endpoint) + { + return new NodeAddresses(endpoint, endpoint, endpoint); + } + public static InetAddressAndPort randomEndpoint(Random random) { return endpoint(random.nextInt(254) + 1); diff --git a/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java b/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java new file mode 100644 index 0000000000..71e4361475 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/ownership/OwnershipUtils.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesByEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.ReplicationParams; + +import static org.apache.cassandra.tcm.membership.MembershipUtils.endpoint; +import static org.apache.cassandra.tcm.membership.MembershipUtils.randomEndpoint; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +public class OwnershipUtils +{ + public static Token token(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + public static RangesByEndpoint emptyReplicas() + { + return new RangesByEndpoint.Builder().build(); + } + + public static RangesByEndpoint trivialReplicas(InetAddressAndPort endpoint, Range<Token> range) + { + RangesByEndpoint.Builder b = new RangesByEndpoint.Builder(); + b.put(endpoint, Replica.fullReplica(endpoint, range)); + return b.build(); + } + + public static PlacementDeltas deltas(DataPlacements first, DataPlacements second) + { + assert first.asMap().keySet().equals(second.asMap().keySet()); + + PlacementDeltas.Builder deltas = PlacementDeltas.builder(first.size()); + first.asMap().forEach((params, placement) -> { + deltas.put(params, placement.difference(second.get(params))); + }); + return deltas.build(); + } + + public static DataPlacements placements(List<Range<Token>> ranges, + Set<ReplicationParams> replication, + Random random) + { + DataPlacements.Builder allPlacements = DataPlacements.builder(replication.size()); + replication.forEach((params) -> { + assertSame("Only simple replication params are necessary/permitted here", SimpleStrategy.class, params.klass); + String s = params.options.get("replication_factor"); + assertNotNull(s); + int rf = Integer.parseInt(s); + DataPlacement.Builder placement = DataPlacement.builder(); + for (Range<Token> range : ranges) + { + // pick rf random nodes to be replicas, no duplicates + Set<InetAddressAndPort> replicas = new HashSet<>(rf); + while (replicas.size() < rf) + replicas.add(randomEndpoint(random)); + + replicas.forEach(e -> { + Replica replica = Replica.fullReplica(e, range); + placement.withReadReplica(replica).withWriteReplica(replica); + }); + } + allPlacements.with(params, placement.build()); + }); + return allPlacements.build(); + } + + public static List<Range<Token>> ranges(Random random) + { + // min 10, max 40 ranges + int count = random.nextInt(30) + 10; + Set<Long> tokens = new HashSet<>(count); + while(tokens.size() < count-1) + tokens.add(random.nextLong()); + List<Long> sorted = new ArrayList<>(tokens); + Collections.sort(sorted); + + List<Range<Token>> ranges = new ArrayList<>(count); + ranges.add(new Range<>(Murmur3Partitioner.MINIMUM, token(sorted.get(0)))); + for (int i = 1; i < sorted.size(); i++) + ranges.add(new Range<>(token(sorted.get(i-1)), token(sorted.get(i)))); + ranges.add(new Range<>(token(sorted.get(sorted.size() - 1)), Murmur3Partitioner.MINIMUM)); + return ranges; + } + + public static DataPlacements randomPlacements(Random random) + { + Set<ReplicationParams> replication = ImmutableSet.of(KeyspaceParams.simple(1).replication, + KeyspaceParams.simple(2).replication, + KeyspaceParams.simple(3).replication); + return placements(ranges(random), replication, random); + } + + public static EndpointsForRange rg(long t0, long t1, int...replicas) + { + Range<Token> range = new Range<>(token(t0), token(t1)); + EndpointsForRange.Builder builder = EndpointsForRange.builder(range); + for (int i : replicas) + builder.add(Replica.fullReplica(endpoint((byte)i), range)); + return builder.build(); + } + +} diff --git a/test/unit/org/apache/cassandra/tcm/ownership/PlacementDeltasTest.java b/test/unit/org/apache/cassandra/tcm/ownership/PlacementDeltasTest.java new file mode 100644 index 0000000000..6245c792f0 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/ownership/PlacementDeltasTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import org.junit.Test; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.RangesByEndpoint; +import org.apache.cassandra.schema.ReplicationParams; + +import static org.apache.cassandra.tcm.membership.MembershipUtils.endpoint; +import static org.apache.cassandra.tcm.ownership.OwnershipUtils.emptyReplicas; +import static org.apache.cassandra.tcm.ownership.OwnershipUtils.token; +import static org.apache.cassandra.tcm.ownership.OwnershipUtils.trivialReplicas; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class PlacementDeltasTest +{ + private static final ReplicationParams key = ReplicationParams.simple(1); + private static final InetAddressAndPort P1 = endpoint(1); + private static final InetAddressAndPort P2 = endpoint(2); + private static final InetAddressAndPort P3 = endpoint(3); + private static final InetAddressAndPort P4 = endpoint(4); + private static final Range<Token> R1 = new Range<>(token(0), token(100)); + private static final Range<Token> R2 = new Range<>(token(100), token(200)); + private static final Range<Token> R_INT = new Range<>(token(50), token(150)); + + + @Test + public void mergeDisjointDeltas() + { + // Combine 2 Deltas with disjoint removals (and no additions), for the same ReplicationParams. + // Verify that the resulting merged Delta contains the removals/additions from both. + RangesByEndpoint group1 = trivialReplicas(P1, R1); + RangesByEndpoint group2 = trivialReplicas(P2, R2); + + Delta d1 = new Delta(group1, emptyReplicas()); + Delta d2 = new Delta(group2, emptyReplicas()); + PlacementDeltas.PlacementDelta merged = PlacementDeltas.builder(1) + .put(key, new PlacementDeltas.PlacementDelta(d1, d1)) + .put(key, new PlacementDeltas.PlacementDelta(d2, d2)) + .build() + .get(key); + + for (Delta delta : new Delta[]{ merged.reads, merged.writes }) + { + assertTrue(delta.additions.isEmpty()); + assertEquals(group1.get(P1), delta.removals.get(P1)); + assertEquals(group2.get(P2), delta.removals.get(P2)); + } + } + + @Test + public void mergeDisjointReplicasForSameEndpoint() + { + // Combine 2 Deltas which both contain removals for the same endpoint, but for disjoint ranges. + RangesByEndpoint group1 = trivialReplicas(P1, R1); + RangesByEndpoint group2 = trivialReplicas(P1, R2); + + Delta d1 = new Delta(group1, emptyReplicas()); + Delta d2 = new Delta(group2, emptyReplicas()); + PlacementDeltas.PlacementDelta merged = PlacementDeltas.builder(1) + .put(key, new PlacementDeltas.PlacementDelta(d1, d1)) + .put(key, new PlacementDeltas.PlacementDelta(d2, d2)) + .build() + .get(key); + + for (Delta delta : new Delta[]{ merged.reads, merged.writes }) + { + assertEquals(1, delta.removals.keySet().size()); + RangesAtEndpoint mergedGroup = delta.removals.get(P1); + + assertEquals(2, mergedGroup.size()); + group1.flattenValues().forEach(r -> assertTrue(mergedGroup.contains(r))); + group2.flattenValues().forEach(r -> assertTrue(mergedGroup.contains(r))); + } + } + + @Test + public void mergeIdenticalReplicasForSameEndpoint() + { + // Combine 2 Deltas which both contain identical removals for the same endpoint. + // Effectively a noop. + RangesByEndpoint group1 = trivialReplicas(P1, R1); + + Delta d1 = new Delta(group1, emptyReplicas()); + Delta d2 = new Delta(group1, emptyReplicas()); + PlacementDeltas.PlacementDelta merged = PlacementDeltas.builder(1) + .put(key, new PlacementDeltas.PlacementDelta(d1, d1)) + .put(key, new PlacementDeltas.PlacementDelta(d2, d2)) + .build() + .get(key); + + for (Delta delta : new Delta[]{ merged.reads, merged.writes }) + { + assertEquals(1, delta.removals.keySet().size()); + RangesAtEndpoint mergedGroup = delta.removals.get(P1); + assertEquals(1, mergedGroup.size()); + group1.flattenValues().forEach(r -> assertTrue(mergedGroup.contains(r))); + } + } + + @Test + public void mergeIntersectingReplicasForSameEndpoint() + { + // Combine 2 Deltas which both contain replicas for a common endpoint, but with intersecting ranges. + // TODO there isn't an obvious reason to support this, so perhaps we should be conservative and + // explicitly reject it + RangesByEndpoint group1 = trivialReplicas(P1, R1); + RangesByEndpoint group2 = trivialReplicas(P1, R_INT); + + Delta d1 = new Delta(group1, emptyReplicas()); + Delta d2 = new Delta(group2, emptyReplicas()); + PlacementDeltas.PlacementDelta merged = PlacementDeltas.builder(1) + .put(key, new PlacementDeltas.PlacementDelta(d1, d1)) + .put(key, new PlacementDeltas.PlacementDelta(d2, d2)) + .build().get(key); + + for (Delta delta : new Delta[]{ merged.reads, merged.writes }) + { + assertEquals(1, delta.removals.keySet().size()); + RangesAtEndpoint mergedGroup = delta.removals.get(P1); + assertEquals(2, mergedGroup.size()); + group1.flattenValues().forEach(r -> assertTrue(mergedGroup.contains(r))); + group2.flattenValues().forEach(r -> assertTrue(mergedGroup.contains(r))); + } + } + + @Test + public void invertSingleDelta() + { + RangesByEndpoint group1 = trivialReplicas(P1, R1); + RangesByEndpoint group2 = trivialReplicas(P2, R2); + + Delta d1 = new Delta(group1, group2); + Delta d2 = new Delta(group2, group1); + + assertEquals(d1, d2.invert()); + assertEquals(d2, d2.invert().invert()); + } + + @Test + public void invertEmptyDelta() + { + Delta d = Delta.empty(); + assertEquals(d, d.invert()); + } + + @Test + public void invertPartiallyEmptyDelta() + { + RangesByEndpoint group1 = trivialReplicas(P1, R1); + RangesByEndpoint group2 = trivialReplicas(P2, R1); + + Delta additions = new Delta(emptyReplicas(), group1); + Delta inverted = additions.invert(); + assertEquals(RangesByEndpoint.EMPTY, inverted.additions); + assertEquals(additions.additions, inverted.removals); + + Delta removals = new Delta(group2, emptyReplicas()); + inverted = removals.invert(); + assertEquals(RangesByEndpoint.EMPTY, inverted.removals); + assertEquals(removals.removals, inverted.additions); + } + + @Test + public void invertPlacementDelta() + { + RangesByEndpoint group1 = trivialReplicas(P1, R1); + RangesByEndpoint group2 = trivialReplicas(P2, R1); + Delta d1 = new Delta(group1, group2); + + RangesByEndpoint group3 = trivialReplicas(P3, R1); + RangesByEndpoint group4 = trivialReplicas(P4, R2); + Delta d2 = new Delta(group3, group4); + + PlacementDeltas.PlacementDelta pd1 = new PlacementDeltas.PlacementDelta(d1,d2); + PlacementDeltas.PlacementDelta pd2 = new PlacementDeltas.PlacementDelta(d1.invert(), d2.invert()); + assertEquals(pd2, pd1.invert()); + } +} diff --git a/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementEquivalenceTest.java b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementEquivalenceTest.java new file mode 100644 index 0000000000..6f31d8539e --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementEquivalenceTest.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; + +import com.google.common.collect.ImmutableMap; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.IEndpointSnitch; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.NetworkTopologyStrategy; +import org.apache.cassandra.locator.RangesByEndpoint; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.MembershipUtils; +import org.apache.cassandra.tcm.membership.NodeId; + +import static org.apache.cassandra.tcm.membership.MembershipUtils.nodeAddresses; +import static org.junit.Assert.assertEquals; + +public class UniformRangePlacementEquivalenceTest +{ + private static Map<String, Integer> DATACENTERS = ImmutableMap.of("rf1", 1, "rf3", 3, "rf5_1", 5, "rf5_2", 5, "rf5_3", 5); + private static final String KEYSPACE = "ks"; + + @BeforeClass + public static void setupClass() throws Exception + { + SchemaLoader.prepareServer(); + } + + @Test + public void testSSPlacement() + { + int[] rfValues = new int[] {1, 2, 3, 5, 12}; + for (int rf : rfValues) + { + KeyspaceParams params = KeyspaceParams.simple(rf); + KeyspaceMetadata ksm = KeyspaceMetadata.create(KEYSPACE, params); + BiFunction<TokenMetadata, IEndpointSnitch, AbstractReplicationStrategy> strategy = ssProvider(params.replication); + testCalculateEndpoints(1, ksm, strategy); + testCalculateEndpoints(16, ksm, strategy); + testCalculateEndpoints(64, ksm, strategy); + } + } + + @Test + public void testNTSPlacement() + { + KeyspaceParams params = toNTSParams(DATACENTERS); + KeyspaceMetadata ksm = KeyspaceMetadata.create(KEYSPACE, params); + BiFunction<TokenMetadata, IEndpointSnitch, AbstractReplicationStrategy> strategy = ntsProvider(params.replication); + testCalculateEndpoints(1, ksm, strategy); + testCalculateEndpoints(16, ksm, strategy); + testCalculateEndpoints(64, ksm, strategy); + } + + private void testCalculateEndpoints(int tokensPerNode, + KeyspaceMetadata keyspace, + BiFunction<TokenMetadata, IEndpointSnitch, AbstractReplicationStrategy> strategy) + { + final int NODES = 100; + final int VNODES = tokensPerNode; + final int RUNS = 10; + + List<InetAddressAndPort> endpoints = nodes(NODES); + + for (int run = 0; run < RUNS; ++run) + { + Random rand = new Random(); + IEndpointSnitch snitch = generateSnitch(DATACENTERS, endpoints, rand); + DatabaseDescriptor.setEndpointSnitch(snitch); + TokenMetadata tokenMetadata = new TokenMetadata(snitch); + Directory directory = directory(endpoints, snitch); + ClusterMetadata metadata = new ClusterMetadata(Murmur3Partitioner.instance, directory); + + + for (int i = 0; i < NODES; ++i) + metadata = joinNode(metadata, tokenMetadata, endpoints, i, VNODES, rand); + + AbstractReplicationStrategy strat = strategy.apply(tokenMetadata, snitch); + Function<Token, List<InetAddressAndPort>> oldLocatorFn = token -> new ArrayList<>(strat.calculateNaturalReplicas(token, tokenMetadata).endpoints()); + + UniformRangePlacement layout = new UniformRangePlacement(); + DataPlacement placement = layout.calculatePlacements(metadata, Keyspaces.of(keyspace)).get(keyspace.params.replication); + Function<Token, List<InetAddressAndPort>> newLocatorFn = t -> placement.reads.forToken(t).endpointList(); + + testEquivalence(oldLocatorFn, newLocatorFn, rand); + } + } + + @Test + public void testSSPlacementTransformation() + { + int[] rfValues = new int[] {1, 2, 3, 5, 12}; + for (int rf : rfValues) + { + KeyspaceParams params = KeyspaceParams.simple(rf); + KeyspaceMetadata ksm = KeyspaceMetadata.create(KEYSPACE, params); + BiFunction<TokenMetadata, IEndpointSnitch, AbstractReplicationStrategy> strategy = ssProvider(params.replication); + doTransformationTest(1, ksm, strategy); + doTransformationTest(16, ksm, strategy); + doTransformationTest(64, ksm, strategy); + } + } + + @Test + public void testNTSPlacementTransformation() + { + KeyspaceParams params = toNTSParams(DATACENTERS); + KeyspaceMetadata ksm = KeyspaceMetadata.create(KEYSPACE, params); + BiFunction<TokenMetadata, IEndpointSnitch, AbstractReplicationStrategy> strategy = ntsProvider(params.replication); + doTransformationTest(1, ksm, strategy); + doTransformationTest(16, ksm, strategy); + doTransformationTest(64, ksm, strategy); + } + + private void doTransformationTest(int tokensPerNode, + KeyspaceMetadata ksm, + BiFunction<TokenMetadata, IEndpointSnitch, AbstractReplicationStrategy> strategy) + { + final int NODES = 100; + final int VNODES = tokensPerNode; + + UniformRangePlacement layout = new UniformRangePlacement(); + Keyspaces keyspaces = Keyspaces.of(ksm); + long seed = System.nanoTime(); + String log = String.format("Running transformation test with params; " + + "Seed: %d, Nodes: %d, VNodes: %d, Replication: %s", + seed, NODES, VNODES, ksm.params.replication); + System.out.println(log); + Random rand = new Random(seed); + + // initialise nodes for the test + List<InetAddressAndPort> endpoints = nodes(NODES); + IEndpointSnitch snitch = generateSnitch(DATACENTERS, endpoints, rand); + DatabaseDescriptor.setEndpointSnitch(snitch); + TokenMetadata tokenMetadata = new TokenMetadata(snitch); + Directory directory = directory(endpoints, snitch); + ClusterMetadata metadata = new ClusterMetadata(Murmur3Partitioner.instance, directory); + + // join all but one of the nodes + for (int i = 0; i < NODES - 1; ++i) + metadata = joinNode(metadata, tokenMetadata, endpoints, i, VNODES, rand); + + + // verify that the old and new placement/location methods agree + PlacementForRange p1 = layout.calculatePlacements(metadata, keyspaces).get(ksm.params.replication).reads; + AbstractReplicationStrategy strat = strategy.apply(tokenMetadata, snitch); + Function<Token, List<InetAddressAndPort>> oldLocatorFn = oldLocatorFn(strat, tokenMetadata); + Function<Token, List<InetAddressAndPort>> newLocatorFn = t -> p1.forToken(t).endpointList(); + testEquivalence(oldLocatorFn, newLocatorFn, rand); + + // now add the remaining node + metadata = joinNode(metadata, tokenMetadata, endpoints, NODES - 1, VNODES, rand); + + // re-check the placements + PlacementForRange p2 = layout.calculatePlacements(metadata, keyspaces).get(ksm.params.replication).reads; + strat = strategy.apply(tokenMetadata, snitch); + oldLocatorFn = oldLocatorFn(strat, tokenMetadata); + newLocatorFn = t -> p2.forToken(t).endpointList(); + testEquivalence(oldLocatorFn, newLocatorFn, rand); + + // get the specific operations needed to transform the first placement with + // the initial nodes to the new one with all nodes. Then apply those operations + // to the first placement and assert the result matches the second placement + // which was directly calculated with all nodes having joined. + Delta delta = p1.difference(p2); + PlacementForRange p3 = p1.without(delta.removals).with(delta.additions); + newLocatorFn = t -> p3.forToken(t).endpointList(); + testEquivalence(oldLocatorFn, newLocatorFn, rand); + testRangeEquivalence(p2, p3); + } + + private Directory directory(List<InetAddressAndPort> endpoints, IEndpointSnitch snitch) + { + Directory directory = new Directory(); + for (InetAddressAndPort endpoint : endpoints) + directory = directory.with(nodeAddresses(endpoint), new Location(snitch.getDatacenter(endpoint), snitch.getRack(endpoint))); + return directory; + } + + private ClusterMetadata joinNode(ClusterMetadata metadata, + TokenMetadata tokenMetadata, + List<InetAddressAndPort> endpoints, + int peerIndex, + int tokensPerNode, + Random rand) + { + return joinNode(metadata, + tokenMetadata, + endpoints, + peerIndex, + tokensPerNode, + () -> Murmur3Partitioner.instance.getRandomToken(rand)); + } + + private ClusterMetadata joinNode(ClusterMetadata metadata, + TokenMetadata tokenMetadata, + List<InetAddressAndPort> endpoints, + int peerIndex, + int tokensPerNode, + Supplier<Token> token) + { + InetAddressAndPort endpoint = endpoints.get(peerIndex); + NodeId id = metadata.directory.peerId(endpoint); + Set<Token> tokens = new HashSet<>(); + for (int j = 0; j < tokensPerNode; ++j) + tokens.add(token.get()); + tokenMetadata.updateNormalTokens(tokens, endpoint); + tokenMetadata.updateHostId(id.uuid, endpoint); + return metadata.transformer().proposeToken(id, tokens).build().metadata; + } + + private void testEquivalence(Function<Token, List<InetAddressAndPort>> oldLocatorFn, + Function<Token, List<InetAddressAndPort>> newLocatorFn, + Random rand) + { + for (int i=0; i<1000; ++i) + { + Token token = Murmur3Partitioner.instance.getRandomToken(rand); + List<InetAddressAndPort> expected = oldLocatorFn.apply(token); + List<InetAddressAndPort> actual = newLocatorFn.apply(token); + if (endpointsDiffer(expected, actual)) + { + System.err.println("Endpoints mismatch for token " + token); + System.err.println(" expected: " + expected); + System.err.println(" actual : " + actual); + assertEquals("Endpoints for token " + token + " mismatch.", expected, actual); + } + } + } + + private boolean endpointsDiffer(List<InetAddressAndPort> ep1, List<InetAddressAndPort> ep2) + { + if (ep1.equals(ep2)) + return false; + // Because the old algorithm does not put the nodes in the correct order in the case where more replicas + // are required than there are racks in a dc, we accept different order as long as the primary + // replica is the same. + if (!ep1.get(0).equals(ep2.get(0))) + return true; + Set<InetAddressAndPort> s1 = new HashSet<>(ep1); + Set<InetAddressAndPort> s2 = new HashSet<>(ep2); + return !s1.equals(s2); + } + + private void testRangeEquivalence(PlacementForRange p1, PlacementForRange p2) + { + RangesByEndpoint byEndpoint1 = p1.byEndpoint(); + RangesByEndpoint byEndpoint2 = p2.byEndpoint(); + assertEquals(byEndpoint1.keySet(), byEndpoint2.keySet()); + + for (InetAddressAndPort endpoint : byEndpoint1.keySet()) + { + assertEquals(Range.normalize(byEndpoint1.get(endpoint).ranges()), + Range.normalize(byEndpoint2.get(endpoint).ranges())); + } + } + + private Function<Token, List<InetAddressAndPort>> oldLocatorFn(AbstractReplicationStrategy strategy, + TokenMetadata tokenMetadata) + { + return token -> new ArrayList<>(strategy.calculateNaturalReplicas(token, tokenMetadata).endpoints()); + } + + private BiFunction<TokenMetadata, IEndpointSnitch, AbstractReplicationStrategy> ssProvider(ReplicationParams params) + { + return (tokenMetadata, snitch) -> { + Map<String, String> p = new HashMap<>(params.options); + p.remove(ReplicationParams.CLASS); + return new SimpleStrategy(KEYSPACE, tokenMetadata, snitch, p); + }; + } + + private BiFunction<TokenMetadata, IEndpointSnitch, AbstractReplicationStrategy> ntsProvider(ReplicationParams params) + { + return (tokenMetadata, snitch) -> { + Map<String, String> p = new HashMap<>(params.options); + p.remove(ReplicationParams.CLASS); + return new NetworkTopologyStrategy(KEYSPACE, tokenMetadata, snitch, p); + }; + } + + private List<InetAddressAndPort> nodes(int count) + { + List<InetAddressAndPort> nodes = new ArrayList<>(count); + for (byte i=1; i<=count; ++i) + nodes.add(MembershipUtils.endpoint(i)); + return nodes; + } + + private KeyspaceParams toNTSParams(Map<String, Integer> datacenters) + { + List<String> args = new ArrayList<>(datacenters.size() * 2); + datacenters.forEach((key, val) -> { args.add(key); args.add(Integer.toString(val));}); + return KeyspaceParams.nts(args.toArray()); + } + + IEndpointSnitch generateSnitch(Map<String, Integer> datacenters, Collection<InetAddressAndPort> nodes, Random rand) + { + final Map<InetAddressAndPort, String> nodeToRack = new HashMap<>(); + final Map<InetAddressAndPort, String> nodeToDC = new HashMap<>(); + Map<String, List<String>> racksPerDC = new HashMap<>(); + datacenters.forEach((dc, rf) -> racksPerDC.put(dc, randomRacks(rf, rand))); + int rf = datacenters.values().stream().mapToInt(x -> x).sum(); + String[] dcs = new String[rf]; + int pos = 0; + for (Map.Entry<String, Integer> dce : datacenters.entrySet()) + { + for (int i = 0; i < dce.getValue(); ++i) + dcs[pos++] = dce.getKey(); + } + + for (InetAddressAndPort node : nodes) + { + String dc = dcs[rand.nextInt(rf)]; + List<String> racks = racksPerDC.get(dc); + String rack = racks.get(rand.nextInt(racks.size())); + nodeToRack.put(node, rack); + nodeToDC.put(node, dc); + } + + return new AbstractNetworkTopologySnitch() + { + public String getRack(InetAddressAndPort endpoint) + { + return nodeToRack.get(endpoint); + } + + public String getDatacenter(InetAddressAndPort endpoint) + { + return nodeToDC.get(endpoint); + } + }; + } + + private List<String> randomRacks(int rf, Random rand) + { + int rc = rand.nextInt(rf * 3 - 1) + 1; + List<String> racks = new ArrayList<>(rc); + for (int i=0; i<rc; ++i) + racks.add(Integer.toString(i)); + return racks; + } +} diff --git a/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java new file mode 100644 index 0000000000..c18f74da75 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/ownership/UniformRangePlacementTest.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.junit.Test; + +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.schema.ReplicationParams; + +import static org.apache.cassandra.tcm.ownership.OwnershipUtils.rg; +import static org.apache.cassandra.tcm.ownership.OwnershipUtils.token; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class UniformRangePlacementTest +{ + + @Test + public void testSplittingPlacementWithSingleRange() + { + // Special case of splitting a range that was created because the initial token list contained a single token, + // which is either the min or max value in the token space. Any other single token would produce two ranges - + // (MIN, t] & (t, MAX] - but because (x, x] denotes a wraparound, this case produces (MIN, MAX] and we need to + // verify that we can safely split that when more tokens are introduced. This test supposes MIN = 0, MAX = 100 + PlacementForRange before = PlacementForRange.builder() + .withReplicaGroup(rg(0, 100, 1, 2, 3)) + .build(); + // existing token is MIN (i.e. 0 for the purposes of this test) + List<Token> tokens = ImmutableList.of(token(0), token(30), token(60), token(90)); + PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + assertPlacement(after, + rg(0, 30, 1, 2, 3), + rg(30, 60, 1, 2, 3), + rg(60, 90, 1, 2, 3), + rg(90, 100, 1, 2, 3)); + + + // existing token is MAX (i.e. 100 for the purposes of this test). + tokens = ImmutableList.of(token(30), token(60), token(90), token(100)); + after = PlacementForRange.splitRangesForPlacement(tokens, before); + assertPlacement(after, + rg(0, 30, 1, 2, 3), + rg(30, 60, 1, 2, 3), + rg(60, 90, 1, 2, 3), + rg(90, 100, 1, 2, 3)); + } + + @Test + public void testSplitSingleRange() + { + // Start with: (0,100] : (n1,n2,n3); + // (100,200] : (n1,n2,n3); + // (200,300] : (n1,n2,n3); + // (300,400] : (n1,n2,n3); + PlacementForRange before = initialPlacement(); + // split (100,200] to (100,150], (150,200] + List<Token> tokens = ImmutableList.of(token(100), token(150), token(200), token(300)); + PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + assertPlacement(after, + rg(0, 100, 1, 2, 3), + rg(100, 150, 1, 2, 3), + rg(150, 200, 1, 2, 3), + rg(200, 300, 1, 2, 3), + rg(300, 400, 1, 2, 3)); + } + + @Test + public void testSplitMultipleDisjointRanges() + { + // Start with: (0,100] : (n1,n2,n3); + // (100,200] : (n1,n2,n3); + // (200,300] : (n1,n2,n3); + // (300,400] : (n1,n2,n3); + PlacementForRange before = initialPlacement(); + // split (100,200] to (100,150],(150,200] + // and (200,300] to (200,250],(250,300] + List<Token> tokens = ImmutableList.of(token(100), token(150), token(200), token(250), token(300)); + PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + assertPlacement(after, + rg(0, 100, 1, 2, 3), + rg(100, 150, 1, 2, 3), + rg(150, 200, 1, 2, 3), + rg(200, 250, 1, 2, 3), + rg(250, 300, 1, 2, 3), + rg(300, 400, 1, 2, 3)); + } + + @Test + public void testSplitSingleRangeMultipleTimes() + { + // Start with: (0,100] : (n1,n2,n3); + // (100,200] : (n1,n2,n3); + // (200,300] : (n1,n2,n3); + // (300,400] : (n1,n2,n3); + PlacementForRange before = initialPlacement(); + // split (100,200] to (100,125],(125,150],(150,200] + List<Token> tokens = ImmutableList.of(token(100), token(125), token(150), token(200), token(300)); + PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + assertPlacement(after, + rg(0, 100, 1, 2, 3), + rg(100, 125, 1, 2, 3), + rg(125, 150, 1, 2, 3), + rg(150, 200, 1, 2, 3), + rg(200, 300, 1, 2, 3), + rg(300, 400, 1, 2, 3)); + } + + @Test + public void testSplitMultipleRangesMultipleTimes() + { + PlacementForRange before = initialPlacement(); + // split (100,200] to (100,125],(125,150],(150,200] + // and (200,300] to (200,225],(225,250],(250,300] + List<Token> tokens = ImmutableList.of(token(100), token(125), token(150), token(200), token(225), token(250), token(300)); + PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + assertPlacement(after, + rg(0, 100, 1, 2, 3), + rg(100, 125, 1, 2, 3), + rg(125, 150, 1, 2, 3), + rg(150, 200, 1, 2, 3), + rg(200, 225, 1, 2, 3), + rg(225, 250, 1, 2, 3), + rg(250, 300, 1, 2, 3), + rg(300, 400, 1, 2, 3)); + } + + @Test + public void testSplitLastRangeMultipleTimes() + { + // Start with: (0,100] : (n1,n2,n3); + // (100,200] : (n1,n2,n3); + // (200,300] : (n1,n2,n3); + // (300,400] : (n1,n2,n3); + PlacementForRange before = initialPlacement(); + // split (300,400] to (300,325],(325,350],(350,400] + List<Token> tokens = ImmutableList.of(token(100), token(200), token(300), token(325), token(350)); + PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + assertPlacement(after, + rg(0, 100, 1, 2, 3), + rg(100, 200, 1, 2, 3), + rg(200, 300, 1, 2, 3), + rg(300, 325, 1, 2, 3), + rg(325, 350, 1, 2, 3), + rg(350, 400, 1, 2, 3)); + } + + @Test + public void testSplitFirstRangeMultipleTimes() + { + // Start with: (0,100] : (n1,n2,n3); + // (100,200] : (n1,n2,n3); + // (200,300] : (n1,n2,n3); + // (300,400] : (n1,n2,n3); + PlacementForRange before = initialPlacement(); + // split (0,100] to (0,25],(25,50],(50,100] + List<Token> tokens = ImmutableList.of(token(25), token(50), token(100), token(200), token(300)); + PlacementForRange after = PlacementForRange.splitRangesForPlacement(tokens, before); + assertPlacement(after, + rg(0, 25, 1, 2, 3), + rg(25, 50, 1, 2, 3), + rg(50, 100, 1, 2, 3), + rg(100, 200, 1, 2, 3), + rg(200, 300, 1, 2, 3), + rg(300, 400, 1, 2, 3)); + } + + @Test + public void testCombiningPlacements() + { + EndpointsForRange[] firstGroups = { rg(0, 100, 1, 2, 3), + rg(100, 200, 1, 2, 3), + rg(200, 300, 1, 2, 3), + rg(300, 400, 1, 2, 3) }; + PlacementForRange p1 = PlacementForRange.builder().withReplicaGroups(Arrays.asList(firstGroups)).build(); + EndpointsForRange[] secondGroups = { rg(0, 100, 2, 3, 4), + rg(100, 200, 2, 3, 5), + rg(200, 300, 2, 3, 6), + rg(300, 400, 2, 3, 7) }; + PlacementForRange p2 = PlacementForRange.builder().withReplicaGroups(Arrays.asList(secondGroups)).build(); + + ReplicationParams params = ReplicationParams.simple(1); + DataPlacements map1 = DataPlacements.builder(1).with(params, new DataPlacement(p1, p1)).build(); + DataPlacements map2 = DataPlacements.builder(1).with(params, new DataPlacement(p2, p2)).build(); + DataPlacement p3 = map1.combineReplicaGroups(map2).get(params); + for (PlacementForRange placement : new PlacementForRange[]{ p3.reads, p3.writes }) + { + assertPlacement(placement, + rg(0, 100, 1, 2, 3, 4), + rg(100, 200, 1, 2, 3, 5), + rg(200, 300, 1, 2, 3, 6), + rg(300, 400, 1, 2, 3, 7)); + } + } + + @Test + public void testSplittingNeedsSorting() + { + List<Token> tokens = ImmutableList.of(token(-4611686018427387905L), token(-3L)); + EndpointsForRange[] initial = { rg(-9223372036854775808L, -4611686018427387905L, 1), + rg(-4611686018427387905L, -9223372036854775808L, 1)}; + DataPlacement.Builder builder = DataPlacement.builder(); + builder.writes.withReplicaGroups(Arrays.asList(initial)); + + DataPlacement initialPlacement = builder.build(); + DataPlacement split = initialPlacement.splitRangesForPlacement(tokens); + assertPlacement(split.writes, rg(-3, -9223372036854775808L, 1), + rg(-9223372036854775808L,-4611686018427387905L, 1), + rg(-4611686018427387905L, -3, 1)); + } + + @Test + public void testInitialFullWrappingRange() + { + /* + TOKENS=[-9223372036854775808, 3074457345618258602] PLACEMENT=[[Full(/127.0.0.1:7000,(-9223372036854775808,-9223372036854775808])]] + */ + List<Token> tokens = ImmutableList.of(token(-9223372036854775808L), token(3074457345618258602L)); + EndpointsForRange[] initial = { rg(-9223372036854775808L, -9223372036854775808L, 1)}; + + DataPlacement.Builder builder = DataPlacement.builder(); + builder.writes.withReplicaGroups(Arrays.asList(initial)); + + DataPlacement initialPlacement = builder.build(); + DataPlacement split = initialPlacement.splitRangesForPlacement(tokens); + assertPlacement(split.writes, rg(3074457345618258602L,-9223372036854775808L, 1), rg(-9223372036854775808L, 3074457345618258602L, 1)); + } + + @Test + public void testWithMinToken() + { + EndpointsForRange initialRG = rg(Long.MIN_VALUE, Long.MIN_VALUE, 1, 2, 3); + + DataPlacement.Builder builder = DataPlacement.builder(); + builder.writes.withReplicaGroup(initialRG); + + DataPlacement initialPlacement = builder.build(); + List<Token> tokens = ImmutableList.of(token(Long.MIN_VALUE), token(0)); + DataPlacement newPlacement = initialPlacement.splitRangesForPlacement(tokens); + assertEquals(2, newPlacement.writes.replicaGroups.values().size()); + } + + private PlacementForRange initialPlacement() + { + EndpointsForRange[] initialGroups = { rg(0, 100, 1, 2, 3), + rg(100, 200, 1, 2, 3), + rg(200, 300, 1, 2, 3), + rg(300, 400, 1, 2, 3) }; + PlacementForRange placement = PlacementForRange.builder().withReplicaGroups(Arrays.asList(initialGroups)).build(); + assertPlacement(placement, initialGroups); + return placement; + } + + private void assertPlacement(PlacementForRange placement, EndpointsForRange...expected) + { + Collection<EndpointsForRange> replicaGroups = placement.replicaGroups.values(); + assertEquals(replicaGroups.size(), expected.length); + int i = 0; + boolean allMatch = true; + for(EndpointsForRange group : replicaGroups) + if (!Iterables.elementsEqual(group, expected[i++])) + allMatch = false; + + assertTrue(String.format("Placement didn't match expected replica groups. " + + "%nExpected: %s%nActual: %s", Arrays.asList(expected), replicaGroups), + allMatch); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
