Repository: cassandra Updated Branches: refs/heads/trunk c0be34182 -> d8490ccea
Support consistent range movements. patch by tjake; reviewed by thobbs for CASSANDRA-2434 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9f60c55b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9f60c55b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9f60c55b Branch: refs/heads/trunk Commit: 9f60c55ba42ff56aa58c3790b9c55924c4deedf4 Parents: 233761e Author: T Jake Luciani <j...@apache.org> Authored: Thu May 1 09:47:22 2014 -0400 Committer: T Jake Luciani <j...@apache.org> Committed: Thu May 1 09:47:22 2014 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 5 ++ .../org/apache/cassandra/dht/BootStrapper.java | 2 +- .../org/apache/cassandra/dht/RangeStreamer.java | 81 +++++++++++++++++++- .../cassandra/service/StorageService.java | 44 ++++++++++- 5 files changed, 127 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 34533cc..be72ad1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -56,6 +56,7 @@ * Optimize cellname comparison (CASSANDRA-6934) * Native protocol v3 (CASSANDRA-6855) * Optimize Cell liveness checks and clean up Cell (CASSANDRA-7119) + * Support consistent range movements (CASSANDRA-2434) Merged from 2.0: * Allow overriding cassandra-rackdc.properties file (CASSANDRA-7072) * Set JMX RMI port to 7199 (CASSANDRA-7087) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 86c6f64..5d59460 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -30,6 +30,11 @@ New features repair session. Use nodetool repair -par -inc to use this feature. A tool to manually mark/unmark sstables as repaired is available in tools/bin/sstablerepairedset. + - Bootstrapping now ensures that range movements are consistent, + meaning the data for the new node is taken from the node that is no + longer a responsible for that range of keys. + If you want the old behavior (due to a lost node perhaps) + you can set the following property (-Dconsistent.rangemovement=false) Upgrading --------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 343748b..cbbd100 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -63,7 +63,7 @@ public class BootStrapper if (logger.isDebugEnabled()) logger.debug("Beginning bootstrap process"); - RangeStreamer streamer = new RangeStreamer(tokenMetadata, address, "Bootstrap"); + RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap"); streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 7ab39a4..2308d30 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -23,6 +23,8 @@ import java.util.*; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import org.apache.cassandra.gms.EndpointState; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; @@ -44,7 +47,8 @@ import org.apache.cassandra.utils.FBUtilities; public class RangeStreamer { private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class); - + public static final boolean useStrictConsistency = Boolean.valueOf(System.getProperty("consistent.rangemovement","true")); + private final Collection<Token> tokens; private final TokenMetadata metadata; private final InetAddress address; private final String description; @@ -99,9 +103,19 @@ public class RangeStreamer } } + public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, String description) + { + this.metadata = metadata; + this.tokens = tokens; + this.address = address; + this.description = description; + this.streamPlan = new StreamPlan(description); + } + public RangeStreamer(TokenMetadata metadata, InetAddress address, String description) { this.metadata = metadata; + this.tokens = null; this.address = address; this.description = description; this.streamPlan = new StreamPlan(description); @@ -114,11 +128,12 @@ public class RangeStreamer public void addRanges(String keyspaceName, Collection<Range<Token>> ranges) { - Multimap<Range<Token>, InetAddress> rangesForKeyspace = getAllRangesWithSourcesFor(keyspaceName, ranges); + Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictConsistency && tokens != null + ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName, ranges); if (logger.isDebugEnabled()) { - for (Map.Entry<Range<Token>, InetAddress> entry: rangesForKeyspace.entries()) + for (Map.Entry<Range<Token>, InetAddress> entry : rangesForKeyspace.entries()) logger.debug(String.format("%s: range %s exists on %s", description, entry.getKey(), entry.getValue())); } @@ -163,6 +178,66 @@ public class RangeStreamer } /** + * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges. + * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating + * consistency. + */ + private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String table, Collection<Range<Token>> desiredRanges) + { + + assert tokens != null; + AbstractReplicationStrategy strat = Keyspace.open(table).getReplicationStrategy(); + + //Active ranges + TokenMetadata metadataClone = metadata.cloneOnlyTokenMap(); + Multimap<Range<Token>,InetAddress> addressRanges = strat.getRangeAddresses(metadataClone); + + //Pending ranges + metadataClone.updateNormalTokens(tokens, address); + Multimap<Range<Token>,InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone); + + //Collects the source that will have its range moved to the new node + Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create(); + + for (Range<Token> desiredRange : desiredRanges) + { + for (Map.Entry<Range<Token>, Collection<InetAddress>> preEntry : addressRanges.asMap().entrySet()) + { + if (preEntry.getKey().contains(desiredRange)) + { + Set<InetAddress> oldEndpoints = Sets.newHashSet(preEntry.getValue()); + Set<InetAddress> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange)); + + //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + //So we need to be careful to only be strict when endpoints == RF + if (oldEndpoints.size() == strat.getReplicationFactor()) + { + oldEndpoints.removeAll(newEndpoints); + assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size(); + } + + rangeSources.put(desiredRange, oldEndpoints.iterator().next()); + } + } + + //Validate + Collection<InetAddress> addressList = rangeSources.get(desiredRange); + if (addressList == null || addressList.isEmpty()) + throw new IllegalStateException("No sources found for " + desiredRange); + + if (addressList.size() > 1) + throw new IllegalStateException("Multiple endpoints found for " + desiredRange); + + InetAddress sourceIp = addressList.iterator().next(); + EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp); + if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive())) + throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dconsistent.rangemovement=false"); + } + + return rangeSources; + } + + /** * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value) * @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given * here, we always exclude ourselves. http://git-wip-us.apache.org/repos/asf/cassandra/blob/9f60c55b/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 25a3670..85c080e 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3217,7 +3217,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // getting collection of the currently used ranges by this keyspace Collection<Range<Token>> currentRanges = getRangesForEndpoint(keyspace, localAddress); // collection of ranges which this node will serve after move to the new token - Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetadata, newToken, localAddress); + Collection<Range<Token>> updatedRanges = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); // ring ranges and endpoints associated with them // this used to determine what nodes should we ping about range data @@ -3237,11 +3237,51 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { if (range.contains(toFetch)) { - List<InetAddress> endpoints = snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range)); + List<InetAddress> endpoints = null; + + if (RangeStreamer.useStrictConsistency) + { + Set<InetAddress> oldEndpoints = Sets.newHashSet(rangeAddresses.get(range)); + Set<InetAddress> newEndpoints = Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, tokenMetaCloneAllSettled)); + + //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + //So we need to be careful to only be strict when endpoints == RF + if (oldEndpoints.size() == strategy.getReplicationFactor()) + { + oldEndpoints.removeAll(newEndpoints); + + //No relocation required + if (oldEndpoints.isEmpty()) + continue; + + assert oldEndpoints.size() == 1 : "Expected 1 endpoint but found " + oldEndpoints.size(); + } + + endpoints = Lists.newArrayList(oldEndpoints.iterator().next()); + } + else + { + endpoints = snitch.getSortedListByProximity(localAddress, rangeAddresses.get(range)); + } + // storing range and preferred endpoint set rangesToFetchWithPreferredEndpoints.putAll(toFetch, endpoints); } } + + Collection<InetAddress> addressList = rangesToFetchWithPreferredEndpoints.get(toFetch); + if (addressList == null || addressList.isEmpty()) + continue; + + if (RangeStreamer.useStrictConsistency) + { + if (addressList.size() > 1) + throw new IllegalStateException("Multiple strict sources found for " + toFetch); + + InetAddress sourceIp = addressList.iterator().next(); + if (Gossiper.instance.isEnabled() && !Gossiper.instance.getEndpointStateForEndpoint(sourceIp).isAlive()) + throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dconsistent.rangemovement=false"); + } } // calculating endpoints to stream current ranges to if needed