Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9751eb5c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9751eb5c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9751eb5c Branch: refs/heads/cassandra-3.0 Commit: 9751eb5c320c879b33d7d90e91591a0da23cce9c Parents: 882df8a 1538c09 Author: Yuki Morishita <yu...@apache.org> Authored: Wed Dec 2 08:49:32 2015 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Dec 2 08:49:32 2015 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/repair/AnticompactionTask.java | 23 ++++++++++++++------ .../cassandra/service/ActiveRepairService.java | 21 +++++++++++++----- 3 files changed, 32 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9751eb5c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index bca5fb0,9c5e2d5..787d145 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,23 -1,5 +1,24 @@@ -2.1.12 +2.2.4 + * Show CQL help in cqlsh in web browser (CASSANDRA-7225) + * Serialize on disk the proper SSTable compression ratio (CASSANDRA-10775) + * Reject index queries while the index is building (CASSANDRA-8505) + * CQL.textile syntax incorrectly includes optional keyspace for aggregate SFUNC and FINALFUNC (CASSANDRA-10747) + * Fix JSON update with prepared statements (CASSANDRA-10631) + * Don't do anticompaction after subrange repair (CASSANDRA-10422) + * Fix SimpleDateType type compatibility (CASSANDRA-10027) + * (Hadoop) fix splits calculation (CASSANDRA-10640) + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058) + * (cqlsh) show partial trace if incomplete after max_trace_wait (CASSANDRA-7645) + * Use most up-to-date version of schema for system tables (CASSANDRA-10652) + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628) + * Expose phi values from failure detector via JMX and tweak debug + and trace logging (CASSANDRA-9526) + * Fix RangeNamesQueryPager (CASSANDRA-10509) + * Deprecate Pig support (CASSANDRA-10542) + * Reduce contention getting instances of CompositeType (CASSANDRA-10433) + * Fix IllegalArgumentException in DataOutputBuffer.reallocate for large buffers (CASSANDRA-10592) +Merged from 2.1: + * Fix incremental repair hang when replica is down (CASSANDRA-10288) * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791) * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768) * Add proper error handling to stream receiver (CASSANDRA-10774) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9751eb5c/src/java/org/apache/cassandra/repair/AnticompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/AnticompactionTask.java index 16de071,8b68fd3..8ecae23 --- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java +++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java @@@ -17,16 -17,15 +17,18 @@@ */ package org.apache.cassandra.repair; + import java.io.IOException; import java.net.InetAddress; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.AbstractFuture; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@@ -54,11 -54,28 +56,20 @@@ public class AnticompactionTask extend public void run() { - AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges); - CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor); - if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0) + if (FailureDetector.instance.isAlive(neighbor)) { - MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); - AnticompactionRequest acr = new AnticompactionRequest(parentSession); - SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor); ++ AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges); ++ CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor); + if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0) + { - if (doAnticompaction) - { - MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); - } - else - { - // we need to clean up parent session - MessagingService.instance().sendRR(new CleanupMessage(parentSession).createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); - } ++ MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); + } + else + { + MessagingService.instance().sendOneWay(acr.createMessage(), neighbor); + // immediately return after sending request + set(neighbor); + } } else { http://git-wip-us.apache.org/repos/asf/cassandra/blob/9751eb5c/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index 0cb4252,dd80d4c..61f4196 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -267,12 -279,18 +267,21 @@@ public class ActiveRepairServic for (InetAddress neighbour : endpoints) { - CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour); - boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0; - logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion); - PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal); - MessageOut<RepairMessage> msg = message.createMessage(); - MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true); + if (FailureDetector.instance.isAlive(neighbour)) + { - PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges); ++ CassandraVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbour); ++ boolean isGlobal = options.isGlobal() && peerVersion != null && peerVersion.compareTo(SUPPORTS_GLOBAL_PREPARE_FLAG_VERSION) >= 0; ++ logger.debug("Sending prepare message: options.isGlobal = {}, peerVersion = {}", options.isGlobal(), peerVersion); ++ PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, options.getRanges(), options.isIncremental(), isGlobal); + MessageOut<RepairMessage> msg = message.createMessage(); + MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true); + } + else + { + status.set(false); + failedNodes.add(neighbour.getHostAddress()); + prepareLatch.countDown(); + } } try {