Merge branch 'cassandra-3.0' into cassandra-3.X
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a1eef56c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a1eef56c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a1eef56c Branch: refs/heads/cassandra-3.X Commit: a1eef56cc021772619eeb4a048cb785078547515 Parents: 0cb3128 14f36fc Author: Yuki Morishita <[email protected]> Authored: Wed Nov 16 17:24:29 2016 -0600 Committer: Yuki Morishita <[email protected]> Committed: Wed Nov 16 17:24:29 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++-- .../apache/cassandra/repair/RepairSession.java | 13 ---- .../cassandra/service/ActiveRepairService.java | 30 ++++++-- 4 files changed, 99 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/AnticompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/AnticompactionTask.java index 16a0a12,c5e066d..bc09b38 --- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java +++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java @@@ -28,8 -33,12 +33,13 @@@ import org.apache.cassandra.config.Data import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RequestFailureReason; + import org.apache.cassandra.gms.ApplicationState; + import org.apache.cassandra.gms.EndpointState; import org.apache.cassandra.gms.FailureDetector; + import org.apache.cassandra.gms.IEndpointStateChangeSubscriber; + import org.apache.cassandra.gms.IFailureDetectionEventListener; + import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; @@@ -100,9 -132,42 +133,42 @@@ public class AnticompactionTask extend return false; } - public void onFailure(InetAddress from) + public void onFailure(InetAddress from, RequestFailureReason failureReason) { - task.setException(new RuntimeException("Anticompaction failed or timed out in " + from)); + maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from)); + } + } + + public void onJoin(InetAddress endpoint, EndpointState epState) {} + public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {} + public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {} + public void onAlive(InetAddress endpoint, EndpointState state) {} + public void onDead(InetAddress endpoint, EndpointState state) {} + + public void onRemove(InetAddress endpoint) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void onRestart(InetAddress endpoint, EndpointState epState) + { + convict(endpoint, Double.MAX_VALUE); + } + + public void convict(InetAddress endpoint, double phi) + { + if (!neighbor.equals(endpoint)) + return; + + // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost. + if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold()) + return; + + Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint)); + if (maybeSetException(exception)) + { + // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice + logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairSession.java index 528115a,5fe306d..00340a1 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@@ -141,8 -134,6 +137,7 @@@ public class RepairSession extends Abst this.ranges = ranges; this.endpoints = endpoints; this.repairedAt = repairedAt; - this.validationRemaining = new AtomicInteger(cfnames.length); + this.pullRepair = pullRepair; } public UUID getId() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a1eef56c/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index b69c24a,6f7b1a4..aa8ebc8 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -158,11 -156,9 +158,9 @@@ public class ActiveRepairService implem */ public void run() { - failureDetector.unregisterFailureDetectionEventListener(session); - gossiper.unregister(session); sessions.remove(session.getId()); } - }, MoreExecutors.sameThreadExecutor()); + }, MoreExecutors.directExecutor()); session.start(executor); return session; }
