Updated Branches: refs/heads/master bf1a094f8 -> c10996a4d
Crunch-254: Added access to the underlying JobId for MRPipeline jobs Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/c10996a4 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/c10996a4 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/c10996a4 Branch: refs/heads/master Commit: c10996a4d995d72a0ae7dab78a2da4aca6a56af3 Parents: bf1a094 Author: Micah Whitacre <[email protected]> Authored: Mon Aug 19 21:17:06 2013 -0500 Committer: Micah Whitacre <[email protected]> Committed: Tue Aug 20 19:50:57 2013 -0500 ---------------------------------------------------------------------- .../src/it/java/org/apache/crunch/MRPipelineIT.java | 7 ++++++- .../src/main/java/org/apache/crunch/PipelineResult.java | 10 ++++++++++ .../java/org/apache/crunch/impl/mr/exec/MRExecutor.java | 2 +- 3 files changed, 17 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/c10996a4/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java index 7670e88..25c85c8 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java @@ -23,6 +23,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import org.apache.crunch.PipelineResult.StageResult; import org.apache.crunch.fn.FilterFns; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.MRPipeline; @@ -68,7 +69,11 @@ public class MRPipelineIT implements Serializable { pipeline.writeTextFile(ungroupedTableA, outputDirA.getAbsolutePath()); pipeline.writeTextFile(ungroupedTableB, outputDirB.getAbsolutePath()); - pipeline.done(); + PipelineResult result = pipeline.done(); + for(StageResult stageResult : result.getStageResults()){ + assertTrue(stageResult.getStageName().length() > 1); + assertTrue(stageResult.getStageId().length() > 1); + } // Verify that output from a single PGroupedTable can be sent to multiple collections assertTrue(new File(outputDirA, "part-r-00000").exists()); http://git-wip-us.apache.org/repos/asf/crunch/blob/c10996a4/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java index 74a073f..bd29999 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java +++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java @@ -38,10 +38,16 @@ public class PipelineResult { public static class StageResult { private final String stageName; + private final String stageId; private final Counters counters; public StageResult(String stageName, Counters counters) { + this(stageName, stageName, counters); + } + + public StageResult(String stageName, String stageId, Counters counters){ this.stageName = stageName; + this.stageId = stageId; this.counters = counters; } @@ -49,6 +55,10 @@ public class PipelineResult { return stageName; } + public String getStageId(){ + return stageId; + } + /** * @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2 * (from a class to an interface) so user programs should avoid this method and use http://git-wip-us.apache.org/repos/asf/crunch/blob/c10996a4/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 1e03ff2..e223e5f 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -112,7 +112,7 @@ public class MRExecutor implements MRPipelineExecution { } List<PipelineResult.StageResult> stages = Lists.newArrayList(); for (CrunchControlledJob job : control.getSuccessfulJobList()) { - stages.add(new PipelineResult.StageResult(job.getJobName(), job.getJob().getCounters())); + stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getJob().getCounters())); } for (PCollectionImpl<?> c : outputTargets.keySet()) {
