Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 65123a606 -> 0412b7ee7


[GOBBLIN-644][GOBBLIN-645] Add metrics reporting config dynamically to compiled 
flows in MultiHopFlowCompiler.

Closes #2514 from sv2000/metricsConfig


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0412b7ee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0412b7ee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0412b7ee

Branch: refs/heads/master
Commit: 0412b7ee71c10f0127a484f08b5ff4093d7f146a
Parents: 65123a6
Author: suvasude <[email protected]>
Authored: Wed Dec 5 15:30:19 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Wed Dec 5 15:30:19 2018 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |  1 +
 .../gobblin/service/ServiceConfigKeys.java      |  1 -
 .../service/modules/flow/FlowGraphPath.java     | 17 ++++++--
 .../modules/flow/MultiHopFlowCompiler.java      |  2 +-
 .../service/modules/spec/JobExecutionPlan.java  | 43 +++++++++++++++++---
 5 files changed, 53 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index b1a525e..510f7cf 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -657,6 +657,7 @@ public class ConfigurationKeys {
   public static final String METRIC_CONTEXT_NAME_KEY = "metrics.context.name";
   public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 
METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes";
   public static final int DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 15;
+  public static final String METRICS_REPORTING_CONFIGURATIONS_PREFIX = 
"metrics.reporting";
 
   // File-based reporting
   public static final String METRICS_REPORTING_FILE_ENABLED_KEY =

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 255b1f7..42baadb 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -114,5 +114,4 @@ public class ServiceConfigKeys {
   public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = 
"log4j-service.properties";
   // GAAS Listerning Port
   public static final String SERVICE_PORT = GOBBLIN_SERVICE_PREFIX + "port";
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
index 84be3b4..03bbbf9 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java
@@ -75,14 +75,22 @@ public class FlowGraphPath {
     this.paths.add(path);
   }
 
-  public Dag<JobExecutionPlan> asDag() throws SpecNotFoundException, 
JobTemplate.TemplateException, URISyntaxException {
+  /**
+   * A method to convert a path of {@link FlowEdgeContext}s into a {@link 
Dag<JobExecutionPlan>}.
+   * @param sysConfig containing environment config (e.g. metric/tracking 
event config) to be added to each {@link JobSpec}.
+   * @return a {@link Dag<JobExecutionPlan>}
+   * @throws SpecNotFoundException
+   * @throws JobTemplate.TemplateException
+   * @throws URISyntaxException
+   */
+  public Dag<JobExecutionPlan> asDag(Config sysConfig) throws 
SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException {
     Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>());
 
     for (List<FlowEdgeContext> path: paths) {
       Dag<JobExecutionPlan> pathDag = new Dag<>(new ArrayList<>());
       Iterator<FlowEdgeContext> pathIterator = path.iterator();
       while (pathIterator.hasNext()) {
-        Dag<JobExecutionPlan> flowEdgeDag = 
convertHopToDag(pathIterator.next());
+        Dag<JobExecutionPlan> flowEdgeDag = 
convertHopToDag(pathIterator.next(), sysConfig);
         pathDag = concatenate(pathDag, flowEdgeDag);
       }
       flowDag = flowDag.merge(pathDag);
@@ -129,9 +137,10 @@ public class FlowGraphPath {
    * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < 
JobExecutionPlan >} that moves data
    * from the source of the {@link FlowEdge} to the destination of the {@link 
FlowEdge}.
    * @param flowEdgeContext an instance of {@link FlowEdgeContext}.
+   * @param sysConfig environment config.
    * @return a {@link Dag} of {@link JobExecutionPlan}s associated with the 
{@link FlowEdge}.
    */
-   private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext 
flowEdgeContext)
+   private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext 
flowEdgeContext, Config sysConfig)
       throws SpecNotFoundException, JobTemplate.TemplateException, 
URISyntaxException {
     FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate();
     DatasetDescriptor inputDatasetDescriptor = 
flowEdgeContext.getInputDatasetDescriptor();
@@ -146,7 +155,7 @@ public class FlowGraphPath {
     List<Config> resolvedJobConfigs = 
flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, 
outputDatasetDescriptor);
     //Iterate over each resolved job config and convert the config to a 
JobSpec.
     for (Config resolvedJobConfig : resolvedJobConfigs) {
-      JobExecutionPlan jobExecutionPlan = new 
JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, 
specExecutor, flowExecutionId);
+      JobExecutionPlan jobExecutionPlan = new 
JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, 
specExecutor, flowExecutionId, sysConfig);
       jobExecutionPlans.add(jobExecutionPlan);
       templateToJobNameMap.put(getJobTemplateName(jobExecutionPlan), 
jobExecutionPlan.getJobSpec().getConfig().getString(
           ConfigurationKeys.JOB_NAME_KEY));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 32418e4..50da32a 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -142,7 +142,7 @@ public class MultiHopFlowCompiler extends 
BaseFlowToJobSpecCompiler {
       FlowGraphPath flowGraphPath = flowGraph.findPath(flowSpec);
       //Convert the path into a Dag of JobExecutionPlans.
       if (flowGraphPath != null) {
-        jobExecutionPlanDag = flowGraphPath.asDag();
+        jobExecutionPlanDag = flowGraphPath.asDag(this.config);
       } else {
         Instrumented.markMeter(flowCompilationFailedMeter);
         log.info(String.format("No path found from source: %s and destination: 
%s", source, destination));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
----------------------------------------------------------------------
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
index 685513c..6d62591 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java
@@ -19,22 +19,26 @@ package org.apache.gobblin.service.modules.spec;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Random;
 
 import org.apache.commons.lang3.StringUtils;
 
 import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.service.ExecutionStatus;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
 import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog;
@@ -54,11 +58,10 @@ public class JobExecutionPlan {
 
   public static class Factory {
     public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_";
-    private static final Random random = new Random();
 
-    public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, 
SpecExecutor specExecutor, Long flowExecutionId)
+    public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, 
SpecExecutor specExecutor, Long flowExecutionId, Config sysConfig)
         throws URISyntaxException {
-        JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId);
+        JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId, 
sysConfig);
         return new JobExecutionPlan(jobSpec, specExecutor);
     }
 
@@ -68,7 +71,7 @@ public class JobExecutionPlan {
      * @param flowSpec input FlowSpec.
      * @return a {@link JobSpec} corresponding to the resolved job config.
      */
-    private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, 
Long flowExecutionId) throws URISyntaxException {
+    private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, 
Long flowExecutionId, Config sysConfig) throws URISyntaxException {
       Config flowConfig = flowSpec.getConfig();
 
       String flowName = ConfigUtils.getString(flowConfig, 
ConfigurationKeys.FLOW_NAME_KEY, "");
@@ -115,12 +118,42 @@ public class JobExecutionPlan {
       
jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_FAILURE_OPTION,
           ConfigValueFactory.fromAnyRef(flowFailureOption)));
 
+      //Add tracking config to JobSpec.
+      addTrackingEventConfig(jobSpec, sysConfig);
+
       // Reset properties in Spec from Config
       
jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig()));
 
       return jobSpec;
     }
 
+    /**
+     * A method to add tracking event configurations to a JobSpec.
+     * This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s
+     * to be emitted from each Gobblin job orchestrated by 
Gobblin-as-a-Service, which will then be used for tracking the
+     * execution status of the job.
+     * @param jobSpec representing a fully resolved {@link JobSpec}.
+     */
+    private static void addTrackingEventConfig(JobSpec jobSpec, Config 
sysConfig) {
+      Config reportingConfig = ConfigUtils.getConfig(sysConfig, 
ConfigurationKeys.METRICS_REPORTING_CONFIGURATIONS_PREFIX, 
ConfigFactory.empty());
+      if (!reportingConfig.isEmpty()) {
+        Config jobConfig = 
jobSpec.getConfig().withFallback(reportingConfig.atPath(ConfigurationKeys.METRICS_REPORTING_CONFIGURATIONS_PREFIX));
+        boolean isSchemaRegistryEnabled = ConfigUtils.getBoolean(sysConfig, 
ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false);
+        if (isSchemaRegistryEnabled) {
+          String schemaRegistryUrl = ConfigUtils.getString(sysConfig, 
KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, "");
+          if (!Strings.isNullOrEmpty(schemaRegistryUrl)) {
+            jobConfig = 
jobConfig.withValue(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, 
ConfigValueFactory.fromAnyRef(schemaRegistryUrl));
+          }
+          String schemaOverrideNamespace = ConfigUtils
+              .getString(sysConfig, 
KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE, 
"");
+          if (!Strings.isNullOrEmpty(schemaOverrideNamespace)) {
+            jobConfig = 
jobConfig.withValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE,
+                ConfigValueFactory.fromAnyRef(schemaOverrideNamespace));
+          }
+        }
+        jobSpec.setConfig(jobConfig);
+      }
+    }
 
     /**
      * A naive implementation of generating a jobSpec's URI within a multi-hop 
flow that follows the convention:

Reply via email to