This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ef1159  [GOBBLIN-792] submit a GobblinTrackingEvent when jobs are 
compiled but not yet orchestrated
8ef1159 is described below

commit 8ef115992eaa5eb4e1c47a2e0eab726f387ea0b8
Author: Arjun <[email protected]>
AuthorDate: Fri Jul 19 14:36:22 2019 -0700

    [GOBBLIN-792] submit a GobblinTrackingEvent when jobs are compiled but not 
yet orchestrated
    
    Closes #2657 from
    arjun4084346/preOrchestrationEvent
---
 .../apache/gobblin/metrics/event/TimingEvent.java  |  1 +
 .../apache/gobblin/service/ExecutionStatus.pdsc    |  5 +++--
 ...ache.gobblin.service.flowstatuses.snapshot.json |  5 +++--
 .../apache/gobblin/service/FlowStatusResource.java |  8 +++++++-
 .../service/modules/orchestration/DagManager.java  | 23 ++++++++++++++++++++++
 .../modules/orchestration/DagManagerUtils.java     |  2 +-
 .../service/modules/spec/JobExecutionPlan.java     |  2 +-
 .../monitoring/KafkaAvroJobStatusMonitor.java      |  3 +++
 8 files changed, 42 insertions(+), 7 deletions(-)

diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index d699238..db30229 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -39,6 +39,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
     public static final String FULL_JOB_EXECUTION = "FullJobExecutionTimer";
     public static final String WORK_UNITS_CREATION = "WorkUnitsCreationTimer";
     public static final String WORK_UNITS_PREPARATION = 
"WorkUnitsPreparationTimer";
+    public static final String JOB_PENDING = "JobPending";
     public static final String JOB_ORCHESTRATED = "JobOrchestrated";
     public static final String JOB_PREPARE = "JobPrepareTimer";
     public static final String JOB_START = "JobStartTimer";
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
index c87c484..5a9635e 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/pegasus/org/apache/gobblin/service/ExecutionStatus.pdsc
@@ -3,13 +3,14 @@
   "name" : "ExecutionStatus",
   "namespace" : "org.apache.gobblin.service",
   "doc" : "Execution status for a flow or job",
-  "symbols" : ["COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", "CANCELLED", 
"COMPLETE"],
+  "symbols" : ["COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", "CANCELLED", 
"COMPLETE", "PENDING"],
   "symbolDocs" : {
     "COMPILED":"Flow compiled to jobs.",
     "ORCHESTRATED":"Job(s) orchestrated to spec executors.",
     "RUNNING": "Flow or job is currently executing",
     "FAILED":"Flow or job failed",
     "CANCELLED":"Flow cancelled.",
-    "COMPLETE":"Flow or job completed execution"
+    "COMPLETE":"Flow or job completed execution",
+    "PENDING":"Flow or job is in pending state."
   }
 }
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
index 74d906b..d84ea73 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowstatuses.snapshot.json
@@ -36,14 +36,15 @@
     "name" : "ExecutionStatus",
     "namespace" : "org.apache.gobblin.service",
     "doc" : "Execution status for a flow or job",
-    "symbols" : [ "COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", 
"CANCELLED", "COMPLETE" ],
+    "symbols" : [ "COMPILED", "ORCHESTRATED", "RUNNING", "FAILED", 
"CANCELLED", "COMPLETE", "PENDING" ],
     "symbolDocs" : {
       "COMPILED" : "Flow compiled to jobs.",
       "ORCHESTRATED" : "Job(s) orchestrated to spec executors.",
       "RUNNING" : "Flow or job is currently executing",
       "FAILED" : "Flow or job failed",
       "CANCELLED" : "Flow cancelled.",
-      "COMPLETE" : "Flow or job completed execution"
+      "COMPLETE" : "Flow or job completed execution",
+      "PENDING" : "Flow or job is in pending state."
     }
   }, {
     "type" : "record",
diff --git 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
index 1f371d6..78f28f8 100644
--- 
a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
+++ 
b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowStatusResource.java
@@ -183,10 +183,16 @@ public class FlowStatusResource extends 
ComplexKeyResourceTemplate<FlowStatusId,
       return ExecutionStatus.CANCELLED;
     }
 
+    if (currentFlowExecutionStatus == ExecutionStatus.COMPLETE &&
+        jobExecutionStatus == ExecutionStatus.PENDING) {
+      return ExecutionStatus.PENDING;
+    }
+
     if (currentFlowExecutionStatus == ExecutionStatus.RUNNING ||
         jobExecutionStatus == ExecutionStatus.RUNNING ||
         jobExecutionStatus == ExecutionStatus.ORCHESTRATED ||
-        jobExecutionStatus == ExecutionStatus.COMPILED) {
+        jobExecutionStatus == ExecutionStatus.COMPILED ||
+        jobExecutionStatus == ExecutionStatus.PENDING) {
       return ExecutionStatus.RUNNING;
     }
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 43f1ee8..2fbb796 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -72,6 +72,7 @@ import 
org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import static org.apache.gobblin.service.ExecutionStatus.COMPLETE;
 import static org.apache.gobblin.service.ExecutionStatus.FAILED;
 import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
+import static org.apache.gobblin.service.ExecutionStatus.PENDING;
 import static org.apache.gobblin.service.ExecutionStatus.valueOf;
 
 
@@ -146,6 +147,7 @@ public class DagManager extends AbstractIdleService {
   private final JobStatusRetriever jobStatusRetriever;
   private final KafkaJobStatusMonitor jobStatusMonitor;
   private final Config config;
+  private final Optional<EventSubmitter> eventSubmitter;
 
   private volatile boolean isActive = false;
 
@@ -156,6 +158,12 @@ public class DagManager extends AbstractIdleService {
     this.scheduledExecutorPool = Executors.newScheduledThreadPool(numThreads);
     this.pollingInterval = ConfigUtils.getInt(config, 
JOB_STATUS_POLLING_INTERVAL_KEY, DEFAULT_JOB_STATUS_POLLING_INTERVAL);
     this.instrumentationEnabled = instrumentationEnabled;
+    if (instrumentationEnabled) {
+      MetricContext metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
+      this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(metricContext, "org.apache.gobblin.service").build());
+    } else {
+      this.eventSubmitter = Optional.absent();
+    }
 
     try {
       Class jobStatusRetrieverClass = 
Class.forName(ConfigUtils.getString(config, JOB_STATUS_RETRIEVER_CLASS_KEY, 
DEFAULT_JOB_STATUS_RETRIEVER_CLASS));
@@ -187,12 +195,24 @@ public class DagManager extends AbstractIdleService {
   synchronized void offer(Dag<JobExecutionPlan> dag) throws IOException {
     //Persist the dag
     this.dagStateStore.writeCheckpoint(dag);
+    submitEventsAndSetStatus(dag);
     //Add it to the queue of dags
     if (!this.queue.offer(dag)) {
       throw new IOException("Could not add dag" + 
DagManagerUtils.generateDagId(dag) + "to queue");
     }
   }
 
+  private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
+    if (this.eventSubmitter.isPresent()) {
+      for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
+        JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(dagNode);
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+        
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
+        jobExecutionPlan.setExecutionStatus(PENDING);
+      }
+    }
+  }
+
   public synchronized void setTopologySpecMap(Map<URI, TopologySpec> 
topologySpecMap) {
     this.topologySpecMap = topologySpecMap;
   }
@@ -380,6 +400,9 @@ public class DagManager extends AbstractIdleService {
             nextSubmitted.putAll(onJobFinish(node));
             nodesToCleanUp.add(node);
             break;
+          case PENDING:
+            jobExecutionPlan.setExecutionStatus(PENDING);
+            break;
           default:
             jobExecutionPlan.setExecutionStatus(RUNNING);
             break;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 14fc2bc..110ae0e 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -141,7 +141,7 @@ public class DagManagerUtils {
       DagNode<JobExecutionPlan> node = nodesToExpand.poll();
       ExecutionStatus executionStatus = getExecutionStatus(node);
       boolean addFlag = true;
-      if (executionStatus == ExecutionStatus.$UNKNOWN) {
+      if (executionStatus == ExecutionStatus.PENDING) {
         //Add a node to be executed next, only if all of its parent nodes are 
COMPLETE.
         List<DagNode<JobExecutionPlan>> parentNodes = dag.getParents(node);
         for (DagNode<JobExecutionPlan> parentNode : parentNodes) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 485f555..acdafea 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -58,7 +58,7 @@ public class JobExecutionPlan {
 
   private final JobSpec jobSpec;
   private final SpecExecutor specExecutor;
-  private ExecutionStatus executionStatus = ExecutionStatus.$UNKNOWN;
+  private ExecutionStatus executionStatus = ExecutionStatus.PENDING;
   private final int maxAttempts;
   private int currentAttempts = 0;
 
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
index fe4d171..e8611cf 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaAvroJobStatusMonitor.java
@@ -126,6 +126,9 @@ public class KafkaAvroJobStatusMonitor extends 
KafkaJobStatusMonitor {
       case TimingEvent.FlowTimings.FLOW_COMPILE_FAILED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.FAILED.name());
         break;
+      case TimingEvent.LauncherTimings.JOB_PENDING:
+        properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.PENDING.name());
+        break;
       case TimingEvent.LauncherTimings.JOB_ORCHESTRATED:
         properties.put(JobStatusRetriever.EVENT_NAME_FIELD, 
ExecutionStatus.ORCHESTRATED.name());
         break;

Reply via email to