Repository: crunch Updated Branches: refs/heads/master 6a4f9977c -> 8a61e3a62
CRUNCH-342: Add start and end time info to StageResult for both the job itself and the pre and post-job hooks. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8a61e3a6 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8a61e3a6 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8a61e3a6 Branch: refs/heads/master Commit: 8a61e3a62a58d461cd002c2145814ea56fdcbd29 Parents: 6a4f997 Author: Josh Wills <[email protected]> Authored: Thu Feb 13 18:08:36 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Feb 21 09:04:05 2014 -0800 ---------------------------------------------------------------------- .../java/org/apache/crunch/PipelineResult.java | 48 ++++++++++++++++++-- .../lib/jobcontrol/CrunchControlledJob.java | 24 ++++++++++ .../apache/crunch/impl/mr/exec/MRExecutor.java | 3 +- .../apache/crunch/impl/spark/SparkRuntime.java | 4 +- 4 files changed, 74 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/8a61e3a6/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java index c1ecdd3..5325bf3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java +++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java @@ -41,26 +41,68 @@ public class PipelineResult { private final String stageName; private final String stageId; private final Counters counters; + private final long startTimeMsec; + private final long jobStartTimeMsec; + private final long jobEndTimeMsec; + private final long endTimeMsec; public StageResult(String stageName, Counters counters) { - this(stageName, stageName, counters); + this(stageName, counters, System.currentTimeMillis(), System.currentTimeMillis()); } - public StageResult(String stageName, String stageId, Counters counters) { + public StageResult(String stageName, Counters counters, long startTimeMsec, long endTimeMsec) { + this(stageName, stageName, counters, startTimeMsec, startTimeMsec, endTimeMsec, endTimeMsec); + } + + public StageResult(String stageName, String stageId, Counters counters, long startTimeMsec, + long jobStartTimeMsec, long jobEndTimeMsec, long endTimeMsec) { this.stageName = stageName; this.stageId = stageId; this.counters = counters; + this.startTimeMsec = startTimeMsec; + this.jobStartTimeMsec = jobStartTimeMsec; + this.jobEndTimeMsec = jobEndTimeMsec; + this.endTimeMsec = endTimeMsec; } public String getStageName() { return stageName; } - public String getStageId(){ + public String getStageId() { return stageId; } /** + * @return the overall start time for this stage, that is, the time at which any pre-job hooks were + * started. + */ + public long getStartTimeMsec() { + return startTimeMsec; + } + + /** + * @return the time that the work for this stage was submitted to the cluster for execution, if applicable. + */ + public long getJobStartTimeMsec() { + return jobStartTimeMsec; + } + + /** + * @return the time that the work for this stage finished processing on the cluster, if applicable. + */ + public long getJobEndTimeMsec() { + return jobEndTimeMsec; + } + + /** + * @return the overall end time for this stage, that is, the time at which any post-job hooks completed. + */ + public long getEndTimeMsec() { + return endTimeMsec; + } + + /** * @deprecated The {@link Counter} class changed incompatibly between Hadoop 1 and 2 * (from a class to an interface) so user programs should avoid this method and use * {@link #getCounterNames()}. http://git-wip-us.apache.org/repos/asf/crunch/blob/8a61e3a6/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index aaf9f04..5dbb43e 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -62,6 +62,10 @@ public class CrunchControlledJob implements MRJob { private String message; private String lastKnownProgress; private Counters counters; + private long preHookStartTimeMsec; + private long jobStartTimeMsec; + private long jobEndTimeMsec; + private long postHookEndTimeMsec; /** * Construct a job. @@ -138,6 +142,22 @@ public class CrunchControlledJob implements MRJob { return this.job.getJobID(); } + public long getStartTimeMsec() { + return preHookStartTimeMsec; + } + + public long getJobStartTimeMsec() { + return jobStartTimeMsec; + } + + public long getJobEndTimeMsec() { + return jobEndTimeMsec; + } + + public long getEndTimeMsec() { + return postHookEndTimeMsec; + } + public Counters getCounters() { return counters; } @@ -231,6 +251,7 @@ public class CrunchControlledJob implements MRJob { private void checkRunningState() throws IOException, InterruptedException { try { if (job.isComplete()) { + this.jobEndTimeMsec = System.currentTimeMillis(); this.counters = job.getCounters(); if (job.isSuccessful()) { this.state = State.SUCCESS; @@ -256,6 +277,7 @@ public class CrunchControlledJob implements MRJob { } if (isCompleted()) { completionHook.run(); + this.postHookEndTimeMsec = System.currentTimeMillis(); } } @@ -303,7 +325,9 @@ public class CrunchControlledJob implements MRJob { */ protected synchronized void submit() { try { + this.preHookStartTimeMsec = System.currentTimeMillis(); prepareHook.run(); + this.jobStartTimeMsec = System.currentTimeMillis(); job.submit(); this.state = State.RUNNING; LOG.info("Running job \"" + getJobName() + "\""); http://git-wip-us.apache.org/repos/asf/crunch/blob/8a61e3a6/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index ce6fffa..1137498 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -123,7 +123,8 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe } List<PipelineResult.StageResult> stages = Lists.newArrayList(); for (CrunchControlledJob job : control.getSuccessfulJobList()) { - stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getCounters())); + stages.add(new PipelineResult.StageResult(job.getJobName(), job.getMapredJobID().toString(), job.getCounters(), + job.getStartTimeMsec(), job.getJobStartTimeMsec(), job.getJobEndTimeMsec(), job.getEndTimeMsec())); } for (PCollectionImpl<?> c : outputTargets.keySet()) { http://git-wip-us.apache.org/repos/asf/crunch/blob/8a61e3a6/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index 99268cc..ecc7023 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -198,6 +198,7 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe private void monitorLoop() { status.set(Status.RUNNING); + long start = System.currentTimeMillis(); Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = Maps.<PCollectionImpl<?>, PCollectionImpl<?>, Set<SourceTarget<?>>>newTreeMap(DEPTH_COMPARATOR); for (PCollectionImpl<?> pcollect : outputTargets.keySet()) { targetDeps.put(pcollect, pcollect.getTargetDependencies()); @@ -280,7 +281,8 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe } if (status.get() != Status.FAILED || status.get() != Status.KILLED) { status.set(Status.SUCCEEDED); - result = new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("Spark", null)), + result = new PipelineResult( + ImmutableList.of(new PipelineResult.StageResult("Spark", null, start, System.currentTimeMillis())), Status.SUCCEEDED); set(result); } else {
