Repository: incubator-gobblin Updated Branches: refs/heads/master 65123a606 -> 0412b7ee7
[GOBBLIN-644][GOBBLIN-645] Add metrics reporting config dynamically to compiled flows in MultiHopFlowCompiler. Closes #2514 from sv2000/metricsConfig Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0412b7ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0412b7ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0412b7ee Branch: refs/heads/master Commit: 0412b7ee71c10f0127a484f08b5ff4093d7f146a Parents: 65123a6 Author: suvasude <[email protected]> Authored: Wed Dec 5 15:30:19 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Wed Dec 5 15:30:19 2018 -0800 ---------------------------------------------------------------------- .../configuration/ConfigurationKeys.java | 1 + .../gobblin/service/ServiceConfigKeys.java | 1 - .../service/modules/flow/FlowGraphPath.java | 17 ++++++-- .../modules/flow/MultiHopFlowCompiler.java | 2 +- .../service/modules/spec/JobExecutionPlan.java | 43 +++++++++++++++++--- 5 files changed, 53 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java index b1a525e..510f7cf 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java @@ -657,6 +657,7 @@ public class ConfigurationKeys { public static final String METRIC_CONTEXT_NAME_KEY = "metrics.context.name"; public static final String METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = METRICS_CONFIGURATIONS_PREFIX + "timer.window.size.in.minutes"; public static final int DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES = 15; + public static final String METRICS_REPORTING_CONFIGURATIONS_PREFIX = "metrics.reporting"; // File-based reporting public static final String METRICS_REPORTING_FILE_ENABLED_KEY = http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java index 255b1f7..42baadb 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java @@ -114,5 +114,4 @@ public class ServiceConfigKeys { public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties"; // GAAS Listerning Port public static final String SERVICE_PORT = GOBBLIN_SERVICE_PREFIX + "port"; - } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java index 84be3b4..03bbbf9 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowGraphPath.java @@ -75,14 +75,22 @@ public class FlowGraphPath { this.paths.add(path); } - public Dag<JobExecutionPlan> asDag() throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { + /** + * A method to convert a path of {@link FlowEdgeContext}s into a {@link Dag<JobExecutionPlan>}. + * @param sysConfig containing environment config (e.g. metric/tracking event config) to be added to each {@link JobSpec}. + * @return a {@link Dag<JobExecutionPlan>} + * @throws SpecNotFoundException + * @throws JobTemplate.TemplateException + * @throws URISyntaxException + */ + public Dag<JobExecutionPlan> asDag(Config sysConfig) throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { Dag<JobExecutionPlan> flowDag = new Dag<>(new ArrayList<>()); for (List<FlowEdgeContext> path: paths) { Dag<JobExecutionPlan> pathDag = new Dag<>(new ArrayList<>()); Iterator<FlowEdgeContext> pathIterator = path.iterator(); while (pathIterator.hasNext()) { - Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next()); + Dag<JobExecutionPlan> flowEdgeDag = convertHopToDag(pathIterator.next(), sysConfig); pathDag = concatenate(pathDag, flowEdgeDag); } flowDag = flowDag.merge(pathDag); @@ -129,9 +137,10 @@ public class FlowGraphPath { * Given an instance of {@link FlowEdge}, this method returns a {@link Dag < JobExecutionPlan >} that moves data * from the source of the {@link FlowEdge} to the destination of the {@link FlowEdge}. * @param flowEdgeContext an instance of {@link FlowEdgeContext}. + * @param sysConfig environment config. * @return a {@link Dag} of {@link JobExecutionPlan}s associated with the {@link FlowEdge}. */ - private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext) + private Dag<JobExecutionPlan> convertHopToDag(FlowEdgeContext flowEdgeContext, Config sysConfig) throws SpecNotFoundException, JobTemplate.TemplateException, URISyntaxException { FlowTemplate flowTemplate = flowEdgeContext.getEdge().getFlowTemplate(); DatasetDescriptor inputDatasetDescriptor = flowEdgeContext.getInputDatasetDescriptor(); @@ -146,7 +155,7 @@ public class FlowGraphPath { List<Config> resolvedJobConfigs = flowTemplate.getResolvedJobConfigs(mergedConfig, inputDatasetDescriptor, outputDatasetDescriptor); //Iterate over each resolved job config and convert the config to a JobSpec. for (Config resolvedJobConfig : resolvedJobConfigs) { - JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId); + JobExecutionPlan jobExecutionPlan = new JobExecutionPlan.Factory().createPlan(flowSpec, resolvedJobConfig, specExecutor, flowExecutionId, sysConfig); jobExecutionPlans.add(jobExecutionPlan); templateToJobNameMap.put(getJobTemplateName(jobExecutionPlan), jobExecutionPlan.getJobSpec().getConfig().getString( ConfigurationKeys.JOB_NAME_KEY)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java index 32418e4..50da32a 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java @@ -142,7 +142,7 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler { FlowGraphPath flowGraphPath = flowGraph.findPath(flowSpec); //Convert the path into a Dag of JobExecutionPlans. if (flowGraphPath != null) { - jobExecutionPlanDag = flowGraphPath.asDag(); + jobExecutionPlanDag = flowGraphPath.asDag(this.config); } else { Instrumented.markMeter(flowCompilationFailedMeter); log.info(String.format("No path found from source: %s and destination: %s", source, destination)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0412b7ee/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java index 685513c..6d62591 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/JobExecutionPlan.java @@ -19,22 +19,26 @@ package org.apache.gobblin.service.modules.spec; import java.net.URI; import java.net.URISyntaxException; -import java.util.Random; import org.apache.commons.lang3.StringUtils; import com.google.common.base.Joiner; +import com.google.common.base.Strings; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; import lombok.Data; import lombok.EqualsAndHashCode; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryConfigurationKeys; +import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; import org.apache.gobblin.service.modules.orchestration.DagManager; import org.apache.gobblin.service.modules.template_catalog.FSFlowCatalog; @@ -54,11 +58,10 @@ public class JobExecutionPlan { public static class Factory { public static final String JOB_NAME_COMPONENT_SEPARATION_CHAR = "_"; - private static final Random random = new Random(); - public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, SpecExecutor specExecutor, Long flowExecutionId) + public JobExecutionPlan createPlan(FlowSpec flowSpec, Config jobConfig, SpecExecutor specExecutor, Long flowExecutionId, Config sysConfig) throws URISyntaxException { - JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId); + JobSpec jobSpec = buildJobSpec(flowSpec, jobConfig, flowExecutionId, sysConfig); return new JobExecutionPlan(jobSpec, specExecutor); } @@ -68,7 +71,7 @@ public class JobExecutionPlan { * @param flowSpec input FlowSpec. * @return a {@link JobSpec} corresponding to the resolved job config. */ - private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long flowExecutionId) throws URISyntaxException { + private static JobSpec buildJobSpec(FlowSpec flowSpec, Config jobConfig, Long flowExecutionId, Config sysConfig) throws URISyntaxException { Config flowConfig = flowSpec.getConfig(); String flowName = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_NAME_KEY, ""); @@ -115,12 +118,42 @@ public class JobExecutionPlan { jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_FAILURE_OPTION, ConfigValueFactory.fromAnyRef(flowFailureOption))); + //Add tracking config to JobSpec. + addTrackingEventConfig(jobSpec, sysConfig); + // Reset properties in Spec from Config jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig())); return jobSpec; } + /** + * A method to add tracking event configurations to a JobSpec. + * This enables {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s + * to be emitted from each Gobblin job orchestrated by Gobblin-as-a-Service, which will then be used for tracking the + * execution status of the job. + * @param jobSpec representing a fully resolved {@link JobSpec}. + */ + private static void addTrackingEventConfig(JobSpec jobSpec, Config sysConfig) { + Config reportingConfig = ConfigUtils.getConfig(sysConfig, ConfigurationKeys.METRICS_REPORTING_CONFIGURATIONS_PREFIX, ConfigFactory.empty()); + if (!reportingConfig.isEmpty()) { + Config jobConfig = jobSpec.getConfig().withFallback(reportingConfig.atPath(ConfigurationKeys.METRICS_REPORTING_CONFIGURATIONS_PREFIX)); + boolean isSchemaRegistryEnabled = ConfigUtils.getBoolean(sysConfig, ConfigurationKeys.METRICS_REPORTING_KAFKA_USE_SCHEMA_REGISTRY, false); + if (isSchemaRegistryEnabled) { + String schemaRegistryUrl = ConfigUtils.getString(sysConfig, KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, ""); + if (!Strings.isNullOrEmpty(schemaRegistryUrl)) { + jobConfig = jobConfig.withValue(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL, ConfigValueFactory.fromAnyRef(schemaRegistryUrl)); + } + String schemaOverrideNamespace = ConfigUtils + .getString(sysConfig, KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE, ""); + if (!Strings.isNullOrEmpty(schemaOverrideNamespace)) { + jobConfig = jobConfig.withValue(KafkaSchemaRegistryConfigurationKeys.KAFKA_SCHEMA_REGISTRY_OVERRIDE_NAMESPACE, + ConfigValueFactory.fromAnyRef(schemaOverrideNamespace)); + } + } + jobSpec.setConfig(jobConfig); + } + } /** * A naive implementation of generating a jobSpec's URI within a multi-hop flow that follows the convention:
