Repository: kafka Updated Branches: refs/heads/trunk 77683c3cb -> f8598f96d
HOTFIX: fix broken WorkerSourceTask test Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #859 from hachikuji/hotfix-worker-source-test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8598f96 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8598f96 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8598f96 Branch: refs/heads/trunk Commit: f8598f96df3500cdea15a913d78de201469244b0 Parents: 77683c3 Author: Jason Gustafson <[email protected]> Authored: Thu Feb 4 11:11:41 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Feb 4 11:11:41 2016 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java | 5 +++++ 1 file changed, 5 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/f8598f96/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 3888534..1f557e4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -260,14 +260,18 @@ public class WorkerSourceTaskTest extends ThreadedTest { @Test public void testSlowTaskStart() throws Exception { + final CountDownLatch startupLatch = new CountDownLatch(1); + createWorkerTask(); sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class)); EasyMock.expectLastCall(); sourceTask.start(EMPTY_TASK_PROPS); + EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() { @Override public Object answer() throws Throwable { + startupLatch.countDown(); Utils.sleep(100); return null; } @@ -282,6 +286,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { // Stopping immediately while the other thread has work to do should result in no polling, no offset commits, // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it // cannot be invoked immediately in the thread trying to stop the task. + startupLatch.await(1000, TimeUnit.MILLISECONDS); workerTask.stop(); assertEquals(true, workerTask.awaitStop(1000));
