Repository: cassandra Updated Branches: refs/heads/trunk f36fe9fb1 -> 690fbf3ba
Resumable bootstrap streaming patch by yukim; reviewed by Stefania Alborghetti for CASSANDRA-8838 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/690fbf3b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/690fbf3b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/690fbf3b Branch: refs/heads/trunk Commit: 690fbf3ba90cee726eb58ed1f69700d178993f75 Parents: f36fe9f Author: Yuki Morishita <[email protected]> Authored: Wed Mar 18 12:18:28 2015 -0500 Committer: Yuki Morishita <[email protected]> Committed: Wed Mar 18 12:18:28 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 83 ++++++++++++++++- .../org/apache/cassandra/dht/BootStrapper.java | 23 +++-- .../org/apache/cassandra/dht/RangeStreamer.java | 95 +++++++++++++------- .../apache/cassandra/dht/StreamStateStore.java | 82 +++++++++++++++++ .../cassandra/service/StorageService.java | 53 +++++++---- .../apache/cassandra/streaming/StreamEvent.java | 5 ++ .../cassandra/streaming/StreamSession.java | 4 +- .../apache/cassandra/db/SystemKeyspaceTest.java | 24 ++--- .../apache/cassandra/dht/BootStrapperTest.java | 5 +- .../cassandra/dht/StreamStateStoreTest.java | 76 ++++++++++++++++ 11 files changed, 372 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c07599a..955d8e3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -71,6 +71,7 @@ * Select optimal CRC32 implementation at runtime (CASSANDRA-8614) * Evaluate MurmurHash of Token once per query (CASSANDRA-7096) * Generalize progress reporting (CASSANDRA-8901) + * Resumable bootstrap streaming (CASSANDRA-8838) 2.1.4 * Use correct bounds for page cache eviction of compressed files (CASSANDRA-8746) http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index dcd0e55..9fa3c6b 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db; import java.io.DataInputStream; +import java.io.IOError; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; @@ -27,6 +28,7 @@ import javax.management.openmbean.*; import com.google.common.base.Function; import com.google.common.collect.*; +import com.google.common.io.ByteStreams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,12 +37,13 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; -import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.db.commitlog.ReplayPosition; +import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.filter.QueryFilter; import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; @@ -49,6 +52,7 @@ import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.metrics.RestorableMeter; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.LegacySchemaTables; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.Commit; @@ -78,6 +82,7 @@ public final class SystemKeyspace public static final String COMPACTION_HISTORY = "compaction_history"; public static final String SSTABLE_ACTIVITY = "sstable_activity"; public static final String SIZE_ESTIMATES = "size_estimates"; + public static final String AVAILABLE_RANGES = "available_ranges"; public static final CFMetaData Hints = compile(HINTS, @@ -218,7 +223,7 @@ public final class SystemKeyspace private static final CFMetaData SizeEstimates = compile(SIZE_ESTIMATES, "per-table primary range size estimates", - "CREATE TABLE %S (" + "CREATE TABLE %s (" + "keyspace_name text," + "table_name text," + "range_start text," @@ -228,6 +233,14 @@ public final class SystemKeyspace + "PRIMARY KEY ((keyspace_name), table_name, range_start, range_end))") .gcGraceSeconds(0); + private static final CFMetaData AvailableRanges = + compile(AVAILABLE_RANGES, + "Available keyspace/ranges during bootstrap/replace that are ready to be served", + "CREATE TABLE %s (" + + "keyspace_name text PRIMARY KEY," + + "ranges set<blob>" + + ")"); + private static CFMetaData compile(String name, String description, String schema) { return CFMetaData.compile(String.format(schema, name), NAME) @@ -249,7 +262,8 @@ public final class SystemKeyspace CompactionsInProgress, CompactionHistory, SSTableActivity, - SizeEstimates)); + SizeEstimates, + AvailableRanges)); return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables); } @@ -954,4 +968,67 @@ public final class SystemKeyspace String cql = String.format("DELETE FROM %s.%s WHERE keyspace_name = ? AND table_name = ?", NAME, SIZE_ESTIMATES); executeInternal(cql, keyspace, table); } + + public static synchronized void updateAvailableRanges(String keyspace, Collection<Range<Token>> completedRanges) + { + String cql = "UPDATE system.%s SET ranges = ranges + ? WHERE keyspace_name = ?"; + Set<ByteBuffer> rangesToUpdate = new HashSet<>(completedRanges.size()); + for (Range<Token> range : completedRanges) + { + rangesToUpdate.add(rangeToBytes(range)); + } + executeInternal(String.format(cql, AVAILABLE_RANGES), rangesToUpdate, keyspace); + } + + public static synchronized Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner) + { + Set<Range<Token>> result = new HashSet<>(); + String query = "SELECT * FROM system.%s WHERE keyspace_name=?"; + UntypedResultSet rs = executeInternal(String.format(query, AVAILABLE_RANGES), keyspace); + for (UntypedResultSet.Row row : rs) + { + Set<ByteBuffer> rawRanges = row.getSet("ranges", BytesType.instance); + for (ByteBuffer rawRange : rawRanges) + { + result.add(byteBufferToRange(rawRange, partitioner)); + } + } + return ImmutableSet.copyOf(result); + } + + public static void resetAvailableRanges() + { + ColumnFamilyStore availableRanges = Keyspace.open(NAME).getColumnFamilyStore(AVAILABLE_RANGES); + availableRanges.truncateBlocking(); + } + + private static ByteBuffer rangeToBytes(Range<Token> range) + { + try + { + DataOutputBuffer out = new DataOutputBuffer(); + Range.tokenSerializer.serialize(range, out, MessagingService.VERSION_30); + return out.asByteBuffer(); + } + catch (IOException e) + { + throw new IOError(e); + } + } + + @SuppressWarnings("unchecked") + private static Range<Token> byteBufferToRange(ByteBuffer rawRange, IPartitioner partitioner) + { + try + { + return (Range<Token>) Range.tokenSerializer.deserialize(ByteStreams.newDataInput(ByteBufferUtil.getArray(rawRange)), + partitioner, + MessagingService.VERSION_30); + } + catch (IOException e) + { + throw new IOError(e); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java index cbbd100..8f52f7e 100644 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@ -55,15 +55,20 @@ public class BootStrapper this.address = address; this.tokens = tokens; - tokenMetadata = tmd; + this.tokenMetadata = tmd; } - public void bootstrap() + public void bootstrap(StreamStateStore stateStore, boolean useStrictConsistency) { - if (logger.isDebugEnabled()) - logger.debug("Beginning bootstrap process"); - - RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap"); + logger.debug("Beginning bootstrap process"); + + RangeStreamer streamer = new RangeStreamer(tokenMetadata, + tokens, + address, + "Bootstrap", + useStrictConsistency, + DatabaseDescriptor.getEndpointSnitch(), + stateStore); streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) @@ -83,7 +88,7 @@ public class BootStrapper } catch (ExecutionException e) { - throw new RuntimeException("Error during boostrap: " + e.getCause().getMessage(), e.getCause()); + throw new RuntimeException("Error during bootstrap: " + e.getCause().getMessage(), e.getCause()); } } @@ -99,7 +104,7 @@ public class BootStrapper if (initialTokens.size() > 0) { logger.debug("tokens manually specified as {}", initialTokens); - List<Token> tokens = new ArrayList<Token>(initialTokens.size()); + List<Token> tokens = new ArrayList<>(initialTokens.size()); for (String tokenString : initialTokens) { Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString); @@ -122,7 +127,7 @@ public class BootStrapper public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens) { - Set<Token> tokens = new HashSet<Token>(numTokens); + Set<Token> tokens = new HashSet<>(numTokens); while (tokens.size() < numTokens) { Token token = StorageService.getPartitioner().getRandomToken(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index 72679cc..fecb308 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -20,24 +20,24 @@ package org.apache.cassandra.dht; import java.net.InetAddress; import java.util.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import org.apache.cassandra.gms.EndpointState; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; -import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamPlan; import org.apache.cassandra.streaming.StreamResultFuture; import org.apache.cassandra.utils.FBUtilities; @@ -48,14 +48,21 @@ import org.apache.cassandra.utils.FBUtilities; public class RangeStreamer { private static final Logger logger = LoggerFactory.getLogger(RangeStreamer.class); - public static final boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement","true")); + + /* bootstrap tokens. can be null if replacing the node. */ private final Collection<Token> tokens; + /* current token ring */ private final TokenMetadata metadata; + /* address of this node */ private final InetAddress address; + /* streaming description */ private final String description; private final Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = HashMultimap.create(); - private final Set<ISourceFilter> sourceFilters = new HashSet<ISourceFilter>(); + private final Set<ISourceFilter> sourceFilters = new HashSet<>(); private final StreamPlan streamPlan; + private final boolean useStrictConsistency; + private final IEndpointSnitch snitch; + private final StreamStateStore stateStore; /** * A filter applied to sources to stream from when constructing a fetch map. @@ -104,18 +111,23 @@ public class RangeStreamer } } - public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, String description) + public RangeStreamer(TokenMetadata metadata, + Collection<Token> tokens, + InetAddress address, + String description, + boolean useStrictConsistency, + IEndpointSnitch snitch, + StreamStateStore stateStore) { this.metadata = metadata; this.tokens = tokens; this.address = address; this.description = description; this.streamPlan = new StreamPlan(description, true); - } - - public RangeStreamer(TokenMetadata metadata, InetAddress address, String description) - { - this(metadata, null, address, description); + this.useStrictConsistency = useStrictConsistency; + this.snitch = snitch; + this.stateStore = stateStore; + streamPlan.listeners(this.stateStore); } public void addSourceFilter(ISourceFilter filter) @@ -123,6 +135,12 @@ public class RangeStreamer sourceFilters.add(filter); } + /** + * Add ranges to be streamed for given keyspace. + * + * @param keyspaceName keyspace name + * @param ranges ranges to be streamed + */ public void addRanges(String keyspaceName, Collection<Range<Token>> ranges) { Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictSourcesForRanges(keyspaceName) @@ -145,11 +163,14 @@ public class RangeStreamer } } + /** + * @param keyspaceName keyspace name to check + * @return true when the node is bootstrapping, useStrictConsistency is true and # of nodes in the cluster is more than # of replica + */ private boolean useStrictSourcesForRanges(String keyspaceName) { AbstractReplicationStrategy strat = Keyspace.open(keyspaceName).getReplicationStrategy(); - return !DatabaseDescriptor.isReplacing() - && useStrictConsistency + return useStrictConsistency && tokens != null && metadata.getAllEndpoints().size() != strat.getReplicationFactor(); } @@ -157,6 +178,8 @@ public class RangeStreamer /** * Get a map of all ranges and their respective sources that are candidates for streaming the given ranges * to us. For each range, the list of sources is sorted by proximity relative to the given destAddress. + * + * @throws java.lang.IllegalStateException when there is no source to get data streamed */ private Multimap<Range<Token>, InetAddress> getAllRangesWithSourcesFor(String keyspaceName, Collection<Range<Token>> desiredRanges) { @@ -170,7 +193,7 @@ public class RangeStreamer { if (range.contains(desiredRange)) { - List<InetAddress> preferred = DatabaseDescriptor.getEndpointSnitch().getSortedListByProximity(address, rangeAddresses.get(range)); + List<InetAddress> preferred = snitch.getSortedListByProximity(address, rangeAddresses.get(range)); rangeSources.putAll(desiredRange, preferred); break; } @@ -187,22 +210,23 @@ public class RangeStreamer * Get a map of all ranges and the source that will be cleaned up once this bootstrapped node is added for the given ranges. * For each range, the list should only contain a single source. This allows us to consistently migrate data without violating * consistency. + * + * @throws java.lang.IllegalStateException when there is no source to get data streamed, or more than 1 source found. */ - private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String table, Collection<Range<Token>> desiredRanges) + private Multimap<Range<Token>, InetAddress> getAllRangesWithStrictSourcesFor(String keyspace, Collection<Range<Token>> desiredRanges) { - assert tokens != null; - AbstractReplicationStrategy strat = Keyspace.open(table).getReplicationStrategy(); + AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy(); - //Active ranges + // Active ranges TokenMetadata metadataClone = metadata.cloneOnlyTokenMap(); - Multimap<Range<Token>,InetAddress> addressRanges = strat.getRangeAddresses(metadataClone); + Multimap<Range<Token>, InetAddress> addressRanges = strat.getRangeAddresses(metadataClone); - //Pending ranges + // Pending ranges metadataClone.updateNormalTokens(tokens, address); - Multimap<Range<Token>,InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone); + Multimap<Range<Token>, InetAddress> pendingRangeAddresses = strat.getRangeAddresses(metadataClone); - //Collects the source that will have its range moved to the new node + // Collects the source that will have its range moved to the new node Multimap<Range<Token>, InetAddress> rangeSources = ArrayListMultimap.create(); for (Range<Token> desiredRange : desiredRanges) @@ -214,8 +238,8 @@ public class RangeStreamer Set<InetAddress> oldEndpoints = Sets.newHashSet(preEntry.getValue()); Set<InetAddress> newEndpoints = Sets.newHashSet(pendingRangeAddresses.get(desiredRange)); - //Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. - //So we need to be careful to only be strict when endpoints == RF + // Due to CASSANDRA-5953 we can have a higher RF then we have endpoints. + // So we need to be careful to only be strict when endpoints == RF if (oldEndpoints.size() == strat.getReplicationFactor()) { oldEndpoints.removeAll(newEndpoints); @@ -226,7 +250,7 @@ public class RangeStreamer } } - //Validate + // Validate Collection<InetAddress> addressList = rangeSources.get(desiredRange); if (addressList == null || addressList.isEmpty()) throw new IllegalStateException("No sources found for " + desiredRange); @@ -237,7 +261,8 @@ public class RangeStreamer InetAddress sourceIp = addressList.iterator().next(); EndpointState sourceState = Gossiper.instance.getEndpointStateForEndpoint(sourceIp); if (Gossiper.instance.isEnabled() && (sourceState == null || !sourceState.isAlive())) - throw new RuntimeException("A node required to move the data consistently is down ("+sourceIp+"). If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false"); + throw new RuntimeException("A node required to move the data consistently is down (" + sourceIp + "). " + + "If you wish to move the data from a potentially inconsistent replica, restart the node with -Dcassandra.consistent.rangemovement=false"); } return rangeSources; @@ -247,7 +272,8 @@ public class RangeStreamer * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value) * @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given * here, we always exclude ourselves. - * @return + * @param keyspace keyspace name + * @return Map of source endpoint to collection of ranges */ private static Multimap<InetAddress, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources, Collection<ISourceFilter> sourceFilters, String keyspace) @@ -285,12 +311,13 @@ public class RangeStreamer return rangeFetchMapMap; } - public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String keyspace) + public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String keyspace, IFailureDetector fd) { - return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(FailureDetector.instance)), keyspace); + return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), keyspace); } // For testing purposes + @VisibleForTesting Multimap<String, Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch() { return toFetch; @@ -304,9 +331,17 @@ public class RangeStreamer InetAddress source = entry.getValue().getKey(); InetAddress preferred = SystemKeyspace.getPreferredIP(source); Collection<Range<Token>> ranges = entry.getValue().getValue(); - /* Send messages to respective folks to stream data over to me */ + + // filter out already streamed ranges + Set<Range<Token>> availableRanges = stateStore.getAvailableRanges(keyspace, StorageService.getPartitioner()); + if (ranges.removeAll(availableRanges)) + { + logger.info(availableRanges + " already available. Skipping streaming."); + } + if (logger.isDebugEnabled()) logger.debug("{}ing from {} ranges {}", description, source, StringUtils.join(ranges, ", ")); + /* Send messages to respective folks to stream data over to me */ streamPlan.requestRanges(source, preferred, keyspace, ranges); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/dht/StreamStateStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/StreamStateStore.java b/src/java/org/apache/cassandra/dht/StreamStateStore.java new file mode 100644 index 0000000..f6046aa --- /dev/null +++ b/src/java/org/apache/cassandra/dht/StreamStateStore.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.dht; + +import java.util.Set; + +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamRequest; +import org.apache.cassandra.streaming.StreamState; + +/** + * Store and update available ranges (data already received) to system keyspace. + */ +public class StreamStateStore implements StreamEventHandler +{ + public Set<Range<Token>> getAvailableRanges(String keyspace, IPartitioner partitioner) + { + return SystemKeyspace.getAvailableRanges(keyspace, partitioner); + } + + /** + * Check if given token's data is available in this node. + * + * @param keyspace keyspace name + * @param token token to check + * @return true if given token in the keyspace is already streamed and ready to be served. + */ + public boolean isDataAvailable(String keyspace, Token token) + { + Set<Range<Token>> availableRanges = getAvailableRanges(keyspace, token.getPartitioner()); + for (Range<Token> range : availableRanges) + { + if (range.contains(token)) + return true; + } + return false; + } + + /** + * When StreamSession completes, make all keyspaces/ranges in session available to be served. + * + * @param event Stream event. + */ + @Override + public void handleStreamEvent(StreamEvent event) + { + if (event.eventType == StreamEvent.Type.STREAM_COMPLETE) + { + StreamEvent.SessionCompleteEvent se = (StreamEvent.SessionCompleteEvent) event; + if (se.success) + { + for (StreamRequest request : se.requests) + { + SystemKeyspace.updateAvailableRanges(request.keyspace, request.ranges); + } + } + } + } + + @Override + public void onSuccess(StreamState streamState) {} + + @Override + public void onFailure(Throwable throwable) {} +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 3e8ad9d..6a39945 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -162,6 +162,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE private Collection<Token> bootstrapTokens = null; + // true when keeping strict consistency while bootstrapping + private boolean useStrictConsistency = Boolean.parseBoolean(System.getProperty("cassandra.consistent.rangemovement", "true")); + private boolean replacing; + + private final StreamStateStore streamStateStore = new StreamStateStore(); + public void finishBootstrapping() { isBootstrapMode = false; @@ -425,13 +431,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE "Use cassandra.replace_address if you want to replace this node.", FBUtilities.getBroadcastAddress())); } - if (RangeStreamer.useStrictConsistency) + if (useStrictConsistency) { for (Map.Entry<InetAddress, EndpointState> entry : Gossiper.instance.getEndpointStates()) { - - if (entry.getValue().getApplicationState(ApplicationState.STATUS) == null) - continue; + // ignore local node or empty status + if (entry.getKey().equals(FBUtilities.getBroadcastAddress()) || entry.getValue().getApplicationState(ApplicationState.STATUS) == null) + continue; String[] pieces = entry.getValue().getApplicationState(ApplicationState.STATUS).value.split(VersionedValue.DELIMITER_STR, -1); assert (pieces.length > 0); String state = pieces[0]; @@ -556,6 +562,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE }, "StorageServiceShutdownHook"); Runtime.getRuntime().addShutdownHook(drainOnShutdown); + replacing = DatabaseDescriptor.isReplacing(); + prepareToJoin(); // Has to be called after the host id has potentially changed in prepareToJoin(). @@ -603,11 +611,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { Map<ApplicationState, VersionedValue> appStates = new HashMap<>(); - if (DatabaseDescriptor.isReplacing() && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))) + if (replacing && !(Boolean.parseBoolean(System.getProperty("cassandra.join_ring", "true")))) throw new ConfigurationException("Cannot set both join_ring=false and attempt to replace a node"); if (DatabaseDescriptor.getReplaceTokens().size() > 0 || DatabaseDescriptor.getReplaceNode() != null) throw new RuntimeException("Replace method removed; use cassandra.replace_address instead"); - if (DatabaseDescriptor.isReplacing()) + if (replacing) { if (SystemKeyspace.bootstrapComplete()) throw new RuntimeException("Cannot replace address with a node that is already bootstrapped"); @@ -712,7 +720,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE )) throw new UnsupportedOperationException("Other bootstrapping/leaving/moving nodes detected, cannot bootstrap while cassandra.consistent.rangemovement is true"); - if (!DatabaseDescriptor.isReplacing()) + if (!replacing) { if (tokenMetadata.isMember(FBUtilities.getBroadcastAddress())) { @@ -925,7 +933,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { logger.info("rebuild from dc: {}", sourceDc == null ? "(any dc)" : sourceDc); - RangeStreamer streamer = new RangeStreamer(tokenMetadata, FBUtilities.getBroadcastAddress(), "Rebuild"); + RangeStreamer streamer = new RangeStreamer(tokenMetadata, + null, + FBUtilities.getBroadcastAddress(), + "Rebuild", + !replacing && useStrictConsistency, + DatabaseDescriptor.getEndpointSnitch(), + streamStateStore); streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); if (sourceDc != null) streamer.addSourceFilter(new RangeStreamer.SingleDatacenterFilter(DatabaseDescriptor.getEndpointSnitch(), sourceDc)); @@ -999,10 +1013,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { isBootstrapMode = true; SystemKeyspace.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping - if (!DatabaseDescriptor.isReplacing()) + if (!replacing) { // if not an existing token then bootstrap - List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<Pair<ApplicationState, VersionedValue>>(); + List<Pair<ApplicationState, VersionedValue>> states = new ArrayList<>(); states.add(Pair.create(ApplicationState.TOKENS, valueFactory.tokens(tokens))); states.add(Pair.create(ApplicationState.STATUS, valueFactory.bootstrapping(tokens))); Gossiper.instance.addLocalApplicationStates(states); @@ -1017,8 +1031,15 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } if (!Gossiper.instance.seenAnySeed()) throw new IllegalStateException("Unable to contact any seeds!"); + + if (Boolean.getBoolean("cassandra.reset_bootstrap_progress")) + { + logger.info("Resetting bootstrap progress to start fresh"); + SystemKeyspace.resetAvailableRanges(); + } + setMode(Mode.JOINING, "Starting to bootstrap...", true); - new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata).bootstrap(); // handles token update + new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata).bootstrap(streamStateStore, !replacing && useStrictConsistency); // handles token update logger.info("Bootstrap completed! for the tokens {}", tokens); } @@ -1544,7 +1565,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { UUID hostId = Gossiper.instance.getHostId(endpoint); InetAddress existing = tokenMetadata.getEndpointForHostId(hostId); - if (DatabaseDescriptor.isReplacing() && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) + if (replacing && Gossiper.instance.getEndpointStateForEndpoint(DatabaseDescriptor.getReplaceAddress()) != null && (hostId.equals(Gossiper.instance.getHostId(DatabaseDescriptor.getReplaceAddress())))) logger.warn("Not updating token metadata for {} because I am replacing it", endpoint); else { @@ -1624,7 +1645,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE for (InetAddress ep : endpointsToRemove) { removeEndpoint(ep); - if (DatabaseDescriptor.isReplacing() && DatabaseDescriptor.getReplaceAddress().equals(ep)) + if (replacing && DatabaseDescriptor.getReplaceAddress().equals(ep)) Gossiper.instance.replacementQuarantine(ep); // quarantine locally longer than normally; see CASSANDRA-8260 } if (!tokensToUpdateInSystemKeyspace.isEmpty()) @@ -3208,7 +3229,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { List<InetAddress> endpoints = null; - if (RangeStreamer.useStrictConsistency) + if (useStrictConsistency) { Set<InetAddress> oldEndpoints = Sets.newHashSet(rangeAddresses.get(range)); Set<InetAddress> newEndpoints = Sets.newHashSet(strategy.calculateNaturalEndpoints(toFetch.right, tokenMetaCloneAllSettled)); @@ -3242,7 +3263,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE if (addressList == null || addressList.isEmpty()) continue; - if (RangeStreamer.useStrictConsistency) + if (useStrictConsistency) { if (addressList.size() > 1) throw new IllegalStateException("Multiple strict sources found for " + toFetch); @@ -3277,7 +3298,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } // stream requests - Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace); + Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace, FailureDetector.instance); for (InetAddress address : workMap.keySet()) { logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address); http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/streaming/StreamEvent.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamEvent.java b/src/java/org/apache/cassandra/streaming/StreamEvent.java index e3cdce5..de3db9c 100644 --- a/src/java/org/apache/cassandra/streaming/StreamEvent.java +++ b/src/java/org/apache/cassandra/streaming/StreamEvent.java @@ -18,8 +18,11 @@ package org.apache.cassandra.streaming; import java.net.InetAddress; +import java.util.Set; import java.util.UUID; +import com.google.common.collect.ImmutableSet; + public abstract class StreamEvent { public static enum Type @@ -43,6 +46,7 @@ public abstract class StreamEvent public final InetAddress peer; public final boolean success; public final int sessionIndex; + public final Set<StreamRequest> requests; public SessionCompleteEvent(StreamSession session) { @@ -50,6 +54,7 @@ public abstract class StreamEvent this.peer = session.peer; this.success = session.isSuccess(); this.sessionIndex = session.sessionIndex(); + this.requests = ImmutableSet.copyOf(session.requests); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/src/java/org/apache/cassandra/streaming/StreamSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index 1b529ed..5a056c4 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -45,8 +45,6 @@ import org.apache.cassandra.streaming.messages.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.JVMStabilityInspector; import org.apache.cassandra.utils.Pair; -import org.apache.cassandra.utils.concurrent.RefCounted; - import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.concurrent.Refs; @@ -132,7 +130,7 @@ public class StreamSession implements IEndpointStateChangeSubscriber private StreamResultFuture streamResult; // stream requests to send to the peer - private final Set<StreamRequest> requests = Sets.newConcurrentHashSet(); + protected final Set<StreamRequest> requests = Sets.newConcurrentHashSet(); // streaming tasks are created and managed per ColumnFamily ID private final Map<UUID, StreamTransferTask> transfers = new ConcurrentHashMap<>(); // data receivers, filled after receiving prepare message http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java index b66a0bd..25bb584 100644 --- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java @@ -1,6 +1,4 @@ -package org.apache.cassandra.db; /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -9,25 +7,19 @@ package org.apache.cassandra.db; * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ - +package org.apache.cassandra.db; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Collection; -import java.util.List; -import java.util.UUID; +import java.util.*; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 5d1d8c6..ababd99 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -31,6 +31,7 @@ import org.junit.runner.RunWith; import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.exceptions.ConfigurationException; @@ -75,7 +76,7 @@ public class BootStrapperTest TokenMetadata tmd = ss.getTokenMetadata(); assertEquals(numOldNodes, tmd.sortedTokens().size()); - RangeStreamer s = new RangeStreamer(tmd, myEndpoint, "Bootstrap"); + RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, "Bootstrap", true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore()); IFailureDetector mockFailureDetector = new IFailureDetector() { public boolean isAlive(InetAddress ep) @@ -96,7 +97,7 @@ public class BootStrapperTest Collection<Map.Entry<InetAddress, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName); // Check we get get RF new ranges in total - Set<Range<Token>> ranges = new HashSet<Range<Token>>(); + Set<Range<Token>> ranges = new HashSet<>(); for (Map.Entry<InetAddress, Collection<Range<Token>>> e : toFetch) ranges.addAll(e.getValue()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/690fbf3b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java new file mode 100644 index 0000000..c8b9f05 --- /dev/null +++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.dht; + +import java.net.InetAddress; +import java.util.Collections; + +import org.junit.Test; + +import org.apache.cassandra.streaming.DefaultConnectionFactory; +import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class StreamStateStoreTest +{ + + @Test + public void testUpdateAndQueryAvailableRanges() + { + // let range (0, 100] of keyspace1 be bootstrapped. + IPartitioner p = new Murmur3Partitioner(); + Token.TokenFactory factory = p.getTokenFactory(); + Range<Token> range = new Range<>(factory.fromString("0"), factory.fromString("100")); + + InetAddress local = FBUtilities.getBroadcastAddress(); + StreamSession session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true); + session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf"), 0); + + StreamStateStore store = new StreamStateStore(); + // session complete event that is not completed makes data not available for keyspace/ranges + store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); + assertFalse(store.isDataAvailable("keyspace1", factory.fromString("50"))); + + // successfully completed session adds available keyspace/ranges + session.state(StreamSession.State.COMPLETE); + store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); + // check if token in range (0, 100] appears available. + assertTrue(store.isDataAvailable("keyspace1", factory.fromString("50"))); + // check if token out of range returns false + assertFalse(store.isDataAvailable("keyspace1", factory.fromString("0"))); + assertFalse(store.isDataAvailable("keyspace1", factory.fromString("101"))); + // check if different keyspace returns false + assertFalse(store.isDataAvailable("keyspace2", factory.fromString("50"))); + + // add different range within the same keyspace + Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200")); + session = new StreamSession(local, local, new DefaultConnectionFactory(), 0, true); + session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf"), 0); + session.state(StreamSession.State.COMPLETE); + store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); + + // newly added range should be available + assertTrue(store.isDataAvailable("keyspace1", factory.fromString("101"))); + // as well as the old one + assertTrue(store.isDataAvailable("keyspace1", factory.fromString("50"))); + } +} \ No newline at end of file
