Repository: cassandra Updated Branches: refs/heads/cassandra-2.0 c44526b29 -> 384d4f0e2 refs/heads/cassandra-2.1 f454fecf6 -> 77f6c25e6 refs/heads/cassandra-2.1.0 56db00180 -> 055b89642 refs/heads/trunk e98a40f9e -> ddeb496ee
partial backport 3569 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/384d4f0e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/384d4f0e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/384d4f0e Branch: refs/heads/cassandra-2.0 Commit: 384d4f0e202d5492aaced9311f5ecb302ac7ff00 Parents: c44526b Author: Yuki Morishita <[email protected]> Authored: Mon Aug 18 12:43:00 2014 -0500 Committer: Yuki Morishita <[email protected]> Committed: Mon Aug 18 13:26:51 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 +- .../repair/IRepairJobEventListener.java | 31 ++++++++++++++++++++ .../org/apache/cassandra/repair/RepairJob.java | 15 ++++++++-- .../apache/cassandra/repair/RepairSession.java | 12 ++++++-- 4 files changed, 54 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6b610d7..4b1becc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -48,7 +48,7 @@ * Add inter_dc_stream_throughput_outbound_megabits_per_sec (CASSANDRA-6596) * Add option to disable STCS in L0 (CASSANDRA-6621) * Fix error when doing reversed queries with static columns (CASSANDRA-7490) - * Backport CASSANDRA-6747 (CASSANDRA-7560) + * Backport CASSNADRA-3569/CASSANDRA-6747 (CASSANDRA-7560) * Track max/min timestamps for range tombstones (CASSANDRA-7647) * Fix NPE when listing saved caches dir (CASSANDRA-7632) * Fix sstableloader unable to connect encrypted node (CASSANDRA-7585) http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java b/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java new file mode 100644 index 0000000..778c09d --- /dev/null +++ b/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +/** + * Implemented by the RepairSession to accept callbacks from sequential snapshot creation failure. + */ + +public interface IRepairJobEventListener +{ + /** + * Signal that there was a failure during the snapshot creation process. + * + */ + public void failedSnapshot(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 13fe511..931f95a 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -58,11 +58,20 @@ public class RepairJob /* Count down as sync completes */ private AtomicInteger waitForSync; + private final IRepairJobEventListener listener; + /** * Create repair job to run on specific columnfamily */ - public RepairJob(UUID sessionId, String keyspace, String columnFamily, Range<Token> range, boolean isSequential, ListeningExecutorService taskExecutor) + public RepairJob(IRepairJobEventListener listener, + UUID sessionId, + String keyspace, + String columnFamily, + Range<Token> range, + boolean isSequential, + ListeningExecutorService taskExecutor) { + this.listener = listener; this.desc = new RepairJobDesc(sessionId, keyspace, columnFamily, range); this.isSequential = isSequential; this.taskExecutor = taskExecutor; @@ -113,8 +122,8 @@ public class RepairJob public void onFailure(Throwable throwable) { - // TODO need to propagate error to RepairSession - logger.error("Error while snapshot", throwable); + logger.error("Error occurred during snapshot phase", throwable); + listener.failedSnapshot(); failed = true; } }, taskExecutor); http://git-wip-us.apache.org/repos/asf/cassandra/blob/384d4f0e/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 942049b..c9a9671 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -73,7 +73,9 @@ import org.apache.cassandra.utils.*; * Similarly, if a job is sequential, it will handle one Differencer at a time, but will handle * all of them in parallel otherwise. */ -public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener +public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, + IFailureDetectionEventListener, + IRepairJobEventListener { private static Logger logger = LoggerFactory.getLogger(RepairSession.class); @@ -268,7 +270,7 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan // Create and queue a RepairJob for each column family for (String cfname : cfnames) { - RepairJob job = new RepairJob(id, keyspace, cfname, range, isSequential, taskExecutor); + RepairJob job = new RepairJob(this, id, keyspace, cfname, range, isSequential, taskExecutor); jobs.offer(job); } @@ -316,6 +318,12 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan completed.signalAll(); } + public void failedSnapshot() + { + exception = new IOException("Failed during snapshot creation."); + forceShutdown(); + } + void failedNode(InetAddress remote) { String errorMsg = String.format("Endpoint %s died", remote);
