Repository: incubator-gobblin Updated Branches: refs/heads/master 95e15f003 -> 0eeccde1a
[GOBBLIN-336] Encapsulate the non-Helix specific task execution logic Put the logic in its own class. Also changed: * Use a try-with statement to close the global broker. * Fix a Helix warning: ERROR is replaced with FAILED. Testing: The integration test org.apache.gobblin.cluster.ClusterIntegrationTest passed. Also inlined a method. The old code has a bug: the globalBroker variable will stay null. Closes #2193 from HappyRay/encapsulate-non-helix- job-launch-logic Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0eeccde1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0eeccde1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0eeccde1 Branch: refs/heads/master Commit: 0eeccde1a8251514874b515c90a81c3c55aaf675 Parents: 95e15f0 Author: Ray Yang <[email protected]> Authored: Sat Dec 9 18:13:34 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Sat Dec 9 18:14:41 2017 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinHelixTask.java | 114 ++++---------- .../apache/gobblin/cluster/SingleHelixTask.java | 148 +++++++++++++++++++ 2 files changed, 175 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0eeccde1/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index 808914d..8166cfe 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -18,7 +18,7 @@ package org.apache.gobblin.cluster; import java.io.IOException; -import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -31,26 +31,16 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; import com.google.common.base.Throwables; -import com.google.common.collect.Lists; import com.google.common.io.Closer; -import com.typesafe.config.ConfigFactory; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.broker.SharedResourcesBrokerFactory; -import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; -import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; -import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.AbstractJobLauncher; -import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; -import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.TaskState; import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.Id; -import org.apache.gobblin.util.JobLauncherUtils; -import org.apache.gobblin.util.SerializationUtils; /** @@ -67,113 +57,63 @@ import org.apache.gobblin.util.SerializationUtils; * {@link org.apache.gobblin.runtime.Task}(s), it persists the {@link TaskState} of each {@link org.apache.gobblin.runtime.Task} to * a file that will be collected by the {@link GobblinHelixJobLauncher} later upon completion of the job. * </p> - * - * @author Yinan Li */ @Alpha public class GobblinHelixTask implements Task { - private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixTask.class); + private static final Logger _logger = LoggerFactory.getLogger(GobblinHelixTask.class); private final TaskConfig taskConfig; - // An empty JobState instance that will be filled with values read from the serialized JobState - private final JobState jobState = new JobState(); - private final String jobName; - private final String jobId; - private final String jobKey; - - private final FileSystem fs; - private final StateStores stateStores; - private final TaskAttemptBuilder taskAttemptBuilder; + private String jobName; + private String jobId; + private String jobKey; + private Path workUnitFilePath; - private GobblinMultiTaskAttempt taskAttempt; + private SingleHelixTask task; public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem fs, Path appWorkDir, TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) throws IOException { this.taskConfig = taskCallbackContext.getTaskConfig(); - this.stateStores = stateStores; - this.taskAttemptBuilder = taskAttemptBuilder; - this.jobName = this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY); - this.jobId = this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_ID_KEY); - this.jobKey = Long.toString(Id.parse(this.jobId).getSequence()); + getInfoFromTaskConfig(); + + Path jobStateFilePath = constructJobStateFilePath(appWorkDir); - this.fs = fs; + this.task = + new SingleHelixTask(this.jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores); + } - Path jobStateFilePath = new Path(appWorkDir, this.jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME); - SerializationUtils.deserializeState(this.fs, jobStateFilePath, this.jobState); + private Path constructJobStateFilePath(Path appWorkDir) { + return new Path(appWorkDir, this.jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME); + } + private void getInfoFromTaskConfig() { + Map<String, String> configMap = this.taskConfig.getConfigMap(); + this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY); + this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY); + this.jobKey = Long.toString(Id.parse(this.jobId).getSequence()); + this.workUnitFilePath = new Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH)); } @Override public TaskResult run() { - SharedResourcesBroker<GobblinScopeTypes> globalBroker = null; try (Closer closer = Closer.create()) { closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName)); closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey)); - Path workUnitFilePath = - new Path(this.taskConfig.getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH)); - - String fileName = workUnitFilePath.getName(); - String storeName = workUnitFilePath.getParent().getName(); - WorkUnit workUnit; - - if (workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) { - workUnit = stateStores.mwuStateStore.getAll(storeName, fileName).get(0); - } else { - workUnit = stateStores.wuStateStore.getAll(storeName, fileName).get(0); - } - - // The list of individual WorkUnits (flattened) to run - List<WorkUnit> workUnits = Lists.newArrayList(); - - if (workUnit instanceof MultiWorkUnit) { - // Flatten the MultiWorkUnit so the job configuration properties can be added to each individual WorkUnits - List<WorkUnit> flattenedWorkUnits = - JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits()); - workUnits.addAll(flattenedWorkUnits); - } else { - workUnits.add(workUnit); - } - - globalBroker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker( - ConfigFactory.parseProperties(this.jobState.getProperties()), GobblinScopeTypes.GLOBAL.defaultScopeInstance()); - SharedResourcesBroker<GobblinScopeTypes> jobBroker = - globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())).build(); - - this.taskAttempt = this.taskAttemptBuilder.build(workUnits.iterator(), this.jobId, this.jobState, jobBroker); - this.taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE); - return new TaskResult(TaskResult.Status.COMPLETED, String.format("completed tasks: %d", workUnits.size())); + int workUnitSize = this.task.run(); + return new TaskResult(TaskResult.Status.COMPLETED, String.format("completed tasks: %d", workUnitSize)); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return new TaskResult(TaskResult.Status.CANCELED, ""); } catch (Throwable t) { - LOGGER.error("GobblinHelixTask failed due to " + t.getMessage(), t); - return new TaskResult(TaskResult.Status.ERROR, Throwables.getStackTraceAsString(t)); - } finally { - if (globalBroker != null) { - try { - globalBroker.close(); - } catch (IOException ioe) { - LOGGER.error("Could not close shared resources broker.", ioe); - } - } + _logger.error("GobblinHelixTask failed due to " + t.getMessage(), t); + return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(t)); } } @Override public void cancel() { - if (this.taskAttempt != null) { - try { - LOGGER.info("Task cancelled: Shutdown starting for tasks with jobId: {}", this.jobId); - this.taskAttempt.shutdownTasks(); - LOGGER.info("Task cancelled: Shutdown complete for tasks with jobId: {}", this.jobId); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while shutting down task with jobId: " + this.jobId, e); - } - } else { - LOGGER.error("Task cancelled but taskAttempt is null, so ignoring."); - } + this.task.cancel(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0eeccde1/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java new file mode 100644 index 0000000..9817a4f --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.runtime.AbstractJobLauncher; +import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.util.StateStores; +import org.apache.gobblin.source.workunit.MultiWorkUnit; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.JobLauncherUtils; +import org.apache.gobblin.util.SerializationUtils; + + +public class SingleHelixTask { + + private static final Logger _logger = LoggerFactory.getLogger(SingleHelixTask.class); + + private GobblinMultiTaskAttempt _taskattempt; + private String _jobId; + private Path _workUnitFilePath; + private Path _jobStateFilePath; + private FileSystem _fs; + private TaskAttemptBuilder _taskAttemptBuilder; + private StateStores _stateStores; + + SingleHelixTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs, + TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) { + _jobId = jobId; + _workUnitFilePath = workUnitFilePath; + _jobStateFilePath = jobStateFilePath; + _fs = fs; + _taskAttemptBuilder = taskAttemptBuilder; + _stateStores = stateStores; + } + + /** + * + * @return the number of work-units processed + * @throws IOException + * @throws InterruptedException + */ + public int run() + throws IOException, InterruptedException { + List<WorkUnit> workUnits = getWorkUnits(); + int workUnitSize = workUnits.size(); + + JobState jobState = getJobState(); + Config jobConfig = getConfigFromJobState(jobState); + + try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory + .createDefaultTopLevelBroker(jobConfig, GobblinScopeTypes.GLOBAL.defaultScopeInstance())) { + SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(jobState, globalBroker); + + _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, jobState, jobBroker); + _taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE); + return workUnitSize; + } + } + + private SharedResourcesBroker<GobblinScopeTypes> getJobBroker(JobState jobState, + SharedResourcesBroker<GobblinScopeTypes> globalBroker) { + return globalBroker.newSubscopedBuilder(new JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build(); + } + + private Config getConfigFromJobState(JobState jobState) { + Properties jobProperties = jobState.getProperties(); + return ConfigFactory.parseProperties(jobProperties); + } + + private JobState getJobState() + throws java.io.IOException { + JobState jobState = new JobState(); + SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState); + return jobState; + } + + private List<WorkUnit> getWorkUnits() + throws IOException { + String fileName = _workUnitFilePath.getName(); + String storeName = _workUnitFilePath.getParent().getName(); + WorkUnit workUnit; + + if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) { + workUnit = _stateStores.mwuStateStore.getAll(storeName, fileName).get(0); + } else { + workUnit = _stateStores.wuStateStore.getAll(storeName, fileName).get(0); + } + + // The list of individual WorkUnits (flattened) to run + List<WorkUnit> workUnits = Lists.newArrayList(); + + if (workUnit instanceof MultiWorkUnit) { + // Flatten the MultiWorkUnit so the job configuration properties can be added to each individual WorkUnits + List<WorkUnit> flattenedWorkUnits = JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits()); + workUnits.addAll(flattenedWorkUnits); + } else { + workUnits.add(workUnit); + } + return workUnits; + } + + public void cancel() { + if (_taskattempt != null) { + try { + _logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId); + _taskattempt.shutdownTasks(); + _logger.info("Task cancelled: Shutdown complete for tasks with jobId: {}", _jobId); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while shutting down task with jobId: " + _jobId, e); + } + } else { + _logger.error("Task cancelled but _taskattempt is null, so ignoring."); + } + } +}
