BOF can possibly continue with failures. Patch by brandonwilliams, reviewed by Yuki Morishita for CASSANDRA-4045
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3f8372c1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3f8372c1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3f8372c1 Branch: refs/heads/cassandra-1.1 Commit: 3f8372c1f5225afe83dced250660c4314e8d86b0 Parents: 67b340b Author: Brandon Williams <[email protected]> Authored: Mon Apr 16 13:15:03 2012 -0500 Committer: Brandon Williams <[email protected]> Committed: Mon Apr 16 13:15:03 2012 -0500 ---------------------------------------------------------------------- .../apache/cassandra/hadoop/BulkRecordWriter.java | 13 +++++++++++++ 1 files changed, 13 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3f8372c1/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java index 69be255..e430e9b 100644 --- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java +++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java @@ -47,6 +47,8 @@ import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>> @@ -55,11 +57,14 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir"; private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize"; private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits"; + private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts"; private final Configuration conf; + private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class); private SSTableSimpleUnsortedWriter writer; private SSTableLoader loader; private File outputdir; private Progressable progress; + private int maxFailures; private enum CFType { @@ -95,6 +100,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> Config.setOutboundBindAny(true); this.conf = conf; DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.valueOf(conf.get(STREAM_THROTTLE_MBITS, "0"))); + maxFailures = Integer.valueOf(conf.get(MAX_FAILED_HOSTS, "O")); String keyspace = ConfigHelper.getOutputKeyspace(conf); outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf)); //dir must be named by ks/cf for the loader outputdir.mkdirs(); @@ -218,6 +224,13 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>> throw new IOException(e); } } + if (future.hadFailures()) + { + if (future.getFailedHosts().size() > maxFailures) + throw new IOException("Too many hosts failed: " + future.getFailedHosts()); + else + logger.warn("Some hosts failed: " + future.getFailedHosts()); + } } }
