Hi,

We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to one Tez 
job,but it doesn't work when the intermediate reduce has different input/output 
types. Please see the details below.

Suppose that we have two mapreduce jobs to implement the ordered-wordcount job 
which count the number of occurrences of word and sort them.

Job1 is a traditional wordcount job except the output is <counts,word> pair. We 
call the mapper "Mapper1" and call the reducer "Reducer1".

Job2 sort the word by the number of occurrences.We call the mapper "Mapper2" 
which has no any logic and call the reducer "Reducer2".

By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)--> Mapper2 
--(shuffle)--> Reducer2

By "MultiStageMRConfigUtil" we want convert it to TEZ job such as: 
Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2

Here Reducer1 is the intermediate reduce and it's input type is 
<IntWritable,Text> but the output is <Text,IntWritable>.

Because the following error happened it didn't work.

 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630 failed with 
state FAILED due to: Vertex failed, vertexName=ivertex1, 
vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed, 
taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt 0 
failed, info=[Error: Failure while running task:java.lang.ClassCastException: 
org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.io.Text

        at 
org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at 
org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331)
        at 
org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143)
        at 
org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
        at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
        at 
org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)


I found the YARNRunner Class uses stageConfs[i-1] to determine the Reducer1's 
input when it creates the edge of DAG while the ReduceProcessor Class uses  
stageConfs[i] to determine his(Reducer1) input.

But in fact the setting of stageConfs[i] is the Reducer1's output, not its 
input. ReduceProcessor should have used stageConfs[i-1]'s setting as YARNRunner 
does. ( In this case 'i' is 1)

-------------------------------------------------------------------

//in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java

for (int i = 0; i < stageConfs.length; i++) {

    //  use stageConfs[i] to create vertex(in our case it is a ReduceProcessor)
    //  then the ReduceProcessor is created and input is determined also by 
stageConfs[i]
    //  Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);  
//it will be TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in 
ReduceProcessor.java
    //  Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf); 
 //it will be TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in 
ReduceProcessor.java

    vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
        i == 0 ? mapInputLocations : reduceInputLocations, i,stageConfs.length);
}

...
// use stageConfs[i-1] to create edge and its input which should be the same as 
reduce's input
// but the reduce's input uses stageConfs[i] as above so they are maybe 
incompatible.
OrderedPartitionedKVEdgeConfig edgeConf =
    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
    stageConfs[i - 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
    MRPartitioner.class.getName(), partitionerConf)
    .configureInput().useLegacyInput().done()
    .setFromConfiguration(stageConfs[i - 1]).build();
Edge edge = Edge.create(vertices[i - 1], vertices[i], 
edgeConf.createDefaultEdgeProperty());
dag.addEdge(edge);

-------------------------------------------------------------------

In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two settings 
to let ReduceProcessor read. Then it does work well(But I think the best way is 
to let ReduceProcessor read stageConfs[i-1]).

-------------------------------------------------------------------

//"mapreduce.reduce.input.key.class" and "mapreduce.reduce.input.value.class" 
are the new settings added by us.
diff 
src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java 
ReduceProcessor.java.OLD
112,113c112,113
<     Class keyClass = 
jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
<     Class valueClass = 
jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
---
>     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
>     Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);

-------------------------------------------------------------------


Thanks

- Hui

Reply via email to