Repository: incubator-gobblin Updated Branches: refs/heads/master ccaa02c6e -> 11a1c46ab
[GOBBLIN-647] Move early stop logic to task driver instance. Closes #2517 from kyuamazon/td Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/11a1c46a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/11a1c46a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/11a1c46a Branch: refs/heads/master Commit: 11a1c46ab9907c403f384b51a81f56c2d281ccaf Parents: ccaa02c Author: Kuai Yu <[email protected]> Authored: Fri Dec 7 18:35:54 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Fri Dec 7 18:35:54 2018 -0800 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 6 +- ...blinHelixDistributeJobExecutionLauncher.java | 58 ++----- .../gobblin/cluster/GobblinHelixJobFactory.java | 26 ++-- .../cluster/GobblinHelixJobLauncher.java | 6 +- .../gobblin/cluster/GobblinHelixJobTask.java | 80 ++++++---- .../gobblin/cluster/HelixJobsMapping.java | 150 +++++++++++++++++++ .../cluster/HelixRetriggeringJobCallable.java | 130 +++++++++------- .../org/apache/gobblin/cluster/HelixUtils.java | 25 ++++ .../TaskRunnerSuiteForJobFactoryTest.java | 41 ++--- 9 files changed, 356 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 00a2e91..873793f 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -126,5 +126,9 @@ public class GobblinClusterConfigurationKeys { public static final String HELIX_JOB_TIMEOUT_SECONDS = "helix.job.timeout.seconds"; public static final String DEFAULT_HELIX_JOB_TIMEOUT_SECONDS = "10800"; public static final String HELIX_TASK_TIMEOUT_SECONDS = "helix.task.timeout.seconds"; - public static final String HELIX_MAX_TASK_RETRIES_KEY = "helix.task.maxretries"; + public static final String HELIX_TASK_MAX_ATTEMPTS_KEY = "helix.task.maxAttempts"; + + public static final String HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "workflowDeleteTimeoutSeconds"; + public static final long DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS = 300; + } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index e01ab28..609639e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -19,7 +19,6 @@ package org.apache.gobblin.cluster; import java.io.Closeable; import java.io.IOException; -import java.net.URI; import java.util.List; import java.util.Map; import java.util.Optional; @@ -36,11 +35,9 @@ import org.apache.helix.task.JobConfig; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskDriver; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -50,19 +47,16 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; -import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.api.ExecutionResult; import org.apache.gobblin.runtime.api.JobExecutionLauncher; import org.apache.gobblin.runtime.api.JobExecutionMonitor; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.MonitoredObject; -import org.apache.gobblin.runtime.util.StateStores; -import org.apache.gobblin.source.extractor.partition.Partitioner; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.PropertiesUtils; @@ -92,11 +86,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher protected TaskDriver helixTaskDriver; protected Properties sysProps; protected Properties jobPlanningProps; - protected StateStores stateStores; - - protected static final String PLANNING_WORK_UNIT_DIR_NAME = "_plan_workunits"; - protected static final String PLANNING_TASK_STATE_DIR_NAME = "_plan_taskstates"; - protected static final String PLANNING_JOB_STATE_DIR_NAME = "_plan_jobstates"; + protected HelixJobsMapping jobsMapping; protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps."; @@ -123,15 +113,9 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher Config combined = ConfigUtils.propertiesToConfig(jobPlanningProps) .withFallback(ConfigUtils.propertiesToConfig(sysProps)); - Config stateStoreJobConfig = combined - .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef( - new URI(builder.appWorkDir.toUri().getScheme(), null, builder.appWorkDir.toUri().getHost(), - builder.appWorkDir.toUri().getPort(), null, null, null).toString())); - - this.stateStores = new StateStores(stateStoreJobConfig, - builder.appWorkDir, PLANNING_TASK_STATE_DIR_NAME, - builder.appWorkDir, PLANNING_WORK_UNIT_DIR_NAME, - builder.appWorkDir, PLANNING_JOB_STATE_DIR_NAME); + this.jobsMapping = new HelixJobsMapping(combined, + PathUtils.getRootPath(builder.appWorkDir).toUri(), + builder.appWorkDir.toString()); this.workFlowExpiryTimeSeconds = ConfigUtils.getLong(combined, GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, @@ -144,17 +128,17 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher private void executeCancellation() { if (this.jobSubmitted) { - String planningName = getPlanningJobId(this.jobPlanningProps); + String planningJobId = getPlanningJobId(this.jobPlanningProps); try { if (this.cancellationRequested && !this.cancellationExecuted) { // TODO : fix this when HELIX-1180 is completed // work flow should never be deleted explicitly because it has a expiry time // If cancellation is requested, we should set the job state to CANCELLED/ABORT - this.helixTaskDriver.waitToStop(planningName, 10000L); - log.info("Stopped the workflow ", planningName); + this.helixTaskDriver.waitToStop(planningJobId, 10000L); + log.info("Stopped the workflow ", planningJobId); } } catch (InterruptedException e) { - throw new RuntimeException("Failed to stop workflow " + planningName + " in Helix", e); + throw new RuntimeException("Failed to stop workflow " + planningJobId + " in Helix", e); } } } @@ -262,7 +246,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher return waitForJobCompletion(planningId, planningId); } catch (Exception e) { log.error(planningId + " is not able to submit."); - return new DistributeJobResult(Optional.empty(), Optional.of(e)); + return new DistributeJobResult(false); } } } @@ -283,35 +267,19 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher } catch (TimeoutException te) { HelixUtils.handleJobTimeout(workFlowName, jobName, helixManager, this, null); - return new DistributeJobResult(Optional.empty(), Optional.of(te)); + return new DistributeJobResult(false); } } - //TODO: change below to Helix UserConentStore - @VisibleForTesting + //TODO: we will remove this logic after we change to polling model. protected DistributeJobResult getResultFromUserContent() { - String planningId = getPlanningJobId(this.jobPlanningProps); - try { - TaskState taskState = this.stateStores.getTaskStateStore().get(planningId, planningId, planningId); - return new DistributeJobResult(Optional.of(taskState.getProperties()), Optional.empty()); - } catch (IOException e) { - return new DistributeJobResult(Optional.empty(), Optional.of(e)); - } + return new DistributeJobResult(false); } @Getter @AllArgsConstructor static class DistributeJobResult implements ExecutionResult { boolean isEarlyStopped = false; - Optional<Properties> properties; - Optional<Throwable> throwable; - public DistributeJobResult(Optional<Properties> properties, Optional<Throwable> throwable) { - this.properties = properties; - this.throwable = throwable; - if (properties.isPresent()) { - isEarlyStopped = PropertiesUtils.getPropAsBoolean(this.properties.get(), Partitioner.IS_EARLY_STOPPED, "false"); - } - } } private class DistributeJobMonitor extends FutureTask<ExecutionResult> implements JobExecutionMonitor { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java index 5bee4e0..f631abf 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java @@ -25,24 +25,23 @@ import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskFactory; import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; /** - * An implementation of Helix's {@link TaskFactory} for {@link GobblinHelixJobTask}s. + * <p> A {@link TaskFactory} that creates {@link GobblinHelixJobTask} + * to run task driver logic. */ @Slf4j class GobblinHelixJobFactory implements TaskFactory { - protected StateStores stateStores; + protected HelixJobsMapping jobsMapping; protected TaskRunnerSuiteBase.Builder builder; @Getter @@ -50,25 +49,20 @@ class GobblinHelixJobFactory implements TaskFactory { @Getter protected GobblinHelixJobTask.GobblinHelixJobTaskMetrics jobTaskMetrics; - private void initializeStateStore(TaskRunnerSuiteBase.Builder builder) { + private void initJobMapping(TaskRunnerSuiteBase.Builder builder) { Config sysConfig = builder.getConfig(); Path appWorkDir = builder.getAppWorkPath(); URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri(); - Config stateStoreJobConfig = sysConfig - .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, - ConfigValueFactory.fromAnyRef(rootPathUri.toString())); - - this.stateStores = new StateStores(stateStoreJobConfig, - appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_TASK_STATE_DIR_NAME, - appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_WORK_UNIT_DIR_NAME, - appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_JOB_STATE_DIR_NAME); + this.jobsMapping = new HelixJobsMapping(sysConfig, + rootPathUri, + appWorkDir.toString()); } public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder, MetricContext metricContext) { this.builder = builder; - // TODO: We can remove below initialization once Helix allow us to persist job resut in userContentStore - initializeStateStore(this.builder); + initJobMapping(this.builder); + // initialize job related metrics (planning jobs) int metricsWindowSizeInMin = ConfigUtils.getInt(builder.getConfig(), ConfigurationKeys.METRIC_TIMER_WINDOW_SIZE_IN_MINUTES, @@ -84,7 +78,7 @@ class GobblinHelixJobFactory implements TaskFactory { @Override public Task createNewTask(TaskCallbackContext context) { return new GobblinHelixJobTask(context, - this.stateStores, + this.jobsMapping, this.builder, this.launcherMetrics, this.jobTaskMetrics); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 8d6d7b2..5798da0 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -184,6 +184,10 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { } } + public String getJobId() { + return this.jobContext.getJobId(); + } + @Override protected void runWorkUnits(List<WorkUnit> workUnits) throws Exception { try { @@ -270,7 +274,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { // Helix task attempts = retries + 1 (fallback to general task retry for backward compatibility) jobConfigBuilder.setMaxAttemptsPerTask(this.jobContext.getJobState().getPropAsInt( - GobblinClusterConfigurationKeys.HELIX_MAX_TASK_RETRIES_KEY, + GobblinClusterConfigurationKeys.HELIX_TASK_MAX_ATTEMPTS_KEY, this.jobContext.getJobState().getPropAsInt( ConfigurationKeys.MAX_TASK_RETRIES_KEY, ConfigurationKeys.DEFAULT_MAX_TASK_RETRIES)) + 1); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java index 2f5bcdf..0266413 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java @@ -17,23 +17,22 @@ package org.apache.gobblin.cluster; -import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskResult; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.io.Closer; import com.typesafe.config.Config; @@ -41,17 +40,15 @@ import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.ContextAwareTimer; import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.JobException; -import org.apache.gobblin.runtime.TaskState; -import org.apache.gobblin.runtime.util.StateStores; -import org.apache.gobblin.source.extractor.partition.Partitioner; import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PropertiesUtils; + /** * An implementation of Helix's {@link org.apache.helix.task.Task} that runs original {@link GobblinHelixJobLauncher}. @@ -62,7 +59,7 @@ class GobblinHelixJobTask implements Task { private final TaskConfig taskConfig; private final Config sysConfig; private final Properties jobPlusSysConfig; - private final StateStores stateStores; + private final HelixJobsMapping jobsMapping; private final String planningJobId; private final HelixManager helixManager; private final Path appWorkDir; @@ -72,7 +69,7 @@ class GobblinHelixJobTask implements Task { private GobblinHelixJobLauncherListener jobLauncherListener; public GobblinHelixJobTask (TaskCallbackContext context, - StateStores stateStores, + HelixJobsMapping jobsMapping, TaskRunnerSuiteBase.Builder builder, GobblinHelixJobLauncherMetrics launcherMetrics, GobblinHelixJobTaskMetrics jobTaskMetrics) { @@ -96,7 +93,7 @@ class GobblinHelixJobTask implements Task { } this.planningJobId = jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); - this.stateStores = stateStores; + this.jobsMapping = jobsMapping; this.appWorkDir = builder.getAppWorkPath(); this.metadataTags = Tag.fromMap(new ImmutableMap.Builder<String, Object>() .put(GobblinClusterMetricTagNames.APPLICATION_NAME, builder.getApplicationName()) @@ -117,7 +114,7 @@ class GobblinHelixJobTask implements Task { public void updateTimeBetweenJobSubmissionAndExecution(Properties jobProps) { long jobSubmitTime = Long.parseLong(jobProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, "0")); if (jobSubmitTime != 0) { - Instrumented.updateTimer(Optional.of(this.timeBetweenJobSubmissionAndExecution), + Instrumented.updateTimer(com.google.common.base.Optional.of(this.timeBetweenJobSubmissionAndExecution), System.currentTimeMillis() - jobSubmitTime, TimeUnit.MILLISECONDS); } @@ -140,10 +137,52 @@ class GobblinHelixJobTask implements Task { public TaskResult run() { log.info("Running planning job {}", this.planningJobId); this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig); + try (Closer closer = Closer.create()) { - this.launcher = createJobLauncher(); - closer.register(launcher).launchJob(this.jobLauncherListener); - setResultToUserContent(ImmutableMap.of(Partitioner.IS_EARLY_STOPPED, "false")); + String jobName = jobPlusSysConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY); + + Optional<String> planningIdFromStateStore = this.jobsMapping.getPlanningJobId(jobName); + + long timeOut = PropertiesUtils.getPropAsLong(jobPlusSysConfig, + GobblinClusterConfigurationKeys.HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_DELETE_TIMEOUT_SECONDS) * 1000; + + if (planningIdFromStateStore.isPresent() && !planningIdFromStateStore.get().equals(this.planningJobId)) { + return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + + ": because planning job in state store has different id (" + planningIdFromStateStore.get() + ")"); + } + + while (true) { + Optional<String> actualJobIdFromStateStore = this.jobsMapping.getActualJobId(jobName); + if (actualJobIdFromStateStore.isPresent()) { + String previousActualJobId = actualJobIdFromStateStore.get(); + if (HelixUtils.isJobFinished(previousActualJobId, previousActualJobId, this.helixManager)) { + log.info("Previous actual job {} [plan: {}] finished, will launch a new job.", previousActualJobId, this.planningJobId); + } else { + log.info("Previous actual job {} [plan: {}] not finished, kill it now.", previousActualJobId, this.planningJobId); + try { + HelixUtils.deleteWorkflow(previousActualJobId, this.helixManager, timeOut); + } catch (HelixException e) { + log.error("Helix cannot delete previous actual job id {} within 5 min.", previousActualJobId); + return new TaskResult(TaskResult.Status.FAILED, ExceptionUtils.getFullStackTrace(e)); + } + } + } else { + log.info("Actual job {} does not exist. First time run.", this.planningJobId); + } + + this.launcher = createJobLauncher(); + + this.jobsMapping.setActualJobId(jobName, this.planningJobId, this.launcher.getJobId()); + + closer.register(launcher).launchJob(this.jobLauncherListener); + + if (!this.launcher.isEarlyStopped()) { + break; + } else { + log.info("Planning job {} has more runs due to early stop.", this.planningJobId); + } + } } catch (Exception e) { return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils .getFullStackTrace(e)); @@ -151,19 +190,6 @@ class GobblinHelixJobTask implements Task { return new TaskResult(TaskResult.Status.COMPLETED, ""); } - //TODO: change below to Helix UserConentStore - @VisibleForTesting - protected void setResultToUserContent(Map<String, String> keyValues) throws IOException { - WorkUnitState wus = new WorkUnitState(); - wus.setProp(ConfigurationKeys.JOB_ID_KEY, this.planningJobId); - wus.setProp(ConfigurationKeys.TASK_ID_KEY, this.planningJobId); - wus.setProp(ConfigurationKeys.TASK_KEY_KEY, this.planningJobId); - keyValues.forEach((key, value)->wus.setProp(key, value)); - TaskState taskState = new TaskState(wus); - - this.stateStores.getTaskStateStore().put(this.planningJobId, this.planningJobId, taskState); - } - @Override public void cancel() { log.info("Cancelling planning job {}", this.planningJobId); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java new file mode 100644 index 0000000..8c2a836 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixJobsMapping.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.IOException; +import java.net.URI; +import java.util.Optional; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import javax.annotation.Nullable; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metastore.FsStateStore; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ConfigUtils; + + +/** + * <p> Any job that submitted to Helix will have a unique id. + * We need some mapping between the job name and the job id, + * in order to perform: + * + * 1) cancel a running job + * 2) delete a running job + * 3) block any incoming job with same name. + * + * <p> More complexity comes into the picture in the distributed + * ask driver mode, where we will have a job name, which maps to a + * planning job id and further maps to a real job id. + * + * <p> We will leverage the state store functionality. We will save + * job name as a storeName, and tableName. The planning job id and + * real job id will be saved in the state object. + */ +public class HelixJobsMapping { + + public static final String JOBS_MAPPING_DB_TABLE_KEY = "jobs.mapping.db.table.key"; + public static final String DEFAULT_JOBS_MAPPING_DB_TABLE_KEY_NAME = "JobsMapping"; + + public static final String DISTRIBUTED_STATE_STORE_NAME_KEY = "jobs.mapping.distributed.state.store.name"; + public static final String DEFAULT_DISTRIBUTED_STATE_STORE_NAME = "distributedState"; + + private StateStore stateStore; + private String distributedStateStoreName; + + public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) { + String stateStoreType = ConfigUtils.getString(sysConfig, + ConfigurationKeys.INTERMEDIATE_STATE_STORE_TYPE_KEY, + ConfigUtils.getString(sysConfig, + ConfigurationKeys.STATE_STORE_TYPE_KEY, + ConfigurationKeys.DEFAULT_STATE_STORE_TYPE)); + + ClassAliasResolver<StateStore.Factory> resolver = + new ClassAliasResolver<>(StateStore.Factory.class); + StateStore.Factory stateStoreFactory; + + try { + stateStoreFactory = resolver.resolveClass(stateStoreType).newInstance(); + } catch (ClassNotFoundException cnfe) { + throw new RuntimeException(cnfe); + } catch (InstantiationException ie) { + throw new RuntimeException(ie); + } catch (IllegalAccessException iae) { + throw new RuntimeException(iae); + } + + String dbTableKey = ConfigUtils.getString(sysConfig, JOBS_MAPPING_DB_TABLE_KEY, DEFAULT_JOBS_MAPPING_DB_TABLE_KEY_NAME); + this.distributedStateStoreName = ConfigUtils.getString(sysConfig, DISTRIBUTED_STATE_STORE_NAME_KEY, DEFAULT_DISTRIBUTED_STATE_STORE_NAME); + + Config stateStoreJobConfig = sysConfig + .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef(fsUri.toString())) + .withValue(ConfigurationKeys.STATE_STORE_ROOT_DIR_KEY, ConfigValueFactory.fromAnyRef(rootDir)) + .withValue(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, ConfigValueFactory.fromAnyRef(dbTableKey)); + + this.stateStore = stateStoreFactory.createStateStore(stateStoreJobConfig, State.class); + } + + @Nullable + private State getOrCreate (String storeName, String jobName) throws IOException { + if (this.stateStore.exists(storeName, jobName)) { + return this.stateStore.get(storeName, jobName, jobName); + } + return new State(); + } + + private void delete (String storeName, String jobName) throws IOException { + this.stateStore.delete(storeName, jobName); + } + + public void setPlanningJobId (String jobName, String planningJobId) throws IOException { + State state = getOrCreate(distributedStateStoreName, jobName); + state.setId(jobName); + state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, planningJobId); + // fs state store use hdfs rename, which assumes the target file doesn't exist. + if (stateStore instanceof FsStateStore) { + this.delete(distributedStateStoreName, jobName); + } + this.stateStore.put(distributedStateStoreName, jobName, state); + } + + public void setActualJobId (String jobName, String planningJobId, String actualJobId) throws IOException { + State state = getOrCreate(distributedStateStoreName, jobName); + state.setId(jobName); + state.setProp(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, planningJobId); + state.setProp(ConfigurationKeys.JOB_ID_KEY, actualJobId); + // fs state store use hdfs rename, which assumes the target file doesn't exist. + if (stateStore instanceof FsStateStore) { + this.delete(distributedStateStoreName, jobName); + } + this.stateStore.put(distributedStateStoreName, jobName, state); + } + + private Optional<String> getId (String jobName, String idName) throws IOException { + State state = this.stateStore.get(distributedStateStoreName, jobName, jobName); + if (state == null) { + return Optional.empty(); + } + + String id = state.getProp(idName); + + return id == null? Optional.empty() : Optional.of(id); + } + + public Optional<String> getActualJobId (String jobName) throws IOException { + return getId(jobName, ConfigurationKeys.JOB_ID_KEY); + } + + public Optional<String> getPlanningJobId (String jobName) throws IOException { + return getId(jobName, GobblinClusterConfigurationKeys.PLANNING_ID_KEY); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java index 6b400d3..f9c70f7 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java @@ -17,6 +17,7 @@ package org.apache.gobblin.cluster; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.Callable; @@ -31,11 +32,12 @@ import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobState; -import org.apache.gobblin.runtime.api.ExecutionResult; import org.apache.gobblin.runtime.api.JobExecutionMonitor; import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.JobLauncherUtils; +import org.apache.gobblin.util.PathUtils; import org.apache.gobblin.util.PropertiesUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; @@ -50,9 +52,9 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; * * <p> Non-Distribution Mode: * If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is false, the job will be handled - * by {@link HelixRetriggeringJobCallable#launchJobLauncherLoop()}, which simply launches {@link GobblinHelixJobLauncher} + * by {@link HelixRetriggeringJobCallable#runJobLauncherLoop()}, which simply launches {@link GobblinHelixJobLauncher} * and submit the work units to Helix. Helix will dispatch the work units to different worker nodes. The worker node will - * handle the work units by {@link GobblinHelixTask}. + * handle the work units via launching {@link GobblinHelixTask}. * * See {@link GobblinHelixJobLauncher} for job launcher details. * See {@link GobblinHelixTask} for work unit handling details. @@ -60,13 +62,14 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; * * <p> Distribution Mode: * If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is true, the job will be handled - * by {@link HelixRetriggeringJobCallable#launchJobExecutionLauncherLoop()}}, which simply launches + * by {@link HelixRetriggeringJobCallable#runJobExecutionLauncher()}, which simply launches * {@link GobblinHelixDistributeJobExecutionLauncher} and submit a planning job to Helix. Helix will dispatch this - * planning job to a worker node. The worker node will handle this planning job by {@link GobblinHelixJobTask}. + * planning job to a task-driver node. The task-driver node will handle this planning job via launching + * {@link GobblinHelixJobTask}. * - * The {@link GobblinHelixJobTask} will launch {@link GobblinHelixJobLauncher} and it will again submit the actual - * work units to Helix. Helix will dispatch the work units to other worker nodes. Similar to Non-Distribution Node, - * some worker nodes will handle those work units by {@link GobblinHelixTask}. + * The {@link GobblinHelixJobTask} will again launch {@link GobblinHelixJobLauncher} to submit the actual job + * to Helix. Helix will dispatch the work units to other worker nodes. Similar to Non-Distribution Node, + * some worker nodes will handle those work units by launching {@link GobblinHelixTask}. * * See {@link GobblinHelixDistributeJobExecutionLauncher} for planning job launcher details. * See {@link GobblinHelixJobTask} for planning job handling details. @@ -83,7 +86,7 @@ class HelixRetriggeringJobCallable implements Callable { private final JobListener jobListener; private final Path appWorkDir; private final HelixManager helixManager; - + protected HelixJobsMapping jobsMapping; private GobblinHelixJobLauncher currentJobLauncher = null; private JobExecutionMonitor currentJobMonitor = null; private boolean isDistributeJobEnabled = false; @@ -102,6 +105,9 @@ class HelixRetriggeringJobCallable implements Callable { this.appWorkDir = appWorkDir; this.helixManager = helixManager; this.isDistributeJobEnabled = isDistributeJobEnabled(); + this.jobsMapping = new HelixJobsMapping(ConfigUtils.propertiesToConfig(sysProps), + PathUtils.getRootPath(appWorkDir).toUri(), + appWorkDir.toString()); } private boolean isRetriggeringEnabled() { @@ -121,18 +127,25 @@ class HelixRetriggeringJobCallable implements Callable { @Override public Void call() throws JobException { if (this.isDistributeJobEnabled) { - launchJobExecutionLauncherLoop(); + runJobExecutionLauncher(); } else { - launchJobLauncherLoop(); + runJobLauncherLoop(); } return null; } - private void launchJobLauncherLoop() throws JobException { + /** + * <p> In some cases, the job launcher will be early stopped. + * It can be due to the large volume of input source data. + * In such case, we need to re-launch the same job until + * the job launcher determines it is safe to stop. + */ + private void runJobLauncherLoop() throws JobException { try { while (true) { currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps); + // in "run once" case, job scheduler will remove current job from the scheduler boolean isEarlyStopped = this.jobScheduler.runJob(jobProps, jobListener, currentJobLauncher); boolean isRetriggerEnabled = this.isRetriggeringEnabled(); if (isEarlyStopped && isRetriggerEnabled) { @@ -148,47 +161,62 @@ class HelixRetriggeringJobCallable implements Callable { } } - private void launchJobExecutionLauncherLoop() throws JobException { + /** + * <p> Launch a planning job. The actual job will be launched + * on task driver instance, which will handle the early-stop case + * by a single while-loop. + * + * @see {@link GobblinHelixJobTask#run()} for the task driver logic. + */ + private void runJobExecutionLauncher() throws JobException { try { - while (true) { - String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, - GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName()); - - GobblinHelixDistributeJobExecutionLauncher.Builder builder = GobblinConstructorUtils - .<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(new ClassAliasResolver( - GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr)); - - // Make a separate copy because we could update some of attributes in job properties (like adding planning id). - Properties jobPlanningProps = new Properties(); - jobPlanningProps.putAll(this.jobProps); - - // Inject planning id and start time - String planningId = JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX - + JobState.getJobNameFromProps(jobPlanningProps)); - jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, planningId); - jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, String.valueOf(System.currentTimeMillis())); - - builder.setSysProps(this.sysProps); - builder.setJobPlanningProps(jobPlanningProps); - builder.setManager(this.helixManager); - builder.setAppWorkDir(this.appWorkDir); - - try (Closer closer = Closer.create()) { - GobblinHelixDistributeJobExecutionLauncher launcher = builder.build(); - closer.register(launcher); - this.currentJobMonitor = launcher.launchJob(null); - ExecutionResult result = this.currentJobMonitor.get(); - boolean isEarlyStopped = ((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) result).isEarlyStopped(); - boolean isRetriggerEnabled = this.isRetriggeringEnabled(); - if (isEarlyStopped && isRetriggerEnabled) { - log.info("DistributeJob {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); - } else { - break; - } - currentJobMonitor = null; - } catch (Throwable t) { - throw new JobException("Failed to launch and run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t); + String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, + GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName()); + + // Check if any existing planning job is running + String jobName = jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); + Optional<String> planningJobIdFromStore = jobsMapping.getPlanningJobId(jobName); + + if (planningJobIdFromStore.isPresent()) { + String previousPlanningJobId = planningJobIdFromStore.get(); + if (HelixUtils.isJobFinished(previousPlanningJobId, previousPlanningJobId, this.helixManager)) { + log.info("Previous planning job {} has reached to the final state. Start a new one.", previousPlanningJobId); + } else { + log.info("Previous planning job {} has not finished yet. Skip it.", previousPlanningJobId); + return; } + } else { + log.info("Planning job for {} does not exist. First time run.", jobName); + } + + GobblinHelixDistributeJobExecutionLauncher.Builder builder = GobblinConstructorUtils + .<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor(new ClassAliasResolver( + GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr)); + + // Make a separate copy because we could update some of attributes in job properties (like adding planning id). + Properties jobPlanningProps = new Properties(); + jobPlanningProps.putAll(this.jobProps); + + // Inject planning id and start time + String planningId = JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX + + JobState.getJobNameFromProps(jobPlanningProps)); + jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, planningId); + jobPlanningProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_JOB_CREATE_TIME, String.valueOf(System.currentTimeMillis())); + + builder.setSysProps(this.sysProps); + builder.setJobPlanningProps(jobPlanningProps); + builder.setManager(this.helixManager); + builder.setAppWorkDir(this.appWorkDir); + + try (Closer closer = Closer.create()) { + GobblinHelixDistributeJobExecutionLauncher launcher = builder.build(); + closer.register(launcher); + this.jobsMapping.setPlanningJobId(jobName, planningId); + this.currentJobMonitor = launcher.launchJob(null); + this.currentJobMonitor.get(); + this.currentJobMonitor = null; + } catch (Throwable t) { + throw new JobException("Failed to launch and run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t); } } catch (Exception e) { log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 6fbc7c5..29539f0 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -173,6 +173,31 @@ public class HelixUtils { throw new TimeoutException("task driver wait time [" + timeoutInSeconds + " sec] is expired."); } + static boolean isJobFinished(String workflowName, String jobName, HelixManager helixManager) { + WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, workflowName); + if (workflowContext == null) { + // this workflow context doesn't exist, considered as finished. + return true; + } + + TaskState jobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(workflowName, jobName)); + switch (jobState) { + case STOPPED: + case FAILED: + case COMPLETED: + case ABORTED: + case TIMED_OUT: + return true; + default: + return false; + } + } + + static void deleteWorkflow (String workflowName, HelixManager helixManager, long timeOut) throws InterruptedException { + TaskDriver taskDriver = new TaskDriver(helixManager); + taskDriver.deleteAndWaitForCompletion(workflowName, timeOut); + } + static void handleJobTimeout(String workFlowName, String jobName, HelixManager helixManager, Object jobLauncher, JobListener jobListener) throws InterruptedException { try { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/11a1c46a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java index 155304a..2e98cfb 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java @@ -17,9 +17,7 @@ package org.apache.gobblin.cluster; -import java.io.IOException; import java.util.Map; -import java.util.Properties; import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; @@ -32,14 +30,14 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alias; import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite; +import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.runtime.util.StateStores; -import org.apache.gobblin.source.extractor.partition.Partitioner; -import org.apache.gobblin.util.PropertiesUtils; public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel { + private TaskFactory testJobFactory; + public TaskRunnerSuiteForJobFactoryTest(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) { super(builder); this.testJobFactory = new TestJobFactory(builder, this.metricContext); @@ -62,7 +60,7 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel @Override public Task createNewTask(TaskCallbackContext context) { return new TestHelixJobTask(context, - stateStores, + jobsMapping, builder, new GobblinHelixJobLauncherMetrics("launcherInJobFactory", metricContext, 5), new GobblinHelixJobTask.GobblinHelixJobTaskMetrics(metricContext, 5)); @@ -71,25 +69,16 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel public class TestHelixJobTask extends GobblinHelixJobTask { public TestHelixJobTask(TaskCallbackContext context, - StateStores stateStores, + HelixJobsMapping jobsMapping, TaskRunnerSuiteBase.Builder builder, GobblinHelixJobLauncherMetrics launcherMetrics, GobblinHelixJobTaskMetrics jobTaskMetrics) { super(context, - stateStores, + jobsMapping, builder, launcherMetrics, jobTaskMetrics); } - - //TODO: change below to Helix UserConentStore - protected void setResultToUserContent(Map<String, String> keyValues) throws IOException { - Map<String, String> customizedKVs = Maps.newHashMap(keyValues); - customizedKVs.put("customizedKey_1", "customizedVal_1"); - customizedKVs.put("customizedKey_2", "customizedVal_2"); - customizedKVs.put("customizedKey_3", "customizedVal_3"); - super.setResultToUserContent(customizedKVs); - } } @Slf4j @@ -99,21 +88,23 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel super(builder); } - //TODO: change below to Helix UserConentStore protected DistributeJobResult getResultFromUserContent() { DistributeJobResult rst = super.getResultFromUserContent(); - Properties properties = rst.getProperties().get(); - Assert.assertTrue(properties.containsKey(Partitioner.IS_EARLY_STOPPED)); - Assert.assertFalse(PropertiesUtils.getPropAsBoolean(properties, Partitioner.IS_EARLY_STOPPED, "false")); + Assert.assertTrue(!rst.isEarlyStopped()); + String jobName = this.jobPlanningProps.getProperty(ConfigurationKeys.JOB_NAME_KEY); + String planningJobId = this.jobPlanningProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); - Assert.assertTrue(properties.getProperty("customizedKey_1").equals("customizedVal_1")); - Assert.assertTrue(properties.getProperty("customizedKey_2").equals("customizedVal_2")); - Assert.assertTrue(properties.getProperty("customizedKey_3").equals("customizedVal_3")); + try { + String planningJobFromStore = this.jobsMapping.getPlanningJobId(jobName).get(); + Assert.assertTrue(planningJobFromStore.equals(planningJobId)); + + } catch (Exception e) { + Assert.fail(e.toString()); + } IntegrationJobFactorySuite.completed.set(true); return rst; } - @Alias("TestDistributedExecutionLauncherBuilder") public static class Builder extends GobblinHelixDistributeJobExecutionLauncher.Builder { public TestDistributedExecutionLauncher build() throws Exception {
