sstableloader detects and reports failures. Patch by brandonwilliams reviewed by Yuki Morishita for CASSANDRA-4146
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/67b340be Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/67b340be Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/67b340be Branch: refs/heads/cassandra-1.1 Commit: 67b340becd1aa369c71af10e5b76570e73809e2e Parents: f57f1c0 Author: Brandon Williams <[email protected]> Authored: Mon Apr 16 11:50:33 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Mon Apr 16 11:50:33 2012 -0500 ---------------------------------------------------------------------- .../apache/cassandra/io/sstable/SSTableLoader.java | 34 +++++++++++--- .../apache/cassandra/streaming/FileStreamTask.java | 6 +- .../org/apache/cassandra/tools/BulkLoader.java | 6 +++ 3 files changed, 35 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/67b340be/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index 79259ec..c76d1e3 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -137,7 +137,7 @@ public class SSTableLoader continue; } Collection<Range<Token>> ranges = entry.getValue(); - StreamOutSession session = StreamOutSession.create(keyspace, remote, new CountDownCallback(future.latch, remote)); + StreamOutSession session = StreamOutSession.create(keyspace, remote, new CountDownCallback(future, remote)); // transferSSTables assumes references have been acquired SSTableReader.acquireReferences(sstables); StreamOut.transferSSTables(session, sstables, ranges, OperationType.BULK_LOAD); @@ -150,6 +150,7 @@ public class SSTableLoader { final CountDownLatch latch; final Map<InetAddress, Collection<PendingFile>> pendingFiles; + private List<InetAddress> failedHosts = new ArrayList<InetAddress>(); private LoaderFuture(int request) { @@ -162,6 +163,16 @@ public class SSTableLoader pendingFiles.put(remote, new ArrayList(files)); } + private void setFailed(InetAddress addr) + { + failedHosts.add(addr); + } + + public List<InetAddress> getFailedHosts() + { + return failedHosts; + } + public boolean cancel(boolean mayInterruptIfRunning) { throw new UnsupportedOperationException("Cancellation is not yet supported"); @@ -192,6 +203,11 @@ public class SSTableLoader return latch.getCount() == 0; } + public boolean hadFailures() + { + return failedHosts.size() > 0; + } + public Map<InetAddress, Collection<PendingFile>> getPendingFiles() { return pendingFiles; @@ -209,28 +225,30 @@ public class SSTableLoader private class CountDownCallback implements IStreamCallback { private final InetAddress endpoint; - private final CountDownLatch latch; + private final LoaderFuture future; - CountDownCallback(CountDownLatch latch, InetAddress endpoint) + CountDownCallback(LoaderFuture future, InetAddress endpoint) { - this.latch = latch; + this.future = future; this.endpoint = endpoint; } public void onSuccess() { - latch.countDown(); - outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, latch.getCount())); + future.latch.countDown(); + outputHandler.debug(String.format("Streaming session to %s completed (waiting on %d outstanding sessions)", endpoint, future.latch.getCount())); // There could be race with stop being called twice but it should be ok - if (latch.getCount() == 0) + if (future.latch.getCount() == 0) client.stop(); } public void onFailure() { outputHandler.output(String.format("Streaming session to %s failed", endpoint)); - onSuccess(); // call onSuccess for latch countdown + future.setFailed(endpoint); + future.latch.countDown(); + client.stop(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/67b340be/src/java/org/apache/cassandra/streaming/FileStreamTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/FileStreamTask.java b/src/java/org/apache/cassandra/streaming/FileStreamTask.java index 8ff2b83..5b62af6 100644 --- a/src/java/org/apache/cassandra/streaming/FileStreamTask.java +++ b/src/java/org/apache/cassandra/streaming/FileStreamTask.java @@ -47,8 +47,7 @@ public class FileStreamTask extends WrappedRunnable private static Logger logger = LoggerFactory.getLogger(FileStreamTask.class); public static final int CHUNK_SIZE = 64 * 1024; - // around 10 minutes at the default rpctimeout - public static final int MAX_CONNECT_ATTEMPTS = 8; + public static final int MAX_CONNECT_ATTEMPTS = 4; protected final StreamHeader header; protected final InetAddress to; @@ -270,7 +269,8 @@ public class FileStreamTask extends WrappedRunnable protected void close() throws IOException { - output.close(); + if (output != null) + output.close(); } public String toString() http://git-wip-us.apache.org/repos/asf/cassandra/blob/67b340be/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java index 47936e9..4520188 100644 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@ -86,6 +86,12 @@ public class BulkLoader } if (!printEnd) indicator.printProgress(); + if (future.hadFailures()) + { + System.err.println("Streaming to the following hosts failed:"); + System.err.println(future.getFailedHosts()); + System.exit(1); + } } System.exit(0); // We need that to stop non daemonized threads
