Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9751eb5c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9751eb5c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9751eb5c

Branch: refs/heads/cassandra-3.0
Commit: 9751eb5c320c879b33d7d90e91591a0da23cce9c
Parents: 882df8a 1538c09
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Dec 2 08:49:32 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Dec 2 08:49:32 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 23 ++++++++++++++------
 .../cassandra/service/ActiveRepairService.java  | 21 +++++++++++++-----
 3 files changed, 32 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9751eb5c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bca5fb0,9c5e2d5..787d145
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,23 -1,5 +1,24 @@@
 -2.1.12
 +2.2.4
 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225)
 + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775)
 + * Reject index queries while the index is building (CASSANDRA-8505)
 + * CQL.textile syntax incorrectly includes optional keyspace for aggregate 
SFUNC and FINALFUNC (CASSANDRA-10747)
 + * Fix JSON update with prepared statements (CASSANDRA-10631)
 + * Don't do anticompaction after subrange repair (CASSANDRA-10422)
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait 
(CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large 
buffers (CASSANDRA-10592)
 +Merged from 2.1:
+  * Fix incremental repair hang when replica is down (CASSANDRA-10288)
   * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
   * Optimize the way we check if a token is repaired in anticompaction 
(CASSANDRA-10768)
   * Add proper error handling to stream receiver (CASSANDRA-10774)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9751eb5c/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 16de071,8b68fd3..8ecae23
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@@ -17,16 -17,15 +17,18 @@@
   */
  package org.apache.cassandra.repair;
  
+ import java.io.IOException;
  import java.net.InetAddress;
 +import java.util.Collection;
  import java.util.UUID;
  import java.util.concurrent.TimeUnit;
  
  import com.google.common.util.concurrent.AbstractFuture;
  
  import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
+ import org.apache.cassandra.gms.FailureDetector;
  import org.apache.cassandra.net.IAsyncCallbackWithFailure;
  import org.apache.cassandra.net.MessageIn;
  import org.apache.cassandra.net.MessagingService;
@@@ -54,11 -54,28 +56,20 @@@ public class AnticompactionTask extend
  
      public void run()
      {
-         AnticompactionRequest acr = new AnticompactionRequest(parentSession, 
successfulRanges);
-         CassandraVersion peerVersion = 
SystemKeyspace.getReleaseVersion(neighbor);
-         if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
+         if (FailureDetector.instance.isAlive(neighbor))
          {
-             MessagingService.instance().sendRR(acr.createMessage(), neighbor, 
new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
 -            AnticompactionRequest acr = new 
AnticompactionRequest(parentSession);
 -            SemanticVersion peerVersion = 
SystemKeyspace.getReleaseVersion(neighbor);
++            AnticompactionRequest acr = new 
AnticompactionRequest(parentSession, successfulRanges);
++            CassandraVersion peerVersion = 
SystemKeyspace.getReleaseVersion(neighbor);
+             if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) 
> 0)
+             {
 -                if (doAnticompaction)
 -                {
 -                    MessagingService.instance().sendRR(acr.createMessage(), 
neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
 -                }
 -                else
 -                {
 -                    // we need to clean up parent session
 -                    MessagingService.instance().sendRR(new 
CleanupMessage(parentSession).createMessage(), neighbor, new 
AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
 -                }
++                MessagingService.instance().sendRR(acr.createMessage(), 
neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+             }
+             else
+             {
+                 MessagingService.instance().sendOneWay(acr.createMessage(), 
neighbor);
+                 // immediately return after sending request
+                 set(neighbor);
+             }
          }
          else
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9751eb5c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 0cb4252,dd80d4c..61f4196
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -267,12 -279,18 +267,21 @@@ public class ActiveRepairServic
  
          for (InetAddress neighbour : endpoints)
          {
-             CassandraVersion peerVersion = 
SystemKeyspace.getReleaseVersion(neighbour);
-             boolean isGlobal = options.isGlobal() && peerVersion != null && 
peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
-             logger.debug("Sending prepare message: options.isGlobal = {}, 
peerVersion = {}", options.isGlobal(), peerVersion);
-             PrepareMessage message = new PrepareMessage(parentRepairSession, 
cfIds, options.getRanges(), options.isIncremental(), isGlobal);
-             MessageOut<RepairMessage> msg = message.createMessage();
-             MessagingService.instance().sendRR(msg, neighbour, callback, 
TimeUnit.HOURS.toMillis(1), true);
+             if (FailureDetector.instance.isAlive(neighbour))
+             {
 -                PrepareMessage message = new 
PrepareMessage(parentRepairSession, cfIds, ranges);
++                CassandraVersion peerVersion = 
SystemKeyspace.getReleaseVersion(neighbour);
++                boolean isGlobal = options.isGlobal() && peerVersion != null 
&& peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0;
++                logger.debug("Sending prepare message: options.isGlobal = {}, 
peerVersion = {}", options.isGlobal(), peerVersion);
++                PrepareMessage message = new 
PrepareMessage(parentRepairSession, cfIds, options.getRanges(), 
options.isIncremental(), isGlobal);
+                 MessageOut<RepairMessage> msg = message.createMessage();
+                 MessagingService.instance().sendRR(msg, neighbour, callback, 
TimeUnit.HOURS.toMillis(1), true);
+             }
+             else
+             {
+                 status.set(false);
+                 failedNodes.add(neighbour.getHostAddress());
+                 prepareLatch.countDown();
+             }
          }
          try
          {

Reply via email to