Repository: incubator-gobblin Updated Branches: refs/heads/master 6232b416a -> 45fc9dfca
[GOBBLIN-535] Add second hop for distributed job launcher Closes #2398 from yukuai518/hop2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/45fc9dfc Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/45fc9dfc Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/45fc9dfc Branch: refs/heads/master Commit: 45fc9dfca69dd8479504ee6b9878d196c722a329 Parents: 6232b41 Author: Kuai Yu <[email protected]> Authored: Fri Jul 13 17:36:51 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Jul 13 17:36:51 2018 -0700 ---------------------------------------------------------------------- ...blinHelixDistributeJobExecutionLauncher.java | 20 +++++- .../gobblin/cluster/GobblinHelixJobFactory.java | 17 +++-- .../cluster/GobblinHelixJobLauncher.java | 10 ++- .../gobblin/cluster/GobblinHelixJobTask.java | 74 ++++++++++++++------ .../gobblin/cluster/GobblinHelixTask.java | 13 ++-- .../gobblin/cluster/GobblinTaskRunner.java | 12 ++-- .../cluster/HelixRetriggeringJobCallable.java | 52 +++++++++++--- .../org/apache/gobblin/cluster/HelixUtils.java | 1 + .../gobblin/cluster/TaskRunnerSuiteBase.java | 24 +++++++ .../cluster/TaskRunnerSuiteThreadModel.java | 4 +- .../TaskRunnerSuiteForJobFactoryTest.java | 10 +-- .../suite/IntegrationJobFactorySuite.java | 3 +- .../gobblin/runtime/api/ExecutionResult.java | 2 +- 13 files changed, 184 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index 87613a6..7ef24fc 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -17,6 +17,7 @@ package org.apache.gobblin.cluster; +import java.io.Closeable; import java.io.IOException; import java.net.URI; import java.util.List; @@ -85,7 +86,7 @@ import org.apache.gobblin.util.PropertiesUtils; */ @Alpha @Slf4j -class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher { +class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher, Closeable { protected HelixManager helixManager; protected TaskDriver helixTaskDriver; protected Properties sysProperties; @@ -100,12 +101,14 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher private final long jobQueueDeleteTimeoutSeconds; + private boolean jobSubmitted; + public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception { this.helixManager = builder.manager; this.helixTaskDriver = new TaskDriver(this.helixManager); this.sysProperties = builder.sysProperties; this.jobProperties = builder.jobProperties; - + this.jobSubmitted = false; Config combined = ConfigUtils.propertiesToConfig(jobProperties) .withFallback(ConfigUtils.propertiesToConfig(sysProperties)); @@ -124,6 +127,17 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS); } + @Override + public void close() + throws IOException { + // we should delete the planning job at the end. + if (this.jobSubmitted) { + String planningName = getPlanningJobName(this.jobProperties); + log.info("[DELETE] workflow {} in the close.", planningName); + this.helixTaskDriver.delete(planningName); + } + } + @Setter public static class Builder { Properties sysProperties; @@ -191,6 +205,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher taskDriver, this.helixManager, this.jobQueueDeleteTimeoutSeconds); + this.jobSubmitted = true; } @Override @@ -232,6 +247,7 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher return getResultFromUserContent(); } catch (TimeoutException te) { helixTaskDriver.waitToStop(planningName, 10L); + log.info("[DELETE] workflow {} timeout.", planningName); this.helixTaskDriver.delete(planningName); this.helixTaskDriver.resume(planningName); log.info("stopped the queue, deleted the job"); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java index 2f7ced2..83821d4 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java @@ -39,11 +39,12 @@ import org.apache.gobblin.util.PathUtils; */ @Slf4j public class GobblinHelixJobFactory implements TaskFactory { - protected Config sysConfig; protected StateStores stateStores; - public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) { - this.sysConfig = builder.getConfig(); + protected TaskRunnerSuiteBase.Builder builder; + + private void initializeStateStore(TaskRunnerSuiteBase.Builder builder) { + Config sysConfig = builder.getConfig(); Path appWorkDir = builder.getAppWorkPath(); URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri(); Config stateStoreJobConfig = sysConfig @@ -56,8 +57,16 @@ public class GobblinHelixJobFactory implements TaskFactory { appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_JOB_STATE_DIR_NAME); } + public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) { + this.builder = builder; + // TODO: We can remove below initialization once Helix allow us to persist job resut in userContentStore + initializeStateStore(this.builder); + } + @Override public Task createNewTask(TaskCallbackContext context) { - return new GobblinHelixJobTask(context, this.sysConfig, stateStores); + return new GobblinHelixJobTask(context, + this.stateStores, + this.builder); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 39c6e5b..8794a34 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -45,6 +45,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -96,6 +97,7 @@ import org.apache.gobblin.util.SerializationUtils; * @author Yinan Li */ @Alpha +@Slf4j public class GobblinHelixJobLauncher extends AbstractJobLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobLauncher.class); @@ -136,8 +138,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { this.runningMap = runningMap; this.appWorkDir = appWorkDir; this.inputWorkUnitDir = new Path(appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME); - this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME + - Path.SEPARATOR + this.jobContext.getJobId()); + this.outputTaskStateDir = new Path(this.appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME + + Path.SEPARATOR + this.jobContext.getJobId()); this.helixQueueName = this.jobContext.getJobName(); this.jobResourceName = TaskUtil.getNamespacedJobName(this.helixQueueName, this.jobContext.getJobId()); @@ -149,7 +151,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { jobConfig = ConfigUtils.propertiesToConfig(jobProps); - this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig, GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS, + this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(jobConfig, + GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS, GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS); Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps) @@ -211,6 +214,7 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { protected void executeCancellation() { if (this.jobSubmitted) { try { + log.info("[DELETE] workflow {}", this.helixQueueName); this.helixTaskDriver.delete(this.helixQueueName); } catch (IllegalArgumentException e) { LOGGER.warn("Failed to cancel job {} in Helix", this.jobContext.getJobId(), e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java index f60852e..9ede090 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java @@ -18,9 +18,14 @@ package org.apache.gobblin.cluster; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixManager; import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskConfig; @@ -28,61 +33,85 @@ import org.apache.helix.task.TaskResult; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.io.Closer; import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.extractor.partition.Partitioner; import org.apache.gobblin.util.ConfigUtils; /** - * An implementation of Helix's {@link org.apache.helix.task.Task} that runs original {@link GobblinHelixJobLauncher} + * An implementation of Helix's {@link org.apache.helix.task.Task} that runs original {@link GobblinHelixJobLauncher}. */ @Slf4j public class GobblinHelixJobTask implements Task { private final TaskConfig taskConfig; - private Config sysConfig; - private Properties jobConfig; - private StateStores stateStores; - private String planningJobId; - + private final Config sysConfig; + private final Properties jobPlusSysConfig; + private final StateStores stateStores; + private final String planningJobId; + private final HelixManager helixManager; + private final Path appWorkDir; + private final List<? extends Tag<?>> metadataTags; + + private GobblinHelixJobLauncher launcher; public GobblinHelixJobTask(TaskCallbackContext context, - Config sysConfig, - StateStores stateStores) { + StateStores stateStores, + TaskRunnerSuiteBase.Builder builder) { this.taskConfig = context.getTaskConfig(); - this.sysConfig = sysConfig; - this.jobConfig = ConfigUtils.configToProperties(sysConfig); + this.sysConfig = builder.getConfig(); + this.helixManager = builder.getHelixManager(); + this.jobPlusSysConfig = ConfigUtils.configToProperties(sysConfig); Map<String, String> configMap = this.taskConfig.getConfigMap(); for (Map.Entry<String, String> entry: configMap.entrySet()) { if (entry.getKey().startsWith(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX)) { String key = entry.getKey().replaceFirst(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX, ""); - jobConfig.put(key, entry.getValue()); + jobPlusSysConfig.put(key, entry.getValue()); } } - if (!jobConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) { + if (!jobPlusSysConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) { throw new RuntimeException("Job doesn't have plannning ID"); } - this.planningJobId = jobConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); + this.planningJobId = jobPlusSysConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); this.stateStores = stateStores; + this.appWorkDir = builder.getAppWorkPath(); + this.metadataTags = Tag.fromMap(new ImmutableMap.Builder<String, Object>() + .put(GobblinClusterMetricTagNames.APPLICATION_NAME, builder.getApplicationName()) + .put(GobblinClusterMetricTagNames.APPLICATION_ID, builder.getApplicationId()) + .build()); } - @Override - public TaskResult run() { - log.info("We will run planning job " + this.planningJobId); - // TODO: We should run GobblinHelixJobLauncher#launchJob() here + private GobblinHelixJobLauncher createJobLauncher() + throws Exception { + return new GobblinHelixJobLauncher(jobPlusSysConfig, + this.helixManager, + this.appWorkDir, + this.metadataTags, + new ConcurrentHashMap<>()); + } - try { + @Override + public TaskResult run() { + log.info("Running planning job {}", this.planningJobId); + // Launch the job + try (Closer closer = Closer.create()) { + this.launcher = createJobLauncher(); + //TODO: we will provide additional listener + closer.register(launcher).launchJob(null); setResultToUserContent(ImmutableMap.of(Partitioner.IS_EARLY_STOPPED, "false")); - } catch (IOException e) { - return new TaskResult(TaskResult.Status.FAILED, "State store cannot be persisted for job " + planningJobId); + } catch (Exception e) { + return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils + .getFullStackTrace(e)); } return new TaskResult(TaskResult.Status.COMPLETED, ""); } @@ -102,6 +131,9 @@ public class GobblinHelixJobTask implements Task { @Override public void cancel() { - // TODO: We should delete the real job. + log.info("Cancelling planning job {}", this.planningJobId); + if (launcher != null) { + launcher.executeCancellation(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index e651b8e..a2516eb 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -26,13 +26,13 @@ import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskResult; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.slf4j.MDC; import com.google.common.base.Throwables; import com.google.common.io.Closer; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.TaskState; @@ -58,14 +58,14 @@ import org.apache.gobblin.util.Id; * </p> */ @Alpha +@Slf4j public class GobblinHelixTask implements Task { - private static final Logger _logger = LoggerFactory.getLogger(GobblinHelixTask.class); - private final TaskConfig taskConfig; private String jobName; private String jobId; private String jobKey; + private String taskId; private Path workUnitFilePath; private SingleTask task; @@ -90,22 +90,25 @@ public class GobblinHelixTask implements Task { this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY); this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY); this.jobKey = Long.toString(Id.parse(this.jobId).getSequence()); + this.taskId = configMap.get(ConfigurationKeys.TASK_ID_KEY); this.workUnitFilePath = new Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH)); } @Override public TaskResult run() { + log.info("Actual task {} started.", this.taskId); try (Closer closer = Closer.create()) { closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName)); closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey)); this.task.run(); + log.info("Actual task {} finished.", this.taskId); return new TaskResult(TaskResult.Status.COMPLETED, ""); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return new TaskResult(TaskResult.Status.CANCELED, ""); } catch (Throwable t) { - _logger.error("GobblinHelixTask failed due to " + t.getMessage(), t); + log.error("GobblinHelixTask " + taskId + " failed due to " + t.getMessage(), t); return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(t)); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index fb7136d..9d0e6cc 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -89,9 +89,10 @@ import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER * {@link org.apache.gobblin.source.workunit.WorkUnit}s. * * <p> - * This class serves as a Helix participant and it uses a {@link HelixManager} to work with Helix. - * This class also uses the Helix task execution framework and {@link GobblinHelixTaskFactory} class - * for creating {@link GobblinHelixTask}s that Helix manages to run Gobblin data ingestion tasks. + * This class presents a Helix participant and uses a {@link HelixManager} to communicate with Helix. + * It also uses Helix task execution framework and {@link GobblinHelixTaskFactory} class to generate + * {@link GobblinHelixTask}s which handles real Gobblin tasks. All the Helix related task framework is + * encapsulated in {@link TaskRunnerSuiteBase}. * </p> * * <p> @@ -174,7 +175,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge { TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath) .setContainerMetrics(this.containerMetrics) .setFileSystem(this.fs) - .setHelixManager(this.helixManager).build(); + .setHelixManager(this.helixManager) + .setApplicationId(applicationId) + .setApplicationName(applicationName) + .build(); this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap()); this.metrics = suite.getTaskMetrics(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java index ce3619b..7b9fd3c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java @@ -23,6 +23,8 @@ import java.util.concurrent.Callable; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; +import com.google.common.io.Closer; + import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; @@ -38,7 +40,31 @@ import org.apache.gobblin.util.reflection.GobblinConstructorUtils; /** - * A {@link Callable} that runs {@link JobLauncher} multiple times iff re-triggering is enabled and job stops early. + * A {@link Callable} that can run a given job multiple times iff: + * 1) Re-triggering is enabled and + * 2) Job stops early. + * + * Moreover based on the job properties, a job can be processed immediately (non-distributed) or forwarded to a remote + * node (distributed) for handling. Details are illustrated as follows: + * + * <p> + * If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is false, the job will be handled + * by {@link HelixRetriggeringJobCallable#launchJobLauncherLoop()}, which simply submits the job to Helix for execution. + * + * See {@link GobblinHelixJobLauncher} for job launcher details. + * </p> + * + * <p> + * If {@link GobblinClusterConfigurationKeys#DISTRIBUTED_JOB_LAUNCHER_ENABLED} is true, the job will be handled + * by {@link HelixRetriggeringJobCallable#launchJobExecutionLauncherLoop()}}. It will first create a planning job with + * {@link GobblinTaskRunner#GOBBLIN_JOB_FACTORY_NAME} pre-configured, so that Helix can forward this planning job to + * any nodes that has implemented the Helix task factory model matching the same name. See {@link TaskRunnerSuiteThreadModel} + * implementation of how task factory model is setup. + * + * Once the planning job reaches to the remote end, it will be handled by {@link GobblinHelixJobTask} which is + * created by {@link GobblinHelixJobTask}. The actual handling is similar to the non-distributed mode, where + * {@link GobblinHelixJobLauncher} is invoked. + * </p> */ @Slf4j @Alpha @@ -123,16 +149,22 @@ class HelixRetriggeringJobCallable implements Callable { builder.setManager(this.helixManager); builder.setAppWorkDir(this.appWorkDir); - this.currentJobMonitor = builder.build().launchJob(null); - ExecutionResult result = this.currentJobMonitor.get(); - boolean isEarlyStopped = ((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) result).isEarlyStopped(); - boolean isRetriggerEnabled = this.isRetriggeringEnabled(); - if (isEarlyStopped && isRetriggerEnabled) { - log.info("DistributeJob {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); - } else { - break; + try (Closer closer = Closer.create()) { + GobblinHelixDistributeJobExecutionLauncher launcher = builder.build(); + closer.register(launcher); + this.currentJobMonitor = launcher.launchJob(null); + ExecutionResult result = this.currentJobMonitor.get(); + boolean isEarlyStopped = ((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) result).isEarlyStopped(); + boolean isRetriggerEnabled = this.isRetriggeringEnabled(); + if (isEarlyStopped && isRetriggerEnabled) { + log.info("DistributeJob {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); + } else { + break; + } + currentJobMonitor = null; + } catch (Throwable t) { + throw new JobException("Failed to launch and run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t); } - currentJobMonitor = null; } } catch (Exception e) { log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index ea4e6f7..8b9d5af 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -99,6 +99,7 @@ public class HelixUtils { WorkflowConfig workflowConfig = helixTaskDriver.getWorkflowConfig(helixManager, queueName); + log.info("[DELETE] workflow {} in the beginning", queueName); // If the queue is present, but in delete state then wait for cleanup before recreating the queue if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) { new TaskDriver(helixManager).deleteAndWaitForCompletion(queueName, jobQueueDeleteTimeoutSeconds); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java index 03d4d42..e65b968 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java @@ -53,11 +53,15 @@ public abstract class TaskRunnerSuiteBase { protected TaskFactory taskFactory; protected TaskFactory jobFactory; protected MetricContext metricContext; + protected String applicationId; + protected String applicationName; protected StandardMetricsBridge.StandardMetrics taskMetrics; protected List<Service> services = Lists.newArrayList(); protected TaskRunnerSuiteBase(Builder builder) { this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(builder.config), this.getClass()); + this.applicationId = builder.getApplicationId(); + this.applicationName = builder.getApplicationName(); } protected MetricContext getMetricContext() { @@ -70,6 +74,14 @@ public abstract class TaskRunnerSuiteBase { protected abstract List<Service> getServices(); + protected String getApplicationId() { + return this.applicationId; + } + + protected String getApplicationName() { + return this.applicationName; + } + @Getter public static class Builder { private Config config; @@ -77,6 +89,8 @@ public abstract class TaskRunnerSuiteBase { private Optional<ContainerMetrics> containerMetrics; private FileSystem fs; private Path appWorkPath; + private String applicationId; + private String applicationName; public Builder(Config config) { this.config = config; @@ -87,6 +101,16 @@ public abstract class TaskRunnerSuiteBase { return this; } + public Builder setApplicationName(String applicationName) { + this.applicationName = applicationName; + return this; + } + + public Builder setApplicationId(String applicationId) { + this.applicationId = applicationId; + return this; + } + public Builder setContainerMetrics(Optional<ContainerMetrics> containerMetrics) { this.containerMetrics = containerMetrics; return this; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java index 4f3a1e0..cddf519 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java @@ -47,7 +47,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { TaskRunnerSuiteThreadModel(TaskRunnerSuiteBase.Builder builder) { super(builder); this.taskExecutor = new TaskExecutor(ConfigUtils.configToProperties(builder.getConfig())); - this.taskFactory = getInProcessTaskFactory(taskExecutor, builder); + this.taskFactory = generateTaskFactory(taskExecutor, builder); this.jobFactory = new GobblinHelixJobFactory(builder); this.taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext); } @@ -70,7 +70,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { return this.services; } - private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor, Builder builder) { + private TaskFactory generateTaskFactory(TaskExecutor taskExecutor, Builder builder) { Properties properties = ConfigUtils.configToProperties(builder.getConfig()); URI rootPathUri = PathUtils.getRootPath(builder.getAppWorkPath()).toUri(); Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties) http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java index 1fdcda5..476747d 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java @@ -27,7 +27,6 @@ import org.apache.helix.task.TaskFactory; import org.testng.Assert; import com.google.common.collect.Maps; -import com.typesafe.config.Config; import lombok.extern.slf4j.Slf4j; @@ -56,19 +55,20 @@ public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel public class TestJobFactory extends GobblinHelixJobFactory { public TestJobFactory(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) { super (builder); + this.builder = builder; } @Override public Task createNewTask(TaskCallbackContext context) { - return new TestHelixJobTask(context, this.sysConfig, stateStores); + return new TestHelixJobTask(context, stateStores, builder); } } public class TestHelixJobTask extends GobblinHelixJobTask { public TestHelixJobTask(TaskCallbackContext context, - Config sysConfig, - StateStores stateStores) { - super(context, sysConfig, stateStores); + StateStores stateStores, + TaskRunnerSuiteBase.Builder builder) { + super(context, stateStores, builder); } //TODO: change below to Helix UserConentStore http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java index 0487c9c..5166395 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java @@ -43,7 +43,8 @@ public class IntegrationJobFactorySuite extends IntegrationBasicSuite { protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) { Config newConfig = ConfigFactory.parseMap(ImmutableMap.of( GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true, - GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, "TestDistributedExecutionLauncherBuilder")); + GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, "TestDistributedExecutionLauncherBuilder")) + .withFallback(rawJobConfig); return ImmutableMap.of("HelloWorldJob", newConfig); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/45fc9dfc/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java index 3f77493..3b990f9 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/ExecutionResult.java @@ -18,7 +18,7 @@ package org.apache.gobblin.runtime.api; /** - * An object which describes the result after job completion. This can be retrieved by {@link JobExecutionFuture#get()} + * An object which describes the result after job completion. This can be retrieved by {@link JobExecutionMonitor#get()} * * @see JobExecutionResult as a derived class. */
