Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/907c8263 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/907c8263 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/907c8263 Branch: refs/heads/trunk Commit: 907c8263b7c814de31f4b4e17610e56e68b3a4aa Parents: 148f369 6100eb2 Author: Yuki Morishita <yu...@apache.org> Authored: Wed May 25 18:49:20 2016 -0500 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed May 25 18:49:20 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/dht/BootStrapper.java | 1 + .../org/apache/cassandra/dht/RangeStreamer.java | 48 +++++++++++++++----- .../cassandra/service/StorageService.java | 2 +- 4 files changed, 40 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index acdf2e9,d914420..d6750ab --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,24 -1,5 +1,25 @@@ -2.1.15 +2.2.7 + * Enable client encryption in sstableloader with cli options (CASSANDRA-11708) + * Possible memory leak in NIODataInputStream (CASSANDRA-11867) + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669) + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753) + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395) + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626) + * Exit JVM if JMX server fails to startup (CASSANDRA-11540) + * Produce a heap dump when exiting on OOM (CASSANDRA-9861) + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427) + * Restore ability to filter on clustering columns when using a 2i (CASSANDRA-11510) + * JSON datetime formatting needs timezone (CASSANDRA-11137) + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502) + * Remove unnescessary file existence check during anticompaction (CASSANDRA-11660) + * Add missing files to debian packages (CASSANDRA-11642) + * Avoid calling Iterables::concat in loops during ModificationStatement::getFunctions (CASSANDRA-11621) + * cqlsh: COPY FROM should use regular inserts for single statement batches and + report errors correctly if workers processes crash on initialization (CASSANDRA-11474) + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553) + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988) +Merged from 2.1: + * Do not consider local node a valid source during replace (CASSANDRA-11848) * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739) * Add message dropped tasks to nodetool netstats (CASSANDRA-11855) * Don't compute expensive MaxPurgeableTimestamp until we've verified there's an http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/src/java/org/apache/cassandra/dht/BootStrapper.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/dht/BootStrapper.java index a6b1ad7,dfefbe9..26fa6b3 --- a/src/java/org/apache/cassandra/dht/BootStrapper.java +++ b/src/java/org/apache/cassandra/dht/BootStrapper.java @@@ -60,21 -55,17 +60,22 @@@ public class BootStrapper extends Progr this.address = address; this.tokens = tokens; - tokenMetadata = tmd; + this.tokenMetadata = tmd; } - public void bootstrap() + public ListenableFuture<StreamState> bootstrap(StreamStateStore stateStore, boolean useStrictConsistency) { - if (logger.isDebugEnabled()) - logger.debug("Beginning bootstrap process"); - - RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, address, "Bootstrap"); + logger.trace("Beginning bootstrap process"); + + RangeStreamer streamer = new RangeStreamer(tokenMetadata, + tokens, + address, + "Bootstrap", + useStrictConsistency, + DatabaseDescriptor.getEndpointSnitch(), + stateStore); streamer.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance)); + streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter()); for (String keyspaceName : Schema.instance.getNonSystemKeyspaces()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/src/java/org/apache/cassandra/dht/RangeStreamer.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/dht/RangeStreamer.java index 8f2dc12,121a351..aef588e --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@@ -111,13 -105,18 +111,24 @@@ public class RangeStreame } } + /** + * Source filter which excludes the current node from source calculations + */ + public static class ExcludeLocalNodeFilter implements ISourceFilter + { + public boolean shouldInclude(InetAddress endpoint) + { + return !FBUtilities.getBroadcastAddress().equals(endpoint); + } + } + - public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, InetAddress address, String description) + public RangeStreamer(TokenMetadata metadata, + Collection<Token> tokens, + InetAddress address, + String description, + boolean useStrictConsistency, + IEndpointSnitch snitch, + StreamStateStore stateStore) { this.metadata = metadata; this.tokens = tokens; @@@ -146,18 -144,18 +157,18 @@@ Multimap<Range<Token>, InetAddress> rangesForKeyspace = useStrictSourcesForRanges(keyspaceName) ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : getAllRangesWithSourcesFor(keyspaceName, ranges); - if (logger.isDebugEnabled()) + if (logger.isTraceEnabled()) { for (Map.Entry<Range<Token>, InetAddress> entry : rangesForKeyspace.entries()) - logger.debug(String.format("%s: range %s exists on %s", description, entry.getKey(), entry.getValue())); + logger.trace(String.format("%s: range %s exists on %s", description, entry.getKey(), entry.getValue())); } -- for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName).asMap().entrySet()) ++ for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName, useStrictConsistency).asMap().entrySet()) { - if (logger.isDebugEnabled()) + if (logger.isTraceEnabled()) { for (Range<Token> r : entry.getValue()) - logger.debug(String.format("%s: range %s from source %s for keyspace %s", description, r, entry.getKey(), keyspaceName)); + logger.trace(String.format("%s: range %s from source %s for keyspace %s", description, r, entry.getKey(), keyspaceName)); } toFetch.put(keyspaceName, entry); } @@@ -272,11 -265,10 +283,12 @@@ * @param rangesWithSources The ranges we want to fetch (key) and their potential sources (value) * @param sourceFilters A (possibly empty) collection of source filters to apply. In addition to any filters given * here, we always exclude ourselves. - * @return + * @param keyspace keyspace name + * @return Map of source endpoint to collection of ranges */ private static Multimap<InetAddress, Range<Token>> getRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources, -- Collection<ISourceFilter> sourceFilters, String keyspace) ++ Collection<ISourceFilter> sourceFilters, String keyspace, ++ boolean useStrictConsistency) { Multimap<InetAddress, Range<Token>> rangeFetchMapMap = HashMultimap.create(); for (Range<Token> range : rangesWithSources.keySet()) @@@ -305,15 -297,28 +317,29 @@@ } if (!foundSource) - throw new IllegalStateException("unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace); + { + AbstractReplicationStrategy strat = Keyspace.open(keyspace).getReplicationStrategy(); + if (strat != null && strat.getReplicationFactor() == 1) + { - if (isNotReplacingAndUsesStrictConsistency()) ++ if (useStrictConsistency) + throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace + " with RF=1." + + "If you want to ignore this, consider using system property -Dcassandra.consistent.rangemovement=false."); + else + logger.warn("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace + " with RF=1. " + + "Keyspace might be missing data."); + } + else + throw new IllegalStateException("Unable to find sufficient sources for streaming range " + range + " in keyspace " + keyspace); + } } return rangeFetchMapMap; } - public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String keyspace, IFailureDetector fd) - public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String keyspace) ++ public static Multimap<InetAddress, Range<Token>> getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String keyspace, ++ IFailureDetector fd, boolean useStrictConsistency) { - return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), keyspace); - return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(FailureDetector.instance)), keyspace); ++ return getRangeFetchMap(rangesWithSourceTarget, Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), keyspace, useStrictConsistency); } // For testing purposes http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/StorageService.java index 82d7c8f,507aedb..83639e0 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -3753,7 -3714,7 +3753,7 @@@ public class StorageService extends Not } // stream requests - Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace, FailureDetector.instance); - Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace); ++ Multimap<InetAddress, Range<Token>> workMap = RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace, FailureDetector.instance, useStrictConsistency); for (InetAddress address : workMap.keySet()) { logger.debug("Will request range {} of keyspace {} from endpoint {}", workMap.get(address), keyspace, address);