This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 905d0f2  KAFKA-8126: Flaky Test 
org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475)
905d0f2 is described below

commit 905d0f244021836196520b22ef6990691a19fd0b
Author: Doroszlai, Attila <6454655+adorosz...@users.noreply.github.com>
AuthorDate: Wed Apr 3 22:00:05 2019 +0200

    KAFKA-8126: Flaky Test 
org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475)
    
    Changed the WorkerTest to use a mock Executor.
    
    Author: Attila Doroszlai <adorosz...@apache.org>
    Reviewer: Randall Hauch <rha...@gmail.com>
---
 .../java/org/apache/kafka/connect/runtime/Worker.java   | 15 +++++++++++++--
 .../org/apache/kafka/connect/runtime/WorkerTest.java    | 17 ++++++++---------
 2 files changed, 21 insertions(+), 11 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 673bd4e..e867983 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -98,7 +98,6 @@ public class Worker {
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
     private WorkerConfigTransformer workerConfigTransformer;
 
-    @SuppressWarnings("deprecation")
     public Worker(
             String workerId,
             Time time,
@@ -106,8 +105,20 @@ public class Worker {
             WorkerConfig config,
             OffsetBackingStore offsetBackingStore
     ) {
+        this(workerId, time, plugins, config, offsetBackingStore, 
Executors.newCachedThreadPool());
+    }
+
+    @SuppressWarnings("deprecation")
+    Worker(
+            String workerId,
+            Time time,
+            Plugins plugins,
+            WorkerConfig config,
+            OffsetBackingStore offsetBackingStore,
+            ExecutorService executorService
+    ) {
         this.metrics = new ConnectMetrics(workerId, config, time);
-        this.executor = Executors.newCachedThreadPool();
+        this.executor = executorService;
         this.workerId = workerId;
         this.time = time;
         this.plugins = plugins;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 8f15c87..eef10f0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -71,6 +71,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR;
 import static org.easymock.EasyMock.anyObject;
@@ -118,6 +119,7 @@ public class WorkerTest extends ThreadedTest {
     @Mock private Converter taskKeyConverter;
     @Mock private Converter taskValueConverter;
     @Mock private HeaderConverter taskHeaderConverter;
+    @Mock private ExecutorService executorService;
 
     @Before
     public void setup() {
@@ -543,8 +545,7 @@ public class WorkerTest extends ThreadedTest {
         expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskValueConverter);
         expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, 
taskHeaderConverter);
 
-        workerTask.run();
-        EasyMock.expectLastCall();
+        EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
 
         
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
         
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@@ -568,7 +569,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService);
         worker.start();
         assertStatistics(worker, 0, 0);
         assertStartupStatistics(worker, 0, 0, 0, 0);
@@ -685,8 +686,7 @@ public class WorkerTest extends ThreadedTest {
         expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
         expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, 
taskHeaderConverter);
 
-        workerTask.run();
-        EasyMock.expectLastCall();
+        EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
 
         
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
         
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@@ -712,7 +712,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService);
         worker.start();
         assertStatistics(worker, 0, 0);
         worker.startTask(TASK_ID, ClusterConfigState.EMPTY, 
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
@@ -778,8 +778,7 @@ public class WorkerTest extends ThreadedTest {
         expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
         expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, 
taskHeaderConverter);
 
-        workerTask.run();
-        EasyMock.expectLastCall();
+        EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
 
         
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
         
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@@ -803,7 +802,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), plugins, config, 
offsetBackingStore, executorService);
         worker.start();
         assertStatistics(worker, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());

Reply via email to