Transient replication: range movement improvements Patch by Alex Petrov; reviewed by Ariel Weisberg and Benedict Elliott Smith for CASSANDRA-14756
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0379201c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0379201c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0379201c Branch: refs/heads/trunk Commit: 0379201c7057f6bac4abf1e0f3d81a12d90abd08 Parents: 210da3d Author: Alex Petrov <[email protected]> Authored: Mon Sep 17 11:51:56 2018 +0200 Committer: Alex Petrov <[email protected]> Committed: Wed Sep 26 11:42:46 2018 +0200 ---------------------------------------------------------------------- .../org/apache/cassandra/db/SystemKeyspace.java | 31 +- .../org/apache/cassandra/dht/BootStrapper.java | 3 - .../cassandra/dht/RangeFetchMapCalculator.java | 2 +- .../org/apache/cassandra/dht/RangeStreamer.java | 448 ++++++++++--------- .../apache/cassandra/dht/StreamStateStore.java | 12 +- .../cassandra/locator/RangesAtEndpoint.java | 6 + .../cassandra/service/RangeRelocator.java | 324 ++++++++++++++ .../cassandra/service/StorageService.java | 314 +------------ .../apache/cassandra/streaming/StreamPlan.java | 17 +- .../cassandra/streaming/StreamSession.java | 8 +- .../apache/cassandra/dht/BootStrapperTest.java | 17 +- .../dht/RangeFetchMapCalculatorTest.java | 79 +++- .../locator/OldNetworkTopologyStrategyTest.java | 3 +- .../service/BootstrapTransientTest.java | 113 +++-- .../cassandra/service/MoveTransientTest.java | 321 +++++++------ .../cassandra/service/StorageServiceTest.java | 18 +- 16 files changed, 981 insertions(+), 735 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index ff070a3..0f904ce 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -32,12 +32,11 @@ import javax.management.openmbean.TabularData; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.SetMultimap; import com.google.common.collect.Sets; import com.google.common.io.ByteStreams; import com.google.common.util.concurrent.ListenableFuture; -import org.apache.cassandra.locator.RangesAtEndpoint; -import org.apache.cassandra.locator.Replica; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1285,24 +1284,40 @@ public final class SystemKeyspace keyspace); } - public static synchronized RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner) + /** + * List of the streamed ranges, where transientness is encoded based on the source, where range was streamed from. + */ + public static synchronized AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner) { String query = "SELECT * FROM system.%s WHERE keyspace_name=?"; UntypedResultSet rs = executeInternal(format(query, AVAILABLE_RANGES_V2), keyspace); - InetAddressAndPort endpoint = InetAddressAndPort.getLocalHost(); - RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint); + + ImmutableSet.Builder<Range<Token>> full = new ImmutableSet.Builder<>(); + ImmutableSet.Builder<Range<Token>> trans = new ImmutableSet.Builder<>(); for (UntypedResultSet.Row row : rs) { Optional.ofNullable(row.getSet("full_ranges", BytesType.instance)) .ifPresent(full_ranges -> full_ranges.stream() .map(buf -> byteBufferToRange(buf, partitioner)) - .forEach(range -> builder.add(fullReplica(endpoint, range)))); + .forEach(full::add)); Optional.ofNullable(row.getSet("transient_ranges", BytesType.instance)) .ifPresent(transient_ranges -> transient_ranges.stream() .map(buf -> byteBufferToRange(buf, partitioner)) - .forEach(range -> builder.add(transientReplica(endpoint, range)))); + .forEach(trans::add)); + } + return new AvailableRanges(full.build(), trans.build()); + } + + public static class AvailableRanges + { + public Set<Range<Token>> full; + public Set<Range<Token>> trans; + + private AvailableRanges(Set<Range<Token>> full, Set<Range<Token>> trans) + { + this.full = full; + this.trans = trans; } - return builder.build(); } public static void resetAvailableRanges() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index 92bf8c8..cef605e 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -79,9 +79,6 @@ public class BootStrapper extends ProgressEventNotifierSupport stateStore, true, DatabaseDescriptor.getStreamingConnectionsPerHost()); - streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); - streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter()); - for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { AbstractReplicationStrategy strategy = Keyspace.open(keyspaceName).getReplicationStrategy(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java index 4b98b97..63265b7 100644 --- a/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java +++ b/src/java/org/apache/cassandra/dht/RangeFetchMapCalculator.java @@ -84,7 +84,7 @@ public class RangeFetchMapCalculator private final Set<Range<Token>> trivialRanges; public RangeFetchMapCalculator(EndpointsByRange rangesWithSources, - Collection<Predicate<Replica>> sourceFilters, + Collection<RangeStreamer.SourceFilter> sourceFilters, String keyspace) { this.rangesWithSources = rangesWithSources; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index e8aa5d3..f46d665 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -27,9 +27,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.HashMultimap; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Multimap; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.EndpointsByReplica; @@ -53,12 +53,12 @@ import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaCollection; import org.apache.cassandra.locator.Replicas; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.PreviewKind; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; import static com.google.common.base.Predicates.and; import static com.google.common.base.Predicates.not; @@ -87,8 +87,8 @@ public class RangeStreamer private final InetAddressAndPort address; /* streaming description */ private final String description; - private final Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = HashMultimap.create(); - private final Set<Predicate<Replica>> sourceFilters = new HashSet<>(); + private final Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch = new HashMap<>(); + private final List<SourceFilter> sourceFilters = new ArrayList<>(); private final StreamPlan streamPlan; private final boolean useStrictConsistency; private final IEndpointSnitch snitch; @@ -97,6 +97,7 @@ public class RangeStreamer public static class FetchReplica { public final Replica local; + // Source replica public final Replica remote; public FetchReplica(Replica local, Replica remote) @@ -135,11 +136,17 @@ public class RangeStreamer } } + public interface SourceFilter extends Predicate<Replica> + { + public boolean apply(Replica replica); + public String message(Replica replica); + } + /** * Source filter which excludes any endpoints that are not alive according to a * failure detector. */ - public static class FailureDetectorSourceFilter implements Predicate<Replica> + public static class FailureDetectorSourceFilter implements SourceFilter { private final IFailureDetector fd; @@ -148,16 +155,23 @@ public class RangeStreamer this.fd = fd; } + @Override public boolean apply(Replica replica) { return fd.isAlive(replica.endpoint()); } + + @Override + public String message(Replica replica) + { + return "Filtered " + replica + " out because it was down"; + } } /** * Source filter which excludes any endpoints that are not in a specific data center. */ - public static class SingleDatacenterFilter implements Predicate<Replica> + public static class SingleDatacenterFilter implements SourceFilter { private final String sourceDc; private final IEndpointSnitch snitch; @@ -168,27 +182,41 @@ public class RangeStreamer this.snitch = snitch; } + @Override public boolean apply(Replica replica) { return snitch.getDatacenter(replica).equals(sourceDc); } + + @Override + public String message(Replica replica) + { + return "Filtered " + replica + " out because it does not belong to " + sourceDc + " datacenter"; + } } /** * Source filter which excludes the current node from source calculations */ - public static class ExcludeLocalNodeFilter implements Predicate<Replica> + public static class ExcludeLocalNodeFilter implements SourceFilter { + @Override public boolean apply(Replica replica) { return !replica.isLocal(); } + + @Override + public String message(Replica replica) + { + return "Filtered " + replica + " out because it is local"; + } } /** * Source filter which only includes endpoints contained within a provided set. */ - public static class WhitelistedSourcesFilter implements Predicate<Replica> + public static class WhitelistedSourcesFilter implements SourceFilter { private final Set<InetAddressAndPort> whitelistedSources; @@ -201,6 +229,12 @@ public class RangeStreamer { return whitelistedSources.contains(replica.endpoint()); } + + @Override + public String message(Replica replica) + { + return "Filtered " + replica + " out because it was not whitelisted, whitelisted sources: " + whitelistedSources; + } } public RangeStreamer(TokenMetadata metadata, @@ -213,6 +247,21 @@ public class RangeStreamer boolean connectSequentially, int connectionsPerHost) { + this(metadata, tokens, address, streamOperation, useStrictConsistency, snitch, stateStore, + FailureDetector.instance, connectSequentially, connectionsPerHost); + } + + RangeStreamer(TokenMetadata metadata, + Collection<Token> tokens, + InetAddressAndPort address, + StreamOperation streamOperation, + boolean useStrictConsistency, + IEndpointSnitch snitch, + StreamStateStore stateStore, + IFailureDetector failureDetector, + boolean connectSequentially, + int connectionsPerHost) + { Preconditions.checkArgument(streamOperation == StreamOperation.BOOTSTRAP || streamOperation == StreamOperation.REBUILD, streamOperation); this.metadata = metadata; this.tokens = tokens; @@ -223,13 +272,34 @@ public class RangeStreamer this.snitch = snitch; this.stateStore = stateStore; streamPlan.listeners(this.stateStore); + + // We're _always_ filtering out a local node and down sources + addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(failureDetector)); + addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter()); } - public void addSourceFilter(Predicate<Replica> filter) + public void addSourceFilter(SourceFilter filter) { sourceFilters.add(filter); } + // Creates error message from source filters + private static String buildErrorMessage(Collection<SourceFilter> sourceFilters, ReplicaCollection<?> replicas) + { + StringBuilder failureMessage = new StringBuilder(); + for (Replica r : replicas) + { + for (SourceFilter filter : sourceFilters) + { + if (!filter.apply(r)) + { + failureMessage.append(filter.message(r)); + break; + } + } + } + return failureMessage.toString(); + } /** * Add ranges to be streamed for given keyspace. * @@ -252,7 +322,6 @@ public class RangeStreamer for (Map.Entry<Replica, Replica> entry : fetchMap.flattenEntries()) logger.info("{}: range {} exists on {} for keyspace {}", description, entry.getKey(), entry.getValue(), keyspaceName); - Multimap<InetAddressAndPort, FetchReplica> workMap; //Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, and no //transient replicas. @@ -265,10 +334,12 @@ public class RangeStreamer workMap = getOptimizedWorkMap(fetchMap, sourceFilters, keyspaceName); } - toFetch.put(keyspaceName, workMap); - for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet()) + if (toFetch.put(keyspaceName, workMap) != null) + throw new IllegalArgumentException("Keyspace is already added to fetch map"); + + if (logger.isTraceEnabled()) { - if (logger.isTraceEnabled()) + for (Map.Entry<InetAddressAndPort, Collection<FetchReplica>> entry : workMap.asMap().entrySet()) { for (FetchReplica r : entry.getValue()) logger.trace("{}: range source {} local range {} for keyspace {}", description, r.remote, r.local, keyspaceName); @@ -289,10 +360,6 @@ public class RangeStreamer /** * Wrapper method to assemble the arguments for invoking the implementation with RangeStreamer's parameters - * @param fetchRanges - * @param keyspace - * @param useStrictConsistency - * @return */ private EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(ReplicaCollection<?> fetchRanges, Keyspace keyspace, boolean useStrictConsistency) { @@ -305,7 +372,7 @@ public class RangeStreamer if (tokens != null) { // Pending ranges - tmdAfter = tmd.cloneOnlyTokenMap(); + tmdAfter = tmd.cloneOnlyTokenMap(); tmdAfter.updateNormalTokens(tokens, address); } else if (useStrictConsistency) @@ -313,15 +380,14 @@ public class RangeStreamer throw new IllegalArgumentException("Can't ask for strict consistency and not supply tokens"); } - return RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity, - strat, - fetchRanges, - useStrictConsistency, - tmd, - tmdAfter, - ALIVE_PREDICATE, - keyspace.getName(), - sourceFilters); + return calculateRangesToFetchWithPreferredEndpoints(snitch::sortedByProximity, + strat, + fetchRanges, + useStrictConsistency, + tmd, + tmdAfter, + keyspace.getName(), + sourceFilters); } @@ -329,7 +395,6 @@ public class RangeStreamer * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges. * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating * consistency. - * **/ public static EndpointsByReplica calculateRangesToFetchWithPreferredEndpoints(BiFunction<InetAddressAndPort, EndpointsForRange, EndpointsForRange> snitchGetSortedListByProximity, @@ -338,165 +403,148 @@ public class RangeStreamer boolean useStrictConsistency, TokenMetadata tmdBefore, TokenMetadata tmdAfter, - Predicate<Replica> isAlive, String keyspace, - Collection<Predicate<Replica>> sourceFilters) - { - EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore); - - InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); - logger.debug ("Keyspace: {}", keyspace); - logger.debug("To fetch RN: {}", fetchRanges); - logger.debug("Fetch ranges: {}", rangeAddresses); - - Predicate<Replica> testSourceFilters = and(sourceFilters); - Function<EndpointsForRange, EndpointsForRange> sorted = - endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints); - - //This list of replicas is just candidates. With strict consistency it's going to be a narrow list. - EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable(); - for (Replica toFetch : fetchRanges) - { - //Replica that is sufficient to provide the data we need - //With strict consistency and transient replication we may end up with multiple types - //so this isn't used with strict consistency - Predicate<Replica> isSufficient = r -> (toFetch.isTransient() || r.isFull()); - Predicate<Replica> accept = r -> - isSufficient.test(r) // is sufficient - && !r.endpoint().equals(localAddress) // is not self - && isAlive.test(r); // is alive - - logger.debug("To fetch {}", toFetch); - for (Range<Token> range : rangeAddresses.keySet()) - { - if (range.contains(toFetch.range())) - { - EndpointsForRange oldEndpoints = rangeAddresses.get(range); - - //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch - //It could be multiple endpoints and we must fetch from all of them if they are there - //With transient replication and strict consistency this is to get the full data from a full replica and - //transient data from the transient replica losing data - EndpointsForRange sources; - if (useStrictConsistency) - { - //Start with two sets of who replicates the range before and who replicates it after - EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); - logger.debug("Old endpoints {}", oldEndpoints); - logger.debug("New endpoints {}", newEndpoints); - - //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. - //So we need to be careful to only be strict when endpoints == RF - if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) - { - Set<InetAddressAndPort> endpointsStillReplicated = newEndpoints.endpoints(); - // Remove new endpoints from old endpoints based on address - oldEndpoints = oldEndpoints.filter(r -> !endpointsStillReplicated.contains(r.endpoint())); - - if (!all(oldEndpoints, isAlive)) - throw new IllegalStateException("A node required to move the data consistently is down: " - + oldEndpoints.filter(not(isAlive))); - - if (oldEndpoints.size() > 1) - throw new AssertionError("Expected <= 1 endpoint but found " + oldEndpoints); - - //If we are transitioning from transient to full and and the set of replicas for the range is not changing - //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely - //since we are already a transient replica and the existing replica remains. - //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore. - //So it's an error if we don't find what we need. - if (oldEndpoints.isEmpty() && toFetch.isTransient()) - { - throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch); - } - - if (!any(oldEndpoints, isSufficient)) - { - // need an additional replica - EndpointsForRange endpointsForRange = sorted.apply(rangeAddresses.get(range)); - // include all our filters, to ensure we include a matching node - Optional<Replica> fullReplica = Iterables.<Replica>tryFind(endpointsForRange, and(accept, testSourceFilters)).toJavaUtil(); - if (fullReplica.isPresent()) - oldEndpoints = Endpoints.concat(oldEndpoints, EndpointsForRange.of(fullReplica.get())); - else - throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + endpointsForRange); - } - - //We have to check the source filters here to see if they will remove any replicas - //required for strict consistency - if (!all(oldEndpoints, testSourceFilters)) - throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + oldEndpoints.filter(not(testSourceFilters))); - } - else - { - oldEndpoints = sorted.apply(oldEndpoints.filter(accept)); - } - - //Apply testSourceFilters that were given to us, and establish everything remaining is alive for the strict case - sources = oldEndpoints.filter(testSourceFilters); - } - else - { - //Without strict consistency we have given up on correctness so no point in fetching from - //a random full + transient replica since it's also likely to lose data - //Also apply testSourceFilters that were given to us so we can safely select a single source - sources = sorted.apply(rangeAddresses.get(range).filter(and(accept, testSourceFilters))); - //Limit it to just the first possible source, we don't need more than one and downstream - //will fetch from every source we supply - sources = sources.size() > 0 ? sources.subList(0, 1) : sources; - } - - // storing range and preferred endpoint set - rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE); - logger.debug("Endpoints to fetch for {} are {}", toFetch, sources); - } - } - - EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch); - if (addressList == null) - throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch); - - /* - * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses - * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica - * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain. - * For a transient range we only need to fetch from one. - */ - if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1)) - throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList)); - - //We must have enough stuff to fetch from - if ((toFetch.isFull() && !any(addressList, Replica::isFull)) || addressList.isEmpty()) - { - if (strat.getReplicationFactor().allReplicas == 1) - { - if (useStrictConsistency) - { - logger.warn("A node required to move the data consistently is down"); - throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " + - "Ensure this keyspace contains replicas in the source datacenter."); - } - else - logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " + - "Keyspace might be missing data.", toFetch, keyspace); - - } - else - { - if (useStrictConsistency) - logger.warn("A node required to move the data consistently is down"); - throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace); - } - } - } - return rangesToFetchWithPreferredEndpoints.asImmutableView(); - } + Collection<SourceFilter> sourceFilters) + { + EndpointsByRange rangeAddresses = strat.getRangeAddresses(tmdBefore); + + InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); + logger.debug ("Keyspace: {}", keyspace); + logger.debug("To fetch RN: {}", fetchRanges); + logger.debug("Fetch ranges: {}", rangeAddresses); + + Predicate<Replica> testSourceFilters = and(sourceFilters); + Function<EndpointsForRange, EndpointsForRange> sorted = + endpoints -> snitchGetSortedListByProximity.apply(localAddress, endpoints); + + //This list of replicas is just candidates. With strict consistency it's going to be a narrow list. + EndpointsByReplica.Mutable rangesToFetchWithPreferredEndpoints = new EndpointsByReplica.Mutable(); + for (Replica toFetch : fetchRanges) + { + //Replica that is sufficient to provide the data we need + //With strict consistency and transient replication we may end up with multiple types + //so this isn't used with strict consistency + Predicate<Replica> isSufficient = r -> toFetch.isTransient() || r.isFull(); + + logger.debug("To fetch {}", toFetch); + for (Range<Token> range : rangeAddresses.keySet()) + { + if (!range.contains(toFetch.range())) + continue; + + final EndpointsForRange oldEndpoints = sorted.apply(rangeAddresses.get(range)); + + //Ultimately we populate this with whatever is going to be fetched from to satisfy toFetch + //It could be multiple endpoints and we must fetch from all of them if they are there + //With transient replication and strict consistency this is to get the full data from a full replica and + //transient data from the transient replica losing data + EndpointsForRange sources; + if (useStrictConsistency) + { + EndpointsForRange strictEndpoints; + //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + //So we need to be careful to only be strict when endpoints == RF + if (oldEndpoints.size() == strat.getReplicationFactor().allReplicas) + { + //Start with two sets of who replicates the range before and who replicates it after + EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toFetch.range().right, tmdAfter); + logger.debug("Old endpoints {}", oldEndpoints); + logger.debug("New endpoints {}", newEndpoints); + + // Remove new endpoints from old endpoints based on address + strictEndpoints = oldEndpoints.without(newEndpoints.endpoints()); + + if (strictEndpoints.size() > 1) + throw new AssertionError("Expected <= 1 endpoint but found " + strictEndpoints); + + //We have to check the source filters here to see if they will remove any replicas + //required for strict consistency + if (!all(strictEndpoints, testSourceFilters)) + throw new IllegalStateException("Necessary replicas for strict consistency were removed by source filters: " + buildErrorMessage(sourceFilters, strictEndpoints)); + + //If we are transitioning from transient to full and and the set of replicas for the range is not changing + //we might end up with no endpoints to fetch from by address. In that case we can pick any full replica safely + //since we are already a transient replica and the existing replica remains. + //The old behavior where we might be asked to fetch ranges we don't need shouldn't occur anymore. + //So it's an error if we don't find what we need. + if (strictEndpoints.isEmpty() && toFetch.isTransient()) + throw new AssertionError("If there are no endpoints to fetch from then we must be transitioning from transient to full for range " + toFetch); + + if (!any(strictEndpoints, isSufficient)) + { + // need an additional replica; include all our filters, to ensure we include a matching node + Optional<Replica> fullReplica = Iterables.<Replica>tryFind(oldEndpoints, and(isSufficient, testSourceFilters)).toJavaUtil(); + if (fullReplica.isPresent()) + strictEndpoints = Endpoints.concat(strictEndpoints, EndpointsForRange.of(fullReplica.get())); + else + throw new IllegalStateException("Couldn't find any matching sufficient replica out of " + buildErrorMessage(sourceFilters, oldEndpoints)); + } + } + else + { + strictEndpoints = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters))); + } + + sources = strictEndpoints; + } + else + { + //Without strict consistency we have given up on correctness so no point in fetching from + //a random full + transient replica since it's also likely to lose data + //Also apply testSourceFilters that were given to us so we can safely select a single source + sources = sorted.apply(oldEndpoints.filter(and(isSufficient, testSourceFilters))); + //Limit it to just the first possible source, we don't need more than one and downstream + //will fetch from every source we supply + sources = sources.size() > 0 ? sources.subList(0, 1) : sources; + } + + // storing range and preferred endpoint set + rangesToFetchWithPreferredEndpoints.putAll(toFetch, sources, Conflict.NONE); + logger.debug("Endpoints to fetch for {} are {}", toFetch, sources); + } + + EndpointsForRange addressList = rangesToFetchWithPreferredEndpoints.getIfPresent(toFetch); + if (addressList == null) + throw new IllegalStateException("Failed to find endpoints to fetch " + toFetch); + + /* + * When we move forwards (shrink our bucket) we are the one losing a range and no one else loses + * from that action (we also don't gain). When we move backwards there are two people losing a range. One is a full replica + * and the other is a transient replica. So we must need fetch from two places in that case for the full range we gain. + * For a transient range we only need to fetch from one. + */ + if (useStrictConsistency && addressList.size() > 1 && (addressList.filter(Replica::isFull).size() > 1 || addressList.filter(Replica::isTransient).size() > 1)) + throw new IllegalStateException(String.format("Multiple strict sources found for %s, sources: %s", toFetch, addressList)); + + //We must have enough stuff to fetch from + if (!any(addressList, isSufficient)) + { + if (strat.getReplicationFactor().allReplicas == 1) + { + if (useStrictConsistency) + { + logger.warn("A node required to move the data consistently is down"); + throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace + " with RF=1. " + + "Ensure this keyspace contains replicas in the source datacenter."); + } + else + logger.warn("Unable to find sufficient sources for streaming range {} in keyspace {} with RF=1. " + + "Keyspace might be missing data.", toFetch, keyspace); + } + else + { + if (useStrictConsistency) + logger.warn("A node required to move the data consistently is down"); + throw new IllegalStateException("Unable to find sufficient sources for streaming range " + toFetch + " in keyspace " + keyspace); + } + } + } + return rangesToFetchWithPreferredEndpoints.asImmutableView(); + } /** * The preferred endpoint list is the wrong format because it is keyed by Replica (this node) rather than the source * endpoint we will fetch from which streaming wants. - * @param preferredEndpoints - * @return */ public static Multimap<InetAddressAndPort, FetchReplica> convertPreferredEndpointsToWorkMap(EndpointsByReplica preferredEndpoints) { @@ -505,7 +553,7 @@ public class RangeStreamer { for (Replica source : e.getValue()) { - assert (e.getKey()).isLocal(); + assert e.getKey().isLocal(); assert !source.isLocal(); workMap.put(source.endpoint(), new FetchReplica(e.getKey(), source)); } @@ -518,7 +566,8 @@ public class RangeStreamer * Optimized version that also outputs the final work map */ private static Multimap<InetAddressAndPort, FetchReplica> getOptimizedWorkMap(EndpointsByReplica rangesWithSources, - Collection<Predicate<Replica>> sourceFilters, String keyspace) + Collection<SourceFilter> sourceFilters, + String keyspace) { //For now we just aren't going to use the optimized range fetch map with transient replication to shrink //the surface area to test and introduce bugs. @@ -531,10 +580,11 @@ public class RangeStreamer unwrapped.put(entry.getKey().range(), entry.getValue()); } - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrapped.asImmutableView(), sourceFilters, keyspace); + EndpointsByRange unwrappedView = unwrapped.asImmutableView(); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(unwrappedView, sourceFilters, keyspace); Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap = calculator.getRangeFetchMap(); logger.info("Output from RangeFetchMapCalculator for keyspace {}", keyspace); - validateRangeFetchMap(unwrapped.asImmutableView(), rangeFetchMapMap, keyspace); + validateRangeFetchMap(unwrappedView, rangeFetchMapMap, keyspace); //Need to rewrap as Replicas Multimap<InetAddressAndPort, FetchReplica> wrapped = HashMultimap.create(); @@ -562,9 +612,6 @@ public class RangeStreamer /** * Verify that source returned for each range is correct - * @param rangesWithSources - * @param rangeFetchMapMap - * @param keyspace */ private static void validateRangeFetchMap(EndpointsByRange rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> rangeFetchMapMap, String keyspace) { @@ -588,7 +635,7 @@ public class RangeStreamer // For testing purposes @VisibleForTesting - Multimap<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch() + Map<String, Multimap<InetAddressAndPort, FetchReplica>> toFetch() { return toFetch; } @@ -600,16 +647,19 @@ public class RangeStreamer sources.asMap().forEach((source, fetchReplicas) -> { // filter out already streamed ranges - RangesAtEndpoint available = stateStore.getAvailableRanges(keyspace, StorageService.instance.getTokenMetadata().partitioner); + SystemKeyspace.AvailableRanges available = stateStore.getAvailableRanges(keyspace, metadata.partitioner); Predicate<FetchReplica> isAvailable = fetch -> { - Replica availableRange = available.byRange().get(fetch.local.range()); - if (availableRange == null) + boolean isInFull = available.full.contains(fetch.local.range()); + boolean isInTrans = available.trans.contains(fetch.local.range()); + + if (!isInFull && !isInTrans) //Range is unavailable return false; + if (fetch.local.isFull()) //For full, pick only replicas with matching transientness - return availableRange.isFull() == fetch.remote.isFull(); + return isInFull == fetch.remote.isFull(); // Any transient or full will do return true; @@ -617,22 +667,16 @@ public class RangeStreamer List<FetchReplica> remaining = fetchReplicas.stream().filter(not(isAvailable)).collect(Collectors.toList()); - if (remaining.size() < available.size()) + if (remaining.size() < available.full.size() + available.trans.size()) { List<FetchReplica> skipped = fetchReplicas.stream().filter(isAvailable).collect(Collectors.toList()); logger.info("Some ranges of {} are already available. Skipping streaming those ranges. Skipping {}. Fully available {} Transiently available {}", - fetchReplicas, skipped, available.filter(Replica::isFull).ranges(), available.filter(Replica::isTransient).ranges()); + fetchReplicas, skipped, available.full, available.trans); } if (logger.isTraceEnabled()) logger.trace("{}ing from {} ranges {}", description, source, StringUtils.join(remaining, ", ")); - //At the other end the distinction between full and transient is ignored it just used the transient status - //of the Replica objects we send to determine what to send. The real reason we have this split down to - //StreamRequest is that on completion StreamRequest is used to write to the system table tracking - //what has already been streamed. At that point since we only have the local Replica instances so we don't - //know what we got from the remote. We preserve that here by splitting based on the remotes transient - //status. InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); RangesAtEndpoint full = remaining.stream() .filter(pair -> pair.remote.isFull()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/dht/StreamStateStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java index 3144e81..e62bc04 100644 --- a/src/java/org/apache/cassandra/dht/StreamStateStore.java +++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java @@ -20,7 +20,8 @@ package org.apache.cassandra.dht; import java.util.Set; import com.google.common.annotations.VisibleForTesting; -import org.apache.cassandra.locator.RangesAtEndpoint; +import com.google.common.collect.Streams; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ public class StreamStateStore implements StreamEventHandler { private static final Logger logger = LoggerFactory.getLogger(StreamStateStore.class); - public RangesAtEndpoint getAvailableRanges(String keyspace, IPartitioner partitioner) + public SystemKeyspace.AvailableRanges getAvailableRanges(String keyspace, IPartitioner partitioner) { return SystemKeyspace.getAvailableRanges(keyspace, partitioner); } @@ -54,8 +55,11 @@ public class StreamStateStore implements StreamEventHandler @VisibleForTesting public boolean isDataAvailable(String keyspace, Token token) { - RangesAtEndpoint availableRanges = getAvailableRanges(keyspace, token.getPartitioner()); - return availableRanges.ranges().stream().anyMatch(range -> range.contains(token)); + SystemKeyspace.AvailableRanges availableRanges = getAvailableRanges(keyspace, token.getPartitioner()); + + return Streams.concat(availableRanges.full.stream(), + availableRanges.trans.stream()) + .anyMatch(range -> range.contains(token)); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java index 1773173..f57c28e 100644 --- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java +++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java @@ -37,6 +37,7 @@ import java.util.Set; import java.util.stream.Collector; import java.util.stream.Collectors; +import static com.google.common.collect.Iterables.all; import static org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict.*; /** @@ -302,6 +303,11 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint .collect(collector(dummy)); } + public static boolean isDummyList(RangesAtEndpoint ranges) + { + return all(ranges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")); + } + /** * @return concatenate two DISJOINT collections together */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/service/RangeRelocator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/RangeRelocator.java b/src/java/org/apache/cassandra/service/RangeRelocator.java new file mode 100644 index 0000000..f2af3db --- /dev/null +++ b/src/java/org/apache/cassandra/service/RangeRelocator.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Future; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Multimap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.RangeStreamer; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.EndpointsByReplica; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.RangesByEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +@VisibleForTesting +public class RangeRelocator +{ + private static final Logger logger = LoggerFactory.getLogger(StorageService.class); + + private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION); + private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); + private final TokenMetadata tokenMetaCloneAllSettled; + // clone to avoid concurrent modification in calculateNaturalReplicas + private final TokenMetadata tokenMetaClone; + private final Collection<Token> tokens; + private final List<String> keyspaceNames; + + + RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd) + { + this.tokens = tokens; + this.keyspaceNames = keyspaceNames; + this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled(); + // clone to avoid concurrent modification in calculateNaturalReplicas + this.tokenMetaClone = tmd.cloneOnlyTokenMap(); + } + + @VisibleForTesting + public RangeRelocator() + { + this.tokens = null; + this.keyspaceNames = null; + this.tokenMetaCloneAllSettled = null; + this.tokenMetaClone = null; + } + + /** + * Wrapper that supplies accessors to the real implementations of the various dependencies for this method + */ + private static Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> calculateRangesToFetchWithPreferredEndpoints(RangesAtEndpoint fetchRanges, + AbstractReplicationStrategy strategy, + String keyspace, + TokenMetadata tmdBefore, + TokenMetadata tmdAfter) + { + EndpointsByReplica preferredEndpoints = + RangeStreamer.calculateRangesToFetchWithPreferredEndpoints(DatabaseDescriptor.getEndpointSnitch()::sortedByProximity, + strategy, + fetchRanges, + StorageService.useStrictConsistency, + tmdBefore, + tmdAfter, + keyspace, + Arrays.asList(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance), + new RangeStreamer.ExcludeLocalNodeFilter())); + return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints); + } + + /** + * calculating endpoints to stream current ranges to if needed + * in some situations node will handle current ranges as part of the new ranges + **/ + public static RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges, + AbstractReplicationStrategy strat, + TokenMetadata tmdBefore, + TokenMetadata tmdAfter) + { + RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable(); + for (Replica toStream : streamRanges) + { + //If the range we are sending is full only send it to the new full replica + //There will also be a new transient replica we need to send the data to, but not + //the repaired data + EndpointsForRange oldEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore); + EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter); + logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, oldEndpoints, newEndpoints); + + for (Replica newEndpoint : newEndpoints) + { + Replica oldEndpoint = oldEndpoints.byEndpoint().get(newEndpoint.endpoint()); + + // Nothing to do + if (newEndpoint.equals(oldEndpoint)) + continue; + + // Completely new range for this endpoint + if (oldEndpoint == null) + { + if (toStream.isTransient() && newEndpoint.isFull()) + throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", newEndpoint, toStream)); + + for (Range<Token> intersection : newEndpoint.range().intersectionWith(toStream.range())) + { + endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(intersection)); + } + } + else + { + Set<Range<Token>> subsToStream = Collections.singleton(toStream.range()); + + //First subtract what we already have + if (oldEndpoint.isFull() == newEndpoint.isFull() || oldEndpoint.isFull()) + subsToStream = toStream.range().subtract(oldEndpoint.range()); + + //Now we only stream what is still replicated + subsToStream.stream() + .flatMap(range -> range.intersectionWith(newEndpoint.range()).stream()) + .forEach(tokenRange -> endpointRanges.put(newEndpoint.endpoint(), newEndpoint.decorateSubrange(tokenRange))); + } + } + } + return endpointRanges.asImmutableView(); + } + + public void calculateToFromStreams() + { + logger.debug("Current tmd: {}, Updated tmd: {}", tokenMetaClone, tokenMetaCloneAllSettled); + + for (String keyspace : keyspaceNames) + { + // replication strategy of the current keyspace + AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); + + logger.info("Calculating ranges to stream and request for keyspace {}", keyspace); + //From what I have seen we only ever call this with a single token from StorageService.move(Token) + for (Token newToken : tokens) + { + Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress); + if (currentTokens.size() > 1 || currentTokens.isEmpty()) + { + throw new AssertionError("Unexpected current tokens: " + currentTokens); + } + + // calculated parts of the ranges to request/stream from/to nodes in the ring + Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges; + + //In the single node token move there is nothing to do and Range subtraction is broken + //so it's easier to just identify this case up front. + if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort() +)).size() > 1) + { + // getting collection of the currently used ranges by this keyspace + RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress); + + // collection of ranges which this node will serve after move to the new token + RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); + + streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas); + } + else + { + streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress)); + } + + RangesByEndpoint rangesToStream = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled); + logger.info("Endpoint ranges to stream to " + rangesToStream); + + // stream ranges + for (InetAddressAndPort address : rangesToStream.keySet()) + { + logger.debug("Will stream range {} of keyspace {} to endpoint {}", rangesToStream.get(address), keyspace, address); + RangesAtEndpoint ranges = rangesToStream.get(address); + streamPlan.transferRanges(address, keyspace, ranges); + } + + Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> rangesToFetch = calculateRangesToFetchWithPreferredEndpoints(streamAndFetchOwnRanges.right, strategy, keyspace, tokenMetaClone, tokenMetaCloneAllSettled); + + // stream requests + rangesToFetch.asMap().forEach((address, sourceAndOurReplicas) -> { + RangesAtEndpoint full = sourceAndOurReplicas.stream() + .filter(pair -> pair.remote.isFull()) + .map(pair -> pair.local) + .collect(RangesAtEndpoint.collector(localAddress)); + RangesAtEndpoint trans = sourceAndOurReplicas.stream() + .filter(pair -> pair.remote.isTransient()) + .map(pair -> pair.local) + .collect(RangesAtEndpoint.collector(localAddress)); + logger.debug("Will request range {} of keyspace {} from endpoint {}", rangesToFetch.get(address), keyspace, address); + streamPlan.requestRanges(address, keyspace, full, trans); + }); + + logger.debug("Keyspace {}: work map {}.", keyspace, rangesToFetch); + } + } + } + + /** + * Calculate pair of ranges to stream/fetch for given two range collections + * (current ranges for keyspace and ranges after move to new token) + * + * With transient replication the added wrinkle is that if a range transitions from full to transient then + * we need to stream the range despite the fact that we are retaining it as transient. Some replica + * somewhere needs to transition from transient to full and we will be the source. + * + * If the range is transient and is transitioning to full then always fetch even if the range was already transient + * since a transiently replicated obviously needs to fetch data to become full. + * + * This why there is a continue after checking for instersection because intersection is not sufficient reason + * to do the subtraction since we might need to stream/fetch data anyways. + * + * @param currentRanges collection of the ranges by current token + * @param updatedRanges collection of the ranges after token is changed + * @return pair of ranges to stream/fetch for given current and updated range collections + */ + public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint currentRanges, RangesAtEndpoint updatedRanges) + { + RangesAtEndpoint.Builder toStream = RangesAtEndpoint.builder(currentRanges.endpoint()); + RangesAtEndpoint.Builder toFetch = RangesAtEndpoint.builder(currentRanges.endpoint()); + logger.debug("Calculating toStream"); + computeRanges(currentRanges, updatedRanges, toStream); + + logger.debug("Calculating toFetch"); + computeRanges(updatedRanges, currentRanges, toFetch); + + logger.debug("To stream {}", toStream); + logger.debug("To fetch {}", toFetch); + return Pair.create(toStream.build(), toFetch.build()); + } + + private static void computeRanges(RangesAtEndpoint srcRanges, RangesAtEndpoint dstRanges, RangesAtEndpoint.Builder ranges) + { + for (Replica src : srcRanges) + { + boolean intersect = false; + RangesAtEndpoint remainder = null; + for (Replica dst : dstRanges) + { + logger.debug("Comparing {} and {}", src, dst); + // Stream the full range if there's no intersection + if (!src.intersectsOnRange(dst)) + continue; + + // If we're transitioning from full to transient + if (src.isFull() && dst.isTransient()) + continue; + + if (remainder == null) + { + remainder = src.subtractIgnoreTransientStatus(dst.range()); + } + else + { + // Re-subtract ranges to avoid overstreaming in cases when the single range is split or merged + RangesAtEndpoint.Builder newRemainder = new RangesAtEndpoint.Builder(remainder.endpoint()); + for (Replica replica : remainder) + newRemainder.addAll(replica.subtractIgnoreTransientStatus(dst.range())); + remainder = newRemainder.build(); + } + intersect = true; + } + + if (!intersect) + { + assert remainder == null; + logger.debug(" Doesn't intersect adding {}", src); + ranges.add(src); // should stream whole old range + } + else + { + ranges.addAll(remainder); + logger.debug(" Intersects adding {}", remainder); + } + } + } + + public Future<StreamState> stream() + { + return streamPlan.execute(); + } + + public boolean streamsNeeded() + { + return !streamPlan.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a979f1c..391598c 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -236,7 +236,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private Collection<Token> bootstrapTokens = null; // true when keeping strict consistency while bootstrapping - private static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true")); + public static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true")); private static final boolean allowSimultaneousMoves = Boolean.parseBoolean(System.getProperty("cassandra.consistent.simultaneousmoves.allow","false")); private static final boolean joinRing = Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")); private boolean replacing; @@ -1227,7 +1227,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE streamStateStore, false, DatabaseDescriptor.getStreamingConnectionsPerHost()); - streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); if (sourceDc != null) streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc)); @@ -4316,208 +4315,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next()); } - @VisibleForTesting - public static class RangeRelocator - { - private final StreamPlan streamPlan = new StreamPlan(StreamOperation.RELOCATION); - private final InetAddressAndPort localAddress = FBUtilities.getBroadcastAddressAndPort(); - private final TokenMetadata tokenMetaCloneAllSettled; - // clone to avoid concurrent modification in calculateNaturalReplicas - private final TokenMetadata tokenMetaClone; - private final Collection<Token> tokens; - private final List<String> keyspaceNames; - - - private RangeRelocator(Collection<Token> tokens, List<String> keyspaceNames, TokenMetadata tmd) - { - this.tokens = tokens; - this.keyspaceNames = keyspaceNames; - this.tokenMetaCloneAllSettled = tmd.cloneAfterAllSettled(); - // clone to avoid concurrent modification in calculateNaturalReplicas - this.tokenMetaClone = tmd.cloneOnlyTokenMap(); - } - - @VisibleForTesting - public RangeRelocator() - { - this.tokens = null; - this.keyspaceNames = null; - this.tokenMetaCloneAllSettled = null; - this.tokenMetaClone = null; - } - - /** - * Wrapper that supplies accessors to the real implementations of the various dependencies for this method - */ - private Multimap<InetAddressAndPort, FetchReplica> calculateRangesToFetchWithPreferredEndpoints(AbstractReplicationStrategy strategy, RangesAtEndpoint fetchRanges, String keyspace) - { - EndpointsByReplica preferredEndpoints = - RangeStreamer.calculateRangesToFetchWithPreferredEndpoints((address, replicas) -> DatabaseDescriptor.getEndpointSnitch().sortedByProximity(address, replicas), - strategy, - fetchRanges, - useStrictConsistency, - tokenMetaClone, - tokenMetaCloneAllSettled, - RangeStreamer.ALIVE_PREDICATE, - keyspace, - Collections.emptyList()); - return RangeStreamer.convertPreferredEndpointsToWorkMap(preferredEndpoints); - } - - /** - * calculating endpoints to stream current ranges to if needed - * in some situations node will handle current ranges as part of the new ranges - **/ - public RangesByEndpoint calculateRangesToStreamWithEndpoints(RangesAtEndpoint streamRanges, - AbstractReplicationStrategy strat, - TokenMetadata tmdBefore, - TokenMetadata tmdAfter) - { - RangesByEndpoint.Mutable endpointRanges = new RangesByEndpoint.Mutable(); - for (Replica toStream : streamRanges) - { - //If the range we are sending is full only send it to the new full replica - //There will also be a new transient replica we need to send the data to, but not - //the repaired data - EndpointsForRange currentEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdBefore); - EndpointsForRange newEndpoints = strat.calculateNaturalReplicas(toStream.range().right, tmdAfter); - logger.debug("Need to stream {}, current endpoints {}, new endpoints {}", toStream, currentEndpoints, newEndpoints); - - for (Replica current : currentEndpoints) - { - for (Replica updated : newEndpoints) - { - if (current.endpoint().equals(updated.endpoint())) - { - //Nothing to do - if (current.equals(updated)) - break; - - //In these two (really three) cases the existing data is sufficient and we should subtract whatever is already replicated - if (current.isFull() == updated.isFull() || current.isFull()) - { - //First subtract what we already have - Set<Range<Token>> subsToStream = toStream.range().subtract(current.range()); - //Now we only stream what is still replicated - subsToStream = subsToStream.stream().flatMap(range -> range.intersectionWith(updated.range()).stream()).collect(Collectors.toSet()); - for (Range<Token> subrange : subsToStream) - { - //Only stream what intersects with what is in the new world - Set<Range<Token>> intersections = subrange.intersectionWith(updated.range()); - for (Range<Token> intersection : intersections) - { - endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection)); - } - } - } - else - { - for (Range<Token> intersection : toStream.range().intersectionWith(updated.range())) - { - endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection)); - } - } - } - } - } - - for (Replica updated : newEndpoints) - { - if (!currentEndpoints.byEndpoint().containsKey(updated.endpoint())) - { - // Completely new range for this endpoint - if (toStream.isTransient() && updated.isFull()) - { - throw new AssertionError(String.format("Need to stream %s, but only have %s which is transient and not full", updated, toStream)); - } - for (Range<Token> intersection : updated.range().intersectionWith(toStream.range())) - { - endpointRanges.put(updated.endpoint(), updated.decorateSubrange(intersection)); - } - } - } - } - return endpointRanges.asImmutableView(); - } - - private void calculateToFromStreams() - { - logger.debug("Current tmd " + tokenMetaClone); - logger.debug("Updated tmd " + tokenMetaCloneAllSettled); - for (String keyspace : keyspaceNames) - { - // replication strategy of the current keyspace - AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy(); - // getting collection of the currently used ranges by this keyspace - RangesAtEndpoint currentReplicas = strategy.getAddressReplicas(localAddress); - - logger.info("Calculating ranges to stream and request for keyspace {}", keyspace); - //From what I have seen we only ever call this with a single token from StorageService.move(Token) - for (Token newToken : tokens) - { - Collection<Token> currentTokens = tokenMetaClone.getTokens(localAddress); - if (currentTokens.size() > 1 || currentTokens.isEmpty()) - { - throw new AssertionError("Unexpected current tokens: " + currentTokens); - } - - // collection of ranges which this node will serve after move to the new token - RangesAtEndpoint updatedReplicas = strategy.getPendingAddressRanges(tokenMetaClone, newToken, localAddress); - - // calculated parts of the ranges to request/stream from/to nodes in the ring - Pair<RangesAtEndpoint, RangesAtEndpoint> streamAndFetchOwnRanges = Pair.create(RangesAtEndpoint.empty(localAddress), RangesAtEndpoint.empty(localAddress)); - //In the single node token move there is nothing to do and Range subtraction is broken - //so it's easier to just identify this case up front. - if (tokenMetaClone.getTopology().getDatacenterEndpoints().get(DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort() -)).size() > 1) - { - streamAndFetchOwnRanges = calculateStreamAndFetchRanges(currentReplicas, updatedReplicas); - } - - Multimap<InetAddressAndPort, FetchReplica> workMap = calculateRangesToFetchWithPreferredEndpoints(strategy, streamAndFetchOwnRanges.right, keyspace); - - RangesByEndpoint endpointRanges = calculateRangesToStreamWithEndpoints(streamAndFetchOwnRanges.left, strategy, tokenMetaClone, tokenMetaCloneAllSettled); - - logger.info("Endpoint ranges to stream to " + endpointRanges); - - // stream ranges - for (InetAddressAndPort address : endpointRanges.keySet()) - { - logger.debug("Will stream range {} of keyspace {} to endpoint {}", endpointRanges.get(address), keyspace, address); - RangesAtEndpoint ranges = endpointRanges.get(address); - streamPlan.transferRanges(address, keyspace, ranges); - } - - // stream requests - workMap.asMap().forEach((address, sourceAndOurReplicas) -> { - RangesAtEndpoint full = sourceAndOurReplicas.stream() - .filter(pair -> pair.remote.isFull()) - .map(pair -> pair.local) - .collect(RangesAtEndpoint.collector(localAddress)); - RangesAtEndpoint transientReplicas = sourceAndOurReplicas.stream() - .filter(pair -> pair.remote.isTransient()) - .map(pair -> pair.local) - .collect(RangesAtEndpoint.collector(localAddress)); - logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address); - streamPlan.requestRanges(address, keyspace, full, transientReplicas); - }); - - logger.debug("Keyspace {}: work map {}.", keyspace, workMap); - } - } - } - - public Future<StreamState> stream() - { - return streamPlan.execute(); - } - - public boolean streamsNeeded() - { - return !streamPlan.isEmpty(); - } - } - public String getRemovalStatus() { return getRemovalStatus(false); @@ -5271,115 +5068,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return streamPlan.execute(); } - /** - * Calculate pair of ranges to stream/fetch for given two range collections - * (current ranges for keyspace and ranges after move to new token) - * - * With transient replication the added wrinkle is that if a range transitions from full to transient then - * we need to stream the range despite the fact that we are retaining it as transient. Some replica - * somewhere needs to transition from transient to full and we wll be the source. - * - * If the range is transient and is transitioning to full then always fetch even if the range was already transient - * since a transiently replicated obviously needs to fetch data to become full. - * - * This why there is a continue after checking for instersection because intersection is not sufficient reason - * to do the subtraction since we might need to stream/fetch data anyways. - * - * @param current collection of the ranges by current token - * @param updated collection of the ranges after token is changed - * @return pair of ranges to stream/fetch for given current and updated range collections - */ - public static Pair<RangesAtEndpoint, RangesAtEndpoint> calculateStreamAndFetchRanges(RangesAtEndpoint current, RangesAtEndpoint updated) - { - // FIXME: transient replication - // this should always be the local node, except for tests TODO: assert this - RangesAtEndpoint.Builder toStream = RangesAtEndpoint.builder(current.endpoint()); - RangesAtEndpoint.Builder toFetch = RangesAtEndpoint.builder(current.endpoint()); - - logger.debug("Calculating toStream"); - for (Replica r1 : current) - { - boolean intersect = false; - RangesAtEndpoint.Mutable remainder = null; - for (Replica r2 : updated) - { - logger.debug("Comparing {} and {}", r1, r2); - //If we will end up transiently replicating send the entire thing and don't subtract - if (r1.intersectsOnRange(r2) && !(r1.isFull() && r2.isTransient())) - { - RangesAtEndpoint.Mutable oldRemainder = remainder; - remainder = new RangesAtEndpoint.Mutable(current.endpoint()); - if (oldRemainder != null) - { - for (Replica replica : oldRemainder) - { - remainder.addAll(replica.subtractIgnoreTransientStatus(r2.range())); - } - } - else - { - remainder.addAll(r1.subtractIgnoreTransientStatus(r2.range())); - } - logger.debug(" Intersects adding {}", remainder); - intersect = true; - } - } - if (!intersect) - { - logger.debug(" Doesn't intersect adding {}", r1); - toStream.add(r1); // should stream whole old range - } - else - { - toStream.addAll(remainder); - } - } - - logger.debug("Calculating toFetch"); - for (Replica r2 : updated) - { - boolean intersect = false; - RangesAtEndpoint.Mutable remainder = null; - for (Replica r1 : current) - { - logger.info("Comparing {} and {}", r2, r1); - //Transitioning from transient to full means fetch everything so intersection doesn't matter. - if (r2.intersectsOnRange(r1) && !(r1.isTransient() && r2.isFull())) - { - RangesAtEndpoint.Mutable oldRemainder = remainder; - remainder = new RangesAtEndpoint.Mutable(current.endpoint()); - if (oldRemainder != null) - { - for (Replica replica : oldRemainder) - { - remainder.addAll(replica.subtractIgnoreTransientStatus(r1.range())); - } - } - else - { - remainder.addAll(r2.subtractIgnoreTransientStatus(r1.range())); - } - logger.debug(" Intersects adding {}", remainder); - intersect = true; - } - } - if (!intersect) - { - logger.debug(" Doesn't intersect adding {}", r2); - toFetch.add(r2); // should fetch whole old range - } - else - { - toFetch.addAll(remainder); - } - } - - logger.debug("To stream {}", toStream); - logger.debug("To fetch {}", toFetch); - - return Pair.create(toStream.build(), toFetch.build()); - } - public void bulkLoad(String directory) { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 2f6deb5..ea54f9d 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -70,6 +70,16 @@ public class StreamPlan /** * Request data in {@code keyspace} and {@code ranges} from specific node. * + * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint) + * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient. + * + * At the other end the distinction between full and transient is ignored it just used the transient status + * of the Replica objects we send to determine what to send. The real reason we have this split down to + * StreamRequest is that on completion StreamRequest is used to write to the system table tracking + * what has already been streamed. At that point since we only have the local Replica instances so we don't + * know what we got from the remote. We preserve that here by splitting based on the remotes transient + * status. + * * @param from endpoint address to fetch data from. * @param keyspace name of keyspace * @param fullRanges ranges to fetch that from provides the full version of @@ -94,10 +104,9 @@ public class StreamPlan public StreamPlan requestRanges(InetAddressAndPort from, String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, String... columnFamilies) { //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node - assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : - fullRanges.toString(); - assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : - transientRanges.toString(); + assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); + assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); + StreamSession session = coordinator.getOrCreateNextSession(from); session.addStreamRequest(keyspace, fullRanges, transientRanges, Arrays.asList(columnFamilies)); return this; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0379201c/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index ec80772..d7d0836 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -300,6 +300,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber /** * Request data fetch task to this session. * + * Here, we have to encode both _local_ range transientness (encoded in Replica itself, in RangesAtEndpoint) + * and _remote_ (source) range transientmess, which is encoded by splitting ranges into full and transient. + * * @param keyspace Requesting keyspace * @param fullRanges Ranges to retrieve data that will return full data from the source * @param transientRanges Ranges to retrieve data that will return transient data from the source @@ -308,8 +311,9 @@ public class StreamSession implements IEndpointStateChangeSubscriber public void addStreamRequest(String keyspace, RangesAtEndpoint fullRanges, RangesAtEndpoint transientRanges, Collection<String> columnFamilies) { //It should either be a dummy address for repair or if it's a bootstrap/move/rebuild it should be this node - assert all(fullRanges, Replica::isLocal) || all(fullRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : fullRanges.toString(); - assert all(transientRanges, Replica::isLocal) || all(transientRanges, range -> range.endpoint().getHostAddress(true).equals("0.0.0.0:0")) : transientRanges.toString(); + assert all(fullRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(fullRanges) : fullRanges.toString(); + assert all(transientRanges, Replica::isLocal) || RangesAtEndpoint.isDummyList(transientRanges) : transientRanges.toString(); + requests.add(new StreamRequest(keyspace, fullRanges, transientRanges, columnFamilies)); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
