Repository: incubator-gobblin Updated Branches: refs/heads/master ec2fe0487 -> 1fe5f952a
[GOBBLIN-336] Rename SingleHelixTask to SingleTask The class has no reference to Helix. It's used in the child process. Plan to create a new class named SingleHelixTask in the next PR to represent the Helix task instance. Testing: Integration tests passed. The class has no reference to Helix. It's used in the child process. Plan to create a new class named SingleHelixTask in the next PR to represent the Helix task instance. Testing: Integration tests passed. Closes #2218 from HappyRay/rename-helix-single- task-class Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1fe5f952 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1fe5f952 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1fe5f952 Branch: refs/heads/master Commit: 1fe5f952a71673646f17ccb3214769611f3c33f3 Parents: ec2fe04 Author: Ray Yang <[email protected]> Authored: Mon Dec 18 14:52:41 2017 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Mon Dec 18 14:52:41 2017 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinHelixTask.java | 8 +- .../apache/gobblin/cluster/SingleHelixTask.java | 140 ------------------- .../org/apache/gobblin/cluster/SingleTask.java | 140 +++++++++++++++++++ .../gobblin/cluster/SingleTaskRunner.java | 7 +- 4 files changed, 148 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1fe5f952/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index bf74f17..6a6e60d 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -69,7 +69,7 @@ public class GobblinHelixTask implements Task { private String jobKey; private Path workUnitFilePath; - private SingleHelixTask task; + private SingleTask task; public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem fs, Path appWorkDir, TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) @@ -81,7 +81,8 @@ public class GobblinHelixTask implements Task { Path jobStateFilePath = constructJobStateFilePath(appWorkDir); this.task = - new SingleHelixTask(this.jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, stateStores); + new SingleTask(this.jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, + stateStores); } private Path constructJobStateFilePath(Path appWorkDir) { @@ -93,7 +94,8 @@ public class GobblinHelixTask implements Task { this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY); this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY); this.jobKey = Long.toString(Id.parse(this.jobId).getSequence()); - this.workUnitFilePath = new Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH)); + this.workUnitFilePath = + new Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1fe5f952/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java deleted file mode 100644 index 04f9253..0000000 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.cluster; - -import java.io.IOException; -import java.util.List; -import java.util.Properties; - -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import org.apache.gobblin.broker.SharedResourcesBrokerFactory; -import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; -import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; -import org.apache.gobblin.broker.iface.SharedResourcesBroker; -import org.apache.gobblin.runtime.AbstractJobLauncher; -import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; -import org.apache.gobblin.runtime.JobState; -import org.apache.gobblin.runtime.util.StateStores; -import org.apache.gobblin.source.workunit.MultiWorkUnit; -import org.apache.gobblin.source.workunit.WorkUnit; -import org.apache.gobblin.util.JobLauncherUtils; -import org.apache.gobblin.util.SerializationUtils; - - -public class SingleHelixTask { - - private static final Logger _logger = LoggerFactory.getLogger(SingleHelixTask.class); - - private GobblinMultiTaskAttempt _taskattempt; - private String _jobId; - private Path _workUnitFilePath; - private Path _jobStateFilePath; - private FileSystem _fs; - private TaskAttemptBuilder _taskAttemptBuilder; - private StateStores _stateStores; - - SingleHelixTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs, - TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) { - _jobId = jobId; - _workUnitFilePath = workUnitFilePath; - _jobStateFilePath = jobStateFilePath; - _fs = fs; - _taskAttemptBuilder = taskAttemptBuilder; - _stateStores = stateStores; - } - - public void run() - throws IOException, InterruptedException { - List<WorkUnit> workUnits = getWorkUnits(); - - JobState jobState = getJobState(); - Config jobConfig = getConfigFromJobState(jobState); - - try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory - .createDefaultTopLevelBroker(jobConfig, GobblinScopeTypes.GLOBAL.defaultScopeInstance())) { - SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(jobState, globalBroker); - - _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, jobState, jobBroker); - _taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE); - } - } - - private SharedResourcesBroker<GobblinScopeTypes> getJobBroker(JobState jobState, - SharedResourcesBroker<GobblinScopeTypes> globalBroker) { - return globalBroker.newSubscopedBuilder(new JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build(); - } - - private Config getConfigFromJobState(JobState jobState) { - Properties jobProperties = jobState.getProperties(); - return ConfigFactory.parseProperties(jobProperties); - } - - private JobState getJobState() - throws java.io.IOException { - JobState jobState = new JobState(); - SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState); - return jobState; - } - - private List<WorkUnit> getWorkUnits() - throws IOException { - String fileName = _workUnitFilePath.getName(); - String storeName = _workUnitFilePath.getParent().getName(); - WorkUnit workUnit; - - if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) { - workUnit = _stateStores.mwuStateStore.getAll(storeName, fileName).get(0); - } else { - workUnit = _stateStores.wuStateStore.getAll(storeName, fileName).get(0); - } - - // The list of individual WorkUnits (flattened) to run - List<WorkUnit> workUnits = Lists.newArrayList(); - - if (workUnit instanceof MultiWorkUnit) { - // Flatten the MultiWorkUnit so the job configuration properties can be added to each individual WorkUnits - List<WorkUnit> flattenedWorkUnits = JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits()); - workUnits.addAll(flattenedWorkUnits); - } else { - workUnits.add(workUnit); - } - return workUnits; - } - - public void cancel() { - if (_taskattempt != null) { - try { - _logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId); - _taskattempt.shutdownTasks(); - _logger.info("Task cancelled: Shutdown complete for tasks with jobId: {}", _jobId); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while shutting down task with jobId: " + _jobId, e); - } - } else { - _logger.error("Task cancelled but _taskattempt is null, so ignoring."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1fe5f952/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java new file mode 100644 index 0000000..da0c633 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.IOException; +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.runtime.AbstractJobLauncher; +import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.util.StateStores; +import org.apache.gobblin.source.workunit.MultiWorkUnit; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.JobLauncherUtils; +import org.apache.gobblin.util.SerializationUtils; + + +public class SingleTask { + + private static final Logger _logger = LoggerFactory.getLogger(SingleTask.class); + + private GobblinMultiTaskAttempt _taskattempt; + private String _jobId; + private Path _workUnitFilePath; + private Path _jobStateFilePath; + private FileSystem _fs; + private TaskAttemptBuilder _taskAttemptBuilder; + private StateStores _stateStores; + + SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, FileSystem fs, + TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) { + _jobId = jobId; + _workUnitFilePath = workUnitFilePath; + _jobStateFilePath = jobStateFilePath; + _fs = fs; + _taskAttemptBuilder = taskAttemptBuilder; + _stateStores = stateStores; + } + + public void run() + throws IOException, InterruptedException { + List<WorkUnit> workUnits = getWorkUnits(); + + JobState jobState = getJobState(); + Config jobConfig = getConfigFromJobState(jobState); + + try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = SharedResourcesBrokerFactory + .createDefaultTopLevelBroker(jobConfig, GobblinScopeTypes.GLOBAL.defaultScopeInstance())) { + SharedResourcesBroker<GobblinScopeTypes> jobBroker = getJobBroker(jobState, globalBroker); + + _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, jobState, jobBroker); + _taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE); + } + } + + private SharedResourcesBroker<GobblinScopeTypes> getJobBroker(JobState jobState, + SharedResourcesBroker<GobblinScopeTypes> globalBroker) { + return globalBroker.newSubscopedBuilder(new JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build(); + } + + private Config getConfigFromJobState(JobState jobState) { + Properties jobProperties = jobState.getProperties(); + return ConfigFactory.parseProperties(jobProperties); + } + + private JobState getJobState() + throws java.io.IOException { + JobState jobState = new JobState(); + SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState); + return jobState; + } + + private List<WorkUnit> getWorkUnits() + throws IOException { + String fileName = _workUnitFilePath.getName(); + String storeName = _workUnitFilePath.getParent().getName(); + WorkUnit workUnit; + + if (_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION)) { + workUnit = _stateStores.mwuStateStore.getAll(storeName, fileName).get(0); + } else { + workUnit = _stateStores.wuStateStore.getAll(storeName, fileName).get(0); + } + + // The list of individual WorkUnits (flattened) to run + List<WorkUnit> workUnits = Lists.newArrayList(); + + if (workUnit instanceof MultiWorkUnit) { + // Flatten the MultiWorkUnit so the job configuration properties can be added to each individual WorkUnits + List<WorkUnit> flattenedWorkUnits = JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits()); + workUnits.addAll(flattenedWorkUnits); + } else { + workUnits.add(workUnit); + } + return workUnits; + } + + public void cancel() { + if (_taskattempt != null) { + try { + _logger.info("Task cancelled: Shutdown starting for tasks with jobId: {}", _jobId); + _taskattempt.shutdownTasks(); + _logger.info("Task cancelled: Shutdown complete for tasks with jobId: {}", _jobId); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while shutting down task with jobId: " + _jobId, e); + } + } else { + _logger.error("Task cancelled but _taskattempt is null, so ignoring."); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1fe5f952/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java index 20226af..9cc4733 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java @@ -54,7 +54,7 @@ class SingleTaskRunner { private final String workUnitFilePath; private final Config clusterConfig; private final Path appWorkPath; - private SingleHelixTask task; + private SingleTask task; private TaskExecutor taskExecutor; private GobblinHelixTaskStateTracker taskStateTracker; private ServiceManager serviceManager; @@ -114,9 +114,8 @@ class SingleTaskRunner { final TaskAttemptBuilder taskAttemptBuilder = getTaskAttemptBuilder(stateStores); - this.task = - new SingleHelixTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs, - taskAttemptBuilder, stateStores); + this.task = new SingleTask(this.jobId, new Path(this.workUnitFilePath), jobStateFilePath, fs, + taskAttemptBuilder, stateStores); } private TaskAttemptBuilder getTaskAttemptBuilder(final StateStores stateStores) {
