Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/907c8263
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/907c8263
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/907c8263

Branch: refs/heads/trunk
Commit: 907c8263b7c814de31f4b4e17610e56e68b3a4aa
Parents: 148f369 6100eb2
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed May 25 18:49:20 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed May 25 18:49:20 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/dht/BootStrapper.java  |  1 +
 .../org/apache/cassandra/dht/RangeStreamer.java | 48 +++++++++++++++-----
 .../cassandra/service/StorageService.java       |  2 +-
 4 files changed, 40 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index acdf2e9,d914420..d6750ab
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,24 -1,5 +1,25 @@@
 -2.1.15
 +2.2.7
 + * Enable client encryption in sstableloader with cli options 
(CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i 
(CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction 
(CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during 
ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches 
and
 +   report errors correctly if workers processes crash on initialization 
(CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
+  * Do not consider local node a valid source during replace (CASSANDRA-11848)
   * Avoid holding SSTableReaders for duration of incremental repair 
(CASSANDRA-11739)
   * Add message dropped tasks to nodetool netstats (CASSANDRA-11855)
   * Don't compute expensive MaxPurgeableTimestamp until we've verified there's 
an 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/BootStrapper.java
index a6b1ad7,dfefbe9..26fa6b3
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@@ -60,21 -55,17 +60,22 @@@ public class BootStrapper extends Progr
  
          this.address = address;
          this.tokens = tokens;
 -        tokenMetadata = tmd;
 +        this.tokenMetadata = tmd;
      }
  
 -    public void bootstrap()
 +    public ListenableFuture<StreamState> bootstrap(StreamStateStore 
stateStore, boolean useStrictConsistency)
      {
 -        if (logger.isDebugEnabled())
 -            logger.debug("Beginning bootstrap process");
 -
 -        RangeStreamer streamer = new RangeStreamer(tokenMetadata, tokens, 
address, "Bootstrap");
 +        logger.trace("Beginning bootstrap process");
 +
 +        RangeStreamer streamer = new RangeStreamer(tokenMetadata,
 +                                                   tokens,
 +                                                   address,
 +                                                   "Bootstrap",
 +                                                   useStrictConsistency,
 +                                                   
DatabaseDescriptor.getEndpointSnitch(),
 +                                                   stateStore);
          streamer.addSourceFilter(new 
RangeStreamer.FailureDetectorSourceFilter(FailureDetector.instance));
+         streamer.addSourceFilter(new RangeStreamer.ExcludeLocalNodeFilter());
  
          for (String keyspaceName : Schema.instance.getNonSystemKeyspaces())
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/src/java/org/apache/cassandra/dht/RangeStreamer.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/dht/RangeStreamer.java
index 8f2dc12,121a351..aef588e
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@@ -111,13 -105,18 +111,24 @@@ public class RangeStreame
          }
      }
  
+     /**
+      * Source filter which excludes the current node from source calculations
+      */
+     public static class ExcludeLocalNodeFilter implements ISourceFilter
+     {
+         public boolean shouldInclude(InetAddress endpoint)
+         {
+             return !FBUtilities.getBroadcastAddress().equals(endpoint);
+         }
+     }
+ 
 -    public RangeStreamer(TokenMetadata metadata, Collection<Token> tokens, 
InetAddress address, String description)
 +    public RangeStreamer(TokenMetadata metadata,
 +                         Collection<Token> tokens,
 +                         InetAddress address,
 +                         String description,
 +                         boolean useStrictConsistency,
 +                         IEndpointSnitch snitch,
 +                         StreamStateStore stateStore)
      {
          this.metadata = metadata;
          this.tokens = tokens;
@@@ -146,18 -144,18 +157,18 @@@
          Multimap<Range<Token>, InetAddress> rangesForKeyspace = 
useStrictSourcesForRanges(keyspaceName)
                  ? getAllRangesWithStrictSourcesFor(keyspaceName, ranges) : 
getAllRangesWithSourcesFor(keyspaceName, ranges);
  
 -        if (logger.isDebugEnabled())
 +        if (logger.isTraceEnabled())
          {
              for (Map.Entry<Range<Token>, InetAddress> entry : 
rangesForKeyspace.entries())
 -                logger.debug(String.format("%s: range %s exists on %s", 
description, entry.getKey(), entry.getValue()));
 +                logger.trace(String.format("%s: range %s exists on %s", 
description, entry.getKey(), entry.getValue()));
          }
  
--        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : 
getRangeFetchMap(rangesForKeyspace, sourceFilters, 
keyspaceName).asMap().entrySet())
++        for (Map.Entry<InetAddress, Collection<Range<Token>>> entry : 
getRangeFetchMap(rangesForKeyspace, sourceFilters, keyspaceName, 
useStrictConsistency).asMap().entrySet())
          {
 -            if (logger.isDebugEnabled())
 +            if (logger.isTraceEnabled())
              {
                  for (Range<Token> r : entry.getValue())
 -                    logger.debug(String.format("%s: range %s from source %s 
for keyspace %s", description, r, entry.getKey(), keyspaceName));
 +                    logger.trace(String.format("%s: range %s from source %s 
for keyspace %s", description, r, entry.getKey(), keyspaceName));
              }
              toFetch.put(keyspaceName, entry);
          }
@@@ -272,11 -265,10 +283,12 @@@
       * @param rangesWithSources The ranges we want to fetch (key) and their 
potential sources (value)
       * @param sourceFilters A (possibly empty) collection of source filters 
to apply. In addition to any filters given
       *                      here, we always exclude ourselves.
 -     * @return
 +     * @param keyspace keyspace name
 +     * @return Map of source endpoint to collection of ranges
       */
      private static Multimap<InetAddress, Range<Token>> 
getRangeFetchMap(Multimap<Range<Token>, InetAddress> rangesWithSources,
--                                                                        
Collection<ISourceFilter> sourceFilters, String keyspace)
++                                                                        
Collection<ISourceFilter> sourceFilters, String keyspace,
++                                                                        
boolean useStrictConsistency)
      {
          Multimap<InetAddress, Range<Token>> rangeFetchMapMap = 
HashMultimap.create();
          for (Range<Token> range : rangesWithSources.keySet())
@@@ -305,15 -297,28 +317,29 @@@
              }
  
              if (!foundSource)
-                 throw new IllegalStateException("unable to find sufficient 
sources for streaming range " + range + " in keyspace " + keyspace);
+             {
+                 AbstractReplicationStrategy strat = 
Keyspace.open(keyspace).getReplicationStrategy();
+                 if (strat != null && strat.getReplicationFactor() == 1)
+                 {
 -                    if (isNotReplacingAndUsesStrictConsistency())
++                    if (useStrictConsistency)
+                         throw new IllegalStateException("Unable to find 
sufficient sources for streaming range " + range + " in keyspace " + keyspace + 
" with RF=1." +
+                                                         "If you want to 
ignore this, consider using system property 
-Dcassandra.consistent.rangemovement=false.");
+                     else
+                         logger.warn("Unable to find sufficient sources for 
streaming range " + range + " in keyspace " + keyspace + " with RF=1. " +
+                                     "Keyspace might be missing data.");
+                 }
+                 else
+                     throw new IllegalStateException("Unable to find 
sufficient sources for streaming range " + range + " in keyspace " + keyspace);
+             }
          }
  
          return rangeFetchMapMap;
      }
  
-     public static Multimap<InetAddress, Range<Token>> 
getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String 
keyspace, IFailureDetector fd)
 -    public static Multimap<InetAddress, Range<Token>> 
getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String 
keyspace)
++    public static Multimap<InetAddress, Range<Token>> 
getWorkMap(Multimap<Range<Token>, InetAddress> rangesWithSourceTarget, String 
keyspace,
++                                                                 
IFailureDetector fd, boolean useStrictConsistency)
      {
-         return getRangeFetchMap(rangesWithSourceTarget, 
Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), 
keyspace);
 -        return getRangeFetchMap(rangesWithSourceTarget, 
Collections.<ISourceFilter>singleton(new 
FailureDetectorSourceFilter(FailureDetector.instance)), keyspace);
++        return getRangeFetchMap(rangesWithSourceTarget, 
Collections.<ISourceFilter>singleton(new FailureDetectorSourceFilter(fd)), 
keyspace, useStrictConsistency);
      }
  
      // For testing purposes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/907c8263/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 82d7c8f,507aedb..83639e0
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -3753,7 -3714,7 +3753,7 @@@ public class StorageService extends Not
                      }
  
                      // stream requests
-                     Multimap<InetAddress, Range<Token>> workMap = 
RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace, 
FailureDetector.instance);
 -                    Multimap<InetAddress, Range<Token>> workMap = 
RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace);
++                    Multimap<InetAddress, Range<Token>> workMap = 
RangeStreamer.getWorkMap(rangesToFetchWithPreferredEndpoints, keyspace, 
FailureDetector.instance, useStrictConsistency);
                      for (InetAddress address : workMap.keySet())
                      {
                          logger.debug("Will request range {} of keyspace {} 
from endpoint {}", workMap.get(address), keyspace, address);

Reply via email to