This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 730ab2a  [GOBBLIN-1431][GOBBLIN-1416] Fix Cluster SingleTask test
730ab2a is described below

commit 730ab2ada7500ca7729948260474be4692de3a2e
Author: Lei Sun <[email protected]>
AuthorDate: Fri Apr 23 09:50:49 2021 -0700

    [GOBBLIN-1431][GOBBLIN-1416] Fix Cluster SingleTask test
    
    Closes #3265 from autumnust/fixSingleTaskTest
---
 .../test/java/org/apache/gobblin/cluster/DummySource.java  | 11 +++++++++--
 .../java/org/apache/gobblin/cluster/TestSingleTask.java    | 14 +++++++++++---
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
index f78628a..8bc27d4 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
@@ -20,15 +20,16 @@ package org.apache.gobblin.cluster;
 import java.io.IOException;
 import java.util.List;
 
+import com.google.common.collect.Lists;
+
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.JobShutdownException;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.extractor.extract.AbstractSource;
 import org.apache.gobblin.source.workunit.WorkUnit;
 
-import com.google.common.collect.Lists;
-
 
 /**
  * A source implementation that does nothing.
@@ -98,5 +99,11 @@ public class DummySource extends AbstractSource<String, 
Integer> {
         throws IOException {
       // Nothing to do
     }
+
+    @Override
+    public void shutdown()
+        throws JobShutdownException {
+      // Nothing to do but overwrite unnecessary checking in the base 
interface.
+    }
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
index 06b0232..fa218fe 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
@@ -25,9 +25,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 
-import org.apache.gobblin.testing.AssertWithBackoff;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.FileUtils;
 import org.junit.Assert;
 import org.testng.annotations.Test;
 
@@ -39,6 +36,11 @@ import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
 import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
 
 import static 
org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_KEY;
 
@@ -49,7 +51,9 @@ import static 
org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_K
  * 1. The workunit is being created in {@link InMemoryWuFailedSingleTask}.
  * 2. When needed to reproduce certain errors, replace 
org.apache.gobblin.cluster.DummySource.DummyExtractor or
  * {@link DummySource} to plug in required logic.
+ * 3. Some of this tests simulate the Helix scenarios where run() and cancel() 
coule be assigned to run in different threads.
  */
+@Slf4j
 public class TestSingleTask {
 
   private InMemorySingleTaskRunner createInMemoryTaskRunner()
@@ -160,6 +164,10 @@ public class TestSingleTask {
     inMemorySingleTaskRunner.startServices();
     inMemorySingleTaskRunner.initClusterSingleTask(false);
     final SingleTask task = inMemorySingleTaskRunner.task;
+
+    // The task.cancel() method has the logic to block on taskAttempt object 
to be initialized before calling
+    // taskAttempt.shutdownTasks(). Here there has to be at least 2 threads 
running concurrently, the run() method
+    // is meant to create the taskAttempt object so that the waiting thread 
(cancel thread) got unblocked after that.
     ExecutorService executor = Executors.newFixedThreadPool(2);
 
     Runnable cancelRunnable = () -> {

Reply via email to