Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 95e15f003 -> 0eeccde1a


[GOBBLIN-336] Encapsulate the non-Helix specific
task execution logic

Put the logic in its own class.

Also changed:
* Use a try-with statement to close the global
broker.
* Fix a Helix warning: ERROR is replaced with
FAILED.

Testing:

The integration test
org.apache.gobblin.cluster.ClusterIntegrationTest
passed.

Also inlined a method. The old code has a bug: the
globalBroker
variable will stay null.

Closes #2193 from HappyRay/encapsulate-non-helix-
job-launch-logic


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/0eeccde1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/0eeccde1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/0eeccde1

Branch: refs/heads/master
Commit: 0eeccde1a8251514874b515c90a81c3c55aaf675
Parents: 95e15f0
Author: Ray Yang <[email protected]>
Authored: Sat Dec 9 18:13:34 2017 -0800
Committer: Hung Tran <[email protected]>
Committed: Sat Dec 9 18:14:41 2017 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinHelixTask.java       | 114 ++++----------
 .../apache/gobblin/cluster/SingleHelixTask.java | 148 +++++++++++++++++++
 2 files changed, 175 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0eeccde1/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 808914d..8166cfe 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -18,7 +18,7 @@
 package org.apache.gobblin.cluster;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -31,26 +31,16 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
 import com.google.common.base.Throwables;
-import com.google.common.collect.Lists;
 import com.google.common.io.Closer;
-import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
-import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
-import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
-import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
-import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.Id;
-import org.apache.gobblin.util.JobLauncherUtils;
-import org.apache.gobblin.util.SerializationUtils;
 
 
 /**
@@ -67,113 +57,63 @@ import org.apache.gobblin.util.SerializationUtils;
  *   {@link org.apache.gobblin.runtime.Task}(s), it persists the {@link 
TaskState} of each {@link org.apache.gobblin.runtime.Task} to
  *   a file that will be collected by the {@link GobblinHelixJobLauncher} 
later upon completion of the job.
  * </p>
- *
- * @author Yinan Li
  */
 @Alpha
 public class GobblinHelixTask implements Task {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinHelixTask.class);
+  private static final Logger _logger = 
LoggerFactory.getLogger(GobblinHelixTask.class);
 
   private final TaskConfig taskConfig;
-  // An empty JobState instance that will be filled with values read from the 
serialized JobState
-  private final JobState jobState = new JobState();
-  private final String jobName;
-  private final String jobId;
-  private final String jobKey;
-
-  private final FileSystem fs;
-  private final StateStores stateStores;
-  private final TaskAttemptBuilder taskAttemptBuilder;
+  private String jobName;
+  private String jobId;
+  private String jobKey;
+  private Path workUnitFilePath;
 
-  private GobblinMultiTaskAttempt taskAttempt;
+  private SingleHelixTask task;
 
   public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem 
fs, Path appWorkDir,
       TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores)
       throws IOException {
 
     this.taskConfig = taskCallbackContext.getTaskConfig();
-    this.stateStores = stateStores;
-    this.taskAttemptBuilder = taskAttemptBuilder;
-    this.jobName = 
this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
-    this.jobId = 
this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_ID_KEY);
-    this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
+    getInfoFromTaskConfig();
+
+    Path jobStateFilePath = constructJobStateFilePath(appWorkDir);
 
-    this.fs = fs;
+    this.task =
+        new SingleHelixTask(this.jobId, workUnitFilePath, jobStateFilePath, 
fs, taskAttemptBuilder, stateStores);
+  }
 
-    Path jobStateFilePath = new Path(appWorkDir, this.jobId + "." + 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
-    SerializationUtils.deserializeState(this.fs, jobStateFilePath, 
this.jobState);
+  private Path constructJobStateFilePath(Path appWorkDir) {
+    return new Path(appWorkDir, this.jobId + "." + 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
+  }
 
+  private void getInfoFromTaskConfig() {
+    Map<String, String> configMap = this.taskConfig.getConfigMap();
+    this.jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY);
+    this.jobId = configMap.get(ConfigurationKeys.JOB_ID_KEY);
+    this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
+    this.workUnitFilePath = new 
Path(configMap.get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH));
   }
 
   @Override
   public TaskResult run() {
-    SharedResourcesBroker<GobblinScopeTypes> globalBroker = null;
     try (Closer closer = Closer.create()) {
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, 
this.jobName));
       closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, 
this.jobKey));
-      Path workUnitFilePath =
-          new 
Path(this.taskConfig.getConfigMap().get(GobblinClusterConfigurationKeys.WORK_UNIT_FILE_PATH));
-
-      String fileName = workUnitFilePath.getName();
-      String storeName = workUnitFilePath.getParent().getName();
-      WorkUnit workUnit;
-
-      if 
(workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
 {
-        workUnit = stateStores.mwuStateStore.getAll(storeName, 
fileName).get(0);
-      } else {
-        workUnit = stateStores.wuStateStore.getAll(storeName, fileName).get(0);
-      }
-
-      // The list of individual WorkUnits (flattened) to run
-      List<WorkUnit> workUnits = Lists.newArrayList();
-
-      if (workUnit instanceof MultiWorkUnit) {
-        // Flatten the MultiWorkUnit so the job configuration properties can 
be added to each individual WorkUnits
-        List<WorkUnit> flattenedWorkUnits =
-            JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) 
workUnit).getWorkUnits());
-        workUnits.addAll(flattenedWorkUnits);
-      } else {
-        workUnits.add(workUnit);
-      }
-
-      globalBroker = SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
-          ConfigFactory.parseProperties(this.jobState.getProperties()), 
GobblinScopeTypes.GLOBAL.defaultScopeInstance());
-      SharedResourcesBroker<GobblinScopeTypes> jobBroker =
-          globalBroker.newSubscopedBuilder(new 
JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())).build();
-
-      this.taskAttempt = this.taskAttemptBuilder.build(workUnits.iterator(), 
this.jobId, this.jobState, jobBroker);
-      
this.taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
-      return new TaskResult(TaskResult.Status.COMPLETED, 
String.format("completed tasks: %d", workUnits.size()));
+      int workUnitSize = this.task.run();
+      return new TaskResult(TaskResult.Status.COMPLETED, 
String.format("completed tasks: %d", workUnitSize));
     } catch (InterruptedException ie) {
       Thread.currentThread().interrupt();
       return new TaskResult(TaskResult.Status.CANCELED, "");
     } catch (Throwable t) {
-      LOGGER.error("GobblinHelixTask failed due to " + t.getMessage(), t);
-      return new TaskResult(TaskResult.Status.ERROR, 
Throwables.getStackTraceAsString(t));
-    } finally {
-      if (globalBroker != null) {
-        try {
-          globalBroker.close();
-        } catch (IOException ioe) {
-          LOGGER.error("Could not close shared resources broker.", ioe);
-        }
-      }
+      _logger.error("GobblinHelixTask failed due to " + t.getMessage(), t);
+      return new TaskResult(TaskResult.Status.FAILED, 
Throwables.getStackTraceAsString(t));
     }
   }
 
   @Override
   public void cancel() {
-    if (this.taskAttempt != null) {
-      try {
-        LOGGER.info("Task cancelled: Shutdown starting for tasks with jobId: 
{}", this.jobId);
-        this.taskAttempt.shutdownTasks();
-        LOGGER.info("Task cancelled: Shutdown complete for tasks with jobId: 
{}", this.jobId);
-      } catch (InterruptedException e) {
-        throw new RuntimeException("Interrupted while shutting down task with 
jobId: " + this.jobId, e);
-      }
-    } else {
-      LOGGER.error("Task cancelled but taskAttempt is null, so ignoring.");
-    }
+    this.task.cancel();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/0eeccde1/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java
new file mode 100644
index 0000000..9817a4f
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleHelixTask.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.runtime.AbstractJobLauncher;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.util.StateStores;
+import org.apache.gobblin.source.workunit.MultiWorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.util.JobLauncherUtils;
+import org.apache.gobblin.util.SerializationUtils;
+
+
+public class SingleHelixTask {
+
+  private static final Logger _logger = 
LoggerFactory.getLogger(SingleHelixTask.class);
+
+  private GobblinMultiTaskAttempt _taskattempt;
+  private String _jobId;
+  private Path _workUnitFilePath;
+  private Path _jobStateFilePath;
+  private FileSystem _fs;
+  private TaskAttemptBuilder _taskAttemptBuilder;
+  private StateStores _stateStores;
+
+  SingleHelixTask(String jobId, Path workUnitFilePath, Path jobStateFilePath, 
FileSystem fs,
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) {
+    _jobId = jobId;
+    _workUnitFilePath = workUnitFilePath;
+    _jobStateFilePath = jobStateFilePath;
+    _fs = fs;
+    _taskAttemptBuilder = taskAttemptBuilder;
+    _stateStores = stateStores;
+  }
+
+  /**
+   *
+   * @return the number of work-units processed
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int run()
+      throws IOException, InterruptedException {
+    List<WorkUnit> workUnits = getWorkUnits();
+    int workUnitSize = workUnits.size();
+
+    JobState jobState = getJobState();
+    Config jobConfig = getConfigFromJobState(jobState);
+
+    try (SharedResourcesBroker<GobblinScopeTypes> globalBroker = 
SharedResourcesBrokerFactory
+        .createDefaultTopLevelBroker(jobConfig, 
GobblinScopeTypes.GLOBAL.defaultScopeInstance())) {
+      SharedResourcesBroker<GobblinScopeTypes> jobBroker = 
getJobBroker(jobState, globalBroker);
+
+      _taskattempt = _taskAttemptBuilder.build(workUnits.iterator(), _jobId, 
jobState, jobBroker);
+      
_taskattempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+      return workUnitSize;
+    }
+  }
+
+  private SharedResourcesBroker<GobblinScopeTypes> getJobBroker(JobState 
jobState,
+      SharedResourcesBroker<GobblinScopeTypes> globalBroker) {
+    return globalBroker.newSubscopedBuilder(new 
JobScopeInstance(jobState.getJobName(), jobState.getJobId())).build();
+  }
+
+  private Config getConfigFromJobState(JobState jobState) {
+    Properties jobProperties = jobState.getProperties();
+    return ConfigFactory.parseProperties(jobProperties);
+  }
+
+  private JobState getJobState()
+      throws java.io.IOException {
+    JobState jobState = new JobState();
+    SerializationUtils.deserializeState(_fs, _jobStateFilePath, jobState);
+    return jobState;
+  }
+
+  private List<WorkUnit> getWorkUnits()
+      throws IOException {
+    String fileName = _workUnitFilePath.getName();
+    String storeName = _workUnitFilePath.getParent().getName();
+    WorkUnit workUnit;
+
+    if 
(_workUnitFilePath.getName().endsWith(AbstractJobLauncher.MULTI_WORK_UNIT_FILE_EXTENSION))
 {
+      workUnit = _stateStores.mwuStateStore.getAll(storeName, fileName).get(0);
+    } else {
+      workUnit = _stateStores.wuStateStore.getAll(storeName, fileName).get(0);
+    }
+
+    // The list of individual WorkUnits (flattened) to run
+    List<WorkUnit> workUnits = Lists.newArrayList();
+
+    if (workUnit instanceof MultiWorkUnit) {
+      // Flatten the MultiWorkUnit so the job configuration properties can be 
added to each individual WorkUnits
+      List<WorkUnit> flattenedWorkUnits = 
JobLauncherUtils.flattenWorkUnits(((MultiWorkUnit) workUnit).getWorkUnits());
+      workUnits.addAll(flattenedWorkUnits);
+    } else {
+      workUnits.add(workUnit);
+    }
+    return workUnits;
+  }
+
+  public void cancel() {
+    if (_taskattempt != null) {
+      try {
+        _logger.info("Task cancelled: Shutdown starting for tasks with jobId: 
{}", _jobId);
+        _taskattempt.shutdownTasks();
+        _logger.info("Task cancelled: Shutdown complete for tasks with jobId: 
{}", _jobId);
+      } catch (InterruptedException e) {
+        throw new RuntimeException("Interrupted while shutting down task with 
jobId: " + _jobId, e);
+      }
+    } else {
+      _logger.error("Task cancelled but _taskattempt is null, so ignoring.");
+    }
+  }
+}

Reply via email to