Adding to that, MultiStageMRConfigUtil is not meant to be used by external projects. Support of this mechanism for multi-stage jobs is supposed to be removed - noone's gotten around to doing this yet, but there's a jira open to remove it.
Using the DAG API to set this up should be possible. On Fri, Jan 23, 2015 at 9:20 AM, Hitesh Shah <[email protected]> wrote: > Hello Hui, > > Thanks for catching and reporting this issue. Before we go about looking > at a fix for this, I will provide some background. > > In the early days of Tez, we tried to change existing MR jobs to support a > chain of MRR and ended up using Config/JobConf as a way to specify > intermediate stages and also to configure them correctly. We realized that > writing MRR pipelines using the JobClient API was quite unwieldy and hard > to understand as it relied on setting a bunch of configs. At some point, we > start cleaning up the Tez API to make it more easy to write MRR jobs in a > more easy to maintain manner. In that respect, have you had a chance to > look at the latest OrderedWordCount code in tez-examples? It shows you how > to write an MRR job in Tez by using Tez native APIs ( DAG, Edge, Vertex ) > instead of messing with config properties in JobConf. This might be an > easier approach if you are considering using Tez for MRR+ pipelines. > > In any case, for the issue that you have seen, would you mind filing a > jira for this ( please mention what version of Tez you are using ) and > possibly helping us by submitting up with a patch for the fix? There was a > function aptly named doJobClientMagic() ( removed in recent times ), that > did a second pass over the configs and setup things correctly for the case > that you describe. I am not sure if removing that somehow introduced this > bug. > > thanks > ― Hitesh > > > On Jan 22, 2015, at 9:13 PM, Hui Zheng <[email protected]> wrote: > > > Hi, > > > > We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to > one Tez job,but it doesn't work when the intermediate reduce has different > input/output types. Please see the details below. > > > > Suppose that we have two mapreduce jobs to implement the > ordered-wordcount job which count the number of occurrences of word and > sort them. > > > > Job1 is a traditional wordcount job except the output is <counts,word> > pair. We call the mapper "Mapper1" and call the reducer "Reducer1". > > > > Job2 sort the word by the number of occurrences.We call the mapper > "Mapper2" which has no any logic and call the reducer "Reducer2". > > > > By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)--> > Mapper2 --(shuffle)--> Reducer2 > > > > By "MultiStageMRConfigUtil" we want convert it to TEZ job such as: > Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2 > > > > Here Reducer1 is the intermediate reduce and it's input type is > <IntWritable,Text> but the output is <Text,IntWritable>. > > > > Because the following error happened it didn't work. > > > > 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630 > failed with state FAILED due to: Vertex failed, vertexName=ivertex1, > vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed, > taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt 0 > failed, info=[Error: Failure while running > task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot > be cast to org.apache.hadoop.io.Text > > > > at > org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71) > > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) > > at > org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331) > > at > org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143) > > at > org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324) > > at > org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176) > > at > org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:415) > > > > > > I found the YARNRunner Class uses stageConfs[i-1] to determine the > Reducer1's input when it creates the edge of DAG while the ReduceProcessor > Class uses stageConfs[i] to determine his(Reducer1) input. > > > > But in fact the setting of stageConfs[i] is the Reducer1's output, not > its input. ReduceProcessor should have used stageConfs[i-1]'s setting as > YARNRunner does. ( In this case 'i' is 1) > > > > ------------------------------------------------------------------- > > > > //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java > > > > for (int i = 0; i < stageConfs.length; i++) { > > > > // use stageConfs[i] to create vertex(in our case it is a > ReduceProcessor) > > // then the ReduceProcessor is created and input is determined also > by stageConfs[i] > > // Class keyClass = > ConfigUtils.getIntermediateInputKeyClass(jobConf); //it will be > TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java > > // Class valueClass = > ConfigUtils.getIntermediateInputValueClass(jobConf); //it will be > TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java > > > > vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources, > > i == 0 ? mapInputLocations : reduceInputLocations, > i,stageConfs.length); > > } > > > > ... > > // use stageConfs[i-1] to create edge and its input which should be the > same as reduce's input > > // but the reduce's input uses stageConfs[i] as above so they are maybe > incompatible. > > OrderedPartitionedKVEdgeConfig edgeConf = > > OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get( > > TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS), > > stageConfs[i - > 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS), > > MRPartitioner.class.getName(), partitionerConf) > > .configureInput().useLegacyInput().done() > > .setFromConfiguration(stageConfs[i - 1]).build(); > > Edge edge = Edge.create(vertices[i - 1], vertices[i], > edgeConf.createDefaultEdgeProperty()); > > dag.addEdge(edge); > > > > ------------------------------------------------------------------- > > > > In ReduceProcessor it can't read stageConfs[i-1], so I simply add two > settings to let ReduceProcessor read. Then it does work well(But I think > the best way is to let ReduceProcessor read stageConfs[i-1]). > > > > ------------------------------------------------------------------- > > > > //"mapreduce.reduce.input.key.class" and > "mapreduce.reduce.input.value.class" are the new settings added by us. > > diff > src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java > ReduceProcessor.java.OLD > > 112,113c112,113 > > < Class keyClass = > jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class); > > < Class valueClass = > jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class); > > --- > >> Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf); > >> Class valueClass = > ConfigUtils.getIntermediateInputValueClass(jobConf); > > > > ------------------------------------------------------------------- > > > > > > Thanks > > > > - Hui > >
