Repository: incubator-gobblin
Updated Branches:
  refs/heads/master cd5222775 -> 4b5f55d08


[GOBBLIN-558] submitting tracking events during flow compilation and 
orchestration in GaaS

Closes #2420 from
arjun4084346/trackingEventsInGaaS


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/4b5f55d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/4b5f55d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/4b5f55d0

Branch: refs/heads/master
Commit: 4b5f55d08742ce4c27e216849f97fbef9da9ac59
Parents: cd52227
Author: Arjun <[email protected]>
Authored: Mon Aug 13 16:28:41 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Aug 13 16:28:41 2018 -0700

----------------------------------------------------------------------
 .../cluster/GobblinClusterMetricTagNames.java   |  9 +--
 .../cluster/GobblinHelixJobLauncher.java        | 12 ++--
 .../gobblin/metrics/event/TimingEvent.java      | 15 ++++
 .../flow/IdentityFlowToJobSpecCompiler.java     |  3 +-
 .../modules/flow/MockedSpecCompiler.java        | 75 ++++++++++++++++++++
 .../service/modules/flow/SpecCompiler.java      |  2 +
 .../modules/orchestration/Orchestrator.java     | 65 ++++++++++++++++-
 7 files changed, 165 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java
index 34c865c..ccf1f14 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterMetricTagNames.java
@@ -18,10 +18,12 @@
 package org.apache.gobblin.cluster;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.metrics.event.TimingEvent;
 
 
 /**
  * A central place for constants of {@link 
org.apache.gobblin.metrics.MetricContext} tag names for a Gobblin cluster.
+ * Some shared constants have been moved to {@link 
TimingEvent.FlowEventConstants}.
  *
  * @author Yinan Li
  */
@@ -32,11 +34,4 @@ public class GobblinClusterMetricTagNames {
   public static final String APPLICATION_ID = "application.id";
   public static final String HELIX_INSTANCE_NAME = "helix.instance.name";
   public static final String TASK_RUNNER_ID = "task.runner.id";
-
-  public static final String FLOW_GROUP = "flowGroup";
-  public static final String FLOW_NAME = "flowName";
-  public static final String FLOW_EXECUTION_ID = "flowExecutionId";
-  public static final String JOB_GROUP = "jobGroup";
-  public static final String JOB_NAME = "jobName";
-  public static final String JOB_EXECUTION_ID = "jobExecutionId";
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index cd37342..e2447a5 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -433,21 +433,21 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
 
     // only inject flow tags if a flow name is defined
     if (jobProps.containsKey(ConfigurationKeys.FLOW_NAME_KEY)) {
-      metadataTags.add(new Tag<>(GobblinClusterMetricTagNames.FLOW_GROUP,
+      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD,
           jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "")));
       metadataTags.add(
-          new Tag<>(GobblinClusterMetricTagNames.FLOW_NAME, 
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
+          new Tag<>(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)));
 
       // use job execution id if flow execution id is not present
-      metadataTags.add(new 
Tag<>(GobblinClusterMetricTagNames.FLOW_EXECUTION_ID,
+      metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
           jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, 
jobExecutionId)));
     }
 
-    metadataTags.add(new Tag<>(GobblinClusterMetricTagNames.JOB_GROUP,
+    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD,
         jobProps.getProperty(ConfigurationKeys.JOB_GROUP_KEY, "")));
-    metadataTags.add(new Tag<>(GobblinClusterMetricTagNames.JOB_NAME,
+    metadataTags.add(new Tag<>(TimingEvent.FlowEventConstants.JOB_NAME_FIELD,
         jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY, "")));
-    metadataTags.add(new Tag<>(GobblinClusterMetricTagNames.JOB_EXECUTION_ID, 
jobExecutionId));
+    metadataTags.add(new 
Tag<>(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, jobExecutionId));
 
     LOGGER.debug("GobblinHelixJobLauncher.addAdditionalMetadataTags: 
metadataTags {}", metadataTags);
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
----------------------------------------------------------------------
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index a7ad821..b00465f 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -31,6 +31,7 @@ public class TimingEvent {
     public static final String FULL_JOB_EXECUTION = "FullJobExecutionTimer";
     public static final String WORK_UNITS_CREATION = "WorkUnitsCreationTimer";
     public static final String WORK_UNITS_PREPARATION = 
"WorkUnitsPreparationTimer";
+    public static final String JOB_ORCHESTRATED = "JobOrchestrated";
     public static final String JOB_PREPARE = "JobPrepareTimer";
     public static final String JOB_START = "JobStartTimer";
     public static final String JOB_RUN = "JobRunTimer";
@@ -53,6 +54,20 @@ public class TimingEvent {
     public static final String HELIX_JOB_RUN = "JobHelixRunTimer";
   }
 
+  public static class FlowTimings {
+    public static final String FLOW_COMPILED = "FlowCompiled";
+  }
+
+  public static class FlowEventConstants {
+    public static final String FLOW_NAME_FIELD = "flowName";
+    public static final String FLOW_GROUP_FIELD = "flowGroup";
+    public static final String FLOW_EXECUTION_ID_FIELD = "flowExecutionId";
+    public static final String JOB_NAME_FIELD = "jobName";
+    public static final String JOB_GROUP_FIELD = "jobGroup";
+    public static final String JOB_EXECUTION_ID_FIELD = "jobExecutionId";
+    public static final String SPEC_EXECUTOR_FIELD = "specExecutor";
+  }
+
   public static final String METADATA_START_TIME = "startTime";
   public static final String METADATA_END_TIME = "endTime";
   public static final String METADATA_DURATION = "durationMillis";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
index fa843c5..42c4926 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java
@@ -96,8 +96,7 @@ public class IdentityFlowToJobSpecCompiler extends 
BaseFlowToJobSpecCompiler {
     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
 
     for (TopologySpec topologySpec : topologySpecMap.values()) {
-      Map<ServiceNode, ServiceNode> capabilities =
-          (Map<ServiceNode, ServiceNode>) 
topologySpec.getSpecExecutor().getCapabilities().get();
+      Map<ServiceNode, ServiceNode> capabilities = 
topologySpec.getSpecExecutor().getCapabilities().get();
       for (Map.Entry<ServiceNode, ServiceNode> capability : 
capabilities.entrySet()) {
         log.info(String.format("Evaluating current JobSpec: %s against 
TopologySpec: %s with "
                 + "capability of source: %s and destination: %s ", 
jobSpec.getUri(), topologySpec.getUri(),

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
new file mode 100644
index 0000000..7a9e5db
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.flow;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * This mocked SpecCompiler class creates 3 dummy job specs to emulate multi 
hop flow spec compiler.
+ * It uses {@link InMemorySpecExecutor} for these dummy specs.
+ */
+public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
+
+  private static final int NUMBER_OF_JOBS = 3;
+
+  public MockedSpecCompiler(Config config) {
+    super(config);
+  }
+
+  @Override
+  public Dag<JobExecutionPlan> compileFlow(Spec spec) {
+    List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+
+    long flowExecutionId = System.currentTimeMillis();
+
+    int i = 0;
+    while(i++ < NUMBER_OF_JOBS) {
+      String specUri = "/foo/bar/spec/" + i;
+      Properties properties = new Properties();
+      properties.put(ConfigurationKeys.FLOW_NAME_KEY, 
((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY));
+      properties.put(ConfigurationKeys.FLOW_GROUP_KEY, 
((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_GROUP_KEY));
+      properties.put(ConfigurationKeys.JOB_NAME_KEY, 
((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY) + 
"_" + i);
+      properties.put(ConfigurationKeys.JOB_GROUP_KEY, 
((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_GROUP_KEY) 
+ "_" + i);
+      properties.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
+      JobSpec jobSpec = JobSpec.builder(specUri)
+          .withConfig(ConfigUtils.propertiesToConfig(properties))
+          .withVersion("1")
+          .withDescription("Spec Description")
+          .build();
+      jobExecutionPlans.add(new JobExecutionPlan(jobSpec, new 
InMemorySpecExecutor(ConfigFactory.empty())));
+    }
+
+    return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
index 3ce3b70..53cdf83 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/SpecCompiler.java
@@ -37,6 +37,8 @@ public interface SpecCompiler extends SpecCatalogListener, 
Instrumentable {
   /***
    * Take in a logical {@link Spec} and compile corresponding materialized 
{@link Spec}s
    * and the mapping to {@link SpecExecutor} that they can be run on.
+   * All the specs generated from the compileFlow must have a
+   * {@link 
org.apache.gobblin.configuration.ConfigurationKeys.FLOW_EXECUTION_ID_KEY}
    * @param spec {@link Spec} to compile.
    * @return Map of materialized physical {@link Spec} and {@link 
SpecExecutor}.
    */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/4b5f55d0/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
index 4959b1a..1f7f737 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java
@@ -24,6 +24,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Maps;
 import javax.annotation.Nonnull;
 
 import com.codahale.metrics.Meter;
@@ -33,12 +35,16 @@ import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.instrumented.Instrumentable;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
@@ -71,6 +77,7 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
   protected final Optional<TopologyCatalog> topologyCatalog;
 
   protected final MetricContext metricContext;
+  protected final Optional<EventSubmitter> eventSubmitter;
   @Getter
   private Optional<Meter> flowOrchestrationSuccessFulMeter;
   @Getter
@@ -86,13 +93,15 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
       this.metricContext = 
Instrumented.getMetricContext(ConfigUtils.configToState(config), 
IdentityFlowToJobSpecCompiler.class);
       this.flowOrchestrationSuccessFulMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER));
       this.flowOrchestrationFailedMeter = 
Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER));
-      this.flowOrchestrationTimer = 
Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
+      this.flowOrchestrationTimer = 
Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER));
+      this.eventSubmitter = Optional.of(new 
EventSubmitter.Builder(this.metricContext, 
"org.apache.gobblin.service").build());
     }
     else {
       this.metricContext = null;
       this.flowOrchestrationSuccessFulMeter = Optional.absent();
       this.flowOrchestrationFailedMeter = Optional.absent();
       this.flowOrchestrationTimer = Optional.absent();
+      this.eventSubmitter = Optional.absent();
     }
 
     this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class);
@@ -188,13 +197,25 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
 
     long startTime = System.nanoTime();
     if (spec instanceof FlowSpec) {
+      Map<String, String> flowMetadata = getFlowMetadata((FlowSpec) spec);
+      TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent()
+          ? 
this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED)
+          : null;
       Dag<JobExecutionPlan> jobExecutionPlanDag = 
specCompiler.compileFlow(spec);
 
       if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) {
+        Instrumented.markMeter(this.flowOrchestrationFailedMeter);
         _log.warn("Cannot determine an executor to run on for Spec: " + spec);
         return;
       }
 
+      
flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD,
+          
jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+
+      if (flowCompilationTimer != null) {
+        flowCompilationTimer.stop(flowMetadata);
+      }
+
       // Schedule all compiled JobSpecs on their respective Executor
       for (Dag.DagNode<JobExecutionPlan> dagNode: 
jobExecutionPlanDag.getNodes()) {
         JobExecutionPlan jobExecutionPlan = dagNode.getValue();
@@ -205,8 +226,22 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
           producer = jobExecutionPlan.getSpecExecutor().getProducer().get();
           Spec jobSpec = jobExecutionPlan.getJobSpec();
 
+          if 
(!((JobSpec)jobSpec).getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))
 {
+            _log.warn("JobSpec does not contain flowExecutionId.");
+          }
+
+          Map<String, String> jobMetadata = getJobMetadata(flowMetadata, 
jobExecutionPlan);
           _log.info(String.format("Going to orchestrate JobSpec: %s on 
Executor: %s", jobSpec, producer));
+
+          TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent()
+              ? 
this.eventSubmitter.get().getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED)
+              : null;
+
           producer.addSpec(jobSpec);
+
+          if (jobOrchestrationTimer != null) {
+            jobOrchestrationTimer.stop(jobMetadata);
+          }
         } catch(Exception e) {
           _log.error("Cannot successfully setup spec: " + 
jobExecutionPlan.getJobSpec() + " on executor: " + producer +
               " for flow: " + spec, e);
@@ -220,6 +255,34 @@ public class Orchestrator implements SpecCatalogListener, 
Instrumentable {
     Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - 
startTime, TimeUnit.NANOSECONDS);
   }
 
+  private Map<String,String> getFlowMetadata(FlowSpec flowSpec) {
+    Map<String, String> metadata = Maps.newHashMap();
+
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY));
+    metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY));
+    if 
(metadata.containsKey(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)) {
+      metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
flowSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+    }
+
+    return metadata;
+  }
+
+  private Map<String,String> getJobMetadata(Map<String,String> flowMetadata, 
JobExecutionPlan jobExecutionPlan) {
+    Map<String, String> jobMetadata = Maps.newHashMap();
+    JobSpec jobSpec = jobExecutionPlan.getJobSpec();
+    SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor();
+
+    jobMetadata.putAll(flowMetadata);
+    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, 
jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY));
+    jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, 
specExecutor.getClass().getCanonicalName());
+
+    return jobMetadata;
+  }
+
   public void remove(Spec spec, Properties headers) {
     // TODO: Evolve logic to cache and reuse previously compiled JobSpecs
     // .. this will work for Identity compiler but not always for multi-hop.

Reply via email to