Using MultiStageMRConfigUtil and per-stage configs is not recommended. https://issues.apache.org/jira/browse/TEZ-1271 is meant to remove support for this.
That said, we have had informal discussions about supporting chains of Map-Reduce jobs, and potentially providing a tool to convert such chains into DAGs. Clearly, there are scenarios where people would be interested in doing this. Would be great to get more input on this from members of the tez community. Thanks - Sid On Sun, Jan 25, 2015 at 6:50 PM, Hui Zheng <[email protected]> wrote: > Hi, > > Actually we have already many mapreduce jobs which are running in > production environment. > And we want to find a easy way to change MRR jobs to a tez job to avoid > reading and writing hdfs. > So I find that we may only change the configuration(not use > MultiStageMRConfigUtil class directly ) to implement it.Is it obsolete or > not recommended? > (we use tez-0.5.2) > > The way is use the“mrr.intermediate.num-stages”property for a job and use > the“mrr.intermediate.stage.STAGE_NUM”prefix for each intermediate reduce > of the job such as follows. > > <property> > <name>mrr.intermediate.num-stages</name> > <value>1</value> > </property> > > <property> > <name>mrr.intermediate.stage.1.mapreduce.job.reduce.class</name> > > <value>org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReduc > er</value> > </property> > > <property> > <name>mrr.intermediate.stage.1.mapreduce.map.output.key.class</name> > <value>org.apache.hadoop.io.IntWritable</value> > </property> > > <property> > <name>mrr.intermediate.stage.1.mapreduce.map.output.value.class</name> > <value>org.apache.hadoop.io.Text</value> > </property> > > Thanks > > > -Hui > > > > On 2015/01/24 6:32, "Siddharth Seth" <[email protected]> wrote: > > >Adding to that, MultiStageMRConfigUtil is not meant to be used by external > >projects. Support of this mechanism for multi-stage jobs is supposed to be > >removed - noone's gotten around to doing this yet, but there's a jira open > >to remove it. > > > >Using the DAG API to set this up should be possible. > > > >On Fri, Jan 23, 2015 at 9:20 AM, Hitesh Shah <[email protected]> wrote: > > > >> Hello Hui, > >> > >> Thanks for catching and reporting this issue. Before we go about looking > >> at a fix for this, I will provide some background. > >> > >> In the early days of Tez, we tried to change existing MR jobs to > >>support a > >> chain of MRR and ended up using Config/JobConf as a way to specify > >> intermediate stages and also to configure them correctly. We realized > >>that > >> writing MRR pipelines using the JobClient API was quite unwieldy and > >>hard > >> to understand as it relied on setting a bunch of configs. At some > >>point, we > >> start cleaning up the Tez API to make it more easy to write MRR jobs in > >>a > >> more easy to maintain manner. In that respect, have you had a chance to > >> look at the latest OrderedWordCount code in tez-examples? It shows you > >>how > >> to write an MRR job in Tez by using Tez native APIs ( DAG, Edge, Vertex > >>) > >> instead of messing with config properties in JobConf. This might be an > >> easier approach if you are considering using Tez for MRR+ pipelines. > >> > >> In any case, for the issue that you have seen, would you mind filing a > >> jira for this ( please mention what version of Tez you are using ) and > >> possibly helping us by submitting up with a patch for the fix? There > >>was a > >> function aptly named doJobClientMagic() ( removed in recent times ), > >>that > >> did a second pass over the configs and setup things correctly for the > >>case > >> that you describe. I am not sure if removing that somehow introduced > >>this > >> bug. > >> > >> thanks > >> ― Hitesh > >> > >> > >> On Jan 22, 2015, at 9:13 PM, Hui Zheng <[email protected]> wrote: > >> > >> > Hi, > >> > > >> > We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to > >> one Tez job,but it doesn't work when the intermediate reduce has > >>different > >> input/output types. Please see the details below. > >> > > >> > Suppose that we have two mapreduce jobs to implement the > >> ordered-wordcount job which count the number of occurrences of word and > >> sort them. > >> > > >> > Job1 is a traditional wordcount job except the output is <counts,word> > >> pair. We call the mapper "Mapper1" and call the reducer "Reducer1". > >> > > >> > Job2 sort the word by the number of occurrences.We call the mapper > >> "Mapper2" which has no any logic and call the reducer "Reducer2". > >> > > >> > By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)--> > >> Mapper2 --(shuffle)--> Reducer2 > >> > > >> > By "MultiStageMRConfigUtil" we want convert it to TEZ job such as: > >> Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2 > >> > > >> > Here Reducer1 is the intermediate reduce and it's input type is > >> <IntWritable,Text> but the output is <Text,IntWritable>. > >> > > >> > Because the following error happened it didn't work. > >> > > >> > 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630 > >> failed with state FAILED due to: Vertex failed, vertexName=ivertex1, > >> vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed, > >> taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt > >>0 > >> failed, info=[Error: Failure while running > >> task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable > >>cannot > >> be cast to org.apache.hadoop.io.Text > >> > > >> > at > >> > >>org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.re > >>duce(ConfigableWordCount.java:71) > >> > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171) > >> > at > >> > >>org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(R > >>educeProcessor.java:331) > >> > at > >> > >>org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProce > >>ssor.java:143) > >> > at > >> > >>org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcess > >>orRuntimeTask.java:324) > >> > at > >> > >>org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTas > >>kRunner.java:176) > >> > at > >> > >>org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTas > >>kRunner.java:168) > >> > at java.security.AccessController.doPrivileged(Native Method) > >> > at javax.security.auth.Subject.doAs(Subject.java:415) > >> > > >> > > >> > I found the YARNRunner Class uses stageConfs[i-1] to determine the > >> Reducer1's input when it creates the edge of DAG while the > >>ReduceProcessor > >> Class uses stageConfs[i] to determine his(Reducer1) input. > >> > > >> > But in fact the setting of stageConfs[i] is the Reducer1's output, not > >> its input. ReduceProcessor should have used stageConfs[i-1]'s setting as > >> YARNRunner does. ( In this case 'i' is 1) > >> > > >> > ------------------------------------------------------------------- > >> > > >> > //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java > >> > > >> > for (int i = 0; i < stageConfs.length; i++) { > >> > > >> > // use stageConfs[i] to create vertex(in our case it is a > >> ReduceProcessor) > >> > // then the ReduceProcessor is created and input is determined > >>also > >> by stageConfs[i] > >> > // Class keyClass = > >> ConfigUtils.getIntermediateInputKeyClass(jobConf); //it will be > >> TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java > >> > // Class valueClass = > >> ConfigUtils.getIntermediateInputValueClass(jobConf); //it will be > >> TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java > >> > > >> > vertices[i] = createVertexForStage(stageConfs[i], > >>jobLocalResources, > >> > i == 0 ? mapInputLocations : reduceInputLocations, > >> i,stageConfs.length); > >> > } > >> > > >> > ... > >> > // use stageConfs[i-1] to create edge and its input which should be > >>the > >> same as reduce's input > >> > // but the reduce's input uses stageConfs[i] as above so they are > >>maybe > >> incompatible. > >> > OrderedPartitionedKVEdgeConfig edgeConf = > >> > OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get( > >> > TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS), > >> > stageConfs[i - > >> 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS), > >> > MRPartitioner.class.getName(), partitionerConf) > >> > .configureInput().useLegacyInput().done() > >> > .setFromConfiguration(stageConfs[i - 1]).build(); > >> > Edge edge = Edge.create(vertices[i - 1], vertices[i], > >> edgeConf.createDefaultEdgeProperty()); > >> > dag.addEdge(edge); > >> > > >> > ------------------------------------------------------------------- > >> > > >> > In ReduceProcessor it can't read stageConfs[i-1], so I simply add two > >> settings to let ReduceProcessor read. Then it does work well(But I think > >> the best way is to let ReduceProcessor read stageConfs[i-1]). > >> > > >> > ------------------------------------------------------------------- > >> > > >> > //"mapreduce.reduce.input.key.class" and > >> "mapreduce.reduce.input.value.class" are the new settings added by us. > >> > diff > >> > >>src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.j > >>ava > >> ReduceProcessor.java.OLD > >> > 112,113c112,113 > >> > < Class keyClass = > >> jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class); > >> > < Class valueClass = > >> > >>jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class); > >> > --- > >> >> Class keyClass = > >>ConfigUtils.getIntermediateInputKeyClass(jobConf); > >> >> Class valueClass = > >> ConfigUtils.getIntermediateInputValueClass(jobConf); > >> > > >> > ------------------------------------------------------------------- > >> > > >> > > >> > Thanks > >> > > >> > - Hui > >> > >> > >
