Repository: crunch
Updated Branches:
  refs/heads/master 6a4f9977c -> 8a61e3a62


CRUNCH-342: Add start and end time info to StageResult for both the job itself
and the pre and post-job hooks.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/8a61e3a6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/8a61e3a6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/8a61e3a6

Branch: refs/heads/master
Commit: 8a61e3a62a58d461cd002c2145814ea56fdcbd29
Parents: 6a4f997
Author: Josh Wills <[email protected]>
Authored: Thu Feb 13 18:08:36 2014 -0800
Committer: Josh Wills <[email protected]>
Committed: Fri Feb 21 09:04:05 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/PipelineResult.java  | 48 ++++++++++++++++++--
 .../lib/jobcontrol/CrunchControlledJob.java     | 24 ++++++++++
 .../apache/crunch/impl/mr/exec/MRExecutor.java  |  3 +-
 .../apache/crunch/impl/spark/SparkRuntime.java  |  4 +-
 4 files changed, 74 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/8a61e3a6/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java 
b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
index c1ecdd3..5325bf3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PipelineResult.java
@@ -41,26 +41,68 @@ public class PipelineResult {
     private final String stageName;
     private final String stageId;
     private final Counters counters;
+    private final long startTimeMsec;
+    private final long jobStartTimeMsec;
+    private final long jobEndTimeMsec;
+    private final long endTimeMsec;
 
     public StageResult(String stageName, Counters counters) {
-      this(stageName, stageName, counters);
+      this(stageName, counters, System.currentTimeMillis(), 
System.currentTimeMillis());
     }
 
-    public StageResult(String stageName, String stageId, Counters counters) {
+    public StageResult(String stageName, Counters counters, long 
startTimeMsec, long endTimeMsec) {
+      this(stageName, stageName, counters, startTimeMsec, startTimeMsec, 
endTimeMsec, endTimeMsec);
+    }
+
+    public StageResult(String stageName, String stageId, Counters counters, 
long startTimeMsec,
+        long jobStartTimeMsec, long jobEndTimeMsec, long endTimeMsec) {
       this.stageName = stageName;
       this.stageId = stageId;
       this.counters = counters;
+      this.startTimeMsec = startTimeMsec;
+      this.jobStartTimeMsec = jobStartTimeMsec;
+      this.jobEndTimeMsec = jobEndTimeMsec;
+      this.endTimeMsec = endTimeMsec;
     }
 
     public String getStageName() {
       return stageName;
     }
 
-    public String getStageId(){
+    public String getStageId() {
       return stageId;
     }
 
     /**
+     * @return the overall start time for this stage, that is, the time at 
which any pre-job hooks were
+     * started.
+     */
+    public long getStartTimeMsec() {
+      return startTimeMsec;
+    }
+
+    /**
+     * @return the time that the work for this stage was submitted to the 
cluster for execution, if applicable.
+     */
+    public long getJobStartTimeMsec() {
+      return jobStartTimeMsec;
+    }
+
+    /**
+     * @return the time that the work for this stage finished processing on 
the cluster, if applicable.
+     */
+    public long getJobEndTimeMsec() {
+      return jobEndTimeMsec;
+    }
+
+    /**
+     * @return the overall end time for this stage, that is, the time at which 
any post-job hooks completed.
+     */
+    public long getEndTimeMsec() {
+      return endTimeMsec;
+    }
+
+    /**
      * @deprecated The {@link Counter} class changed incompatibly between 
Hadoop 1 and 2
      * (from a class to an interface) so user programs should avoid this 
method and use
      * {@link #getCounterNames()}.

http://git-wip-us.apache.org/repos/asf/crunch/blob/8a61e3a6/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
----------------------------------------------------------------------
diff --git 
a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
 
b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
index aaf9f04..5dbb43e 100644
--- 
a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
+++ 
b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java
@@ -62,6 +62,10 @@ public class CrunchControlledJob implements MRJob {
   private String message;
   private String lastKnownProgress;
   private Counters counters;
+  private long preHookStartTimeMsec;
+  private long jobStartTimeMsec;
+  private long jobEndTimeMsec;
+  private long postHookEndTimeMsec;
 
   /**
    * Construct a job.
@@ -138,6 +142,22 @@ public class CrunchControlledJob implements MRJob {
     return this.job.getJobID();
   }
 
+  public long getStartTimeMsec() {
+    return preHookStartTimeMsec;
+  }
+
+  public long getJobStartTimeMsec() {
+    return jobStartTimeMsec;
+  }
+
+  public long getJobEndTimeMsec() {
+    return jobEndTimeMsec;
+  }
+
+  public long getEndTimeMsec() {
+    return postHookEndTimeMsec;
+  }
+
   public Counters getCounters() {
     return counters;
   }
@@ -231,6 +251,7 @@ public class CrunchControlledJob implements MRJob {
   private void checkRunningState() throws IOException, InterruptedException {
     try {
       if (job.isComplete()) {
+        this.jobEndTimeMsec = System.currentTimeMillis();
         this.counters = job.getCounters();
         if (job.isSuccessful()) {
           this.state = State.SUCCESS;
@@ -256,6 +277,7 @@ public class CrunchControlledJob implements MRJob {
     }
     if (isCompleted()) {
       completionHook.run();
+      this.postHookEndTimeMsec = System.currentTimeMillis();
     }
   }
 
@@ -303,7 +325,9 @@ public class CrunchControlledJob implements MRJob {
    */
   protected synchronized void submit() {
     try {
+      this.preHookStartTimeMsec = System.currentTimeMillis();
       prepareHook.run();
+      this.jobStartTimeMsec = System.currentTimeMillis();
       job.submit();
       this.state = State.RUNNING;
       LOG.info("Running job \"" + getJobName() + "\"");

http://git-wip-us.apache.org/repos/asf/crunch/blob/8a61e3a6/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git 
a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java 
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index ce6fffa..1137498 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -123,7 +123,8 @@ public class MRExecutor extends 
AbstractFuture<PipelineResult> implements MRPipe
       }
       List<PipelineResult.StageResult> stages = Lists.newArrayList();
       for (CrunchControlledJob job : control.getSuccessfulJobList()) {
-        stages.add(new PipelineResult.StageResult(job.getJobName(), 
job.getMapredJobID().toString(), job.getCounters()));
+        stages.add(new PipelineResult.StageResult(job.getJobName(), 
job.getMapredJobID().toString(), job.getCounters(),
+            job.getStartTimeMsec(), job.getJobStartTimeMsec(), 
job.getJobEndTimeMsec(), job.getEndTimeMsec()));
       }
 
       for (PCollectionImpl<?> c : outputTargets.keySet()) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/8a61e3a6/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
----------------------------------------------------------------------
diff --git 
a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java 
b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
index 99268cc..ecc7023 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java
@@ -198,6 +198,7 @@ public class SparkRuntime extends 
AbstractFuture<PipelineResult> implements Pipe
 
   private void monitorLoop() {
     status.set(Status.RUNNING);
+    long start = System.currentTimeMillis();
     Map<PCollectionImpl<?>, Set<SourceTarget<?>>> targetDeps = 
Maps.<PCollectionImpl<?>, PCollectionImpl<?>, 
Set<SourceTarget<?>>>newTreeMap(DEPTH_COMPARATOR);
     for (PCollectionImpl<?> pcollect : outputTargets.keySet()) {
       targetDeps.put(pcollect, pcollect.getTargetDependencies());
@@ -280,7 +281,8 @@ public class SparkRuntime extends 
AbstractFuture<PipelineResult> implements Pipe
     }
     if (status.get() != Status.FAILED || status.get() != Status.KILLED) {
       status.set(Status.SUCCEEDED);
-      result = new PipelineResult(ImmutableList.of(new 
PipelineResult.StageResult("Spark", null)),
+      result = new PipelineResult(
+          ImmutableList.of(new PipelineResult.StageResult("Spark", null, 
start, System.currentTimeMillis())),
           Status.SUCCEEDED);
       set(result);
     } else {

Reply via email to