[ 
https://issues.apache.org/jira/browse/HADOOP-1230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12619439#action_12619439
 ] 

Alejandro Abdelnur commented on HADOOP-1230:
--------------------------------------------

I've played a little bit with the proposed API to see how {{MultipleOutputs}} 
could be integrated in a more natural way.

I've came up with 2 possible alternatives (following code sample for Mapper, 
for Reducer it would similar)

*Option1:*

Defined a {{MapContext}} subclass, {{MOMapContext}}, that wraps a 
{{MapContext}} instance delegating all methods to it and adding its own methods 
for multiple output support.

Defined a {{Mapper}} subclass, {{MOMapper}}, that has an abstract 
{{moMap(MOMapContext)}} method and in {{map(MapContext)}} creates a 
{{MOMapContext}} instance and invokes the {{moMap()}}.

Whoever wants to use multiple outputs should extend the MOMapper class instead 
Mapper.

The code would look like:

{code}
public abstract class MOMapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends 
Mapper<MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> {
  private MultipleOutputs multipleOutputs;

  public void configure(JobConf jobConf) {
    multipleOutputs = new MultipleOutputs(jobConf);
  }

  public final void map(MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> context) 
throws IOException {
    MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> moc =
      new MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>(context, multipleOutputs);
    moMap(moc);
  }

  public abstract void moMap(MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> 
context) throws IOException;

  public void close() throws IOException {
    multipleOutputs.close();
  }

}

public class MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends 
MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  private MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapContext;
  private MultipleOutputs multipleOutputs;

  public MOMapContext(MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapContext, 
MultipleOutputs multipleOutputs) {
    this.mapContext = mapContext;
    this.multipleOutputs = multipleOutputs;
  }

  //... delegates all MapContext methods to mapContext instance.

  // MO methods

  public void collect(String namedOutput, Object key, Object value) throws 
IOException {
    Reporter reporter = null; //TODO, how do I get a reporter ????
    multipleOutputs.getCollector(namedOutput, reporter).collect(key, value);
  }

  public void collect(String namedOutput, String multiName, Object key, Object 
value) throws IOException {
    Reporter reporter = null; //TODO, how do I get a reporter ????
    multipleOutputs.getCollector(namedOutput, multiName, reporter).collect(key, 
value);
  }

}
{code}

*Option2:*

Defined a {{MapContext}} subclass, {{MOMapContext}}, that extends the concrete 
{{MapContext}} IMPL adding methods for multiple output support.

The {{MapContext}} IMPL class should be both {{Configurable}} and {{Closeable}} 
(in the same lifecycle as the Mapper).

The TaskRunner should look in the {{JobConf}} what implementation of the 
{{MapContext}} to use.

Whoever wants to use multiple outputs just defines his/her Mapper as {{extends 
Mapper<MOMapContext<KIN, VIN, KOUT, VOUT>>}} and defines the multiple outpus in 
the {{JobConf}} as usual (this would set the right {{MapContext}} 
implementation).

{code}
public class MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends 
MapContextIMPL<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  private MultipleOutputs multipleOutputs;

  public void configure(JobConf jobConf) {
    super.configure(jobConf);
    multipleOutputs = new MultipleOutputs(jobConf);
  }

  // MO methods

  public void collect(String namedOutput, Object key, Object value) throws 
IOException {
    Reporter reporter = null; //TODO, how do I get a reporter ????
    multipleOutputs.getCollector(namedOutput, reporter).collect(key, value);
  }

  public void collect(String namedOutput, String multiName, Object key, Object 
value) throws IOException {
    Reporter reporter = null; //TODO, how do I get a reporter ????
    multipleOutputs.getCollector(namedOutput, multiName, reporter).collect(key, 
value);
  }

  public void close() throws IOException {
    multipleOutputs.close();
    super.close();
  }

}
{code}

IMO *Option 2* it would be more natural to the Map/Reduce developer as it does 
not introduce a separate Map/Reduce class with a different method {{moMap()}} 
to do the actual map logic and it does not need to create a lightweight 
{{MOMapContext}} on every {{map()}} invocation.

In both cases I need to figure out how to get a {{Reporter}} to pass to the 
{{MultipleOutputs}} when getting the {{OutputCollector}} this is required as 
the the {{MultipleOutputs}} use counters.

Thoughts?


> Replace parameters with context objects in Mapper, Reducer, Partitioner, 
> InputFormat, and OutputFormat classes
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-1230
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1230
>             Project: Hadoop Core
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Owen O'Malley
>            Assignee: Owen O'Malley
>         Attachments: context-objs-2.patch, context-objs-3.patch, 
> context-objs.patch
>
>
> This is a big change, but it will future-proof our API's. To maintain 
> backwards compatibility, I'd suggest that we move over to a new package name 
> (org.apache.hadoop.mapreduce) and deprecate the old interfaces and package. 
> Basically, it will replace:
> package org.apache.hadoop.mapred;
> public interface Mapper extends JobConfigurable, Closeable {
>   void map(WritableComparable key, Writable value, OutputCollector output, 
> Reporter reporter) throws IOException;
> }
> with:
> package org.apache.hadoop.mapreduce;
> public interface Mapper extends Closable {
>   void map(MapContext context) throws IOException;
> }
> where MapContext has the methods like getKey(), getValue(), collect(Key, 
> Value), progress(), etc.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to