Repository: incubator-gobblin
Updated Branches:
  refs/heads/master ec2fe0487 -> 1fe5f952a


[GOBBLIN-336] Rename SingleHelixTask to SingleTask

The class has no reference to Helix. It's used in
the child process.

Plan to create a new class named SingleHelixTask
in the next PR
to represent the Helix task instance.

Testing:

Integration tests passed.

The class has no reference to Helix. It's used in
the child process.

Plan to create a new class named SingleHelixTask
in the next PR
to represent the Helix task instance.

Testing:

Integration tests passed.

Closes #2218 from HappyRay/rename-helix-single-
task-class


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1fe5f952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1fe5f952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1fe5f952

Branch: refs/heads/master
Commit: 1fe5f952a71673646f17ccb3214769611f3c33f3
Parents: ec2fe04
Author: Ray Yang <[email protected]>
Authored: Mon Dec 18 14:52:41 2017 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Mon Dec 18 14:52:41 2017 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinHelixTask.java       |   8 +-
 .../apache/gobblin/cluster/SingleHelixTask.java | 140 -------------------
 .../org/apache/gobblin/cluster/SingleTask.java  | 140 +++++++++++++++++++
 .../gobblin/cluster/SingleTaskRunner.java       |   7 +-
 4 files changed, 148 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1fe5f952/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index bf74f17..6a6e60d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -69,7 +69,7 @@ public class GobblinHelixTask implements Task {
   private String jobKey;
   private Path workUnitFilePath;
 
-  private SingleHelixTask task;
+  private SingleTask task;
 
   public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem 
fs, Path appWorkDir,
       TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores)
@@ -81,7 +81,8 @@ public class GobblinHelixTask implements Task {
     Path jobStateFilePath = constructJobStateFilePath(appWorkDir);
 
     this.task =
-        new SingleHelixTask(this.jobId, workUnitFilePath, jobStateFilePath, 
fs, taskAttemptBuilder, stateStores);
+        new SingleTask(this.jobId, workUnitFilePath, jobStateFilePath, fs, 
taskAttemptBuilder,
+            stateStores);
   }
 
   private Path constructJobStateFilePath(Path appWorkDir) {
@@ -93,7 +94,8 @@ public class GobblinHelixTask implements Task {
     this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY);
     this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY);
     this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
-    this.workUnitFilePath = new 
Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH));
+    this.workUnitFilePath =
+        new 
Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1fe5f952/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java
deleted file mode 100644
index 04f9253..0000000
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.cluster;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
-import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
-import org.apache.gobblin.broker.iface.SharedResourcesBroker;
-import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
-import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.util.StateStores;
-import org.apache.gobblin.source.workunit.MultiWorkUnit;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.util.JobLauncherUtils;
-import org.apache.gobblin.util.SerializationUtils;
-
-
-public class SingleHelixTask {
-
-  private static final Logger _logger = 
LoggerFactory.getLogger(SingleHelixTask.class);
-
-  private GobblinMultiTaskAttempt _taskattempt;
-  private String _jobId;
-  private Path _workUnitFilePath;
-  private Path _jobStateFilePath;
-  private FileSystem _fs;
-  private TaskAttemptBuilder _taskAttemptBuilder;
-  private StateStores _stateStores;
-
-  SingleHelixTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, 
FileSystem fs,
-      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) {
-    _jobId = jobId;
-    _workUnitFilePath = workUnitFilePath;
-    _jobStateFilePath = jobStateFilePath;
-    _fs = fs;
-    _taskAttemptBuilder = taskAttemptBuilder;
-    _stateStores = stateStores;
-  }
-
-  public void run()
-      throws IOException, InterruptedException {
-    List<WorkUnit> workUnits = getWorkUnits();
-
-    JobState jobState = getJobState();
-    Config jobConfig = getConfigFromJobState(jobState);
-
-    try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = 
SharedResourcesBrokerFactory
-        .createDefaultTopLevelBroker(jobConfig, 
GobblinScopeTypes.GLOBAL.defaultScopeInstance())) {
-      SharedResourcesBroker<GobblinScopeTypes> jobBroker = 
getJobBroker(jobState, globalBroker);
-
-      _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, 
jobState, jobBroker);
-      
_taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
-    }
-  }
-
-  private SharedResourcesBroker<GobblinScopeTypes> getJobBroker(JobState 
jobState,
-      SharedResourcesBroker<GobblinScopeTypes> globalBroker) {
-    return globalBroker.newSubscopedBuilder(new 
JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build();
-  }
-
-  private Config getConfigFromJobState(JobState jobState) {
-    Properties jobProperties = jobState.getProperties();
-    return ConfigFactory.parseProperties(jobProperties);
-  }
-
-  private JobState getJobState()
-      throws java.io.IOException {
-    JobState jobState = new JobState();
-    SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState);
-    return jobState;
-  }
-
-  private List<WorkUnit> getWorkUnits()
-      throws IOException {
-    String fileName = _workUnitFilePath.getName();
-    String storeName = _workUnitFilePath.getParent().getName();
-    WorkUnit workUnit;
-
-    if 
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
 {
-      workUnit = _stateStores.mwuStateStore.getAll(storeName, fileName).get(0);
-    } else {
-      workUnit = _stateStores.wuStateStore.getAll(storeName, fileName).get(0);
-    }
-
-    // The list of individual WorkUnits (flattened) to run
-    List<WorkUnit> workUnits = Lists.newArrayList();
-
-    if (workUnit instanceof MultiWorkUnit) {
-      // Flatten the MultiWorkUnit so the job configuration properties can be 
added to each individual WorkUnits
-      List<WorkUnit> flattenedWorkUnits = 
JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits());
-      workUnits.addAll(flattenedWorkUnits);
-    } else {
-      workUnits.add(workUnit);
-    }
-    return workUnits;
-  }
-
-  public void cancel() {
-    if (_taskattempt != null) {
-      try {
-        _logger.info("Task cancelled: Shutdown starting for tasks with jobId: 
{}", _jobId);
-        _taskattempt.shutdownTasks();
-        _logger.info("Task cancelled: Shutdown complete for tasks with jobId: 
{}", _jobId);
-      } catch (InterruptedException e) {
-        throw new RuntimeException("Interrupted while shutting down task with 
jobId: " + _jobId, e);
-      }
-    } else {
-      _logger.error("Task cancelled but _taskattempt is null, so ignoring.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1fe5f952/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
new file mode 100644
index 0000000..da0c633
--- /dev/null
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.SerializationUtils;
+
+
+public class SingleTask {
+
+  private static final Logger _logger = 
LoggerFactory.getLogger(SingleTask.class);
+
+  private GobblinMultiTaskAttempt _taskattempt;
+  private String _jobId;
+  private Path _workUnitFilePath;
+  private Path _jobStateFilePath;
+  private FileSystem _fs;
+  private TaskAttemptBuilder _taskAttemptBuilder;
+  private StateStores _stateStores;
+
+  SingleTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, 
FileSystem fs,
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) {
+    _jobId = jobId;
+    _workUnitFilePath = workUnitFilePath;
+    _jobStateFilePath = jobStateFilePath;
+    _fs = fs;
+    _taskAttemptBuilder = taskAttemptBuilder;
+    _stateStores = stateStores;
+  }
+
+  public void run()
+      throws IOException, InterruptedException {
+    List<WorkUnit> workUnits = getWorkUnits();
+
+    JobState jobState = getJobState();
+    Config jobConfig = getConfigFromJobState(jobState);
+
+    try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = 
SharedResourcesBrokerFactory
+        .createDefaultTopLevelBroker(jobConfig, 
GobblinScopeTypes.GLOBAL.defaultScopeInstance())) {
+      SharedResourcesBroker<GobblinScopeTypes> jobBroker = 
getJobBroker(jobState, globalBroker);
+
+      _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, 
jobState, jobBroker);
+      
_taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+    }
+  }
+
+  private SharedResourcesBroker<GobblinScopeTypes> getJobBroker(JobState 
jobState,
+      SharedResourcesBroker<GobblinScopeTypes> globalBroker) {
+    return globalBroker.newSubscopedBuilder(new 
JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build();
+  }
+
+  private Config getConfigFromJobState(JobState jobState) {
+    Properties jobProperties = jobState.getProperties();
+    return ConfigFactory.parseProperties(jobProperties);
+  }
+
+  private JobState getJobState()
+      throws java.io.IOException {
+    JobState jobState = new JobState();
+    SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState);
+    return jobState;
+  }
+
+  private List<WorkUnit> getWorkUnits()
+      throws IOException {
+    String fileName = _workUnitFilePath.getName();
+    String storeName = _workUnitFilePath.getParent().getName();
+    WorkUnit workUnit;
+
+    if 
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
 {
+      workUnit = _stateStores.mwuStateStore.getAll(storeName, fileName).get(0);
+    } else {
+      workUnit = _stateStores.wuStateStore.getAll(storeName, fileName).get(0);
+    }
+
+    // The list of individual WorkUnits (flattened) to run
+    List<WorkUnit> workUnits = Lists.newArrayList();
+
+    if (workUnit instanceof MultiWorkUnit) {
+      // Flatten the MultiWorkUnit so the job configuration properties can be 
added to each individual WorkUnits
+      List<WorkUnit> flattenedWorkUnits = 
JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits());
+      workUnits.addAll(flattenedWorkUnits);
+    } else {
+      workUnits.add(workUnit);
+    }
+    return workUnits;
+  }
+
+  public void cancel() {
+    if (_taskattempt != null) {
+      try {
+        _logger.info("Task cancelled: Shutdown starting for tasks with jobId: 
{}", _jobId);
+        _taskattempt.shutdownTasks();
+        _logger.info("Task cancelled: Shutdown complete for tasks with jobId: 
{}", _jobId);
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Interrupted while shutting down task with 
jobId: " + _jobId, e);
+      }
+    } else {
+      _logger.error("Task cancelled but _taskattempt is null, so ignoring.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1fe5f952/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index 20226af..9cc4733 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -54,7 +54,7 @@ class SingleTaskRunner {
   private final String workUnitFilePath;
   private final Config clusterConfig;
   private final Path appWorkPath;
-  private SingleHelixTask task;
+  private SingleTask task;
   private TaskExecutor taskExecutor;
   private GobblinHelixTaskStateTracker taskStateTracker;
   private ServiceManager serviceManager;
@@ -114,9 +114,8 @@ class SingleTaskRunner {
 
     final TaskAttemptBuilder taskAttemptBuilder = 
getTaskAttemptBuilder(stateStores);
 
-    this.task =
-        new SingleHelixTask(this.jobId, new Path(this.workUnitFilePath), 
jobStateFilePath, fs,
-            taskAttemptBuilder, stateStores);
+    this.task = new SingleTask(this.jobId, new Path(this.workUnitFilePath), 
jobStateFilePath, fs,
+        taskAttemptBuilder, stateStores);
   }
 
   private TaskAttemptBuilder getTaskAttemptBuilder(final StateStores 
stateStores) {

Reply via email to