Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 94bdc6f39 -> 25d0a7d1a


[GOBBLIN-638] Submit more timing events from GaaS to accurately track flow/job 
status.

Closes #2508 from sv2000/gaasTimingEvents


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/25d0a7d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/25d0a7d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/25d0a7d1

Branch: refs/heads/master
Commit: 25d0a7d1a89bd781c35afb6081a17937cf9c1b3e
Parents: 94bdc6f
Author: suvasude <[email protected]>
Authored: Tue Nov 27 14:17:53 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Tue Nov 27 14:17:53 2018 -0800

----------------------------------------------------------------------
 .../gobblin/metrics/event/TimingEvent.java      |  6 +-
 .../gobblin/service/ServiceMetricNames.java     |  3 +
 .../modules/orchestration/DagManager.java       | 37 ++++++----
 .../modules/orchestration/Orchestrator.java     | 74 ++++++++------------
 .../modules/orchestration/TimingEventUtils.java | 62 ++++++++++++++++
 5 files changed, 120 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index c3e6259..8fd1462 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -56,11 +56,7 @@ public class TimingEvent {
 
   public static class FlowTimings {
     public static final String FLOW_COMPILED = "FlowCompiled";
-  }
-
-  public static class JobStatusTimings {
-    public static final String JOB_STATUS_POLLED = "JobStatusPolled";
-    public static final String ALL_JOB_STATUSES_POLLED = 
"AllJobStatusesPolled";
+    public static final String FLOW_COMPILE_FAILED = "FlowCompileFailed";
   }
 
   public static class FlowEventConstants {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
index d1fb711..f72093b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java
@@ -28,4 +28,7 @@ public class ServiceMetricNames {
   public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = 
GOBBLIN_SERVICE_PREFIX + "flowOrchestration.successful";
   public static final String FLOW_ORCHESTRATION_FAILED_METER = 
GOBBLIN_SERVICE_PREFIX + "flowOrchestration.failed";
   public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX 
+ "flowOrchestration.time";
+
+  //Job status poll timer
+  public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX 
+ "jobStatusPoll.time";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 2d0c555..0c94b36 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -30,9 +30,11 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
@@ -49,6 +51,7 @@ import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecProducer;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceMetricNames;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
 import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -142,7 +145,7 @@ public class DagManager extends AbstractIdleService {
     try {
       Class jobStatusRetrieverClass = 
Class.forName(config.getString(JOB_STATUS_RETRIEVER_KEY));
       this.jobStatusRetriever =
-          (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass);
+          (JobStatusRetriever) 
GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, 
config);
       Class dagStateStoreClass = 
Class.forName(config.getString(DAG_STORE_CLASS_KEY));
       this.dagStateStore = (DagStateStore) 
GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config);
     } catch (ReflectiveOperationException e) {
@@ -215,6 +218,7 @@ public class DagManager extends AbstractIdleService {
     private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>();
     private final MetricContext metricContext;
     private final Optional<EventSubmitter> eventSubmitter;
+    private final Optional<Timer> jobStatusPolledTimer;
 
     private JobStatusRetriever jobStatusRetriever;
     private DagStateStore dagStateStore;
@@ -231,9 +235,12 @@ public class DagManager extends AbstractIdleService {
       if (instrumentationEnabled) {
         this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), 
getClass());
         this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
+        this.jobStatusPolledTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER));
+
       } else {
         this.metricContext = null;
         this.eventSubmitter = Optional.absent();
+        this.jobStatusPolledTimer = Optional.absent();
       }
     }
 
@@ -255,15 +262,9 @@ public class DagManager extends AbstractIdleService {
           initialize(dag);
         }
         log.info("Polling job statuses..");
-        TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent()
-            ? 
eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.ALL_JOB_STATUSES_POLLED)
-            : null;
         //Poll and update the job statuses of running jobs.
         pollJobStatuses();
         log.info("Poll done.");
-        if (jobStatusPollTimer != null) {
-          jobStatusPollTimer.stop();
-        }
         //Clean up any finished dags
         log.info("Cleaning up finished dags..");
         cleanUp();
@@ -310,13 +311,10 @@ public class DagManager extends AbstractIdleService {
         throws IOException {
       this.failedDagIdsFinishRunning.clear();
       for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
-        TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent()
-            ? 
eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.JOB_STATUS_POLLED)
-            : null;
+        long pollStartTime = System.nanoTime();
         JobStatus jobStatus = pollJobStatus(node);
-        if (jobStatusPollTimer != null) {
-          jobStatusPollTimer.stop();
-        }
+        Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() 
- pollStartTime, TimeUnit.NANOSECONDS);
+
         Preconditions.checkNotNull(jobStatus, "Received null job status for a 
running job " + DagManagerUtils.getJobName(node));
         JobExecutionPlan jobExecutionPlan = 
DagManagerUtils.getJobExecutionPlan(node);
 
@@ -388,7 +386,20 @@ public class DagManager extends AbstractIdleService {
           log.warn("JobSpec does not contain flowExecutionId.");
         }
         log.info("Submitting job: {} on executor: {}", jobSpec, producer);
+
+        Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
+        log.info("Going to orchestrate JobSpec: {} on Executor: {}", jobSpec, 
producer);
+
+        TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get().
+            getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : 
null;
+
         producer.addSpec(jobSpec);
+
+        if (jobOrchestrationTimer != null) {
+          jobOrchestrationTimer.stop(jobMetadata);
+        }
+
+        log.info("Orchestrated JobSpec: {} on Executor: {}", jobSpec, 
producer);
       } catch (Exception e) {
         log.error("Cannot submit job: {} on executor: {}", jobSpec, producer, 
e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index ea0d0bd..3dd7538 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -91,20 +91,6 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log,
       boolean instrumentationEnabled) {
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
-    if (instrumentationEnabled) {
-      this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
IdentityFlowToJobSpecCompiler.class);
-      this.flowOrchestrationSuccessFulMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
-      this.flowOrchestrationFailedMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
-      this.flowOrchestrationTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
-      this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
-    } else {
-      this.metricContext = null;
-      this.flowOrchestrationSuccessFulMeter = Optional.absent();
-      this.flowOrchestrationFailedMeter = Optional.absent();
-      this.flowOrchestrationTimer = Optional.absent();
-      this.eventSubmitter = Optional.absent();
-    }
-
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
     this.topologyCatalog = topologyCatalog;
     this.dagManager = dagManager;
@@ -121,6 +107,20 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         | ClassNotFoundException e) {
       throw new RuntimeException(e);
     }
+
+    if (instrumentationEnabled) {
+      this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
this.specCompiler.getClass());
+      this.flowOrchestrationSuccessFulMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
+      this.flowOrchestrationFailedMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
+      this.flowOrchestrationTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
+      this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
+    } else {
+      this.metricContext = null;
+      this.flowOrchestrationSuccessFulMeter = Optional.absent();
+      this.flowOrchestrationFailedMeter = Optional.absent();
+      this.flowOrchestrationTimer = Optional.absent();
+      this.eventSubmitter = Optional.absent();
+    }
   }
 
   public Orchestrator(Config config, Optional<TopologyCatalog> 
topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) {
@@ -199,18 +199,31 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
     long startTime = System.nanoTime();
     if (spec instanceof FlowSpec) {
-      Map<String, String> flowMetadata = getFlowMetadata((FlowSpec) spec);
       TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
           ? 
this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
           : null;
+
       Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
 
+      Map<String, String> flowMetadata = 
TimingEventUtils.getFlowMetadata((FlowSpec) spec);
       if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+        // For scheduled flows, we do not insert the flowExecutionId into the 
FlowSpec. As a result, if the flow
+        // compilation fails (i.e. we are unable to find a path), the metadata 
will not have flowExecutionId.
+        // In this case, the current time is used as the flow executionId.
+        
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+            Long.toString(System.currentTimeMillis()));
+        TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ? 
this.eventSubmitter.get()
+            .getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) : 
null;
         Instrumented.markMeter(this.flowOrchestrationFailedMeter);
         _log.warn("Cannot determine an executor to run on for Spec: " + spec);
+        if (flowCompileFailedTimer != null) {
+          flowCompileFailedTimer.stop(flowMetadata);
+        }
         return;
       }
 
+      //If it is a scheduled flow (and hence, does not have flowExecutionId in 
the FlowSpec) and the flow compilation is successful,
+      // retrieve the flowExecutionId from the JobSpec.
       
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
           
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
 
@@ -218,6 +231,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
         flowCompilationTimer.stop(flowMetadata);
       }
 
+
       if (this.dagManager.isPresent()) {
         //Send the dag to the DagManager.
         this.dagManager.get().offer(jobExecutionPlanDag);
@@ -236,7 +250,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
               _log.warn("JobSpec does not contain flowExecutionId.");
             }
 
-            Map<String, String> jobMetadata = getJobMetadata(flowMetadata, 
jobExecutionPlan);
+            Map<String, String> jobMetadata = 
TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan);
             _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec, producer));
 
             TimingEvent jobOrchestrationTimer = 
this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
@@ -261,34 +275,6 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
-  private Map<String, String> getFlowMetadata(FlowSpec flowSpec) {
-    Map<String, String> metadata = Maps.newHashMap();
-
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY));
-    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY));
-    if 
(metadata.containsKey(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)) {
-      metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
flowSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
-    }
-
-    return metadata;
-  }
-
-  private Map<String, String> getJobMetadata(Map<String, String> flowMetadata, 
JobExecutionPlan jobExecutionPlan) {
-    Map<String, String> jobMetadata = Maps.newHashMap();
-    JobSpec jobSpec = jobExecutionPlan.getJobSpec();
-    SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
-
-    jobMetadata.putAll(flowMetadata);
-    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY));
-    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY));
-    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
-    jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
-    jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY));
-    jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
specExecutor.getClass().getCanonicalName());
-
-    return jobMetadata;
-  }
-
   public void remove(Spec spec, Properties headers) {
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
new file mode 100644
index 0000000..38a9d07
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.SpecExecutor;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+
+
+class TimingEventUtils {
+  static Map<String, String> getFlowMetadata(FlowSpec flowSpec) {
+    Map<String, String> metadata = Maps.newHashMap();
+
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY));
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY));
+
+    Config flowConfig = flowSpec.getConfig();
+    if (flowConfig.hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
+      metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
flowConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+    }
+    return metadata;
+  }
+
+  static Map<String, String> getJobMetadata(Map<String, String> flowMetadata, 
JobExecutionPlan jobExecutionPlan) {
+    Map<String, String> jobMetadata = Maps.newHashMap();
+    JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+    SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
+
+    jobMetadata.putAll(flowMetadata);
+    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
specExecutor.getClass().getCanonicalName());
+
+    return jobMetadata;
+  }
+
+}

Reply via email to