This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 730ab2a [GOBBLIN-1431][GOBBLIN-1416] Fix Cluster SingleTask test
730ab2a is described below
commit 730ab2ada7500ca7729948260474be4692de3a2e
Author: Lei Sun <[email protected]>
AuthorDate: Fri Apr 23 09:50:49 2021 -0700
[GOBBLIN-1431][GOBBLIN-1416] Fix Cluster SingleTask test
Closes #3265 from autumnust/fixSingleTaskTest
---
.../test/java/org/apache/gobblin/cluster/DummySource.java | 11 +++++++++--
.../java/org/apache/gobblin/cluster/TestSingleTask.java | 14 +++++++++++---
2 files changed, 20 insertions(+), 5 deletions(-)
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
index f78628a..8bc27d4 100644
--- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
+++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/DummySource.java
@@ -20,15 +20,16 @@ package org.apache.gobblin.cluster;
import java.io.IOException;
import java.util.List;
+import com.google.common.collect.Lists;
+
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.runtime.JobShutdownException;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.extractor.extract.AbstractSource;
import org.apache.gobblin.source.workunit.WorkUnit;
-import com.google.common.collect.Lists;
-
/**
* A source implementation that does nothing.
@@ -98,5 +99,11 @@ public class DummySource extends AbstractSource<String,
Integer> {
throws IOException {
// Nothing to do
}
+
+ @Override
+ public void shutdown()
+ throws JobShutdownException {
+ // Nothing to do but overwrite unnecessary checking in the base
interface.
+ }
}
}
\ No newline at end of file
diff --git
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
index 06b0232..fa218fe 100644
---
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
+++
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
@@ -25,9 +25,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
-import org.apache.gobblin.testing.AssertWithBackoff;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.FileUtils;
import org.junit.Assert;
import org.testng.annotations.Test;
@@ -39,6 +36,11 @@ import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
import static
org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_KEY;
@@ -49,7 +51,9 @@ import static
org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_K
* 1. The workunit is being created in {@link InMemoryWuFailedSingleTask}.
* 2. When needed to reproduce certain errors, replace
org.apache.gobblin.cluster.DummySource.DummyExtractor or
* {@link DummySource} to plug in required logic.
+ * 3. Some of this tests simulate the Helix scenarios where run() and cancel()
coule be assigned to run in different threads.
*/
+@Slf4j
public class TestSingleTask {
private InMemorySingleTaskRunner createInMemoryTaskRunner()
@@ -160,6 +164,10 @@ public class TestSingleTask {
inMemorySingleTaskRunner.startServices();
inMemorySingleTaskRunner.initClusterSingleTask(false);
final SingleTask task = inMemorySingleTaskRunner.task;
+
+ // The task.cancel() method has the logic to block on taskAttempt object
to be initialized before calling
+ // taskAttempt.shutdownTasks(). Here there has to be at least 2 threads
running concurrently, the run() method
+ // is meant to create the taskAttempt object so that the waiting thread
(cancel thread) got unblocked after that.
ExecutorService executor = Executors.newFixedThreadPool(2);
Runnable cancelRunnable = () -> {