Can you post the exceptions/error logs?

+Vinod
Hortonworks Inc.
http://hortonworks.com/


On Mon, Jun 16, 2014 at 10:15 AM, Jeff Hurt <jeff.h...@clickfox.com> wrote:

>  All,
>
> I have a scenario where I have three Map-Reduce jobs, and I would like to
> build this as a Tez DAG.  The basic design is that the first Map-Reduce job
> (M1,R1) should invoke both the second Map-Reduce job (M2,R2) and the third
> Map (M3) job.  In addition, each job should write out its results to a file
> in HDFS.
>
> Graphically, a layout would look like this:
>
>           M1
>           |
>           R1    (R1 writes output to HDFS)
>           |
>       M2 --- M3  (M3 has no reducer, writes output to HDFS)
>       |
>       R2  (R2 writes output to HDFS)
>
> The results of R1 would be written out to HDFS, and would also be used as
> the inputs to both M2 and M3.
>
> But, we have not been able to get this functionality to work.  Errors show
> up whenever our DAG contains more than just the first Map-Reduce job.
>
> Here is the pseudocode:
>
>         final byte[] map1Payload =
> MRHelpers.createUserPayloadFromConf(map1Conf);
>         final byte[] map1InputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
>             Text.class.getName());
>
>         final Vertex map1Vertex = new Vertex("M1",
>             new
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload),
> -1,
>             MRHelpers.getMapResource(map1Conf));
>         mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));
>
>         final Map<String, String> map1Env = new HashMap<String, String>();
>         MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
>         map1Vertex.setTaskEnvironment(map1Env);
>
>         final Class<? extends TezRootInputInitializer> initializerClazz =
> MRInputAMSplitGenerator.class;
>         MRHelpers.addMRInput(map1Vertex, map1InputPayload,
> initializerClazz);
>
>         final byte[] reduce1Payload =
> MRHelpers.createUserPayloadFromConf(reduce1Conf);
>         final Vertex reduce1Vertex = new Vertex("R1",
>             new
> ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload),
> 1,
>             MRHelpers.getReduceResource(reduce1Conf));
>
> reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));
>
>         final Map<String, String> reduce1Env = new HashMap<String,
> String>();
>         MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env,
> false);
>         reduceVertex.setTaskEnvironment(reduce1Env);
>
>         MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);
>
>         final byte[] map2Payload =
> MRHelpers.createUserPayloadFromConf(map2Conf);
>         final byte[] map2InputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
>             Text.class.getName());
>
>         final Vertex map2Vertex = new Vertex("M2",
>             new
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload),
> -1,
>             MRHelpers.getMapResource(map2Conf));
>         mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));
>
>         final Map<String, String> map2Env = new HashMap<String, String>();
>         MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
>         map2Vertex.setTaskEnvironment(map2Env);
>
>         final byte[] reduce2Payload =
> MRHelpers.createUserPayloadFromConf(reduce2Conf);
>         final Vertex reduce2Vertex = new Vertex("R2",
>             new
> ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload),
> 1,
>             MRHelpers.getReduceResource(reduce2Conf));
>
> reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));
>
>         final Map<String, String> reduce2Env = new HashMap<String,
> String>();
>         MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env,
> false);
>         reduceVertex.setTaskEnvironment(reduce2Env);
>
>         MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);
>
>         final byte[] map3Payload =
> MRHelpers.createUserPayloadFromConf(map3Conf);
>         final byte[] map3InputPayload =
> MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
>             Text.class.getName());
>
>         final Vertex map3Vertex = new Vertex("M3",
>             new
> ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload),
> -1,
>             MRHelpers.getMapResource(map3Conf));
>         mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));
>
>         final Map<String, String> map3Env = new HashMap<String, String>();
>         MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
>         map2Vertex.setTaskEnvironment(map3Env);
>        MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);
>
>        DAG dag = new DAG();
>         dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>         dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>         dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>         dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new
> EdgeProperty(DataMovementType.SCATTER_GATHER,
>             DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new
> OutputDescriptor(OnFileSortedOutput.class
>                 .getName()), new
> InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
>
>
>
>
> (Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each
> reducer)
>
> We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything
> works fine.  But when we add Map-Reduce 2 or Map 3; we start to get errors.
>
> Is there a way to have multiple vertices write output to HDFS in the same
> DAG?  Are there code examples of doing this?
>
> FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.
>
> Thanks in advance,
> Jeff Hurt
>

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.

Reply via email to