Repository: incubator-gobblin Updated Branches: refs/heads/master 94bdc6f39 -> 25d0a7d1a
[GOBBLIN-638] Submit more timing events from GaaS to accurately track flow/job status. Closes #2508 from sv2000/gaasTimingEvents Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/25d0a7d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/25d0a7d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/25d0a7d1 Branch: refs/heads/master Commit: 25d0a7d1a89bd781c35afb6081a17937cf9c1b3e Parents: 94bdc6f Author: suvasude <[email protected]> Authored: Tue Nov 27 14:17:53 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Nov 27 14:17:53 2018 -0800 ---------------------------------------------------------------------- .../gobblin/metrics/event/TimingEvent.java | 6 +- .../gobblin/service/ServiceMetricNames.java | 3 + .../modules/orchestration/DagManager.java | 37 ++++++---- .../modules/orchestration/Orchestrator.java | 74 ++++++++------------ .../modules/orchestration/TimingEventUtils.java | 62 ++++++++++++++++ 5 files changed, 120 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java ---------------------------------------------------------------------- diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java index c3e6259..8fd1462 100644 --- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java +++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java @@ -56,11 +56,7 @@ public class TimingEvent { public static class FlowTimings { public static final String FLOW_COMPILED = "FlowCompiled"; - } - - public static class JobStatusTimings { - public static final String JOB_STATUS_POLLED = "JobStatusPolled"; - public static final String ALL_JOB_STATUSES_POLLED = "AllJobStatusesPolled"; + public static final String FLOW_COMPILE_FAILED = "FlowCompileFailed"; } public static class FlowEventConstants { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java index d1fb711..f72093b 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceMetricNames.java @@ -28,4 +28,7 @@ public class ServiceMetricNames { public static final String FLOW_ORCHESTRATION_SUCCESSFUL_METER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.successful"; public static final String FLOW_ORCHESTRATION_FAILED_METER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.failed"; public static final String FLOW_ORCHESTRATION_TIMER = GOBBLIN_SERVICE_PREFIX + "flowOrchestration.time"; + + //Job status poll timer + public static final String JOB_STATUS_POLLED_TIMER = GOBBLIN_SERVICE_PREFIX + "jobStatusPoll.time"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java index 2d0c555..0c94b36 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java @@ -30,9 +30,11 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.AbstractIdleService; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; @@ -49,6 +51,7 @@ import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; import org.apache.gobblin.runtime.api.SpecProducer; import org.apache.gobblin.service.ExecutionStatus; +import org.apache.gobblin.service.ServiceMetricNames; import org.apache.gobblin.service.modules.flowgraph.Dag; import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode; import org.apache.gobblin.service.modules.spec.JobExecutionPlan; @@ -142,7 +145,7 @@ public class DagManager extends AbstractIdleService { try { Class jobStatusRetrieverClass = Class.forName(config.getString(JOB_STATUS_RETRIEVER_KEY)); this.jobStatusRetriever = - (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass); + (JobStatusRetriever) GobblinConstructorUtils.invokeLongestConstructor(jobStatusRetrieverClass, config); Class dagStateStoreClass = Class.forName(config.getString(DAG_STORE_CLASS_KEY)); this.dagStateStore = (DagStateStore) GobblinConstructorUtils.invokeLongestConstructor(dagStateStoreClass, config); } catch (ReflectiveOperationException e) { @@ -215,6 +218,7 @@ public class DagManager extends AbstractIdleService { private final Set<String> failedDagIdsFinishAllPossible = new HashSet<>(); private final MetricContext metricContext; private final Optional<EventSubmitter> eventSubmitter; + private final Optional<Timer> jobStatusPolledTimer; private JobStatusRetriever jobStatusRetriever; private DagStateStore dagStateStore; @@ -231,9 +235,12 @@ public class DagManager extends AbstractIdleService { if (instrumentationEnabled) { this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass()); this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build()); + this.jobStatusPolledTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.JOB_STATUS_POLLED_TIMER)); + } else { this.metricContext = null; this.eventSubmitter = Optional.absent(); + this.jobStatusPolledTimer = Optional.absent(); } } @@ -255,15 +262,9 @@ public class DagManager extends AbstractIdleService { initialize(dag); } log.info("Polling job statuses.."); - TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent() - ? eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.ALL_JOB_STATUSES_POLLED) - : null; //Poll and update the job statuses of running jobs. pollJobStatuses(); log.info("Poll done."); - if (jobStatusPollTimer != null) { - jobStatusPollTimer.stop(); - } //Clean up any finished dags log.info("Cleaning up finished dags.."); cleanUp(); @@ -310,13 +311,10 @@ public class DagManager extends AbstractIdleService { throws IOException { this.failedDagIdsFinishRunning.clear(); for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) { - TimingEvent jobStatusPollTimer = this.eventSubmitter.isPresent() - ? eventSubmitter.get().getTimingEvent(TimingEvent.JobStatusTimings.JOB_STATUS_POLLED) - : null; + long pollStartTime = System.nanoTime(); JobStatus jobStatus = pollJobStatus(node); - if (jobStatusPollTimer != null) { - jobStatusPollTimer.stop(); - } + Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS); + Preconditions.checkNotNull(jobStatus, "Received null job status for a running job " + DagManagerUtils.getJobName(node)); JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(node); @@ -388,7 +386,20 @@ public class DagManager extends AbstractIdleService { log.warn("JobSpec does not contain flowExecutionId."); } log.info("Submitting job: {} on executor: {}", jobSpec, producer); + + Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan); + log.info("Going to orchestrate JobSpec: {} on Executor: {}", jobSpec, producer); + + TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get(). + getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null; + producer.addSpec(jobSpec); + + if (jobOrchestrationTimer != null) { + jobOrchestrationTimer.stop(jobMetadata); + } + + log.info("Orchestrated JobSpec: {} on Executor: {}", jobSpec, producer); } catch (Exception e) { log.error("Cannot submit job: {} on executor: {}", jobSpec, producer, e); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java index ea0d0bd..3dd7538 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java @@ -91,20 +91,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log, boolean instrumentationEnabled) { _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); - if (instrumentationEnabled) { - this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); - this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER)); - this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER)); - this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER)); - this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build()); - } else { - this.metricContext = null; - this.flowOrchestrationSuccessFulMeter = Optional.absent(); - this.flowOrchestrationFailedMeter = Optional.absent(); - this.flowOrchestrationTimer = Optional.absent(); - this.eventSubmitter = Optional.absent(); - } - this.aliasResolver = new ClassAliasResolver<>(SpecCompiler.class); this.topologyCatalog = topologyCatalog; this.dagManager = dagManager; @@ -121,6 +107,20 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { | ClassNotFoundException e) { throw new RuntimeException(e); } + + if (instrumentationEnabled) { + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.specCompiler.getClass()); + this.flowOrchestrationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_SUCCESSFUL_METER)); + this.flowOrchestrationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_ORCHESTRATION_FAILED_METER)); + this.flowOrchestrationTimer = Optional.of(this.metricContext.timer(ServiceMetricNames.FLOW_ORCHESTRATION_TIMER)); + this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build()); + } else { + this.metricContext = null; + this.flowOrchestrationSuccessFulMeter = Optional.absent(); + this.flowOrchestrationFailedMeter = Optional.absent(); + this.flowOrchestrationTimer = Optional.absent(); + this.eventSubmitter = Optional.absent(); + } } public Orchestrator(Config config, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<Logger> log) { @@ -199,18 +199,31 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { long startTime = System.nanoTime(); if (spec instanceof FlowSpec) { - Map<String, String> flowMetadata = getFlowMetadata((FlowSpec) spec); TimingEvent flowCompilationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILED) : null; + Dag<JobExecutionPlan> jobExecutionPlanDag = specCompiler.compileFlow(spec); + Map<String, String> flowMetadata = TimingEventUtils.getFlowMetadata((FlowSpec) spec); if (jobExecutionPlanDag == null || jobExecutionPlanDag.isEmpty()) { + // For scheduled flows, we do not insert the flowExecutionId into the FlowSpec. As a result, if the flow + // compilation fails (i.e. we are unable to find a path), the metadata will not have flowExecutionId. + // In this case, the current time is used as the flow executionId. + flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, + Long.toString(System.currentTimeMillis())); + TimingEvent flowCompileFailedTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get() + .getTimingEvent(TimingEvent.FlowTimings.FLOW_COMPILE_FAILED) : null; Instrumented.markMeter(this.flowOrchestrationFailedMeter); _log.warn("Cannot determine an executor to run on for Spec: " + spec); + if (flowCompileFailedTimer != null) { + flowCompileFailedTimer.stop(flowMetadata); + } return; } + //If it is a scheduled flow (and hence, does not have flowExecutionId in the FlowSpec) and the flow compilation is successful, + // retrieve the flowExecutionId from the JobSpec. flowMetadata.putIfAbsent(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobExecutionPlanDag.getNodes().get(0).getValue().getJobSpec().getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); @@ -218,6 +231,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { flowCompilationTimer.stop(flowMetadata); } + if (this.dagManager.isPresent()) { //Send the dag to the DagManager. this.dagManager.get().offer(jobExecutionPlanDag); @@ -236,7 +250,7 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { _log.warn("JobSpec does not contain flowExecutionId."); } - Map<String, String> jobMetadata = getJobMetadata(flowMetadata, jobExecutionPlan); + Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(flowMetadata, jobExecutionPlan); _log.info(String.format("Going to orchestrate JobSpec: %s on Executor: %s", jobSpec, producer)); TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get(). @@ -261,34 +275,6 @@ public class Orchestrator implements SpecCatalogListener, Instrumentable { Instrumented.updateTimer(this.flowOrchestrationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); } - private Map<String, String> getFlowMetadata(FlowSpec flowSpec) { - Map<String, String> metadata = Maps.newHashMap(); - - metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY)); - metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY)); - if (metadata.containsKey(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD)) { - metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); - } - - return metadata; - } - - private Map<String, String> getJobMetadata(Map<String, String> flowMetadata, JobExecutionPlan jobExecutionPlan) { - Map<String, String> jobMetadata = Maps.newHashMap(); - JobSpec jobSpec = jobExecutionPlan.getJobSpec(); - SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor(); - - jobMetadata.putAll(flowMetadata); - jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY)); - jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY)); - jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); - jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY)); - jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY)); - jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName()); - - return jobMetadata; - } - public void remove(Spec spec, Properties headers) { // TODO: Evolve logic to cache and reuse previously compiled JobSpecs // .. this will work for Identity compiler but not always for multi-hop. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/25d0a7d1/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java new file mode 100644 index 0000000..38a9d07 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.util.Map; + +import com.google.common.collect.Maps; +import com.typesafe.config.Config; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.event.TimingEvent; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; + + +class TimingEventUtils { + static Map<String, String> getFlowMetadata(FlowSpec flowSpec) { + Map<String, String> metadata = Maps.newHashMap(); + + metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY)); + metadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY)); + + Config flowConfig = flowSpec.getConfig(); + if (flowConfig.hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) { + metadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, flowConfig.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); + } + return metadata; + } + + static Map<String, String> getJobMetadata(Map<String, String> flowMetadata, JobExecutionPlan jobExecutionPlan) { + Map<String, String> jobMetadata = Maps.newHashMap(); + JobSpec jobSpec = jobExecutionPlan.getJobSpec(); + SpecExecutor specExecutor = jobExecutionPlan.getSpecExecutor(); + + jobMetadata.putAll(flowMetadata); + jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.JOB_NAME_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_NAME_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD, jobSpec.getConfig().getString(ConfigurationKeys.JOB_GROUP_KEY)); + jobMetadata.put(TimingEvent.FlowEventConstants.SPEC_EXECUTOR_FIELD, specExecutor.getClass().getCanonicalName()); + + return jobMetadata; + } + +}
