Adding to that, MultiStageMRConfigUtil is not meant to be used by external
projects. Support of this mechanism for multi-stage jobs is supposed to be
removed - noone's gotten around to doing this yet, but there's a jira open
to remove it.

Using the DAG API to set this up should be possible.

On Fri, Jan 23, 2015 at 9:20 AM, Hitesh Shah <[email protected]> wrote:

> Hello Hui,
>
> Thanks for catching and reporting this issue. Before we go about looking
> at a fix for this, I will provide some background.
>
> In the early days of Tez, we tried to change existing MR jobs to support a
> chain of MRR and ended up using Config/JobConf as a way to specify
> intermediate stages and also to configure them correctly. We realized that
> writing MRR pipelines using the JobClient API was quite unwieldy and hard
> to understand as it relied on setting a bunch of configs. At some point, we
> start cleaning up the Tez API to make it more easy to write MRR jobs in a
> more easy to maintain manner. In that respect, have you had a chance to
> look at the latest OrderedWordCount code in tez-examples? It shows you how
> to write an MRR job in Tez by using Tez native APIs ( DAG, Edge, Vertex )
> instead of messing with config properties in JobConf. This might be an
> easier approach if you are considering using Tez for MRR+ pipelines.
>
> In any case, for the issue that you have seen, would you mind filing a
> jira for this ( please mention what version of Tez you are using ) and
> possibly helping us by submitting up with a patch for the fix? There was a
> function aptly named doJobClientMagic() ( removed in recent times ), that
> did a second pass over the configs and setup things correctly for the case
> that you describe. I am not sure if removing that somehow introduced this
> bug.
>
> thanks
> ― Hitesh
>
>
> On Jan 22, 2015, at 9:13 PM, Hui Zheng <[email protected]> wrote:
>
> > Hi,
> >
> > We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to
> one Tez job,but it doesn't work when the intermediate reduce has different
> input/output types. Please see the details below.
> >
> > Suppose that we have two mapreduce jobs to implement the
> ordered-wordcount job which count the number of occurrences of word and
> sort them.
> >
> > Job1 is a traditional wordcount job except the output is <counts,word>
> pair. We call the mapper "Mapper1" and call the reducer "Reducer1".
> >
> > Job2 sort the word by the number of occurrences.We call the mapper
> "Mapper2" which has no any logic and call the reducer "Reducer2".
> >
> > By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)-->
> Mapper2 --(shuffle)--> Reducer2
> >
> > By "MultiStageMRConfigUtil" we want convert it to TEZ job such as:
> Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2
> >
> > Here Reducer1 is the intermediate reduce and it's input type is
> <IntWritable,Text> but the output is <Text,IntWritable>.
> >
> > Because the following error happened it didn't work.
> >
> > 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630
> failed with state FAILED due to: Vertex failed, vertexName=ivertex1,
> vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed,
> taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt 0
> failed, info=[Error: Failure while running
> task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable cannot
> be cast to org.apache.hadoop.io.Text
> >
> >        at
> org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71)
> >        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
> >        at
> org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331)
> >        at
> org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143)
> >        at
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
> >        at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
> >        at
> org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
> >        at java.security.AccessController.doPrivileged(Native Method)
> >        at javax.security.auth.Subject.doAs(Subject.java:415)
> >
> >
> > I found the YARNRunner Class uses stageConfs[i-1] to determine the
> Reducer1's input when it creates the edge of DAG while the ReduceProcessor
> Class uses  stageConfs[i] to determine his(Reducer1) input.
> >
> > But in fact the setting of stageConfs[i] is the Reducer1's output, not
> its input. ReduceProcessor should have used stageConfs[i-1]'s setting as
> YARNRunner does. ( In this case 'i' is 1)
> >
> > -------------------------------------------------------------------
> >
> > //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java
> >
> > for (int i = 0; i < stageConfs.length; i++) {
> >
> >    //  use stageConfs[i] to create vertex(in our case it is a
> ReduceProcessor)
> >    //  then the ReduceProcessor is created and input is determined also
> by stageConfs[i]
> >    //  Class keyClass =
> ConfigUtils.getIntermediateInputKeyClass(jobConf);  //it will be
> TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java
> >    //  Class valueClass =
> ConfigUtils.getIntermediateInputValueClass(jobConf);  //it will be
> TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java
> >
> >    vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
> >        i == 0 ? mapInputLocations : reduceInputLocations,
> i,stageConfs.length);
> > }
> >
> > ...
> > // use stageConfs[i-1] to create edge and its input which should be the
> same as reduce's input
> > // but the reduce's input uses stageConfs[i] as above so they are maybe
> incompatible.
> > OrderedPartitionedKVEdgeConfig edgeConf =
> >    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
> >        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
> >    stageConfs[i -
> 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
> >    MRPartitioner.class.getName(), partitionerConf)
> >    .configureInput().useLegacyInput().done()
> >    .setFromConfiguration(stageConfs[i - 1]).build();
> > Edge edge = Edge.create(vertices[i - 1], vertices[i],
> edgeConf.createDefaultEdgeProperty());
> > dag.addEdge(edge);
> >
> > -------------------------------------------------------------------
> >
> > In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two
> settings to let ReduceProcessor read. Then it does work well(But I think
> the best way is to let ReduceProcessor read stageConfs[i-1]).
> >
> > -------------------------------------------------------------------
> >
> > //"mapreduce.reduce.input.key.class" and
> "mapreduce.reduce.input.value.class" are the new settings added by us.
> > diff
> src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
> ReduceProcessor.java.OLD
> > 112,113c112,113
> > <     Class keyClass =
> jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
> > <     Class valueClass =
> jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
> > ---
> >>    Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
> >>    Class valueClass =
> ConfigUtils.getIntermediateInputValueClass(jobConf);
> >
> > -------------------------------------------------------------------
> >
> >
> > Thanks
> >
> > - Hui
>
>

Reply via email to