[
https://issues.apache.org/jira/browse/HADOOP-1230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12619439#action_12619439
]
tucu00 edited comment on HADOOP-1230 at 8/4/08 3:49 AM:
--------------------------------------------------------------------
I've played a little bit with the proposed API to see how {{MultipleOutputs}}
could be integrated in a more natural way.
I've came up with 2 possible alternatives (following code sample for Mapper,
for Reducer it would similar)
*Option1:*
Defined a {{MapContext}} subclass, {{MOMapContext}}, that wraps a
{{MapContext}} instance delegating all methods to it and adding its own methods
for multiple output support.
Defined a {{Mapper}} subclass, {{MOMapper}}, that has an abstract
{{moMap(MOMapContext)}} method and in {{map(MapContext)}} creates a
{{MOMapContext}} instance and invokes the {{moMap()}}.
Whoever wants to use multiple outputs should extend the MOMapper class instead
Mapper.
The code would look like:
{code}
public abstract class MOMapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends
Mapper<MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> {
private MultipleOutputs multipleOutputs;
public void configure(JobConf jobConf) {
multipleOutputs = new MultipleOutputs(jobConf);
}
public final void map(MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> context)
throws IOException {
MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> moc =
new MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>(context, multipleOutputs);
moMap(moc);
}
public abstract void moMap(MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
context) throws IOException;
public void close() throws IOException {
multipleOutputs.close();
}
}
public class MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends
MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
private MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapContext;
private MultipleOutputs multipleOutputs;
public MOMapContext(MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapContext,
MultipleOutputs multipleOutputs) {
this.mapContext = mapContext;
this.multipleOutputs = multipleOutputs;
}
//... delegates all MapContext methods to mapContext instance.
// MO methods
public void collect(String namedOutput, Object key, Object value) throws
IOException {
Reporter reporter = null; //TODO, how do I get a reporter ????
multipleOutputs.getCollector(namedOutput, reporter).collect(key, value);
}
public void collect(String namedOutput, String multiName, Object key, Object
value) throws IOException {
Reporter reporter = null; //TODO, how do I get a reporter ????
multipleOutputs.getCollector(namedOutput, multiName, reporter).collect(key,
value);
}
}
{code}
*Option2:*
Defined a {{MapContext}} subclass, {{MOMapContext}}, that extends the concrete
{{MapContext}} IMPL adding methods for multiple output support.
The {{MapContext}} IMPL class should be both {{Configurable}} and {{Closeable}}
(in the same lifecycle as the Mapper).
The TaskRunner should look in the {{JobConf}} what implementation of the
{{MapContext}} to use.
Whoever wants to use multiple outputs just defines his/her Mapper as {{extends
Mapper<MOMapContext<KIN, VIN, KOUT, VOUT>>}} and defines the multiple outpus in
the {{JobConf}} as usual (this would set the right {{MapContext}}
implementation).
{code}
public class MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends
MapContextIMPL<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
private MultipleOutputs multipleOutputs;
public void configure(JobConf jobConf) {
super.configure(jobConf);
multipleOutputs = new MultipleOutputs(jobConf);
}
// MO methods
public void collect(String namedOutput, Object key, Object value) throws
IOException {
Reporter reporter = null; //TODO, how do I get a reporter ????
multipleOutputs.getCollector(namedOutput, reporter).collect(key, value);
}
public void collect(String namedOutput, String multiName, Object key, Object
value) throws IOException {
Reporter reporter = null; //TODO, how do I get a reporter ????
multipleOutputs.getCollector(namedOutput, multiName, reporter).collect(key,
value);
}
public void close() throws IOException {
multipleOutputs.close();
super.close();
}
}
{code}
IMO *Option 2* it would be more natural to the Map/Reduce developer as it does
not introduce a separate Map/Reduce class with a different method {{moMap()}}
to do the actual map logic and it does not need to create a lightweight
{{MOMapContext}} on every {{map()}} invocation. Also, as the {{MOMapContext}}
would extends the {{MapContext}} IMPL class there is not need to do a
delegation for all methods as in the Option1 wrapper.
In both cases I need to figure out how to get a {{Reporter}} to pass to the
{{MultipleOutputs}} when getting the {{OutputCollector}} this is required as
the the {{MultipleOutputs}} use counters.
Thoughts?
was (Author: tucu00):
I've played a little bit with the proposed API to see how
{{MultipleOutputs}} could be integrated in a more natural way.
I've came up with 2 possible alternatives (following code sample for Mapper,
for Reducer it would similar)
*Option1:*
Defined a {{MapContext}} subclass, {{MOMapContext}}, that wraps a
{{MapContext}} instance delegating all methods to it and adding its own methods
for multiple output support.
Defined a {{Mapper}} subclass, {{MOMapper}}, that has an abstract
{{moMap(MOMapContext)}} method and in {{map(MapContext)}} creates a
{{MOMapContext}} instance and invokes the {{moMap()}}.
Whoever wants to use multiple outputs should extend the MOMapper class instead
Mapper.
The code would look like:
{code}
public abstract class MOMapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends
Mapper<MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>> {
private MultipleOutputs multipleOutputs;
public void configure(JobConf jobConf) {
multipleOutputs = new MultipleOutputs(jobConf);
}
public final void map(MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> context)
throws IOException {
MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> moc =
new MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>(context, multipleOutputs);
moMap(moc);
}
public abstract void moMap(MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
context) throws IOException;
public void close() throws IOException {
multipleOutputs.close();
}
}
public class MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends
MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
private MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapContext;
private MultipleOutputs multipleOutputs;
public MOMapContext(MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> mapContext,
MultipleOutputs multipleOutputs) {
this.mapContext = mapContext;
this.multipleOutputs = multipleOutputs;
}
//... delegates all MapContext methods to mapContext instance.
// MO methods
public void collect(String namedOutput, Object key, Object value) throws
IOException {
Reporter reporter = null; //TODO, how do I get a reporter ????
multipleOutputs.getCollector(namedOutput, reporter).collect(key, value);
}
public void collect(String namedOutput, String multiName, Object key, Object
value) throws IOException {
Reporter reporter = null; //TODO, how do I get a reporter ????
multipleOutputs.getCollector(namedOutput, multiName, reporter).collect(key,
value);
}
}
{code}
*Option2:*
Defined a {{MapContext}} subclass, {{MOMapContext}}, that extends the concrete
{{MapContext}} IMPL adding methods for multiple output support.
The {{MapContext}} IMPL class should be both {{Configurable}} and {{Closeable}}
(in the same lifecycle as the Mapper).
The TaskRunner should look in the {{JobConf}} what implementation of the
{{MapContext}} to use.
Whoever wants to use multiple outputs just defines his/her Mapper as {{extends
Mapper<MOMapContext<KIN, VIN, KOUT, VOUT>>}} and defines the multiple outpus in
the {{JobConf}} as usual (this would set the right {{MapContext}}
implementation).
{code}
public class MOMapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> extends
MapContextIMPL<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
private MultipleOutputs multipleOutputs;
public void configure(JobConf jobConf) {
super.configure(jobConf);
multipleOutputs = new MultipleOutputs(jobConf);
}
// MO methods
public void collect(String namedOutput, Object key, Object value) throws
IOException {
Reporter reporter = null; //TODO, how do I get a reporter ????
multipleOutputs.getCollector(namedOutput, reporter).collect(key, value);
}
public void collect(String namedOutput, String multiName, Object key, Object
value) throws IOException {
Reporter reporter = null; //TODO, how do I get a reporter ????
multipleOutputs.getCollector(namedOutput, multiName, reporter).collect(key,
value);
}
public void close() throws IOException {
multipleOutputs.close();
super.close();
}
}
{code}
IMO *Option 2* it would be more natural to the Map/Reduce developer as it does
not introduce a separate Map/Reduce class with a different method {{moMap()}}
to do the actual map logic and it does not need to create a lightweight
{{MOMapContext}} on every {{map()}} invocation.
In both cases I need to figure out how to get a {{Reporter}} to pass to the
{{MultipleOutputs}} when getting the {{OutputCollector}} this is required as
the the {{MultipleOutputs}} use counters.
Thoughts?
> Replace parameters with context objects in Mapper, Reducer, Partitioner,
> InputFormat, and OutputFormat classes
> --------------------------------------------------------------------------------------------------------------
>
> Key: HADOOP-1230
> URL: https://issues.apache.org/jira/browse/HADOOP-1230
> Project: Hadoop Core
> Issue Type: Improvement
> Components: mapred
> Reporter: Owen O'Malley
> Assignee: Owen O'Malley
> Attachments: context-objs-2.patch, context-objs-3.patch,
> context-objs.patch
>
>
> This is a big change, but it will future-proof our API's. To maintain
> backwards compatibility, I'd suggest that we move over to a new package name
> (org.apache.hadoop.mapreduce) and deprecate the old interfaces and package.
> Basically, it will replace:
> package org.apache.hadoop.mapred;
> public interface Mapper extends JobConfigurable, Closeable {
> void map(WritableComparable key, Writable value, OutputCollector output,
> Reporter reporter) throws IOException;
> }
> with:
> package org.apache.hadoop.mapreduce;
> public interface Mapper extends Closable {
> void map(MapContext context) throws IOException;
> }
> where MapContext has the methods like getKey(), getValue(), collect(Key,
> Value), progress(), etc.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.