Repository: kafka Updated Branches: refs/heads/trunk db8d6f02c -> dc662776c
KAFKA-3211: Handle WorkerTask stop before start correctly Author: Jason Gustafson <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #874 from hachikuji/KAFKA-3211 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dc662776 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dc662776 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dc662776 Branch: refs/heads/trunk Commit: dc662776cde8e980a3f978041adaf961edf0fe7d Parents: db8d6f0 Author: Jason Gustafson <[email protected]> Authored: Thu Feb 4 18:00:45 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Feb 4 18:00:45 2016 -0800 ---------------------------------------------------------------------- .../kafka/connect/runtime/WorkerTask.java | 3 + .../kafka/connect/runtime/WorkerTaskTest.java | 92 ++++++++++++++++++++ 2 files changed, 95 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/dc662776/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index b4d427a..ecaeb7b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -102,6 +102,9 @@ abstract class WorkerTask implements Runnable { throw new IllegalStateException("The task cannot be started while still running"); try { + if (stopping.get()) + return; + execute(); } catch (Throwable t) { log.error("Unhandled exception in task {}", id, t); http://git-wip-us.apache.org/repos/asf/kafka/blob/dc662776/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java new file mode 100644 index 0000000..f5213a6 --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.easymock.EasyMock; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; + +import static org.easymock.EasyMock.partialMockBuilder; +import static org.easymock.EasyMock.replay; +import static org.easymock.EasyMock.verify; + +public class WorkerTaskTest { + + private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap(); + + @Test + public void standardStartup() { + WorkerTask workerTask = partialMockBuilder(WorkerTask.class) + .withConstructor(ConnectorTaskId.class) + .withArgs(new ConnectorTaskId("foo", 0)) + .addMockedMethod("initialize") + .addMockedMethod("execute") + .addMockedMethod("close") + .createStrictMock(); + + workerTask.initialize(EMPTY_TASK_PROPS); + EasyMock.expectLastCall(); + + workerTask.execute(); + EasyMock.expectLastCall(); + + workerTask.close(); + EasyMock.expectLastCall(); + + replay(workerTask); + + workerTask.initialize(EMPTY_TASK_PROPS); + workerTask.run(); + workerTask.stop(); + workerTask.awaitStop(1000L); + + verify(workerTask); + } + + @Test + public void stopBeforeStarting() { + WorkerTask workerTask = partialMockBuilder(WorkerTask.class) + .withConstructor(ConnectorTaskId.class) + .withArgs(new ConnectorTaskId("foo", 0)) + .addMockedMethod("initialize") + .addMockedMethod("execute") + .addMockedMethod("close") + .createStrictMock(); + + workerTask.initialize(EMPTY_TASK_PROPS); + EasyMock.expectLastCall(); + + workerTask.close(); + EasyMock.expectLastCall(); + + replay(workerTask); + + workerTask.initialize(EMPTY_TASK_PROPS); + workerTask.stop(); + workerTask.awaitStop(1000L); + + // now run should not do anything + workerTask.run(); + + verify(workerTask); + } + + +}
