This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 24ebc02 [GOBBLIN-1416] Fix a race condition caused by Helix task
cancellation being invoked before Gobblin task creation
24ebc02 is described below
commit 24ebc02852f70ffaaef97139ff0ae61cf6dba2ce
Author: suvasude <[email protected]>
AuthorDate: Mon Mar 22 15:12:37 2021 -0700
[GOBBLIN-1416] Fix a race condition caused by Helix task cancellation being
invoked before Gobblin task creation
Closes #3250 from sv2000/taskCancelRaceCondition
---
.../gobblin/runtime/GobblinMultiTaskAttempt.java | 28 +++++++++++++++++---
.../runtime/GobblinMultiTaskAttemptTest.java | 30 ++++++++++++++++++++--
2 files changed, 52 insertions(+), 6 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 86fe146..9b39b95 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
@@ -46,6 +47,7 @@ import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import javax.annotation.Nullable;
+import lombok.Getter;
import lombok.Setter;
import org.apache.gobblin.annotation.Alpha;
@@ -115,6 +117,8 @@ public class GobblinMultiTaskAttempt {
@Setter
private Predicate<GobblinMultiTaskAttempt> interruptionPredicate = (gmta) ->
false;
private List<Task> tasks;
+ @Getter
+ private volatile AtomicBoolean stopped = new AtomicBoolean(false);
/**
* Additional commit steps that may be added by different launcher, and can
be environment specific.
@@ -157,6 +161,13 @@ public class GobblinMultiTaskAttempt {
Pair<List<Task>, Boolean> executionResult = runWorkUnits(countDownLatch);
this.tasks = executionResult.getFirst();
+ // The task attempt has already been stopped and the task list is empty.
This indicates that a cancel has been
+ // invoked prior to creation of underlying Gobblin tasks. In a normal
scenario, where a cancel is invoked after
+ // successful task creation, the task list is guaranteed to be non-empty
and we shouldn't enter the following block.
+ if (this.tasks.isEmpty() && this.stopped.get()) {
+ return;
+ }
+
// Indicating task submission failure, propagating exception as it should
be noticeable to job launcher.
// Submission failure could be task-creation failure, or state-tracker
failed to be scheduled so that the actual
// task isn't submitted into the executor.
@@ -250,7 +261,7 @@ public class GobblinMultiTaskAttempt {
* A method that shuts down all running tasks managed by this instance.
* TODO: Call this from the right place.
*/
- public void shutdownTasks()
+ public synchronized void shutdownTasks()
throws InterruptedException {
log.info("Shutting down tasks");
for (Task task : this.tasks) {
@@ -268,6 +279,7 @@ public class GobblinMultiTaskAttempt {
log.info("Task {} could not be cancelled.", task.getTaskId());
}
}
+ this.stopped.set(true);
}
private void persistTaskStateStore()
@@ -384,10 +396,14 @@ public class GobblinMultiTaskAttempt {
* @return a list of {@link Task}s from the {@link WorkUnit}s, as well as if
there's a failure in task creation
* which should be handled separately to avoid silently starving on certain
workunit.
*/
- private Pair<List<Task>, Boolean> runWorkUnits(CountUpAndDownLatch
countDownLatch) {
-
+ private synchronized Pair<List<Task>, Boolean>
runWorkUnits(CountUpAndDownLatch countDownLatch) {
List<Task> tasks = Lists.newArrayList();
-
+ //Has the task-attempt already been cancelled? This can happen for
instance when a cancellation has been invoked on
+ // the GobblinMultiTaskAttempt instance (e.g. in the case of Helix task
cancellation) before the Gobblin tasks
+ // have been submitted to the underlying task executor.
+ if (this.stopped.get()) {
+ return new Pair<>(tasks, false);
+ }
// A flag indicating if there are any tasks not submitted successfully.
// Caller of this method should handle tasks with submission failures
accordingly.
boolean areAllTasksSubmitted = true;
@@ -600,4 +616,8 @@ public class GobblinMultiTaskAttempt {
multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
return multiTaskAttempt;
}
+
+ public int getNumTasksCreated() {
+ return this.tasks.size();
+ }
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
index 79ed7f4..390c20f 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime;
import java.util.List;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.mockito.Mockito;
@@ -65,7 +66,7 @@ public class GobblinMultiTaskAttemptTest {
public void testRunWithTaskCreationFailure()
throws Exception {
// Preparing Instance of TaskAttempt with designed failure on task creation
- WorkUnit tmpWU = new WorkUnit();
+ WorkUnit tmpWU = WorkUnit.createEmpty();
// Put necessary attributes in workunit
tmpWU.setProp(ConfigurationKeys.TASK_ID_KEY, "task_test");
List<WorkUnit> workUnit = ImmutableList.of(tmpWU);
@@ -96,7 +97,7 @@ public class GobblinMultiTaskAttemptTest {
throws Exception {
TaskStateTracker stateTracker = new DummyTestStateTracker(new
Properties(), log);
// Preparing Instance of TaskAttempt with designed failure on task creation
- WorkUnit tmpWU = new WorkUnit();
+ WorkUnit tmpWU = WorkUnit.createEmpty();
// Put necessary attributes in workunit
tmpWU.setProp(ConfigurationKeys.TASK_ID_KEY, "task_test");
List<WorkUnit> workUnit = ImmutableList.of(tmpWU);
@@ -121,6 +122,31 @@ public class GobblinMultiTaskAttemptTest {
Assert.fail();
}
+ @Test
+ public void testRunAfterCancellation() throws Exception {
+ WorkUnit tmpWU = WorkUnit.createEmpty();
+ // Put necessary attributes in workunit
+ tmpWU.setProp(ConfigurationKeys.TASK_ID_KEY, "task_test");
+ List<WorkUnit> workUnit = ImmutableList.of(tmpWU);
+ JobState jobState = new JobState();
+ // Limit the number of times of retry in task-creation.
+ jobState.setProp(RETRY_TIME_OUT_MS, 1000);
+ jobState.setProp(ConfigurationKeys.SOURCE_CLASS_KEY,
DatasetStateStoreTest.DummySource.class.getName());
+
+ TaskStateTracker stateTrackerMock = Mockito.mock(TaskStateTracker.class);
+
+ taskAttempt =
+ new GobblinMultiTaskAttempt(workUnit.iterator(), "testJob1", jobState,
stateTrackerMock, taskExecutorMock,
+ Optional.absent(), Optional.absent(), jobBroker);
+
+ //Call shutdown() before creation of underlying Gobblin tasks.
+ taskAttempt.shutdownTasks();
+
taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+ Assert.assertEquals(taskAttempt.getNumTasksCreated(), 0);
+ Assert.assertTrue(taskAttempt.getStopped().get());
+ }
+
+
public static class DummyTestStateTracker extends AbstractTaskStateTracker {
public DummyTestStateTracker(Properties properties, Logger logger) {
super(properties, logger);