Repository: incubator-gobblin Updated Branches: refs/heads/master 8c338be3d -> ba909f1fc
[GOBBLIN-336] Refactor HelixTask to create taskAttempt with a builder This allows the task attempt logic to be mocked out in unit tests in the future. Testing: The integration test org.apache.gobblin.cluster.ClusterIntegrationTest passed. Closes #2191 from HappyRay/add-task-attempt- builder Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ba909f1f Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ba909f1f Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ba909f1f Branch: refs/heads/master Commit: ba909f1fc81ff1994c0f24f8f51a98b4a3299d3a Parents: 8c338be Author: Ray Yang <[email protected]> Authored: Fri Dec 8 02:43:26 2017 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Fri Dec 8 02:43:26 2017 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinHelixTask.java | 42 +++---------- .../cluster/GobblinHelixTaskFactory.java | 25 +++++--- .../gobblin/cluster/GobblinTaskRunner.java | 2 +- .../gobblin/cluster/TaskAttemptBuilder.java | 64 ++++++++++++++++++++ .../gobblin/cluster/GobblinHelixTaskTest.java | 2 +- 5 files changed, 93 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index 7c7a0f9..808914d 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -17,9 +17,6 @@ package org.apache.gobblin.cluster; -import com.google.common.io.Closer; -import org.apache.gobblin.metastore.StateStore; -import org.apache.gobblin.runtime.util.StateStores; import java.io.IOException; import java.util.List; @@ -33,29 +30,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; -import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.collect.Lists; +import com.google.common.io.Closer; import com.typesafe.config.ConfigFactory; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; import org.apache.gobblin.broker.iface.SharedResourcesBroker; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; import org.apache.gobblin.runtime.JobState; -import org.apache.gobblin.runtime.TaskExecutor; import org.apache.gobblin.runtime.TaskState; -import org.apache.gobblin.runtime.TaskStateTracker; -import org.apache.gobblin.runtime.util.JobMetrics; +import org.apache.gobblin.runtime.util.StateStores; import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.Id; import org.apache.gobblin.util.JobLauncherUtils; import org.apache.gobblin.util.SerializationUtils; -import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; -import org.apache.gobblin.broker.gobblin_scopes.JobScopeInstance; /** @@ -80,50 +75,35 @@ public class GobblinHelixTask implements Task { private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixTask.class); - @SuppressWarnings({"unused", "FieldCanBeLocal"}) - private final Optional<JobMetrics> jobMetrics; - private final TaskExecutor taskExecutor; - private final TaskStateTracker taskStateTracker; - private final TaskConfig taskConfig; // An empty JobState instance that will be filled with values read from the serialized JobState private final JobState jobState = new JobState(); private final String jobName; private final String jobId; private final String jobKey; - private final String participantId; private final FileSystem fs; private final StateStores stateStores; + private final TaskAttemptBuilder taskAttemptBuilder; private GobblinMultiTaskAttempt taskAttempt; - public GobblinHelixTask(TaskCallbackContext taskCallbackContext, Optional<ContainerMetrics> containerMetrics, - TaskExecutor taskExecutor, TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir, - StateStores stateStores) + public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem fs, Path appWorkDir, + TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) throws IOException { - this.taskExecutor = taskExecutor; - this.taskStateTracker = taskStateTracker; this.taskConfig = taskCallbackContext.getTaskConfig(); + this.stateStores = stateStores; + this.taskAttemptBuilder = taskAttemptBuilder; this.jobName = this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_NAME_KEY); this.jobId = this.taskConfig.getConfigMap().get(ConfigurationKeys.JOB_ID_KEY); this.jobKey = Long.toString(Id.parse(this.jobId).getSequence()); - this.participantId = taskCallbackContext.getManager().getInstanceName(); this.fs = fs; - this.stateStores = stateStores; Path jobStateFilePath = new Path(appWorkDir, this.jobId + "." + AbstractJobLauncher.JOB_STATE_FILE_NAME); SerializationUtils.deserializeState(this.fs, jobStateFilePath, this.jobState); - if (containerMetrics.isPresent()) { - // This must be done after the jobState is deserialized from the jobStateFilePath - // A reference to jobMetrics is required to ensure it is not evicted from the GobblinMetricsRegistry Cache - this.jobMetrics = Optional.of(JobMetrics.get(this.jobState, containerMetrics.get().getMetricContext())); - } else { - this.jobMetrics = Optional.absent(); - } } @Override @@ -162,9 +142,7 @@ public class GobblinHelixTask implements Task { SharedResourcesBroker<GobblinScopeTypes> jobBroker = globalBroker.newSubscopedBuilder(new JobScopeInstance(this.jobState.getJobName(), this.jobState.getJobId())).build(); - this.taskAttempt = new GobblinMultiTaskAttempt(workUnits.iterator(), this.jobId, this.jobState, this.taskStateTracker, - this.taskExecutor, Optional.of(this.participantId), Optional.of(this.stateStores.taskStateStore), jobBroker); - + this.taskAttempt = this.taskAttemptBuilder.build(workUnits.iterator(), this.jobId, this.jobState, jobBroker); this.taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE); return new TaskResult(TaskResult.Status.COMPLETED, String.format("completed tasks: %d", workUnits.size())); } catch (InterruptedException ie) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java index e66756f..b8e55d8 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java @@ -17,28 +17,26 @@ package org.apache.gobblin.cluster; -import com.typesafe.config.Config; -import org.apache.gobblin.runtime.util.StateStores; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - +import org.apache.helix.HelixManager; import org.apache.helix.task.Task; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskFactory; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.codahale.metrics.Counter; - import com.google.common.base.Optional; import com.google.common.base.Throwables; +import com.typesafe.config.Config; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.TaskExecutor; import org.apache.gobblin.runtime.TaskStateTracker; +import org.apache.gobblin.runtime.util.StateStores; /** @@ -54,6 +52,7 @@ public class GobblinHelixTaskFactory implements TaskFactory { private static final String GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER = "gobblin.cluster.new.helix.task"; private final Optional<ContainerMetrics> containerMetrics; + private final HelixManager helixManager; /** * A {@link Counter} to count the number of new {@link GobblinHelixTask}s that are created. @@ -64,10 +63,12 @@ public class GobblinHelixTaskFactory implements TaskFactory { private final FileSystem fs; private final Path appWorkDir; private final StateStores stateStores; + private final TaskAttemptBuilder taskAttemptBuilder; public GobblinHelixTaskFactory(Optional<ContainerMetrics> containerMetrics, TaskExecutor taskExecutor, - TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir, Config config) { + TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir, Config config, HelixManager helixManager) { this.containerMetrics = containerMetrics; + this.helixManager = helixManager; if (this.containerMetrics.isPresent()) { this.newTasksCounter = Optional.of(this.containerMetrics.get().getCounter(GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER)); } else { @@ -79,6 +80,15 @@ public class GobblinHelixTaskFactory implements TaskFactory { this.appWorkDir = appWorkDir; this.stateStores = new StateStores(config, appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME); + this.taskAttemptBuilder = createTaskAttemptBuilder(); + } + + private TaskAttemptBuilder createTaskAttemptBuilder() { + TaskAttemptBuilder builder = new TaskAttemptBuilder(this.taskStateTracker, this.taskExecutor); + builder.setContainerId(this.helixManager.getInstanceName()); + builder.setTaskStateStore(this.stateStores.taskStateStore); + + return builder; } @Override @@ -87,8 +97,7 @@ public class GobblinHelixTaskFactory implements TaskFactory { if (this.newTasksCounter.isPresent()) { this.newTasksCounter.get().inc(); } - return new GobblinHelixTask(context, this.containerMetrics, this.taskExecutor, this.taskStateTracker, - this.fs, this.appWorkDir, stateStores); + return new GobblinHelixTask(context, this.fs, this.appWorkDir, this.taskAttemptBuilder, this.stateStores); } catch (IOException ioe) { LOGGER.error("Failed to create a new GobblinHelixTask", ioe); throw Throwables.propagate(ioe); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 1de9bb1..76d9098 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -178,7 +178,7 @@ public class GobblinTaskRunner { Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker, this.fs, appWorkDir, - stateStoreJobConfig)); + stateStoreJobConfig, this.helixManager)); this.taskStateModelFactory = new TaskStateModelFactory(this.helixManager, taskFactoryMap); this.helixManager.getStateMachineEngine().registerStateModelFactory("Task", this.taskStateModelFactory); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskAttemptBuilder.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskAttemptBuilder.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskAttemptBuilder.java new file mode 100644 index 0000000..4dbed25 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskAttemptBuilder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.Iterator; + +import com.google.common.base.Optional; + +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.broker.iface.SharedResourcesBroker; +import org.apache.gobblin.metastore.StateStore; +import org.apache.gobblin.runtime.GobblinMultiTaskAttempt; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskExecutor; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.TaskStateTracker; +import org.apache.gobblin.source.workunit.WorkUnit; + + +public class TaskAttemptBuilder { + private final TaskStateTracker _taskStateTracker; + private final TaskExecutor _taskExecutor; + private String _containerId; + private StateStore<TaskState> _taskStateStore; + + public TaskAttemptBuilder(TaskStateTracker taskStateTracker, TaskExecutor taskExecutor) { + _taskStateTracker = taskStateTracker; + _taskExecutor = taskExecutor; + } + + public TaskAttemptBuilder setContainerId(String containerId) { + _containerId = containerId; + return this; + } + + public TaskAttemptBuilder setTaskStateStore(StateStore<TaskState> taskStateStore) { + _taskStateStore = taskStateStore; + return this; + } + + public GobblinMultiTaskAttempt build(Iterator<WorkUnit> workUnits, String jobId, JobState jobState, + SharedResourcesBroker<GobblinScopeTypes> jobBroker) { + GobblinMultiTaskAttempt attemptInstance = + new GobblinMultiTaskAttempt(workUnits, jobId, jobState, _taskStateTracker, _taskExecutor, + Optional.fromNullable(_containerId), Optional.fromNullable(_taskStateStore), jobBroker); + + return attemptInstance; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ba909f1f/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java index d1197b0..171115a 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java @@ -146,7 +146,7 @@ public class GobblinHelixTaskTest { GobblinHelixTaskFactory gobblinHelixTaskFactory = new GobblinHelixTaskFactory(Optional.<ContainerMetrics>absent(), this.taskExecutor, this.taskStateTracker, - this.localFs, this.appWorkDir, ConfigFactory.empty()); + this.localFs, this.appWorkDir, ConfigFactory.empty(), this.helixManager); this.gobblinHelixTask = (GobblinHelixTask) gobblinHelixTaskFactory.createNewTask(taskCallbackContext); }
