Hi,
We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to one Tez
job,but it doesn't work when the intermediate reduce has different input/output
types. Please see the details below.
Suppose that we have two mapreduce jobs to implement the ordered-wordcount job
which count the number of occurrences of word and sort them.
Job1 is a traditional wordcount job except the output is <counts,word> pair. We
call the mapper "Mapper1" and call the reducer "Reducer1".
Job2 sort the word by the number of occurrences.We call the mapper "Mapper2"
which has no any logic and call the reducer "Reducer2".
By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)--> Mapper2
--(shuffle)--> Reducer2
By "MultiStageMRConfigUtil" we want convert it to TEZ job such as:
Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2
Here Reducer1 is the intermediate reduce and it's input type is
<IntWritable,Text> but the output is <Text,IntWritable>.
Because the following error happened it didn't work.
5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630 failed with
state FAILED due to: Vertex failed, vertexName=ivertex1,
vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed,
taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt 0
failed, info=[Error: Failure while running task:java.lang.ClassCastException:
org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.io.Text
at
org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at
org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331)
at
org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143)
at
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
at
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
I found the YARNRunner Class uses stageConfs[i-1] to determine the Reducer1's
input when it creates the edge of DAG while the ReduceProcessor Class uses
stageConfs[i] to determine his(Reducer1) input.
But in fact the setting of stageConfs[i] is the Reducer1's output, not its
input. ReduceProcessor should have used stageConfs[i-1]'s setting as YARNRunner
does. ( In this case 'i' is 1)
-------------------------------------------------------------------
//in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java
for (int i = 0; i < stageConfs.length; i++) {
// use stageConfs[i] to create vertex(in our case it is a ReduceProcessor)
// then the ReduceProcessor is created and input is determined also by
stageConfs[i]
// Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
//it will be TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in
ReduceProcessor.java
// Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
//it will be TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in
ReduceProcessor.java
vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
i == 0 ? mapInputLocations : reduceInputLocations, i,stageConfs.length);
}
...
// use stageConfs[i-1] to create edge and its input which should be the same as
reduce's input
// but the reduce's input uses stageConfs[i] as above so they are maybe
incompatible.
OrderedPartitionedKVEdgeConfig edgeConf =
OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
stageConfs[i - 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
MRPartitioner.class.getName(), partitionerConf)
.configureInput().useLegacyInput().done()
.setFromConfiguration(stageConfs[i - 1]).build();
Edge edge = Edge.create(vertices[i - 1], vertices[i],
edgeConf.createDefaultEdgeProperty());
dag.addEdge(edge);
-------------------------------------------------------------------
In ReduceProcessor it can't read stageConfs[i-1], so I simply add two settings
to let ReduceProcessor read. Then it does work well(But I think the best way is
to let ReduceProcessor read stageConfs[i-1]).
-------------------------------------------------------------------
//"mapreduce.reduce.input.key.class" and "mapreduce.reduce.input.value.class"
are the new settings added by us.
diff
src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
ReduceProcessor.java.OLD
112,113c112,113
< Class keyClass =
jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
< Class valueClass =
jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
---
> Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
> Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
-------------------------------------------------------------------
Thanks
- Hui