[
https://issues.apache.org/jira/browse/CASSANDRA-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13493577#comment-13493577
]
Michael Kjellman edited comment on CASSANDRA-4912 at 11/8/12 10:54 PM:
-----------------------------------------------------------------------
So when ConfigHelper calls checkOutputSpecs() in local mode when the job is
setup we don't throw any exceptions. When a reducer is created however
org.apache.cassandra.hadoop.ConfigHelper.getOutputColumnFamily throws a
UnsupportedOperationException that the output column family isn't setup. It
looks like mapreduce.output.basename is null.
See Example.java attached as a stripped down example MR job.
was (Author: mkjellman):
So when ConfigHelper calls checkOutputSpecs() in local mode when the job is
setup we don't throw any exceptions. When a reducer is created however
org.apache.cassandra.hadoop.ConfigHelper.getOutputColumnFamily throws a
UnsupportedOperationException that the output column family isn't setup. It
looks like mapreduce.output.basename is null.
Job Config is something along the lines of
public int run(String[] args) throws Exception
{
Job job = new Job(getConf(), "MRJobName");
job.setJarByClass(Nashoba.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(ReducerToCassandra.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);
// setup 3 reducers
job.setNumReduceTasks(3);
// thrift input job settings
ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setInputInitialAddress(job.getConfiguration(),
"127.0.0.1");
ConfigHelper.setInputPartitioner(job.getConfiguration(),
"RandomPartitioner");
// thrift output job settings
ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
ConfigHelper.setOutputInitialAddress(job.getConfiguration(),
"127.0.0.1");
ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"RandomPartitioner");
//set timeout to 1 hour for testing
job.getConfiguration().set("mapreduce.task.timeout", "3600000");
job.getConfiguration().set("mapred.task.timeout", "3600000");
job.getConfiguration().set("mapreduce.output.bulkoutputformat.buffersize",
"64");
job.setOutputFormatClass(BulkOutputFormat.class);
ConfigHelper.setRangeBatchSize(getConf(), 99);
// let ConfigHelper know what Column Family to get data from
and where to output it
ConfigHelper.setInputColumnFamily(job.getConfiguration(),
KEYSPACE, INPUT_COLUMN_FAMILY);
ConfigHelper.setOutputKeyspace(job.getConfiguration(),
KEYSPACE);
MultipleOutputs.addNamedOutput(job, OUTPUT_COLUMN_FAMILY1,
BulkOutputFormat.class, ByteBuffer.class, List.class);
MultipleOutputs.addNamedOutput(job, OUTPUT_COLUMN_FAMILY2,
BulkOutputFormat.class, ByteBuffer.class, List.class);
//what classes the mapper will write and what the consumer
should expect to recieve
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(MapWritable.class);
job.setOutputKeyClass(ByteBuffer.class);
job.setOutputValueClass(List.class);
SliceRange sliceRange = new SliceRange();
sliceRange.setStart(new bytes[0]);
sliceRange.setFinish(new bytes[0]);
SlicePredicate predicate = new SlicePredicate();
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(job.getConfiguration(),
predicate);
job.waitForCompletion(true);
return 0;
}
public static class ReducerToCassandra extends Reducer<Text, MapWritable,
ByteBuffer, List<Mutation>>
{
private MultipleOutputs<ByteBuffer, List<Mutation>> output;
@Override
public void setup(Context context) {
output = new MultipleOutputs<ByteBuffer,
List<Mutation>>(context);
}
public void reduce(Text word, Iterable<MapWritable> values,
Context context) throws IOException, InterruptedException
{
do stuff in reducer...
//write out our result to Hadoop
context.progress();
//for writing to 2 column families
output.write(OUTPUT_COLUMN_FAMILY1, key,
Collections.singletonList(getMutation1(word, val)));
output.write(OUTPUT_COLUMN_FAMILY2, key,
Collections.singletonList(getMutation2(word, val)));
}
public void cleanup(Context context) throws IOException,
InterruptedException {
output.close(); //closes all of the opened outputs
}
}
> BulkOutputFormat should support Hadoop MultipleOutput
> -----------------------------------------------------
>
> Key: CASSANDRA-4912
> URL: https://issues.apache.org/jira/browse/CASSANDRA-4912
> Project: Cassandra
> Issue Type: New Feature
> Components: Hadoop
> Affects Versions: 1.2.0 beta 1
> Reporter: Michael Kjellman
>
> Much like CASSANDRA-4208 BOF should support outputting to Multiple Column
> Families. The current approach takken in the patch for COF results in only
> one stream being sent.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira