[ 
https://issues.apache.org/jira/browse/HADOOP-3149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12594853#action_12594853
 ] 

tucu00 edited comment on HADOOP-3149 at 5/7/08 4:01 AM:
--------------------------------------------------------------------

Regarding Owen's _You should also remove the references to Writable and 
WritableComparable and make them Object instead._ comment.

Assuming that it refers to generified Objects, I still think it is not 
applicable to remove the use of WritableComparable and Writable from the 
{{MultipleOutputs.collect()}} .

The K and V classes uses in the different outputs of a MultipleOutputs may 
differ among the outputs and they may differ from the K and V classes used by 
the M/R. 

Because of this they can not be bound to the K and V generics of the Mapper and 
Reducer classes, thus they have to accept the base class.

For example:

{noformat}
  JobConf conf = new JobConf();

  conf.setMapperClass(MOMap.class);
  conf.setReducerClass(MOReduce.class);

  conf.setInputPath(inDir);
  conf.setInputFormat(TextInputFormat.class);

  conf.setMapOutputKeyClass(LongWritable.class);
  conf.setMapOutputValueClass(Text.class);

  FileOutputFormat.setOutputPath(conf, outDir);

  conf.setOutputFormat(TextOutputFormat.class);
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(Text.class);

  MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, 
LongWritable.class, Text.class);
  MultipleOutputs.addNamedOutput(conf, "sequence", Text.class, 
MapWritable.class, Text.class);
  ...

public class MOReduce implements Reducer<LongWritable, Text, Text, Text> {
  private MultipleOutputs mos;

  public void configure(JobConf conf) {
    mos = new MultipleOutputs(conf);
  }

  public void reduce(LongWritable key, Iterator&lt;Text&gt; values, 
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
    output.collect(new Text("a"), value);
    mos.collect("text", key, new Text("Hello"), reporter);
    mos.collect("sequence", value, new MapWritable(), reporter);
  }

  public void close() throws IOException {
    mos.close();
  }

}
{noformat}



      was (Author: tucu00):
    Regarding Owen's _You should also remove the references to Writable and 
WritableComparable and make them Object instead._ comment.

Assuming that it refers to generified Objects, I still think it is not 
applicable to remove the use of WritableComparable and Writable from the 
{{MultipleOutputs.collect()}} .

The K and V classes uses in the different outputs of a MultipleOutputs may 
differ among the outputs and they may differ from the K and V classes used by 
the M/R. 

Because of this they can not be bound to the K and V generics of the Mapper and 
Reducer classes, thus they have to accept the base class.

For example:

{{
  JobConf conf = new JobConf();

  conf.setMapperClass(MOMap.class);
  conf.setReducerClass(MOReduce.class);

  conf.setInputPath(inDir);
  conf.setInputFormat(TextInputFormat.class);

  conf.setMapOutputKeyClass(LongWritable.class);
  conf.setMapOutputValueClass(Text.class);

  FileOutputFormat.setOutputPath(conf, outDir);

  conf.setOutputFormat(TextOutputFormat.class);
  conf.setOutputKeyClass(Text.class);
  conf.setOutputValueClass(Text.class);

  MultipleOutputs.addNamedOutput(conf, "text", TextOutputFormat.class, 
LongWritable.class, Text.class);
  MultipleOutputs.addNamedOutput(conf, "sequence", Text.class, 
MapWritable.class, Text.class);
  ...

public class MOReduce implements Reducer<LongWritable, Text, Text, Text> {
  private MultipleOutputs mos;

  public void configure(JobConf conf) {
    mos = new MultipleOutputs(conf);
  }

  public void reduce(LongWritable key, Iterator&lt;Text&gt; values, 
OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
    output.collect(new Text("a"), value);
    mos.collect("text", key, new Text("Hello"), reporter);
    mos.collect("sequence", value, new MapWritable(), reporter);
  }

  public void close() throws IOException {
    mos.close();
  }

}
}}


  
> supporting multiple outputs for M/R jobs
> ----------------------------------------
>
>                 Key: HADOOP-3149
>                 URL: https://issues.apache.org/jira/browse/HADOOP-3149
>             Project: Hadoop Core
>          Issue Type: New Feature
>          Components: mapred
>         Environment: all
>            Reporter: Alejandro Abdelnur
>            Assignee: Alejandro Abdelnur
>             Fix For: 0.18.0
>
>         Attachments: patch3149.txt, patch3149.txt, patch3149.txt, 
> patch3149.txt, patch3149.txt, patch3149.txt, patch3149.txt, patch3149.txt, 
> patch3149.txt
>
>
> The outputcollector supports writing data to a single output, the 'part' 
> files in the output path.
> We found quite common that our M/R jobs have to write data to different 
> output. For example when classifying data as NEW, UPDATE, DELETE, NO-CHANGE 
> to later do different processing on it.
> Handling the initialization of additional outputs from within the M/R code 
> complicates the code and is counter intuitive with the notion of job 
> configuration.
> It would be desirable to:
> # Configure the additional outputs in the jobconf, potentially specifying 
> different outputformats, key and value classes for each one.
> # Write to the additional outputs in a similar way as data is written to the 
> outputcollector.
> # Support the speculative execution semantics for the output files, only 
> visible in the final output for promoted tasks.
> To support multiple outputs the following classes would be added to 
> mapred/lib:
> * {{MOJobConf}} : extends {{JobConf}} adding methods to define named outputs 
> (name, outputformat, key class, value class)
> * {{MOOutputCollector}} : extends {{OutputCollector}} adding a 
> {{collect(String outputName, WritableComparable key, Writable value)}} method.
> * {{MOMapper}} and {{MOReducer}}: implement {{Mapper}} and {{Reducer}} adding 
> a new {{configure}}, {{map}} and {{reduce}} signature that take the 
> corresponding {{MO}} classes and performs the proper initialization.
> The data flow behavior would be: key/values written to the default (unnamed) 
> output (using the original OutputCollector {{collect}} signature) take part 
> of the shuffle/sort/reduce processing phases. key/values written to a named 
> output from within a map don't.
> The named output files would be named using the task type and task ID to 
> avoid collision among tasks (i.e. 'new-m-00002' and 'new-r-00001').
> Together with the setInputPathFilter feature introduced by HADOOP-2055 it 
> would become very easy to chain jobs working on particular named outputs 
> within a single directory.
> We are using heavily this pattern and it greatly simplified our M/R code as 
> well as chaining different M/R. 
> We wanted to contribute this back to Hadoop as we think is a generic feature 
> many could benefit from.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to