This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2ec0cde [GOBBLIN-963] Remove duplicated copies of
TaskContext/TaskState when constructing TaskIFaceWrapper
2ec0cde is described below
commit 2ec0cde77870a24e2efba5acc9529fbd30ec0710
Author: Chen Guo <[email protected]>
AuthorDate: Tue Nov 19 16:46:23 2019 -0800
[GOBBLIN-963] Remove duplicated copies of TaskContext/TaskState when
constructing TaskIFaceWrapper
Closes #2818 from enjoyear/GOBBLIN-963
---
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 45 +++++++++++++---------
1 file changed, 26 insertions(+), 19 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index fc98beb..ebd3be9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -38,6 +38,9 @@ import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+import lombok.Setter;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.gobblin_scopes.TaskScopeInstance;
@@ -60,9 +63,6 @@ import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
-import javax.annotation.Nullable;
-import lombok.Setter;
-
/**
* Attempt of running multiple {@link Task}s generated from a list of{@link
WorkUnit}s.
@@ -79,6 +79,7 @@ public class GobblinMultiTaskAttempt {
* Commit {@link GobblinMultiTaskAttempt} immediately after running is
done.
*/
IMMEDIATE,
+
/**
* Not committing {@link GobblinMultiTaskAttempt} but leaving it to user
customized launcher.
*/
@@ -129,7 +130,8 @@ public class GobblinMultiTaskAttempt {
* @throws IOException
* @throws InterruptedException
*/
- public void run() throws IOException, InterruptedException {
+ public void run()
+ throws IOException, InterruptedException {
if (!this.workUnits.hasNext()) {
log.warn("No work units to run in container " +
containerIdOptional.or(""));
return;
@@ -158,7 +160,8 @@ public class GobblinMultiTaskAttempt {
log.info("All assigned tasks of job {} have completed in container {}",
jobId, containerIdOptional.or(""));
}
- private void interruptTaskExecution(CountDownLatch countDownLatch) throws
InterruptedException {
+ private void interruptTaskExecution(CountDownLatch countDownLatch)
+ throws InterruptedException {
log.info("Job interrupted. Attempting a graceful shutdown of the job.");
this.tasks.forEach(Task::shutdown);
if (!countDownLatch.await(5, TimeUnit.SECONDS)) {
@@ -176,7 +179,8 @@ public class GobblinMultiTaskAttempt {
* 3. persist task statestore.
* @throws IOException
*/
- public void commit() throws IOException {
+ public void commit()
+ throws IOException {
if (this.tasks == null || this.tasks.isEmpty()) {
log.warn("No tasks to be committed in container " +
containerIdOptional.or(""));
return;
@@ -188,7 +192,8 @@ public class GobblinMultiTaskAttempt {
return new Callable<Void>() {
@Nullable
@Override
- public Void call() throws Exception {
+ public Void call()
+ throws Exception {
task.commit();
return null;
}
@@ -199,8 +204,8 @@ public class GobblinMultiTaskAttempt {
try {
List<Either<Void, ExecutionException>> executionResults =
new IteratorExecutor<>(callableIterator,
this.getTaskCommitThreadPoolSize(),
- ExecutorsUtils.newDaemonThreadFactory(Optional.of(log),
-
Optional.of("Task-committing-pool-%d"))).executeAndGetResults();
+ ExecutorsUtils.newDaemonThreadFactory(Optional.of(log),
Optional.of("Task-committing-pool-%d")))
+ .executeAndGetResults();
IteratorExecutor.logFailures(executionResults, log, 10);
} catch (InterruptedException ie) {
log.error("Committing of tasks interrupted. Aborting.");
@@ -220,7 +225,8 @@ public class GobblinMultiTaskAttempt {
* A method that shuts down all running tasks managed by this instance.
* TODO: Call this from the right place.
*/
- public void shutdownTasks() throws InterruptedException {
+ public void shutdownTasks()
+ throws InterruptedException {
log.info("Shutting down tasks");
for (Task task : this.tasks) {
task.shutdown();
@@ -239,7 +245,8 @@ public class GobblinMultiTaskAttempt {
}
}
- private void persistTaskStateStore() throws IOException {
+ private void persistTaskStateStore()
+ throws IOException {
if (!this.taskStateStoreOptional.isPresent()) {
log.info("Task state store does not exist.");
return;
@@ -277,8 +284,8 @@ public class GobblinMultiTaskAttempt {
// to filter out successful tasks on subsequent attempts.
if (task.getTaskState().getWorkingState() ==
WorkUnitState.WorkingState.SUCCESSFUL
|| task.getTaskState().getWorkingState() ==
WorkUnitState.WorkingState.COMMITTED) {
- taskStateStore.put(task.getJobId(), task.getTaskId() +
TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX,
- task.getTaskState());
+ taskStateStore
+ .put(task.getJobId(), task.getTaskId() +
TASK_STATE_STORE_SUCCESS_MARKER_SUFFIX, task.getTaskState());
}
}
@@ -421,8 +428,8 @@ public class GobblinMultiTaskAttempt {
this.log.info("Heap Memory");
this.log.info(String.format(format, "init", "used", "Committed", "max"));
- this.log.info(String.format(format, heapMemory.getInit(),
heapMemory.getUsed(), heapMemory.getCommitted(),
- heapMemory.getMax()));
+ this.log.info(String
+ .format(format, heapMemory.getInit(), heapMemory.getUsed(),
heapMemory.getCommitted(), heapMemory.getMax()));
this.log.info("Non-heap Memory");
this.log.info(String.format(format, "init", "used", "Committed", "max"));
@@ -432,12 +439,12 @@ public class GobblinMultiTaskAttempt {
private Task createTaskRunnable(WorkUnitState workUnitState, CountDownLatch
countDownLatch) {
Optional<TaskFactory> taskFactoryOpt =
TaskUtils.getTaskFactory(workUnitState);
+ final TaskContext taskContext = new TaskContext(workUnitState);
if (taskFactoryOpt.isPresent()) {
- return new TaskIFaceWrapper(taskFactoryOpt.get().createTask(new
TaskContext(workUnitState)),
- new TaskContext(workUnitState), countDownLatch,
this.taskStateTracker);
+ return new
TaskIFaceWrapper(taskFactoryOpt.get().createTask(taskContext), taskContext,
countDownLatch,
+ this.taskStateTracker);
} else {
- return new Task(new TaskContext(workUnitState), this.taskStateTracker,
this.taskExecutor,
- Optional.of(countDownLatch));
+ return new Task(taskContext, this.taskStateTracker, this.taskExecutor,
Optional.of(countDownLatch));
}
}