Hi Jeff

A few questions to clarify your use-case:
   - You seem to have clarified that you need each stage to write to HDFS but 
just wanted to confirm in any case whether this is a strict requirement or 
something just done for faster recovery in case of failures? 
   - Would I be correct in saying that you want to send the data generated by 
R1 to both M2 and M3 and also write a replica of that data set to HDFS? 
   - Does the data sent from R1 also need to be sorted and partitioned before 
being sent to the downstream vertices?
   - How much of your logic inside the Mappers and Reducers is tied to 
MapReduce? If you are wiling to write your own processor instead of using a 
Mapper/Reducer, you will probably be able to leverage more performance 
benefits. For example. the logic in M2 and M3 could possibly be combined into a 
single vertex/processor. The single processor could write the required output 
to HDFS that M3 would have generated and likewise generate the required 
intermediate data needed by R2.

From a Tez point of view, the MapProcessor and ReduceProcessor were written 
pretty much to provide MR compatibility when strictly used in an M->R dag or 
M->R->R…->R ( straight-line DAGs ). They do not handle multiple outputs nor 
multiple inputs.

That said, I believe your use-case should be something that can be addressed in 
Tez. However, there are a couple of things lacking:
   - Support for something called a “shared edge”. This effectively means a 
vertex generating data on a given Output and that same data being sent 
downstream to different downstream Inputs ( edges ). Today, it is a strict 1:1 
relationship
   - An edge that uses HDFS to transfer data has not been built yet. This would 
allow R1 to write data to HDFS and have M2 and M3 read from HDFS. In your 
use-case, today, one would need generate data twice - one for the shuffle edge 
and one for HDFS and have the shuffle edge data being sent downstream. But that 
would not be supported by the Map/Reduce Processors.

thanks
— Hitesh


On Jun 16, 2014, at 10:15 AM, Jeff Hurt <jeff.h...@clickfox.com> wrote:

> All,
> 
> I have a scenario where I have three Map-Reduce jobs, and I would like to 
> build this as a Tez DAG.  The basic design is that the first Map-Reduce job 
> (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third 
> Map (M3) job.  In addition, each job should write out its results to a file 
> in HDFS.
> 
> Graphically, a layout would look like this:
> 
>          M1
>          |
>          R1    (R1 writes output to HDFS)
>          |
>      M2 --- M3  (M3 has no reducer, writes output to HDFS)
>      |
>      R2  (R2 writes output to HDFS)
> 
> The results of R1 would be written out to HDFS, and would also be used as the 
> inputs to both M2 and M3.
> 
> But, we have not been able to get this functionality to work.  Errors show up 
> whenever our DAG contains more than just the first Map-Reduce job.
> 
> Here is the pseudocode:
> 
>        final byte[] map1Payload = 
> MRHelpers.createUserPayloadFromConf(map1Conf);
>        final byte[] map1InputPayload = 
> MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
>            Text.class.getName());
> 
>        final Vertex map1Vertex = new Vertex("M1",
>            new 
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload),
>  -1,
>            MRHelpers.getMapResource(map1Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));
> 
>        final Map<String, String> map1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
>        map1Vertex.setTaskEnvironment(map1Env);
> 
>        final Class<? extends TezRootInputInitializer> initializerClazz = 
> MRInputAMSplitGenerator.class;
>        MRHelpers.addMRInput(map1Vertex, map1InputPayload, initializerClazz);
> 
>        final byte[] reduce1Payload = 
> MRHelpers.createUserPayloadFromConf(reduce1Conf);
>        final Vertex reduce1Vertex = new Vertex("R1",
>            new 
> ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload),
>  1,
>            MRHelpers.getReduceResource(reduce1Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));
> 
>        final Map<String, String> reduce1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env, false);
>        reduceVertex.setTaskEnvironment(reduce1Env);
> 
>        MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);
> 
>        final byte[] map2Payload = 
> MRHelpers.createUserPayloadFromConf(map2Conf);
>        final byte[] map2InputPayload = 
> MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
>            Text.class.getName());
> 
>        final Vertex map2Vertex = new Vertex("M2",
>            new 
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload),
>  -1,
>            MRHelpers.getMapResource(map2Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));
> 
>        final Map<String, String> map2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
>        map2Vertex.setTaskEnvironment(map2Env);
> 
>        final byte[] reduce2Payload = 
> MRHelpers.createUserPayloadFromConf(reduce2Conf);
>        final Vertex reduce2Vertex = new Vertex("R2",
>            new 
> ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload),
>  1,
>            MRHelpers.getReduceResource(reduce2Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));
> 
>        final Map<String, String> reduce2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env, false);
>        reduceVertex.setTaskEnvironment(reduce2Env);
> 
>        MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);
> 
>        final byte[] map3Payload = 
> MRHelpers.createUserPayloadFromConf(map3Conf);
>        final byte[] map3InputPayload = 
> MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
>            Text.class.getName());
> 
>        final Vertex map3Vertex = new Vertex("M3",
>            new 
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload),
>  -1,
>            MRHelpers.getMapResource(map3Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));
> 
>        final Map<String, String> map3Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
>        map2Vertex.setTaskEnvironment(map3Env);
>       MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);
> 
>       DAG dag = new DAG();
>        dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new 
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new 
> OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new 
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new 
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new 
> OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new 
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new 
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new 
> OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new 
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new 
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new 
> OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new 
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
> 
> 
> 
> (Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each reducer)
> 
> We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything 
> works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.
> 
> Is there a way to have multiple vertices write output to HDFS in the same 
> DAG?  Are there code examples of doing this?
> 
> FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.
> 
> Thanks in advance,
> Jeff Hurt

Reply via email to