This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new 905d0f2 KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475) 905d0f2 is described below commit 905d0f244021836196520b22ef6990691a19fd0b Author: Doroszlai, Attila <6454655+adorosz...@users.noreply.github.com> AuthorDate: Wed Apr 3 22:00:05 2019 +0200 KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475) Changed the WorkerTest to use a mock Executor. Author: Attila Doroszlai <adorosz...@apache.org> Reviewer: Randall Hauch <rha...@gmail.com> --- .../java/org/apache/kafka/connect/runtime/Worker.java | 15 +++++++++++++-- .../org/apache/kafka/connect/runtime/WorkerTest.java | 17 ++++++++--------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 673bd4e..e867983 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -98,7 +98,6 @@ public class Worker { private SourceTaskOffsetCommitter sourceTaskOffsetCommitter; private WorkerConfigTransformer workerConfigTransformer; - @SuppressWarnings("deprecation") public Worker( String workerId, Time time, @@ -106,8 +105,20 @@ public class Worker { WorkerConfig config, OffsetBackingStore offsetBackingStore ) { + this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool()); + } + + @SuppressWarnings("deprecation") + Worker( + String workerId, + Time time, + Plugins plugins, + WorkerConfig config, + OffsetBackingStore offsetBackingStore, + ExecutorService executorService + ) { this.metrics = new ConnectMetrics(workerId, config, time); - this.executor = Executors.newCachedThreadPool(); + this.executor = executorService; this.workerId = workerId; this.time = time; this.plugins = plugins; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 8f15c87..eef10f0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -71,6 +71,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR; import static org.easymock.EasyMock.anyObject; @@ -118,6 +119,7 @@ public class WorkerTest extends ThreadedTest { @Mock private Converter taskKeyConverter; @Mock private Converter taskValueConverter; @Mock private HeaderConverter taskHeaderConverter; + @Mock private ExecutorService executorService; @Before public void setup() { @@ -543,8 +545,7 @@ public class WorkerTest extends ThreadedTest { expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskValueConverter); expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter); - workerTask.run(); - EasyMock.expectLastCall(); + EasyMock.expect(executorService.submit(workerTask)).andReturn(null); EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) @@ -568,7 +569,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService); worker.start(); assertStatistics(worker, 0, 0); assertStartupStatistics(worker, 0, 0, 0, 0); @@ -685,8 +686,7 @@ public class WorkerTest extends ThreadedTest { expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); - workerTask.run(); - EasyMock.expectLastCall(); + EasyMock.expect(executorService.submit(workerTask)).andReturn(null); EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) @@ -712,7 +712,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService); worker.start(); assertStatistics(worker, 0, 0); worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED); @@ -778,8 +778,7 @@ public class WorkerTest extends ThreadedTest { expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null); expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter); - workerTask.run(); - EasyMock.expectLastCall(); + EasyMock.expect(executorService.submit(workerTask)).andReturn(null); EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader); EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName())) @@ -803,7 +802,7 @@ public class WorkerTest extends ThreadedTest { PowerMock.replayAll(); - worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore); + worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService); worker.start(); assertStatistics(worker, 0, 0); assertEquals(Collections.emptySet(), worker.taskIds());