Hello Hui, 

Thanks for catching and reporting this issue. Before we go about looking at a 
fix for this, I will provide some background.

In the early days of Tez, we tried to change existing MR jobs to support a 
chain of MRR and ended up using Config/JobConf as a way to specify intermediate 
stages and also to configure them correctly. We realized that writing MRR 
pipelines using the JobClient API was quite unwieldy and hard to understand as 
it relied on setting a bunch of configs. At some point, we start cleaning up 
the Tez API to make it more easy to write MRR jobs in a more easy to maintain 
manner. In that respect, have you had a chance to look at the latest 
OrderedWordCount code in tez-examples? It shows you how to write an MRR job in 
Tez by using Tez native APIs ( DAG, Edge, Vertex ) instead of messing with 
config properties in JobConf. This might be an easier approach if you are 
considering using Tez for MRR+ pipelines.

In any case, for the issue that you have seen, would you mind filing a jira for 
this ( please mention what version of Tez you are using ) and possibly helping 
us by submitting up with a patch for the fix? There was a function aptly named 
doJobClientMagic() ( removed in recent times ), that did a second pass over the 
configs and setup things correctly for the case that you describe. I am not 
sure if removing that somehow introduced this bug.

thanks
― Hitesh 


On Jan 22, 2015, at 9:13 PM, Hui Zheng <[email protected]> wrote:

> Hi,
> 
> We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to one Tez 
> job,but it doesn't work when the intermediate reduce has different 
> input/output types. Please see the details below.
> 
> Suppose that we have two mapreduce jobs to implement the ordered-wordcount 
> job which count the number of occurrences of word and sort them.
> 
> Job1 is a traditional wordcount job except the output is <counts,word> pair. 
> We call the mapper "Mapper1" and call the reducer "Reducer1".
> 
> Job2 sort the word by the number of occurrences.We call the mapper "Mapper2" 
> which has no any logic and call the reducer "Reducer2".
> 
> By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)--> Mapper2 
> --(shuffle)--> Reducer2
> 
> By "MultiStageMRConfigUtil" we want convert it to TEZ job such as: 
> Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2
> 
> Here Reducer1 is the intermediate reduce and it's input type is 
> <IntWritable,Text> but the output is <Text,IntWritable>.
> 
> Because the following error happened it didn't work.
> 
> 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630 failed 
> with state FAILED due to: Vertex failed, vertexName=ivertex1, 
> vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed, 
> taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt 0 
> failed, info=[Error: Failure while running task:java.lang.ClassCastException: 
> org.apache.hadoop.io.IntWritable cannot be cast to org.apache.hadoop.io.Text
> 
>        at 
> org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71)
>        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
>        at 
> org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331)
>        at 
> org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143)
>        at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
>        at 
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
>        at 
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:415)
> 
> 
> I found the YARNRunner Class uses stageConfs[i-1] to determine the Reducer1's 
> input when it creates the edge of DAG while the ReduceProcessor Class uses  
> stageConfs[i] to determine his(Reducer1) input.
> 
> But in fact the setting of stageConfs[i] is the Reducer1's output, not its 
> input. ReduceProcessor should have used stageConfs[i-1]'s setting as 
> YARNRunner does. ( In this case 'i' is 1)
> 
> -------------------------------------------------------------------
> 
> //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java
> 
> for (int i = 0; i < stageConfs.length; i++) {
> 
>    //  use stageConfs[i] to create vertex(in our case it is a ReduceProcessor)
>    //  then the ReduceProcessor is created and input is determined also by 
> stageConfs[i]
>    //  Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);  
> //it will be TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in 
> ReduceProcessor.java
>    //  Class valueClass = 
> ConfigUtils.getIntermediateInputValueClass(jobConf);  //it will be 
> TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java
> 
>    vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
>        i == 0 ? mapInputLocations : reduceInputLocations, 
> i,stageConfs.length);
> }
> 
> ...
> // use stageConfs[i-1] to create edge and its input which should be the same 
> as reduce's input
> // but the reduce's input uses stageConfs[i] as above so they are maybe 
> incompatible.
> OrderedPartitionedKVEdgeConfig edgeConf =
>    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
>        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
>    stageConfs[i - 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
>    MRPartitioner.class.getName(), partitionerConf)
>    .configureInput().useLegacyInput().done()
>    .setFromConfiguration(stageConfs[i - 1]).build();
> Edge edge = Edge.create(vertices[i - 1], vertices[i], 
> edgeConf.createDefaultEdgeProperty());
> dag.addEdge(edge);
> 
> -------------------------------------------------------------------
> 
> In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two 
> settings to let ReduceProcessor read. Then it does work well(But I think the 
> best way is to let ReduceProcessor read stageConfs[i-1]).
> 
> -------------------------------------------------------------------
> 
> //"mapreduce.reduce.input.key.class" and "mapreduce.reduce.input.value.class" 
> are the new settings added by us.
> diff 
> src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java 
> ReduceProcessor.java.OLD
> 112,113c112,113
> <     Class keyClass = 
> jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
> <     Class valueClass = 
> jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
> ---
>>    Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
>>    Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
> 
> -------------------------------------------------------------------
> 
> 
> Thanks
> 
> - Hui

Reply via email to