[GOBBLIN-617] Add distributed job launcher metrics and some refactoring. Closes #2484 from yukuai518/distMetrics
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1155cdc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1155cdc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1155cdc5 Branch: refs/heads/master Commit: 1155cdc5e1944c536ef6c9707bf947dd1a57d67b Parents: c103a8f Author: Kuai Yu <[email protected]> Authored: Tue Oct 23 16:04:44 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Oct 23 16:04:44 2018 -0700 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 1 + .../gobblin/cluster/GobblinClusterManager.java | 22 +- ...blinHelixDistributeJobExecutionLauncher.java | 70 +++--- .../gobblin/cluster/GobblinHelixJobFactory.java | 26 +- .../cluster/GobblinHelixJobLauncher.java | 28 +-- .../GobblinHelixJobLauncherListener.java | 80 +++++++ .../cluster/GobblinHelixJobLauncherMetrics.java | 74 ++++++ .../cluster/GobblinHelixJobScheduler.java | 240 ++++++------------- .../GobblinHelixJobSchedulerMetrics.java | 96 ++++++++ .../gobblin/cluster/GobblinHelixJobTask.java | 54 ++++- .../cluster/GobblinHelixMultiManager.java | 6 +- .../gobblin/cluster/GobblinTaskRunner.java | 38 ++- .../cluster/GobblinTaskRunnerMetrics.java | 14 +- .../cluster/HelixRetriggeringJobCallable.java | 98 +++++--- .../gobblin/cluster/TaskRunnerSuiteBase.java | 6 +- .../cluster/TaskRunnerSuiteProcessModel.java | 7 +- .../cluster/TaskRunnerSuiteThreadModel.java | 21 +- .../TaskRunnerSuiteForJobFactoryTest.java | 25 +- .../instrumented/StandardMetricsBridge.java | 16 +- .../apache/gobblin/runtime/api/JobCatalog.java | 14 +- .../runtime/api/JobExecutionLauncher.java | 17 +- .../apache/gobblin/runtime/api/SpecCatalog.java | 11 +- .../modules/core/GobblinServiceManager.java | 6 + 23 files changed, 629 insertions(+), 341 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 2b1ba09..3fb665e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -77,6 +77,7 @@ public class GobblinClusterConfigurationKeys { public static final String PLANNING_JOB_NAME_PREFIX = "PlanningJob"; public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + "planning."; public static final String PLANNING_ID_KEY = PLANNING_CONF_PREFIX + "idKey"; + public static final String PLANNING_JOB_CREATE_TIME = PLANNING_CONF_PREFIX + "createTime"; // job spec operation public static final String JOB_ALWAYS_DELETE = GOBBLIN_CLUSTER_PREFIX + "job.alwaysDelete"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index b1d0f43..e90a65a 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -20,6 +20,8 @@ package org.apache.gobblin.cluster; import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -54,7 +56,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; -import javax.annotation.Nonnull; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -105,9 +106,6 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri private static final Logger LOGGER = LoggerFactory.getLogger(GobblinClusterManager.class); - @VisibleForTesting - protected GobblinHelixMultiManager multiManager; - private StopStatus stopStatus = new StopStatus(false); protected ServiceBasedAppLauncher applicationLauncher; @@ -127,12 +125,11 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri // set to true to stop the idle process thread private volatile boolean stopIdleProcessThread = false; - // flag to keep track of leader and avoid processing duplicate leadership change notifications - private boolean isLeader = false; - private final boolean isStandaloneMode; @Getter + protected GobblinHelixMultiManager multiManager; + @Getter private MutableJobCatalog jobCatalog; @Getter private GobblinHelixJobScheduler jobScheduler; @@ -142,14 +139,12 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri private final String clusterName; private final Config config; private final MetricContext metricContext; - private final StandardMetrics metrics; public GobblinClusterManager(String clusterName, String applicationId, Config config, Optional<Path> appWorkDirOptional) throws Exception { this.clusterName = clusterName; this.config = config; this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); - this.metrics = new StandardMetrics(); this.isStandaloneMode = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.STANDALONE_CLUSTER_MODE_KEY, GobblinClusterConfigurationKeys.DEFAULT_STANDALONE_CLUSTER_MODE); @@ -446,11 +441,14 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri } @Override - public StandardMetrics getStandardMetrics() { - return this.metrics; + public Collection<StandardMetrics> getStandardMetricsCollection() { + List<StandardMetrics> list = new ArrayList(); + list.addAll(this.jobScheduler.getStandardMetricsCollection()); + list.addAll(this.multiManager.getStandardMetricsCollection()); + list.addAll(this.jobCatalog.getStandardMetricsCollection()); + return list; } - @Nonnull @Override public MetricContext getMetricContext() { return this.metricContext; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index eb78938..f7b0a83 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -43,6 +43,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; @@ -53,7 +54,6 @@ import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.api.ExecutionResult; import org.apache.gobblin.runtime.api.JobExecutionLauncher; @@ -63,19 +63,19 @@ import org.apache.gobblin.runtime.api.MonitoredObject; import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.extractor.partition.Partitioner; import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.JobLauncherUtils; import org.apache.gobblin.util.PropertiesUtils; /** - * To avoid all the task driver logic ({@link GobblinHelixJobLauncher}) runs on the same instance (node), this - * {@link JobExecutionLauncher} can distribute the original job (called planning job) to Helix. Helix will - * assign this job to one participant. The participant can parse the original job properties and run the task driver. + * To avoid all the task driver logic ({@link GobblinHelixJobLauncher}) runs on the same + * instance (manager), this {@link JobExecutionLauncher} will distribute the original job + * to one of the worker (participant) node. The original job will be launched there. * * <p> * For job submission, the Helix workflow name will be the original job name with prefix - * {@link GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}. The Helix job name will be the auto-generated planning - * job ID with prefix {@link GobblinClusterConfigurationKeys#PLANNING_ID_KEY}. + * {@link GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}. The Helix job name + * will be the auto-generated planning job ID with prefix + * {@link GobblinClusterConfigurationKeys#PLANNING_ID_KEY}. * </p> * * <p> @@ -87,10 +87,11 @@ import org.apache.gobblin.util.PropertiesUtils; @Alpha @Slf4j class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher, Closeable { + protected HelixManager helixManager; protected TaskDriver helixTaskDriver; - protected Properties sysProperties; - protected Properties jobProperties; + protected Properties sysProps; + protected Properties jobPlanningProps; protected StateStores stateStores; protected static final String PLANNING_WORK_UNIT_DIR_NAME = "_plan_workunits"; @@ -115,11 +116,12 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception { this.helixManager = builder.manager; this.helixTaskDriver = new TaskDriver(this.helixManager); - this.sysProperties = builder.sysProperties; - this.jobProperties = builder.jobProperties; + this.sysProps = builder.sysProps; + this.jobPlanningProps = builder.jobPlanningProps; this.jobSubmitted = false; - Config combined = ConfigUtils.propertiesToConfig(jobProperties) - .withFallback(ConfigUtils.propertiesToConfig(sysProperties)); + + Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps) + .withFallback(ConfigUtils.propertiesToConfig(sysProps)); Config stateStoreJobConfig = combined .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef( @@ -141,8 +143,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } private void executeCancellation() { - String planningName = getPlanningJobId(this.jobProperties); if (this.jobSubmitted) { + String planningName = getPlanningJobId(this.jobPlanningProps); try { if (this.cancellationRequested && !this.cancellationExecuted) { // TODO : fix this when HELIX-1180 is completed @@ -159,8 +161,8 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher @Setter public static class Builder { - Properties sysProperties; - Properties jobProperties; + Properties sysProps; + Properties jobPlanningProps; HelixManager manager; Path appWorkDir; public GobblinHelixDistributeJobExecutionLauncher build() throws Exception { @@ -168,18 +170,12 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } } - private String getPlanningJobName (Properties jobProps) { - String jobName = JobState.getJobNameFromProps(jobProps); - return GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX + jobName; - } - - protected String getPlanningJobId (Properties jobProps) { - if (jobProps.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) { - return jobProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); + protected String getPlanningJobId (Properties jobPlanningProps) { + if (jobPlanningProps.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) { + return jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); + } else { + throw new RuntimeException("Cannot find planning id"); } - String planningId = JobLauncherUtils.newJobId(getPlanningJobName(jobProps)); - jobProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, planningId); - return planningId; } /** @@ -193,7 +189,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher * * In short, the planning job will run once and requires no timeout. */ - private JobConfig.Builder createPlanningJob (Properties jobProps) { + private JobConfig.Builder createJobBuilder (Properties jobProps) { // Create a single task for job planning String planningId = getPlanningJobId(jobProps); Map<String, TaskConfig> taskConfigMap = Maps.newHashMap(); @@ -224,7 +220,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } /** - * Submit job to helix so that it can be re-assigned to one of its participants. + * Submit a planning job to helix so that it can launched from a remote node. * @param jobName A planning job name which has prefix {@link GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}. * @param jobId A planning job id created by {@link GobblinHelixDistributeJobExecutionLauncher#getPlanningJobId}. * @param jobConfigBuilder A job config builder which contains a single task. @@ -241,19 +237,19 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } @Override - public DistributeJobMonitor launchJob(JobSpec jobSpec) { - this.jobMonitor = new DistributeJobMonitor(new DistributeJobCallable(this.jobProperties)); + public DistributeJobMonitor launchJob(@Nullable JobSpec jobSpec) { + this.jobMonitor = new DistributeJobMonitor(new DistributeJobCallable(this.jobPlanningProps)); return this.jobMonitor; } @AllArgsConstructor private class DistributeJobCallable implements Callable<ExecutionResult> { - Properties jobProps; + Properties jobPlanningProps; @Override public DistributeJobResult call() throws Exception { - String planningId = getPlanningJobId(this.jobProps); - JobConfig.Builder builder = createPlanningJob(this.jobProps); + String planningId = getPlanningJobId(this.jobPlanningProps); + JobConfig.Builder builder = createJobBuilder(this.jobPlanningProps); try { submitJobToHelix(planningId, planningId, builder); return waitForJobCompletion(planningId, planningId); @@ -265,9 +261,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } private DistributeJobResult waitForJobCompletion(String workFlowName, String jobName) throws InterruptedException { - boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, + boolean timeoutEnabled = Boolean.parseBoolean(this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); - long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, + long timeoutInSeconds = Long.parseLong(this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS)); try { @@ -287,7 +283,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher //TODO: change below to Helix UserConentStore @VisibleForTesting protected DistributeJobResult getResultFromUserContent() { - String planningId = getPlanningJobId(this.jobProperties); + String planningId = getPlanningJobId(this.jobPlanningProps); try { TaskState taskState = this.stateStores.getTaskStateStore().get(planningId, planningId, planningId); return new DistributeJobResult(Optional.of(taskState.getProperties()), Optional.empty()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java index 83821d4..5bee4e0 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java @@ -27,10 +27,13 @@ import org.apache.helix.task.TaskFactory; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.runtime.util.StateStores; +import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; @@ -38,10 +41,14 @@ import org.apache.gobblin.util.PathUtils; * An implementation of Helix's {@link TaskFactory} for {@link GobblinHelixJobTask}s. */ @Slf4j -public class GobblinHelixJobFactory implements TaskFactory { +class GobblinHelixJobFactory implements TaskFactory { protected StateStores stateStores; protected TaskRunnerSuiteBase.Builder builder; + @Getter + protected GobblinHelixJobLauncherMetrics launcherMetrics; + @Getter + protected GobblinHelixJobTask.GobblinHelixJobTaskMetrics jobTaskMetrics; private void initializeStateStore(TaskRunnerSuiteBase.Builder builder) { Config sysConfig = builder.getConfig(); @@ -57,16 +64,29 @@ public class GobblinHelixJobFactory implements TaskFactory { appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_JOB_STATE_DIR_NAME); } - public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) { + public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder, MetricContext metricContext) { + this.builder = builder; // TODO: We can remove below initialization once Helix allow us to persist job resut in userContentStore initializeStateStore(this.builder); + // initialize job related metrics (planning jobs) + int metricsWindowSizeInMin = ConfigUtils.getInt(builder.getConfig(), + ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, + ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); + + this.launcherMetrics = new GobblinHelixJobLauncherMetrics("launcherInJobFactory", + metricContext, + metricsWindowSizeInMin); + this.jobTaskMetrics = new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, + metricsWindowSizeInMin); } @Override public Task createNewTask(TaskCallbackContext context) { return new GobblinHelixJobTask(context, this.stateStores, - this.builder); + this.builder, + this.launcherMetrics, + this.jobTaskMetrics); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 8523e21..8d6d7b2 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -35,7 +35,6 @@ import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobQueue; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskDriver; -import org.apache.helix.task.TaskUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +44,6 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; import javax.annotation.Nullable; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; @@ -108,8 +106,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final HelixManager helixManager; private final TaskDriver helixTaskDriver; private final String helixWorkFlowName; - private final String jobResourceName; - @Getter private JobListener jobListener; private final FileSystem fs; @@ -123,15 +119,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { private final TaskStateCollectorService taskStateCollectorService; private volatile boolean jobSubmitted = false; - private volatile boolean jobComplete = false; private final ConcurrentHashMap<String, Boolean> runningMap; private final StateStores stateStores; private final Config jobConfig; private final long workFlowExpiryTimeSeconds; - public GobblinHelixJobLauncher(Properties jobProps, final HelixManager helixManager, Path appWorkDir, - List<? extends Tag<?>> metadataTags, ConcurrentHashMap<String, Boolean> runningMap) - throws Exception { + public GobblinHelixJobLauncher (Properties jobProps, + final HelixManager helixManager, + Path appWorkDir, + List<? extends Tag<?>> metadataTags, + ConcurrentHashMap<String, Boolean> runningMap) throws Exception { + super(jobProps, addAdditionalMetadataTags(jobProps, metadataTags)); LOGGER.debug("GobblinHelixJobLauncher: jobProps {}, appWorkDir {}", jobProps, appWorkDir); @@ -144,8 +142,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { + Path.SEPARATOR + this.jobContext.getJobId()); this.helixWorkFlowName = this.jobContext.getJobId(); - this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixWorkFlowName, this.jobContext.getJobId()); - this.jobContext.getJobState().setJobLauncherType(LauncherTypeEnum.CLUSTER); this.stateSerDeRunnerThreads = Integer.parseInt(jobProps.getProperty(ParallelRunner.PARALLEL_RUNNER_THREADS_KEY, @@ -170,8 +166,11 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { URI fsUri = URI.create(jobProps.getProperty(ConfigurationKeys.FS_URI_KEY, ConfigurationKeys.LOCAL_FS_URI)); this.fs = FileSystem.get(fsUri, new Configuration()); - this.taskStateCollectorService = new TaskStateCollectorService(jobProps, this.jobContext.getJobState(), - this.eventBus, this.stateStores.getTaskStateStore(), outputTaskStateDir); + this.taskStateCollectorService = new TaskStateCollectorService(jobProps, + this.jobContext.getJobState(), + this.eventBus, + this.stateStores.getTaskStateStore(), + this.outputTaskStateDir); startCancellationExecutor(); } @@ -212,7 +211,6 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { waitForJobCompletion(); jobRunTimer.stop(); LOGGER.info(String.format("Job %s completed", this.jobContext.getJobId())); - this.jobComplete = true; } finally { // The last iteration of output TaskState collecting will run when the collector service gets stopped this.taskStateCollectorService.stopAsync().awaitTerminated(); @@ -291,7 +289,9 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT)); if (this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY)) { - jobConfigBuilder.setInstanceGroupTag(this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY)); + String jobTag = this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY); + log.info("Job {} has tags associated : {}", this.jobContext.getJobId(), jobTag); + jobConfigBuilder.setInstanceGroupTag(jobTag); } if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherListener.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherListener.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherListener.java new file mode 100644 index 0000000..b19d301 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherListener.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Optional; + +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.runtime.JobContext; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.listeners.AbstractJobListener; +import org.apache.gobblin.runtime.listeners.JobListener; + + +/** + * A job listener used when {@link GobblinHelixJobLauncher} launches a job. + * The {@link GobblinHelixJobLauncherMetrics} will always be passed in because + * it will be be updated accordingly. + */ +class GobblinHelixJobLauncherListener extends AbstractJobListener { + + private final GobblinHelixJobLauncherMetrics jobLauncherMetrics; + private static final String JOB_START_TIME = "jobStartTime"; + + GobblinHelixJobLauncherListener(GobblinHelixJobLauncherMetrics jobLauncherMetrics) { + this.jobLauncherMetrics = jobLauncherMetrics; + } + + @Override + public void onJobPrepare(JobContext jobContext) + throws Exception { + super.onJobPrepare(jobContext); + jobContext.getJobState().setProp(JOB_START_TIME, Long.toString(System.nanoTime())); + jobLauncherMetrics.totalJobsLaunched.incrementAndGet(); + } + + /** + * From {@link org.apache.gobblin.runtime.AbstractJobLauncher#launchJob(JobListener)}, the final + * job state should only be FAILED or COMMITTED. This means the completed jobs metrics covers + * both failed jobs and committed jobs. + */ + @Override + public void onJobCompletion(JobContext jobContext) + throws Exception { + super.onJobCompletion(jobContext); + long startTime = jobContext.getJobState().getPropAsLong(JOB_START_TIME); + jobLauncherMetrics.totalJobsCompleted.incrementAndGet(); + Instrumented.updateTimer(Optional.of(jobLauncherMetrics.timeForCompletedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) { + jobLauncherMetrics.totalJobsFailed.incrementAndGet(); + Instrumented.updateTimer(Optional.of(jobLauncherMetrics.timeForFailedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + } else { + jobLauncherMetrics.totalJobsCommitted.incrementAndGet(); + Instrumented.updateTimer(Optional.of(jobLauncherMetrics.timeForCommittedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); + } + } + + @Override + public void onJobCancellation(JobContext jobContext) + throws Exception { + super.onJobCancellation(jobContext); + jobLauncherMetrics.totalJobsCancelled.incrementAndGet(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherMetrics.java new file mode 100644 index 0000000..b33edae --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherMetrics.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareTimer; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.runtime.api.JobExecutionLauncher; + +/** + * Metrics that relates to jobs launched by {@link GobblinHelixJobLauncher}. + */ +class GobblinHelixJobLauncherMetrics extends StandardMetricsBridge.StandardMetrics { + private final String metricsName; + final AtomicLong totalJobsLaunched; + final AtomicLong totalJobsCompleted; + final AtomicLong totalJobsCommitted; + final AtomicLong totalJobsFailed; + final AtomicLong totalJobsCancelled; + + final ContextAwareTimer timeForCompletedJobs; + final ContextAwareTimer timeForFailedJobs; + final ContextAwareTimer timeForCommittedJobs; + + public GobblinHelixJobLauncherMetrics(String metricsName, final MetricContext metricContext, int windowSizeInMin) { + this.metricsName = metricsName; + + // All historical counters + this.totalJobsLaunched = new AtomicLong(0); + this.totalJobsCompleted = new AtomicLong(0); + this.totalJobsCommitted = new AtomicLong(0); + this.totalJobsFailed = new AtomicLong(0); + this.totalJobsCancelled = new AtomicLong(0); + + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING, + ()->(int)(GobblinHelixJobLauncherMetrics.this.totalJobsLaunched.get() - GobblinHelixJobLauncherMetrics.this.totalJobsCompleted.get()))); + + this.timeForCompletedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMPLETED_JOBS, windowSizeInMin, TimeUnit.MINUTES); + this.timeForFailedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_FAILED_JOBS, windowSizeInMin, TimeUnit.MINUTES); + this.timeForCommittedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMMITTED_JOBS, windowSizeInMin, TimeUnit.MINUTES); + + this.contextAwareMetrics.add(timeForCommittedJobs); + this.contextAwareMetrics.add(timeForCompletedJobs); + this.contextAwareMetrics.add(timeForFailedJobs); + } + + @Override + public String getName() { + return this.metricsName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index e29fe61..a215ffa 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -19,22 +19,21 @@ package org.apache.gobblin.cluster; import java.net.URI; import java.net.URISyntaxException; +import java.util.Collection; import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; @@ -48,16 +47,11 @@ import org.apache.gobblin.cluster.event.UpdateJobConfigArrivalEvent; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; -import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.runtime.JobContext; import org.apache.gobblin.runtime.JobException; -import org.apache.gobblin.runtime.JobState; -import org.apache.gobblin.runtime.api.JobExecutionLauncher; import org.apache.gobblin.runtime.api.MutableJobCatalog; -import org.apache.gobblin.runtime.listeners.AbstractJobListener; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.scheduler.JobScheduler; import org.apache.gobblin.scheduler.SchedulerService; @@ -66,10 +60,19 @@ import org.apache.gobblin.util.PropertiesUtils; /** - * An extension to {@link JobScheduler} that schedules and runs Gobblin jobs on Helix using - * {@link GobblinHelixJobLauncher}s. + * An extension to {@link JobScheduler} that schedules and runs + * Gobblin jobs on Helix. * - * @author Yinan Li + * <p> The actual job running logic is handled by + * {@link HelixRetriggeringJobCallable}. This callable will first + * determine if this job should be launched from the same node + * where the scheduler is running, or from a remote node. + * + * <p> If the job should be launched from the scheduler node, + * {@link GobblinHelixJobLauncher} is invoked. Else the + * {@link GobblinHelixDistributeJobExecutionLauncher} is invoked. + * + * <p> More details can be found at {@link HelixRetriggeringJobCallable}. */ @Alpha public class GobblinHelixJobScheduler extends JobScheduler implements StandardMetricsBridge{ @@ -84,12 +87,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe private final ConcurrentHashMap<String, Boolean> jobRunningMap; private final MutableJobCatalog jobCatalog; private final MetricContext metricContext; - private final Metrics metrics; + + final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics; + final GobblinHelixJobLauncherMetrics launcherMetrics; + private boolean startServicesCompleted; - public GobblinHelixJobScheduler(Properties properties, HelixManager helixManager, EventBus eventBus, - Path appWorkDir, List<? extends Tag<?>> metadataTags, SchedulerService schedulerService, - MutableJobCatalog jobCatalog) throws Exception { + public GobblinHelixJobScheduler(Properties properties, + HelixManager helixManager, + EventBus eventBus, + Path appWorkDir, List<? extends Tag<?>> metadataTags, + SchedulerService schedulerService, + MutableJobCatalog jobCatalog) throws Exception { + super(properties, schedulerService); this.properties = properties; this.helixManager = helixManager; @@ -99,7 +109,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.metadataTags = metadataTags; this.jobCatalog = jobCatalog; this.metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(properties), this.getClass()); - this.metrics = new Metrics(this.metricContext); + + int metricsWindowSizeInMin = ConfigUtils.getInt(ConfigUtils.propertiesToConfig(this.properties), + ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, + ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); + + this.launcherMetrics = new GobblinHelixJobLauncherMetrics("launcherInScheduler", + this.metricContext, + metricsWindowSizeInMin); + + this.jobSchedulerMetrics = new GobblinHelixJobSchedulerMetrics(this.jobExecutor, + this.metricContext, + metricsWindowSizeInMin); + this.startServicesCompleted = false; } @@ -115,137 +137,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe } @Override - public StandardMetrics getStandardMetrics() { - return metrics; - } - - private class Metrics extends StandardMetrics { - - private final AtomicLong totalJobsLaunched; - private final AtomicLong totalJobsCompleted; - private final AtomicLong totalJobsCommitted; - private final AtomicLong totalJobsFailed; - private final AtomicLong totalJobsCancelled; - - private final ContextAwareTimer timeForCompletedJobs; - private final ContextAwareTimer timeForFailedJobs; - private final ContextAwareTimer timeForCommittedJobs; - private final ContextAwareTimer timeBeforeJobScheduling; - private final ContextAwareTimer timeBeforeJobLaunching; - private final ContextAwareTimer timeBetwenJobSchedulingAndLaunching; - - private final ThreadPoolExecutor threadPoolExecutor; - - public Metrics(final MetricContext metricContext) { - // Thread executor reference from job scheduler - this.threadPoolExecutor = (ThreadPoolExecutor)GobblinHelixJobScheduler.this.jobExecutor; - - // timer duration setup - int windowSize = ConfigUtils.getInt(ConfigUtils.propertiesToConfig(GobblinHelixJobScheduler.this.properties), - ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, - ConfigurationKeys.DEFAULT_METRIC_TIMER_WINDOW_SIZE_IN_MINUTES); - - // All historical counters - this.totalJobsLaunched = new AtomicLong(0); - this.totalJobsCompleted = new AtomicLong(0); - this.totalJobsCommitted = new AtomicLong(0); - this.totalJobsFailed = new AtomicLong(0); - this.totalJobsCancelled = new AtomicLong(0); - - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_LAUNCHED, ()->this.totalJobsLaunched.get())); - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMPLETED, ()->this.totalJobsCompleted.get())); - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_COMMITTED, ()->this.totalJobsCommitted.get())); - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_FAILED, ()->this.totalJobsFailed.get())); - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_CANCELLED, ()->this.totalJobsCancelled.get())); - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.NUM_JOBS_RUNNING, - ()->(int)(Metrics.this.totalJobsLaunched.get() - Metrics.this.totalJobsCompleted.get()))); - - this.timeForCompletedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMPLETED_JOBS, windowSize, TimeUnit.MINUTES); - this.timeForFailedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_FAILED_JOBS, windowSize, TimeUnit.MINUTES); - this.timeForCommittedJobs = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_FOR_COMMITTED_JOBS, windowSize, TimeUnit.MINUTES); - this.timeBeforeJobScheduling = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_SCHEDULING, windowSize, TimeUnit.MINUTES); - this.timeBeforeJobLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BEFORE_JOB_LAUNCHING, windowSize, TimeUnit.MINUTES); - this.timeBetwenJobSchedulingAndLaunching = metricContext.contextAwareTimer(JobExecutionLauncher.StandardMetrics.TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING, windowSize, TimeUnit.MINUTES); - - // executor metrics - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount())); - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize())); - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize())); - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize())); - this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size())); - - this.contextAwareMetrics.add(timeForCommittedJobs); - this.contextAwareMetrics.add(timeForCompletedJobs); - this.contextAwareMetrics.add(timeForFailedJobs); - this.contextAwareMetrics.add(timeBeforeJobScheduling); - this.contextAwareMetrics.add(timeBeforeJobLaunching); - this.contextAwareMetrics.add(timeBetwenJobSchedulingAndLaunching); - } - - private void updateTimeBeforeJobScheduling (Properties jobConfig) { - long jobCreationTime = Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0")); - Instrumented.updateTimer(Optional.of(timeBeforeJobScheduling), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS); - } - - private void updateTimeBeforeJobLaunching (Properties jobConfig) { - long jobCreationTime = Long.parseLong(jobConfig.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0")); - Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching), System.currentTimeMillis() - jobCreationTime, TimeUnit.MILLISECONDS); - } - - private void updateTimeBetweenJobSchedulingAndJobLaunching (long scheduledTime, long launchingTime) { - Instrumented.updateTimer(Optional.of(timeBetwenJobSchedulingAndLaunching), launchingTime - scheduledTime, TimeUnit.MILLISECONDS); - } - - @Override - public String getName() { - return GobblinHelixJobScheduler.class.getName(); - } - } - - private class MetricsTrackingListener extends AbstractJobListener { - private final Metrics metrics; - private static final String START_TIME = "startTime"; - MetricsTrackingListener(Metrics metrics) { - this.metrics = metrics; - } - - @Override - public void onJobPrepare(JobContext jobContext) - throws Exception { - super.onJobPrepare(jobContext); - jobContext.getJobState().setProp(START_TIME, Long.toString(System.nanoTime())); - if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { - metrics.totalJobsLaunched.incrementAndGet(); - } - } - - @Override - public void onJobCompletion(JobContext jobContext) - throws Exception { - super.onJobCompletion(jobContext); - long startTime = jobContext.getJobState().getPropAsLong(START_TIME); - if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { - metrics.totalJobsCompleted.incrementAndGet(); - Instrumented.updateTimer(Optional.of(metrics.timeForCompletedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - if (jobContext.getJobState().getState() == JobState.RunningState.FAILED) { - metrics.totalJobsFailed.incrementAndGet(); - Instrumented.updateTimer(Optional.of(metrics.timeForFailedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - } else { - metrics.totalJobsCommitted.incrementAndGet(); - Instrumented.updateTimer(Optional.of(metrics.timeForCommittedJobs), System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - } - } - } - - @Override - public void onJobCancellation(JobContext jobContext) - throws Exception { - super.onJobCancellation(jobContext); - if (GobblinHelixJobScheduler.this.isInstrumentationEnabled()) { - metrics.totalJobsCancelled.incrementAndGet(); - } - } - + public Collection<StandardMetrics> getStandardMetricsCollection() { + return ImmutableList.of(this.launcherMetrics, this.jobSchedulerMetrics); } @Override @@ -262,7 +155,12 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe LOGGER.info("{} service is not fully up, waiting here...", this.getClass().getName()); Thread.sleep(1000); } - scheduleJob(jobProps, jobListener, Maps.newHashMap(), GobblinHelixJob.class); + + scheduleJob(jobProps, + jobListener, + Maps.newHashMap(), + GobblinHelixJob.class); + } catch (Exception e) { throw new JobException("Failed to schedule job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); } @@ -274,7 +172,12 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override public void runJob(Properties jobProps, JobListener jobListener) throws JobException { - new HelixRetriggeringJobCallable(this, this.properties, jobProps, jobListener, this.appWorkDir, this.helixManager).call(); + new HelixRetriggeringJobCallable(this, + this.properties, + jobProps, + jobListener, + this.appWorkDir, + this.helixManager).call(); } @Override @@ -341,17 +244,19 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe public void handleNewJobConfigArrival(NewJobConfigArrivalEvent newJobArrival) { LOGGER.info("Received new job configuration of job " + newJobArrival.getJobName()); try { - Properties jobConfig = new Properties(); - jobConfig.putAll(newJobArrival.getJobConfig()); + Properties jobProps = new Properties(); + jobProps.putAll(newJobArrival.getJobConfig()); - metrics.updateTimeBeforeJobScheduling(jobConfig); + this.jobSchedulerMetrics.updateTimeBeforeJobScheduling(jobProps); - if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { + if (jobProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) { LOGGER.info("Scheduling job " + newJobArrival.getJobName()); - scheduleJob(jobConfig, new MetricsTrackingListener(metrics)); + scheduleJob(jobProps, + new GobblinHelixJobLauncherListener(this.launcherMetrics)); } else { LOGGER.info("No job schedule found, so running job " + newJobArrival.getJobName()); - this.jobExecutor.execute(new NonScheduledJobRunner(newJobArrival.getJobName(), jobConfig, new MetricsTrackingListener(metrics))); + this.jobExecutor.execute(new NonScheduledJobRunner(newJobArrival.getJobName(), jobProps, + new GobblinHelixJobLauncherListener(this.launcherMetrics))); } } catch (JobException je) { LOGGER.error("Failed to schedule or run job " + newJobArrival.getJobName(), je); @@ -391,13 +296,16 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe class NonScheduledJobRunner implements Runnable { private final String jobUri; - private final Properties jobConfig; - private final JobListener jobListener; + private final Properties jobProps; + private final GobblinHelixJobLauncherListener jobListener; private final Long creationTimeInMillis; - public NonScheduledJobRunner(String jobUri, Properties jobConfig, JobListener jobListener) { + public NonScheduledJobRunner(String jobUri, + Properties jobProps, + GobblinHelixJobLauncherListener jobListener) { + this.jobUri = jobUri; - this.jobConfig = jobConfig; + this.jobProps = jobProps; this.jobListener = jobListener; this.creationTimeInMillis = System.currentTimeMillis(); } @@ -414,13 +322,15 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override public void run() { - boolean alwaysDelete = PropertiesUtils - .getPropAsBoolean(this.jobConfig, GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE, "false"); + boolean alwaysDelete = PropertiesUtils.getPropAsBoolean(this.jobProps, + GobblinClusterConfigurationKeys.JOB_ALWAYS_DELETE, + "false"); boolean isDeleted = false; + try { - ((MetricsTrackingListener)jobListener).metrics.updateTimeBeforeJobLaunching(this.jobConfig); - ((MetricsTrackingListener)jobListener).metrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis()); - GobblinHelixJobScheduler.this.runJob(this.jobConfig, this.jobListener); + GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBeforeJobLaunching(this.jobProps); + GobblinHelixJobScheduler.this.jobSchedulerMetrics.updateTimeBetweenJobSchedulingAndJobLaunching(this.creationTimeInMillis, System.currentTimeMillis()); + GobblinHelixJobScheduler.this.runJob(this.jobProps, this.jobListener); // remove non-scheduled job catalog once done so it won't be re-executed if (GobblinHelixJobScheduler.this.jobCatalog != null) { @@ -433,7 +343,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe } } catch (JobException je) { deleteJobSpec(alwaysDelete, isDeleted); - LOGGER.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); + LOGGER.error("Failed to run job " + this.jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), je); } catch (Exception e) { deleteJobSpec(alwaysDelete, isDeleted); throw e; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerMetrics.java new file mode 100644 index 0000000..1045647 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobSchedulerMetrics.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.base.Optional; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareTimer; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.runtime.api.JobExecutionLauncher; + + +class GobblinHelixJobSchedulerMetrics extends StandardMetricsBridge.StandardMetrics { + public static final String SCHEDULE_CANCELLATION_START = "scheduleCancellationStart"; + public static final String SCHEDULE_CANCELLATION_END = "scheduleCancellationStart"; + public static final String TIMER_BEFORE_JOB_SCHEDULING = "timerBeforeJobScheduling"; + public static final String TIMER_BEFORE_JOB_LAUNCHING = "timerBeforeJobLaunching"; + public static final String TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING = "timerBetwenJobSchedulingAndLaunching"; + + final AtomicLong numCancellationStart; + final AtomicLong numCancellationComplete; + final ContextAwareTimer timeBeforeJobScheduling; + final ContextAwareTimer timeBeforeJobLaunching; + final ContextAwareTimer timeBetwenJobSchedulingAndLaunching; + + final ThreadPoolExecutor threadPoolExecutor; + + public GobblinHelixJobSchedulerMetrics (final ExecutorService jobExecutor, final MetricContext metricContext, int windowSizeInMin) { + this.timeBeforeJobScheduling = metricContext.contextAwareTimer(TIMER_BEFORE_JOB_SCHEDULING, + windowSizeInMin, TimeUnit.MINUTES); + this.timeBeforeJobLaunching = metricContext.contextAwareTimer(TIMER_BEFORE_JOB_LAUNCHING, + windowSizeInMin, TimeUnit.MINUTES); + this.timeBetwenJobSchedulingAndLaunching = metricContext.contextAwareTimer(TIMER_BETWEEN_JOB_SCHEDULING_AND_LAUNCHING, + windowSizeInMin, TimeUnit.MINUTES); + this.numCancellationStart = new AtomicLong(0); + this.numCancellationComplete = new AtomicLong(0); + + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(SCHEDULE_CANCELLATION_START, ()->this.numCancellationStart.get())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(SCHEDULE_CANCELLATION_END, ()->this.numCancellationComplete.get())); + this.contextAwareMetrics.add(timeBeforeJobScheduling); + this.contextAwareMetrics.add(timeBeforeJobLaunching); + this.contextAwareMetrics.add(timeBetwenJobSchedulingAndLaunching); + + this.threadPoolExecutor = (ThreadPoolExecutor) jobExecutor; + + // executor metrics + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_ACTIVE_COUNT, ()->this.threadPoolExecutor.getActiveCount())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_MAX_POOL_SIZE, ()->this.threadPoolExecutor.getMaximumPoolSize())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_POOL_SIZE, ()->this.threadPoolExecutor.getPoolSize())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_CORE_POOL_SIZE, ()->this.threadPoolExecutor.getCorePoolSize())); + this.contextAwareMetrics.add(metricContext.newContextAwareGauge(JobExecutionLauncher.StandardMetrics.EXECUTOR_QUEUE_SIZE, ()->this.threadPoolExecutor.getQueue().size())); + } + + void updateTimeBeforeJobScheduling (Properties jobProps) { + long jobCreationTime = Long.parseLong(jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0")); + Instrumented.updateTimer(Optional.of(timeBeforeJobScheduling), + System.currentTimeMillis() - jobCreationTime, + TimeUnit.MILLISECONDS); + } + + void updateTimeBeforeJobLaunching (Properties jobProps) { + long jobCreationTime = Long.parseLong(jobProps.getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "0")); + Instrumented.updateTimer(Optional.of(timeBeforeJobLaunching), + System.currentTimeMillis() - jobCreationTime, + TimeUnit.MILLISECONDS); + } + + void updateTimeBetweenJobSchedulingAndJobLaunching (long scheduledTime, long launchingTime) { + Instrumented.updateTimer(Optional.of(timeBetwenJobSchedulingAndLaunching), + launchingTime - scheduledTime, + TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java index 4fb198c..2f5bcdf 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.fs.Path; @@ -32,6 +33,7 @@ import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskResult; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.io.Closer; import com.typesafe.config.Config; @@ -40,6 +42,10 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareTimer; +import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.TaskState; @@ -51,7 +57,7 @@ import org.apache.gobblin.util.ConfigUtils; * An implementation of Helix's {@link org.apache.helix.task.Task} that runs original {@link GobblinHelixJobLauncher}. */ @Slf4j -public class GobblinHelixJobTask implements Task { +class GobblinHelixJobTask implements Task { private final TaskConfig taskConfig; private final Config sysConfig; @@ -61,15 +67,22 @@ public class GobblinHelixJobTask implements Task { private final HelixManager helixManager; private final Path appWorkDir; private final List<? extends Tag<?>> metadataTags; - private GobblinHelixJobLauncher launcher; - public GobblinHelixJobTask(TaskCallbackContext context, - StateStores stateStores, - TaskRunnerSuiteBase.Builder builder) { + private GobblinHelixJobTaskMetrics jobTaskMetrics; + private GobblinHelixJobLauncherListener jobLauncherListener; + + public GobblinHelixJobTask (TaskCallbackContext context, + StateStores stateStores, + TaskRunnerSuiteBase.Builder builder, + GobblinHelixJobLauncherMetrics launcherMetrics, + GobblinHelixJobTaskMetrics jobTaskMetrics) { + this.jobTaskMetrics = jobTaskMetrics; this.taskConfig = context.getTaskConfig(); this.sysConfig = builder.getConfig(); this.helixManager = builder.getHelixManager(); this.jobPlusSysConfig = ConfigUtils.configToProperties(sysConfig); + this.jobLauncherListener = new GobblinHelixJobLauncherListener(launcherMetrics); + Map<String, String> configMap = this.taskConfig.getConfigMap(); for (Map.Entry<String, String> entry: configMap.entrySet()) { if (entry.getKey().startsWith(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX)) { @@ -79,7 +92,7 @@ public class GobblinHelixJobTask implements Task { } if (!jobPlusSysConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) { - throw new RuntimeException("Job doesn't have plannning ID"); + throw new RuntimeException("Job doesn't have planning ID"); } this.planningJobId = jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); @@ -91,6 +104,25 @@ public class GobblinHelixJobTask implements Task { .build()); } + static class GobblinHelixJobTaskMetrics extends StandardMetricsBridge.StandardMetrics { + static final String TIME_BETWEEN_JOB_SUBMISSION_AND_EXECUTION = "timeBetweenJobSubmissionAndExecution"; + final ContextAwareTimer timeBetweenJobSubmissionAndExecution; + + public GobblinHelixJobTaskMetrics(MetricContext metricContext, int windowSizeInMin) { + timeBetweenJobSubmissionAndExecution = metricContext.contextAwareTimer(TIME_BETWEEN_JOB_SUBMISSION_AND_EXECUTION, + windowSizeInMin, TimeUnit.MINUTES); + this.contextAwareMetrics.add(timeBetweenJobSubmissionAndExecution); + } + + public void updateTimeBetweenJobSubmissionAndExecution(Properties jobProps) { + long jobSubmitTime = Long.parseLong(jobProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, "0")); + if (jobSubmitTime != 0) { + Instrumented.updateTimer(Optional.of(this.timeBetweenJobSubmissionAndExecution), + System.currentTimeMillis() - jobSubmitTime, + TimeUnit.MILLISECONDS); + } + } + } private GobblinHelixJobLauncher createJobLauncher() throws Exception { @@ -101,14 +133,16 @@ public class GobblinHelixJobTask implements Task { new ConcurrentHashMap<>()); } + /** + * Launch the actual {@link GobblinHelixJobLauncher}. + */ @Override public TaskResult run() { log.info("Running planning job {}", this.planningJobId); - // Launch the job + this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig); try (Closer closer = Closer.create()) { this.launcher = createJobLauncher(); - //TODO: we will provide additional listener - closer.register(launcher).launchJob(null); + closer.register(launcher).launchJob(this.jobLauncherListener); setResultToUserContent(ImmutableMap.of(Partitioner.IS_EARLY_STOPPED, "false")); } catch (Exception e) { return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils @@ -135,7 +169,7 @@ public class GobblinHelixJobTask implements Task { log.info("Cancelling planning job {}", this.planningJobId); if (launcher != null) { try { - launcher.cancelJob(launcher.getJobListener()); + launcher.cancelJob(this.jobLauncherListener); } catch (JobException e) { throw new RuntimeException("Unable to cancel planning job " + this.planningJobId + ": ", e); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java index f659978..c59625e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java @@ -18,6 +18,7 @@ package org.apache.gobblin.cluster; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -47,6 +48,7 @@ import org.apache.helix.task.WorkflowConfig; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.MoreExecutors; @@ -488,7 +490,7 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { } @Override - public StandardMetrics getStandardMetrics() { - return this.metrics; + public Collection<StandardMetrics> getStandardMetricsCollection() { + return ImmutableList.of(this.metrics); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 9d0e6cc..3f63e2d 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.URI; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -148,11 +149,15 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private final Path appWorkPath; private final MetricContext metricContext; - private final StandardMetricsBridge.StandardMetrics metrics; + private final Collection<StandardMetricsBridge.StandardMetrics> metricsCollection; + + public GobblinTaskRunner(String applicationName, + String helixInstanceName, + String applicationId, + String taskRunnerId, + Config config, + Optional<Path> appWorkDirOptional) throws Exception { - public GobblinTaskRunner(String applicationName, String helixInstanceName, String applicationId, - String taskRunnerId, Config config, Optional<Path> appWorkDirOptional) - throws Exception { this.helixInstanceName = helixInstanceName; this.taskRunnerId = taskRunnerId; this.applicationName = applicationName; @@ -168,9 +173,13 @@ public class GobblinTaskRunner implements StandardMetricsBridge { this.containerMetrics = buildContainerMetrics(); - String builderStr = ConfigUtils.getString(this.config, GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER, TaskRunnerSuiteBase.Builder.class.getName()); + String builderStr = ConfigUtils.getString(this.config, + GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER, + TaskRunnerSuiteBase.Builder.class.getName()); + TaskRunnerSuiteBase.Builder builder = GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor( - new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class).resolveClass(builderStr), this.config); + new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class) + .resolveClass(builderStr), this.config); TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath) .setContainerMetrics(this.containerMetrics) @@ -181,7 +190,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { .build(); this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap()); - this.metrics = suite.getTaskMetrics(); + this.metricsCollection = suite.getMetricsCollection(); this.metricContext = suite.getMetricContext(); this.services.addAll(suite.getServices()); @@ -193,7 +202,12 @@ public class GobblinTaskRunner implements StandardMetricsBridge { } logger.debug("GobblinTaskRunner: applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}", - applicationName, helixInstanceName, applicationId, taskRunnerId, config, appWorkDirOptional); + applicationName, + helixInstanceName, + applicationId, + taskRunnerId, + config, + appWorkDirOptional); } private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) { @@ -231,8 +245,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { * Start this {@link GobblinTaskRunner} instance. */ public void start() { - logger.info( - String.format("Starting %s in container %s", this.helixInstanceName, this.taskRunnerId)); + logger.info(String.format("Starting %s in container %s", this.helixInstanceName, this.taskRunnerId)); // Add a shutdown hook so the task scheduler gets properly shutdown addShutdownHook(); @@ -329,6 +342,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private void addInstanceTags() { if (this.helixManager.isConnected()) { List<String> tags = ConfigUtils.getStringList(this.config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY); + logger.info("Adding tags binding " + tags); tags.forEach(tag -> helixManager.getClusterManagmentTool().addInstanceTag(this.clusterName, this.helixInstanceName, tag)); } } @@ -379,8 +393,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge { } @Override - public StandardMetrics getStandardMetrics() { - return this.metrics; + public Collection<StandardMetrics> getStandardMetricsCollection() { + return this.metricsCollection; } @Nonnull http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java index 6435ff4..9fc8bc0 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunnerMetrics.java @@ -25,7 +25,10 @@ import org.apache.gobblin.runtime.TaskExecutor; public class GobblinTaskRunnerMetrics { - static class InProcessTaskRunnerMetrics extends StandardMetricsBridge.StandardMetrics { + /** + * This metrics shows the task execution that correlates to a work unit. + */ + static class TaskExecutionMetrics extends StandardMetricsBridge.StandardMetrics { private TaskExecutor taskExecutor; private static String CURRENT_QUEUED_TASK_COUNT = "currentQueuedTaskCount"; private static String HISTORICAL_QUEUED_TASK_COUNT = "historicalQueuedTaskCount"; @@ -37,7 +40,7 @@ public class GobblinTaskRunnerMetrics { private static String SUCCESSFUL_TASK_COUNT = "successfulTaskCount"; private static String RUNNING_TASK_COUNT = "runningTaskCount"; - public InProcessTaskRunnerMetrics (TaskExecutor executor, MetricContext context) { + public TaskExecutionMetrics (TaskExecutor executor, MetricContext context) { taskExecutor = executor; contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_COUNT, ()->this.taskExecutor.getCurrentQueuedTaskCount().longValue())); contextAwareMetrics.add(context.newContextAwareGauge(CURRENT_QUEUED_TASK_TOTAL_TIME, ()->this.taskExecutor.getCurrentQueuedTaskTotalTime().longValue())); @@ -48,21 +51,20 @@ public class GobblinTaskRunnerMetrics { contextAwareMetrics.add(context.newContextAwareGauge(FAILED_TASK_COUNT, ()->this.taskExecutor.getFailedTaskCount().getCount())); contextAwareMetrics.add(context.newContextAwareGauge(SUCCESSFUL_TASK_COUNT, ()->this.taskExecutor.getSuccessfulTaskCount().getCount())); contextAwareMetrics.add(context.newContextAwareGauge(RUNNING_TASK_COUNT, ()->this.taskExecutor.getRunningTaskCount().getCount())); - this.rawMetrics.put(ConfigurationKeys.WORK_UNIT_CREATION_AND_RUN_INTERVAL, this.taskExecutor.getTaskCreateAndRunTimer()); } @Override public String getName() { - return InProcessTaskRunnerMetrics.class.getName(); + return TaskExecutionMetrics.class.getName(); } } - static class JvmTaskRunnerMetrics extends StandardMetricsBridge.StandardMetrics { + static class JvmTaskMetrics extends StandardMetricsBridge.StandardMetrics { //TODO: add metrics to monitor the process execution status (will be revisited after process isolation work is done) @Override public String getName() { - return JvmTaskRunnerMetrics.class.getName(); + return JvmTaskMetrics.class.getName(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java index 7b9fd3c..6b400d3 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java @@ -30,11 +30,12 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.JobException; -import org.apache.gobblin.runtime.JobLauncher; +import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.api.ExecutionResult; import org.apache.gobblin.runtime.api.JobExecutionMonitor; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.JobLauncherUtils; import org.apache.gobblin.util.PropertiesUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @@ -44,39 +45,48 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; * 1) Re-triggering is enabled and * 2) Job stops early. * - * Moreover based on the job properties, a job can be processed immediately (non-distributed) or forwarded to a remote - * node (distributed) for handling. Details are illustrated as follows: + * Based on the job properties, a job can be processed immediately (non-distribution mode) or forwarded to a remote + * node (distribution mode). Details are as follows: * - * <p> - * If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is false, the job will be handled - * by {@link HelixRetriggeringJobCallable#launchJobLauncherLoop()}, which simply submits the job to Helix for execution. + * <p> Non-Distribution Mode: + * If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is false, the job will be handled + * by {@link HelixRetriggeringJobCallable#launchJobLauncherLoop()}, which simply launches {@link GobblinHelixJobLauncher} + * and submit the work units to Helix. Helix will dispatch the work units to different worker nodes. The worker node will + * handle the work units by {@link GobblinHelixTask}. * - * See {@link GobblinHelixJobLauncher} for job launcher details. + * See {@link GobblinHelixJobLauncher} for job launcher details. + * See {@link GobblinHelixTask} for work unit handling details. * </p> * - * <p> + * <p> Distribution Mode: * If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is true, the job will be handled - * by {@link HelixRetriggeringJobCallable#launchJobExecutionLauncherLoop()}}. It will first create a planning job with - * {@link GobblinTaskRunner#GOBBLIN_JOB_FACTORY_NAME} pre-configured, so that Helix can forward this planning job to - * any nodes that has implemented the Helix task factory model matching the same name. See {@link TaskRunnerSuiteThreadModel} - * implementation of how task factory model is setup. + * by {@link HelixRetriggeringJobCallable#launchJobExecutionLauncherLoop()}}, which simply launches + * {@link GobblinHelixDistributeJobExecutionLauncher} and submit a planning job to Helix. Helix will dispatch this + * planning job to a worker node. The worker node will handle this planning job by {@link GobblinHelixJobTask}. * - * Once the planning job reaches to the remote end, it will be handled by {@link GobblinHelixJobTask} which is - * created by {@link GobblinHelixJobTask}. The actual handling is similar to the non-distributed mode, where - * {@link GobblinHelixJobLauncher} is invoked. + * The {@link GobblinHelixJobTask} will launch {@link GobblinHelixJobLauncher} and it will again submit the actual + * work units to Helix. Helix will dispatch the work units to other worker nodes. Similar to Non-Distribution Node, + * some worker nodes will handle those work units by {@link GobblinHelixTask}. + * + * See {@link GobblinHelixDistributeJobExecutionLauncher} for planning job launcher details. + * See {@link GobblinHelixJobTask} for planning job handling details. + * See {@link GobblinHelixJobLauncher} for job launcher details. + * See {@link GobblinHelixTask} for work unit handling details. * </p> */ @Slf4j @Alpha class HelixRetriggeringJobCallable implements Callable { - private GobblinHelixJobScheduler jobScheduler; - private Properties sysProps; - private Properties jobProps; - private JobListener jobListener; - private JobLauncher currentJobLauncher = null; + private final GobblinHelixJobScheduler jobScheduler; + private final Properties sysProps; + private final Properties jobProps; + private final JobListener jobListener; + private final Path appWorkDir; + private final HelixManager helixManager; + + private GobblinHelixJobLauncher currentJobLauncher = null; private JobExecutionMonitor currentJobMonitor = null; - private Path appWorkDir; - private HelixManager helixManager; + private boolean isDistributeJobEnabled = false; public HelixRetriggeringJobCallable( GobblinHelixJobScheduler jobScheduler, @@ -91,6 +101,7 @@ class HelixRetriggeringJobCallable implements Callable { this.jobListener = jobListener; this.appWorkDir = appWorkDir; this.helixManager = helixManager; + this.isDistributeJobEnabled = isDistributeJobEnabled(); } private boolean isRetriggeringEnabled() { @@ -109,7 +120,7 @@ class HelixRetriggeringJobCallable implements Callable { @Override public Void call() throws JobException { - if (isDistributeJobEnabled()) { + if (this.isDistributeJobEnabled) { launchJobExecutionLauncherLoop(); } else { launchJobLauncherLoop(); @@ -140,12 +151,25 @@ class HelixRetriggeringJobCallable implements Callable { private void launchJobExecutionLauncherLoop() throws JobException { try { while (true) { - String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName()); - GobblinHelixDistributeJobExecutionLauncher.Builder builder = GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor( - new ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr)); + String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, + GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName()); + + GobblinHelixDistributeJobExecutionLauncher.Builder builder = GobblinConstructorUtils + .<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(new ClassAliasResolver( + GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr)); + + // Make a separate copy because we could update some of attributes in job properties (like adding planning id). + Properties jobPlanningProps = new Properties(); + jobPlanningProps.putAll(this.jobProps); - builder.setSysProperties(this.sysProps); - builder.setJobProperties(this.jobProps); + // Inject planning id and start time + String planningId = JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX + + JobState.getJobNameFromProps(jobPlanningProps)); + jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, planningId); + jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, String.valueOf(System.currentTimeMillis())); + + builder.setSysProps(this.sysProps); + builder.setJobPlanningProps(jobPlanningProps); builder.setManager(this.helixManager); builder.setAppWorkDir(this.appWorkDir); @@ -172,11 +196,19 @@ class HelixRetriggeringJobCallable implements Callable { } } - public void cancel() throws JobException { - if (currentJobLauncher != null) { - currentJobLauncher.cancelJob(this.jobListener); - } else if (currentJobMonitor != null) { - currentJobMonitor.cancel(false); + void cancel() throws JobException { + this.jobScheduler.jobSchedulerMetrics.numCancellationStart.incrementAndGet(); + + if (isDistributeJobEnabled) { + if (currentJobMonitor != null) { + currentJobMonitor.cancel(false); + } + } else { + if (currentJobLauncher != null) { + currentJobLauncher.cancelJob(this.jobListener); + } } + + this.jobScheduler.jobSchedulerMetrics.numCancellationComplete.incrementAndGet(); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java index e65b968..182cd07 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java @@ -17,6 +17,7 @@ package org.apache.gobblin.cluster; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -51,11 +52,10 @@ import org.apache.gobblin.util.ConfigUtils; @Alpha public abstract class TaskRunnerSuiteBase { protected TaskFactory taskFactory; - protected TaskFactory jobFactory; + protected GobblinHelixJobFactory jobFactory; protected MetricContext metricContext; protected String applicationId; protected String applicationName; - protected StandardMetricsBridge.StandardMetrics taskMetrics; protected List<Service> services = Lists.newArrayList(); protected TaskRunnerSuiteBase(Builder builder) { @@ -68,7 +68,7 @@ public abstract class TaskRunnerSuiteBase { return this.metricContext; } - protected abstract StandardMetricsBridge.StandardMetrics getTaskMetrics(); + protected abstract Collection<StandardMetricsBridge.StandardMetrics> getMetricsCollection(); protected abstract Map<String, TaskFactory> getTaskFactoryMap(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1155cdc5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java index bf21a4a..72ed17b 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java @@ -17,12 +17,14 @@ package org.apache.gobblin.cluster; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskFactory; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Service; @@ -44,12 +46,11 @@ class TaskRunnerSuiteProcessModel extends TaskRunnerSuiteBase { taskFactory = new HelixTaskFactory(builder.getContainerMetrics(), GobblinTaskRunner.CLUSTER_CONF_PATH, builder.getConfig()); - taskMetrics = new GobblinTaskRunnerMetrics.JvmTaskRunnerMetrics(); } @Override - protected StandardMetricsBridge.StandardMetrics getTaskMetrics() { - return this.taskMetrics; + protected Collection<StandardMetricsBridge.StandardMetrics> getMetricsCollection() { + return ImmutableList.of(new GobblinTaskRunnerMetrics.JvmTaskMetrics()); } @Override
