Updated Branches: refs/heads/master 16d4d35f7 -> af3df548a
CRUNCH-271: Cache Counters immediately upon Job completion Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/af3df548 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/af3df548 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/af3df548 Branch: refs/heads/master Commit: af3df548a3d8c70cf5b4849f5e73e8a7fbe67f3c Parents: 16d4d35 Author: Josh Wills <[email protected]> Authored: Sat Sep 28 18:56:14 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Sat Sep 28 19:37:24 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/PipelineResult.java | 21 +++++++++++++++++++- .../lib/jobcontrol/CrunchControlledJob.java | 7 +++++++ .../apache/crunch/impl/mr/exec/MRExecutor.java | 2 +- 3 files changed, 28 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/af3df548/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java index bd29999..f98f305 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java +++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java @@ -18,6 +18,7 @@ package org.apache.crunch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.util.Map; @@ -45,7 +46,7 @@ public class PipelineResult { this(stageName, stageName, counters); } - public StageResult(String stageName, String stageId, Counters counters){ + public StageResult(String stageName, String stageId, Counters counters) { this.stageName = stageName; this.stageId = stageId; this.counters = counters; @@ -73,6 +74,9 @@ public class PipelineResult { * @return a map of group names to counter names. */ public Map<String, Set<String>> getCounterNames() { + if (counters == null) { + return ImmutableMap.of(); + } Map<String, Set<String>> names = Maps.newHashMap(); for (CounterGroup counterGroup : counters) { Set<String> counterNames = Sets.newHashSet(); @@ -91,22 +95,37 @@ public class PipelineResult { */ @Deprecated public Counter findCounter(Enum<?> key) { + if (counters == null) { + return null; + } return counters.findCounter(key); } public long getCounterValue(String groupName, String counterName) { + if (counters == null) { + return 0L; + } return counters.findCounter(groupName, counterName).getValue(); } public String getCounterDisplayName(String groupName, String counterName) { + if (counters == null) { + return null; + } return counters.findCounter(groupName, counterName).getDisplayName(); } public long getCounterValue(Enum<?> key) { + if (counters == null) { + return 0L; + } return counters.findCounter(key).getValue(); } public String getCounterDisplayName(Enum<?> key) { + if (counters == null) { + return null; + } return counters.findCounter(key).getDisplayName(); } } http://git-wip-us.apache.org/repos/asf/crunch/blob/af3df548/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index 0038ab7..ab46263 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.impl.mr.MRJob; import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.util.StringUtils; @@ -60,6 +61,7 @@ public class CrunchControlledJob implements MRJob { // some info for human consumption, e.g. the reason why the job failed private String message; private String lastKnownProgress; + private Counters counters; /** * Construct a job. @@ -136,6 +138,10 @@ public class CrunchControlledJob implements MRJob { return this.job.getJobID(); } + public Counters getCounters() { + return counters; + } + @Override public synchronized Job getJob() { return this.job; @@ -225,6 +231,7 @@ public class CrunchControlledJob implements MRJob { private void checkRunningState() throws IOException, InterruptedException { try { if (job.isComplete()) { + this.counters = job.getCounters(); if (job.isSuccessful()) { this.state = State.SUCCESS; } else { http://git-wip-us.apache.org/repos/asf/crunch/blob/af3df548/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index e223e5f..532e37c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -112,7 +112,7 @@ public class MRExecutor implements MRPipelineExecution { } List<PipelineResult.StageResult> stages = Lists.newArrayList(); for (CrunchControlledJob job : control.getSuccessfulJobList()) { - stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getJob().getCounters())); + stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getCounters())); } for (PCollectionImpl<?> c : outputTargets.keySet()) {
