[ 
https://issues.apache.org/jira/browse/CASSANDRA-4912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13493577#comment-13493577
 ] 

Michael Kjellman edited comment on CASSANDRA-4912 at 11/8/12 10:54 PM:
-----------------------------------------------------------------------

So when ConfigHelper calls checkOutputSpecs() in local mode when the job is 
setup we don't throw any exceptions. When a reducer is created however 
org.apache.cassandra.hadoop.ConfigHelper.getOutputColumnFamily throws a 
UnsupportedOperationException that the output column family isn't setup. It 
looks like mapreduce.output.basename is null.

See Example.java attached as a stripped down example MR job.
                
      was (Author: mkjellman):
    So when ConfigHelper calls checkOutputSpecs() in local mode when the job is 
setup we don't throw any exceptions. When a reducer is created however 
org.apache.cassandra.hadoop.ConfigHelper.getOutputColumnFamily throws a 
UnsupportedOperationException that the output column family isn't setup. It 
looks like mapreduce.output.basename is null.

Job Config is something along the lines of

public int run(String[] args) throws Exception
        {       
                Job job = new Job(getConf(), "MRJobName");
        
                job.setJarByClass(Nashoba.class);
                job.setMapperClass(TokenizerMapper.class);
                job.setReducerClass(ReducerToCassandra.class);
                job.setInputFormatClass(ColumnFamilyInputFormat.class);
                
                // setup 3 reducers
                job.setNumReduceTasks(3);

                // thrift input job settings
                ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
                ConfigHelper.setInputInitialAddress(job.getConfiguration(), 
"127.0.0.1");
                ConfigHelper.setInputPartitioner(job.getConfiguration(), 
"RandomPartitioner");

                // thrift output job settings
                ConfigHelper.setOutputRpcPort(job.getConfiguration(), "9160");
                ConfigHelper.setOutputInitialAddress(job.getConfiguration(), 
"127.0.0.1");
                ConfigHelper.setOutputPartitioner(job.getConfiguration(), 
"RandomPartitioner");
                
                //set timeout to 1 hour for testing
                job.getConfiguration().set("mapreduce.task.timeout", "3600000");
                job.getConfiguration().set("mapred.task.timeout", "3600000");
                
                
job.getConfiguration().set("mapreduce.output.bulkoutputformat.buffersize", 
"64");
job.setOutputFormatClass(BulkOutputFormat.class);
                ConfigHelper.setRangeBatchSize(getConf(), 99);
                
                // let ConfigHelper know what Column Family to get data from 
and where to output it
                ConfigHelper.setInputColumnFamily(job.getConfiguration(), 
KEYSPACE, INPUT_COLUMN_FAMILY);
                
                ConfigHelper.setOutputKeyspace(job.getConfiguration(), 
KEYSPACE);
                MultipleOutputs.addNamedOutput(job, OUTPUT_COLUMN_FAMILY1, 
BulkOutputFormat.class, ByteBuffer.class, List.class);
                MultipleOutputs.addNamedOutput(job, OUTPUT_COLUMN_FAMILY2, 
BulkOutputFormat.class, ByteBuffer.class, List.class);
                
                //what classes the mapper will write and what the consumer 
should expect to recieve
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(MapWritable.class);
                job.setOutputKeyClass(ByteBuffer.class);
                job.setOutputValueClass(List.class);
                
                SliceRange sliceRange = new SliceRange();
                sliceRange.setStart(new bytes[0]);
                sliceRange.setFinish(new bytes[0]);
                SlicePredicate predicate = new SlicePredicate();
                predicate.setSlice_range(sliceRange);
                ConfigHelper.setInputSlicePredicate(job.getConfiguration(), 
predicate);
                
                job.waitForCompletion(true);
                return 0;
}

public static class ReducerToCassandra extends Reducer<Text, MapWritable, 
ByteBuffer, List<Mutation>>
        {
                private MultipleOutputs<ByteBuffer, List<Mutation>> output;
                
                @Override
                public void setup(Context context) {
                        output = new MultipleOutputs<ByteBuffer, 
List<Mutation>>(context);
                }
                
                public void reduce(Text word, Iterable<MapWritable> values, 
Context context) throws IOException, InterruptedException
        {
                        do stuff in reducer...

                        //write out our result to Hadoop
                        context.progress();
                        //for writing to 2 column families
                        output.write(OUTPUT_COLUMN_FAMILY1, key, 
Collections.singletonList(getMutation1(word, val)));
                        output.write(OUTPUT_COLUMN_FAMILY2, key, 
Collections.singletonList(getMutation2(word, val)));
                }

                
                public void cleanup(Context context) throws IOException, 
InterruptedException {
                        output.close(); //closes all of the opened outputs
                }

        }
                  
> BulkOutputFormat should support Hadoop MultipleOutput
> -----------------------------------------------------
>
>                 Key: CASSANDRA-4912
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-4912
>             Project: Cassandra
>          Issue Type: New Feature
>          Components: Hadoop
>    Affects Versions: 1.2.0 beta 1
>            Reporter: Michael Kjellman
>
> Much like CASSANDRA-4208 BOF should support outputting to Multiple Column 
> Families. The current approach takken in the patch for COF results in only 
> one stream being sent.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to