Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 8c338be3d -> ba909f1fc


[GOBBLIN-336] Refactor HelixTask to create taskAttempt with a builder

This allows the task attempt logic to be mocked
out in unit tests in the
future.

Testing:

The integration test
org.apache.gobblin.cluster.ClusterIntegrationTest
passed.

Closes #2191 from HappyRay/add-task-attempt-
builder


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ba909f1f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ba909f1f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ba909f1f

Branch: refs/heads/master
Commit: ba909f1fc81ff1994c0f24f8f51a98b4a3299d3a
Parents: 8c338be
Author: Ray Yang <[email protected]>
Authored: Fri Dec 8 02:43:26 2017 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Fri Dec 8 02:43:26 2017 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinHelixTask.java       | 42 +++----------
 .../cluster/GobblinHelixTaskFactory.java        | 25 +++++---
 .../gobblin/cluster/GobblinTaskRunner.java      |  2 +-
 .../gobblin/cluster/TaskAttemptBuilder.java     | 64 ++++++++++++++++++++
 .../gobblin/cluster/GobblinHelixTaskTest.java   |  2 +-
 5 files changed, 93 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
index 7c7a0f9..808914d 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java
@@ -17,9 +17,6 @@
 
 package org.apache.gobblin.cluster;
 
-import com.google.common.io.Closer;
-import org.apache.gobblin.metastore.StateStore;
-import org.apache.gobblin.runtime.util.StateStores;
 import java.io.IOException;
 import java.util.List;
 
@@ -33,29 +30,27 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
 
-import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
 import com.typesafe.config.ConfigFactory;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
 import org.apache.gobblin.runtime.JobState;
-import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskState;
-import org.apache.gobblin.runtime.TaskStateTracker;
-import org.apache.gobblin.runtime.util.JobMetrics;
+import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.Id;
 import org.apache.gobblin.util.JobLauncherUtils;
 import org.apache.gobblin.util.SerializationUtils;
-import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
-import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance;
 
 
 /**
@@ -80,50 +75,35 @@ public class GobblinHelixTask implements Task {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GobblinHelixTask.class);
 
-  @SuppressWarnings({"unused", "FieldCanBeLocal"})
-  private final Optional<JobMetrics> jobMetrics;
-  private final TaskExecutor taskExecutor;
-  private final TaskStateTracker taskStateTracker;
-
   private final TaskConfig taskConfig;
   // An empty JobState instance that will be filled with values read from the 
serialized JobState
   private final JobState jobState = new JobState();
   private final String jobName;
   private final String jobId;
   private final String jobKey;
-  private final String participantId;
 
   private final FileSystem fs;
   private final StateStores stateStores;
+  private final TaskAttemptBuilder taskAttemptBuilder;
 
   private GobblinMultiTaskAttempt taskAttempt;
 
-  public GobblinHelixTask(TaskCallbackContext taskCallbackContext, 
Optional<ContainerMetrics> containerMetrics,
-      TaskExecutor taskExecutor, TaskStateTracker taskStateTracker, FileSystem 
fs, Path appWorkDir,
-      StateStores stateStores)
+  public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem 
fs, Path appWorkDir,
+      TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores)
       throws IOException {
-    this.taskExecutor = taskExecutor;
-    this.taskStateTracker = taskStateTracker;
 
     this.taskConfig = taskCallbackContext.getTaskConfig();
+    this.stateStores = stateStores;
+    this.taskAttemptBuilder = taskAttemptBuilder;
     this.jobName = 
this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY);
     this.jobId = 
this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_ID_KEY);
     this.jobKey = Long.toString(Id.parse(this.jobId).getSequence());
-    this.participantId = taskCallbackContext.getManager().getInstanceName();
 
     this.fs = fs;
-    this.stateStores = stateStores;
 
     Path jobStateFilePath = new Path(appWorkDir, this.jobId + "." + 
AbstractJobLauncher.JOB_STATE_FILE_NAME);
     SerializationUtils.deserializeState(this.fs, jobStateFilePath, 
this.jobState);
 
-    if (containerMetrics.isPresent()) {
-      // This must be done after the jobState is deserialized from the 
jobStateFilePath
-      // A reference to jobMetrics is required to ensure it is not evicted 
from the GobblinMetricsRegistry Cache
-      this.jobMetrics = Optional.of(JobMetrics.get(this.jobState, 
containerMetrics.get().getMetricContext()));
-    } else {
-      this.jobMetrics = Optional.absent();
-    }
   }
 
   @Override
@@ -162,9 +142,7 @@ public class GobblinHelixTask implements Task {
       SharedResourcesBroker<GobblinScopeTypes> jobBroker =
           globalBroker.newSubscopedBuilder(new 
JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())).build();
 
-      this.taskAttempt = new GobblinMultiTaskAttempt(workUnits.iterator(), 
this.jobId, this.jobState, this.taskStateTracker,
-          this.taskExecutor, Optional.of(this.participantId), 
Optional.of(this.stateStores.taskStateStore), jobBroker);
-
+      this.taskAttempt = this.taskAttemptBuilder.build(workUnits.iterator(), 
this.jobId, this.jobState, jobBroker);
       
this.taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
       return new TaskResult(TaskResult.Status.COMPLETED, 
String.format("completed tasks: %d", workUnits.size()));
     } catch (InterruptedException ie) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
index e66756f..b8e55d8 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java
@@ -17,28 +17,26 @@
 
 package org.apache.gobblin.cluster;
 
-import com.typesafe.config.Config;
-import org.apache.gobblin.runtime.util.StateStores;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
+import org.apache.helix.HelixManager;
 import org.apache.helix.task.Task;
 import org.apache.helix.task.TaskCallbackContext;
 import org.apache.helix.task.TaskFactory;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.codahale.metrics.Counter;
-
 import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.TaskExecutor;
 import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.runtime.util.StateStores;
 
 
 /**
@@ -54,6 +52,7 @@ public class GobblinHelixTaskFactory implements TaskFactory {
   private static final String GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER = 
"gobblin.cluster.new.helix.task";
 
   private final Optional<ContainerMetrics> containerMetrics;
+  private final HelixManager helixManager;
 
   /**
    * A {@link Counter} to count the number of new {@link GobblinHelixTask}s 
that are created.
@@ -64,10 +63,12 @@ public class GobblinHelixTaskFactory implements TaskFactory 
{
   private final FileSystem fs;
   private final Path appWorkDir;
   private final StateStores stateStores;
+  private final TaskAttemptBuilder taskAttemptBuilder;
 
   public GobblinHelixTaskFactory(Optional<ContainerMetrics> containerMetrics, 
TaskExecutor taskExecutor,
-      TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir, 
Config config) {
+      TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir, 
Config config, HelixManager helixManager) {
     this.containerMetrics = containerMetrics;
+    this.helixManager = helixManager;
     if (this.containerMetrics.isPresent()) {
       this.newTasksCounter = 
Optional.of(this.containerMetrics.get().getCounter(GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER));
     } else {
@@ -79,6 +80,15 @@ public class GobblinHelixTaskFactory implements TaskFactory {
     this.appWorkDir = appWorkDir;
     this.stateStores = new StateStores(config, appWorkDir, 
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME,
         appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME);
+    this.taskAttemptBuilder = createTaskAttemptBuilder();
+  }
+
+  private TaskAttemptBuilder createTaskAttemptBuilder() {
+    TaskAttemptBuilder builder = new TaskAttemptBuilder(this.taskStateTracker, 
this.taskExecutor);
+    builder.setContainerId(this.helixManager.getInstanceName());
+    builder.setTaskStateStore(this.stateStores.taskStateStore);
+
+    return builder;
   }
 
   @Override
@@ -87,8 +97,7 @@ public class GobblinHelixTaskFactory implements TaskFactory {
       if (this.newTasksCounter.isPresent()) {
         this.newTasksCounter.get().inc();
       }
-      return new GobblinHelixTask(context, this.containerMetrics, 
this.taskExecutor, this.taskStateTracker,
-          this.fs, this.appWorkDir, stateStores);
+      return new GobblinHelixTask(context, this.fs, this.appWorkDir, 
this.taskAttemptBuilder, this.stateStores);
     } catch (IOException ioe) {
       LOGGER.error("Failed to create a new GobblinHelixTask", ioe);
       throw Throwables.propagate(ioe);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index 1de9bb1..76d9098 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -178,7 +178,7 @@ public class GobblinTaskRunner {
     Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap();
     taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME,
         new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, 
taskStateTracker, this.fs, appWorkDir,
-            stateStoreJobConfig));
+            stateStoreJobConfig, this.helixManager));
     this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, 
taskFactoryMap);
     
this.helixManager.getStateMachineEngine().registerStateModelFactory("Task", 
this.taskStateModelFactory);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskAttemptBuilder.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskAttemptBuilder.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskAttemptBuilder.java
new file mode 100644
index 0000000..4dbed25
--- /dev/null
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskAttemptBuilder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import java.util.Iterator;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.broker.iface.SharedResourcesBroker;
+import org.apache.gobblin.metastore.StateStore;
+import org.apache.gobblin.runtime.GobblinMultiTaskAttempt;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskExecutor;
+import org.apache.gobblin.runtime.TaskState;
+import org.apache.gobblin.runtime.TaskStateTracker;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+public class TaskAttemptBuilder {
+  private final TaskStateTracker _taskStateTracker;
+  private final TaskExecutor _taskExecutor;
+  private String _containerId;
+  private StateStore<TaskState> _taskStateStore;
+
+  public TaskAttemptBuilder(TaskStateTracker taskStateTracker, TaskExecutor 
taskExecutor) {
+    _taskStateTracker = taskStateTracker;
+    _taskExecutor = taskExecutor;
+  }
+
+  public TaskAttemptBuilder setContainerId(String containerId) {
+    _containerId = containerId;
+    return this;
+  }
+
+  public TaskAttemptBuilder setTaskStateStore(StateStore<TaskState> 
taskStateStore) {
+    _taskStateStore = taskStateStore;
+    return this;
+  }
+
+  public GobblinMultiTaskAttempt build(Iterator<WorkUnit> workUnits, String 
jobId, JobState jobState,
+      SharedResourcesBroker<GobblinScopeTypes> jobBroker) {
+    GobblinMultiTaskAttempt attemptInstance =
+        new GobblinMultiTaskAttempt(workUnits, jobId, jobState, 
_taskStateTracker, _taskExecutor,
+            Optional.fromNullable(_containerId), 
Optional.fromNullable(_taskStateStore), jobBroker);
+
+    return attemptInstance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
index d1197b0..171115a 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java
@@ -146,7 +146,7 @@ public class GobblinHelixTaskTest {
 
     GobblinHelixTaskFactory gobblinHelixTaskFactory =
         new GobblinHelixTaskFactory(Optional.<ContainerMetrics>absent(), 
this.taskExecutor, this.taskStateTracker,
-            this.localFs, this.appWorkDir, ConfigFactory.empty());
+            this.localFs, this.appWorkDir, ConfigFactory.empty(), 
this.helixManager);
     this.gobblinHelixTask = (GobblinHelixTask) 
gobblinHelixTaskFactory.createNewTask(taskCallbackContext);
   }
 

Reply via email to