This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 24ebc02  [GOBBLIN-1416] Fix a race condition caused by Helix task 
cancellation being invoked before Gobblin task creation
24ebc02 is described below

commit 24ebc02852f70ffaaef97139ff0ae61cf6dba2ce
Author: suvasude <[email protected]>
AuthorDate: Mon Mar 22 15:12:37 2021 -0700

    [GOBBLIN-1416] Fix a race condition caused by Helix task cancellation being 
invoked before Gobblin task creation
    
    Closes #3250 from sv2000/taskCancelRaceCondition
---
 .../gobblin/runtime/GobblinMultiTaskAttempt.java   | 28 +++++++++++++++++---
 .../runtime/GobblinMultiTaskAttemptTest.java       | 30 ++++++++++++++++++++--
 2 files changed, 52 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
index 86fe146..9b39b95 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/GobblinMultiTaskAttempt.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
@@ -46,6 +47,7 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
 import javax.annotation.Nullable;
+import lombok.Getter;
 import lombok.Setter;
 
 import org.apache.gobblin.annotation.Alpha;
@@ -115,6 +117,8 @@ public class GobblinMultiTaskAttempt {
   @Setter
   private Predicate<GobblinMultiTaskAttempt> interruptionPredicate = (gmta) -> 
false;
   private List<Task> tasks;
+  @Getter
+  private volatile AtomicBoolean stopped = new AtomicBoolean(false);
 
   /**
    * Additional commit steps that may be added by different launcher, and can 
be environment specific.
@@ -157,6 +161,13 @@ public class GobblinMultiTaskAttempt {
     Pair<List<Task>, Boolean> executionResult = runWorkUnits(countDownLatch);
     this.tasks = executionResult.getFirst();
 
+    // The task attempt has already been stopped and the task list is empty. 
This indicates that a cancel has been
+    // invoked prior to creation of underlying Gobblin tasks. In a normal 
scenario, where a cancel is invoked after
+    // successful task creation, the task list is guaranteed to be non-empty 
and we shouldn't enter the following block.
+    if (this.tasks.isEmpty() && this.stopped.get()) {
+      return;
+    }
+
     // Indicating task submission failure, propagating exception as it should 
be noticeable to job launcher.
     // Submission failure could be task-creation failure, or state-tracker 
failed to be scheduled so that the actual
     // task isn't submitted into the executor.
@@ -250,7 +261,7 @@ public class GobblinMultiTaskAttempt {
    * A method that shuts down all running tasks managed by this instance.
    * TODO: Call this from the right place.
    */
-  public void shutdownTasks()
+  public synchronized void shutdownTasks()
       throws InterruptedException {
     log.info("Shutting down tasks");
     for (Task task : this.tasks) {
@@ -268,6 +279,7 @@ public class GobblinMultiTaskAttempt {
         log.info("Task {} could not be cancelled.", task.getTaskId());
       }
     }
+    this.stopped.set(true);
   }
 
   private void persistTaskStateStore()
@@ -384,10 +396,14 @@ public class GobblinMultiTaskAttempt {
    * @return a list of {@link Task}s from the {@link WorkUnit}s, as well as if 
there's a failure in task creation
    * which should be handled separately to avoid silently starving on certain 
workunit.
    */
-  private Pair<List<Task>, Boolean> runWorkUnits(CountUpAndDownLatch 
countDownLatch) {
-
+  private synchronized Pair<List<Task>, Boolean> 
runWorkUnits(CountUpAndDownLatch countDownLatch) {
     List<Task> tasks = Lists.newArrayList();
-
+    //Has the task-attempt already been cancelled? This can happen for 
instance when a cancellation has been invoked on
+    // the GobblinMultiTaskAttempt instance (e.g. in the case of Helix task 
cancellation) before the Gobblin tasks
+    // have been submitted to the underlying task executor.
+    if (this.stopped.get()) {
+      return new Pair<>(tasks, false);
+    }
     // A flag indicating if there are any tasks not submitted successfully.
     // Caller of this method should handle tasks with submission failures 
accordingly.
     boolean areAllTasksSubmitted = true;
@@ -600,4 +616,8 @@ public class GobblinMultiTaskAttempt {
     
multiTaskAttempt.runAndOptionallyCommitTaskAttempt(multiTaskAttemptCommitPolicy);
     return multiTaskAttempt;
   }
+
+  public int getNumTasksCreated() {
+    return this.tasks.size();
+  }
 }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
index 79ed7f4..390c20f 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/GobblinMultiTaskAttemptTest.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.runtime;
 
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
 import org.mockito.Mockito;
@@ -65,7 +66,7 @@ public class GobblinMultiTaskAttemptTest {
   public void testRunWithTaskCreationFailure()
       throws Exception {
     // Preparing Instance of TaskAttempt with designed failure on task creation
-    WorkUnit tmpWU = new WorkUnit();
+    WorkUnit tmpWU = WorkUnit.createEmpty();
     // Put necessary attributes in workunit
     tmpWU.setProp(ConfigurationKeys.TASK_ID_KEY, "task_test");
     List<WorkUnit> workUnit = ImmutableList.of(tmpWU);
@@ -96,7 +97,7 @@ public class GobblinMultiTaskAttemptTest {
       throws Exception {
     TaskStateTracker stateTracker = new DummyTestStateTracker(new 
Properties(), log);
     // Preparing Instance of TaskAttempt with designed failure on task creation
-    WorkUnit tmpWU = new WorkUnit();
+    WorkUnit tmpWU = WorkUnit.createEmpty();
     // Put necessary attributes in workunit
     tmpWU.setProp(ConfigurationKeys.TASK_ID_KEY, "task_test");
     List<WorkUnit> workUnit = ImmutableList.of(tmpWU);
@@ -121,6 +122,31 @@ public class GobblinMultiTaskAttemptTest {
     Assert.fail();
   }
 
+  @Test
+  public void testRunAfterCancellation() throws Exception {
+    WorkUnit tmpWU = WorkUnit.createEmpty();
+    // Put necessary attributes in workunit
+    tmpWU.setProp(ConfigurationKeys.TASK_ID_KEY, "task_test");
+    List<WorkUnit> workUnit = ImmutableList.of(tmpWU);
+    JobState jobState = new JobState();
+    // Limit the number of times of retry in task-creation.
+    jobState.setProp(RETRY_TIME_OUT_MS, 1000);
+    jobState.setProp(ConfigurationKeys.SOURCE_CLASS_KEY, 
DatasetStateStoreTest.DummySource.class.getName());
+
+    TaskStateTracker stateTrackerMock = Mockito.mock(TaskStateTracker.class);
+
+    taskAttempt =
+        new GobblinMultiTaskAttempt(workUnit.iterator(), "testJob1", jobState, 
stateTrackerMock, taskExecutorMock,
+            Optional.absent(), Optional.absent(), jobBroker);
+
+    //Call shutdown() before creation of underlying Gobblin tasks.
+    taskAttempt.shutdownTasks();
+    
taskAttempt.runAndOptionallyCommitTaskAttempt(GobblinMultiTaskAttempt.CommitPolicy.IMMEDIATE);
+    Assert.assertEquals(taskAttempt.getNumTasksCreated(), 0);
+    Assert.assertTrue(taskAttempt.getStopped().get());
+  }
+
+
   public static class DummyTestStateTracker extends AbstractTaskStateTracker {
     public DummyTestStateTracker(Properties properties, Logger logger) {
       super(properties, logger);

Reply via email to