This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 3c6161a Revert "KAFKA-10792: Prevent source task shutdown from
blocking herder thread (#9669)"
3c6161a is described below
commit 3c6161ac23f9093c46f347ed68beaa20567e2160
Author: Randall Hauch <[email protected]>
AuthorDate: Fri Dec 4 12:07:46 2020 -0600
Revert "KAFKA-10792: Prevent source task shutdown from blocking herder
thread (#9669)"
This reverts commit d8b60939b6e14b8cd47e92520b9299ce5dfde5e5.
---
.../kafka/connect/runtime/WorkerSourceTask.java | 44 +-
.../connect/integration/BlockingConnectorTest.java | 566 ++++-----------------
.../connect/runtime/ErrorHandlingTaskTest.java | 6 +
.../ErrorHandlingTaskWithTopicCreationTest.java | 6 +
4 files changed, 150 insertions(+), 472 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 57f6f98..1febd7f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -102,7 +102,9 @@ class WorkerSourceTask extends WorkerTask {
private CountDownLatch stopRequestedLatch;
private Map<String, String> taskConfig;
- private boolean started = false;
+ private boolean finishedStart = false;
+ private boolean startedShutdownBeforeStartCompleted = false;
+ private boolean stopped = false;
public WorkerSourceTask(ConnectorTaskId id,
SourceTask task,
@@ -164,12 +166,8 @@ class WorkerSourceTask extends WorkerTask {
@Override
protected void close() {
- if (started) {
- try {
- task.stop();
- } catch (Throwable t) {
- log.warn("Could not stop task", t);
- }
+ if (!shouldPause()) {
+ tryStop();
}
if (producer != null) {
try {
@@ -208,21 +206,39 @@ class WorkerSourceTask extends WorkerTask {
public void stop() {
super.stop();
stopRequestedLatch.countDown();
+ synchronized (this) {
+ if (finishedStart)
+ tryStop();
+ else
+ startedShutdownBeforeStartCompleted = true;
+ }
+ }
+
+ private synchronized void tryStop() {
+ if (!stopped) {
+ try {
+ task.stop();
+ stopped = true;
+ } catch (Throwable t) {
+ log.warn("Could not stop task", t);
+ }
+ }
}
@Override
public void execute() {
try {
- // If we try to start the task at all by invoking initialize, then
count this as
- // "started" and expect a subsequent call to the task's stop()
method
- // to properly clean up any resources allocated by its
initialize() or
- // start() methods. If the task throws an exception during stop(),
- // the worst thing that happens is another exception gets logged
for an already-
- // failed task
- started = true;
task.initialize(new WorkerSourceTaskContext(offsetReader, this,
configState));
task.start(taskConfig);
log.info("{} Source task finished initialization and start", this);
+ synchronized (this) {
+ if (startedShutdownBeforeStartCompleted) {
+ tryStop();
+ return;
+ }
+ finishedStart = true;
+ }
+
while (!isStopping()) {
if (shouldPause()) {
onPause();
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index abc9a93..9a8e1fa 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -16,26 +16,17 @@
*/
package org.apache.kafka.connect.integration;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
-import org.apache.kafka.connect.sink.SinkConnector;
-import org.apache.kafka.connect.sink.SinkRecord;
-import org.apache.kafka.connect.sink.SinkTask;
-import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
-import org.apache.kafka.connect.source.SourceTaskContext;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
@@ -45,21 +36,16 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static
org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
-import static
org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertThrows;
@@ -81,33 +67,6 @@ public class BlockingConnectorTest {
private static final long RECORD_TRANSFER_DURATION_MS =
TimeUnit.SECONDS.toMillis(30);
private static final long REST_REQUEST_TIMEOUT =
Worker.CONNECTOR_GRACEFUL_SHUTDOWN_TIMEOUT_MS * 2;
- private static final String CONNECTOR_INITIALIZE = "Connector::initialize";
- private static final String CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS =
"Connector::initializeWithTaskConfigs";
- private static final String CONNECTOR_START = "Connector::start";
- private static final String CONNECTOR_RECONFIGURE =
"Connector::reconfigure";
- private static final String CONNECTOR_TASK_CLASS = "Connector::taskClass";
- private static final String CONNECTOR_TASK_CONFIGS =
"Connector::taskConfigs";
- private static final String CONNECTOR_STOP = "Connector::stop";
- private static final String CONNECTOR_VALIDATE = "Connector::validate";
- private static final String CONNECTOR_CONFIG = "Connector::config";
- private static final String CONNECTOR_VERSION = "Connector::version";
- private static final String TASK_START = "Task::start";
- private static final String TASK_STOP = "Task::stop";
- private static final String TASK_VERSION = "Task::version";
- private static final String SINK_TASK_INITIALIZE = "SinkTask::initialize";
- private static final String SINK_TASK_PUT = "SinkTask::put";
- private static final String SINK_TASK_FLUSH = "SinkTask::flush";
- private static final String SINK_TASK_PRE_COMMIT = "SinkTask::preCommit";
- private static final String SINK_TASK_OPEN = "SinkTask::open";
- private static final String SINK_TASK_ON_PARTITIONS_ASSIGNED =
"SinkTask::onPartitionsAssigned";
- private static final String SINK_TASK_CLOSE = "SinkTask::close";
- private static final String SINK_TASK_ON_PARTITIONS_REVOKED =
"SinkTask::onPartitionsRevoked";
- private static final String SOURCE_TASK_INITIALIZE =
"SourceTask::initialize";
- private static final String SOURCE_TASK_POLL = "SourceTask::poll";
- private static final String SOURCE_TASK_COMMIT = "SourceTask::commit";
- private static final String SOURCE_TASK_COMMIT_RECORD =
"SourceTask::commitRecord";
- private static final String SOURCE_TASK_COMMIT_RECORD_WITH_METADATA =
"SourceTask::commitRecordWithMetadata";
-
private EmbeddedConnectCluster connect;
private ConnectorHandle normalConnectorHandle;
@@ -142,18 +101,18 @@ public class BlockingConnectorTest {
// stop all Connect, Kafka and Zk threads.
connect.stop();
ConnectorsResource.resetRequestTimeout();
- Block.resetBlockLatch();
+ BlockingConnector.resetBlockLatch();
}
@Test
public void testBlockInConnectorValidate() throws Exception {
log.info("Starting test testBlockInConnectorValidate");
- assertThrows(ConnectRestException.class, () ->
createConnectorWithBlock(ValidateBlockingConnector.class, CONNECTOR_VALIDATE));
+ assertThrows(ConnectRestException.class, () ->
createConnectorWithBlock(ValidateBlockingConnector.class));
// Will NOT assert that connector has failed, since the request should
fail before it's even created
// Connector should already be blocked so this should return
immediately, but check just to
// make sure that it actually did block
- Block.waitForBlock();
+ BlockingConnector.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -162,12 +121,12 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorConfig() throws Exception {
log.info("Starting test testBlockInConnectorConfig");
- assertThrows(ConnectRestException.class, () ->
createConnectorWithBlock(ConfigBlockingConnector.class, CONNECTOR_CONFIG));
+ assertThrows(ConnectRestException.class, () ->
createConnectorWithBlock(ConfigBlockingConnector.class));
// Will NOT assert that connector has failed, since the request should
fail before it's even created
// Connector should already be blocked so this should return
immediately, but check just to
// make sure that it actually did block
- Block.waitForBlock();
+ BlockingConnector.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -176,8 +135,8 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorInitialize() throws Exception {
log.info("Starting test testBlockInConnectorInitialize");
- createConnectorWithBlock(InitializeBlockingConnector.class,
CONNECTOR_INITIALIZE);
- Block.waitForBlock();
+ createConnectorWithBlock(InitializeBlockingConnector.class);
+ BlockingConnector.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -186,8 +145,8 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorStart() throws Exception {
log.info("Starting test testBlockInConnectorStart");
- createConnectorWithBlock(BlockingConnector.class, CONNECTOR_START);
- Block.waitForBlock();
+ createConnectorWithBlock(BlockingConnector.START);
+ BlockingConnector.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -196,54 +155,10 @@ public class BlockingConnectorTest {
@Test
public void testBlockInConnectorStop() throws Exception {
log.info("Starting test testBlockInConnectorStop");
- createConnectorWithBlock(BlockingConnector.class, CONNECTOR_STOP);
- waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
- connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
- Block.waitForBlock();
-
- createNormalConnector();
- verifyNormalConnector();
- }
-
- @Test
- public void testBlockInSourceTaskStart() throws Exception {
- log.info("Starting test testBlockInSourceTaskStart");
- createConnectorWithBlock(BlockingSourceConnector.class, TASK_START);
- Block.waitForBlock();
-
- createNormalConnector();
- verifyNormalConnector();
- }
-
- @Test
- public void testBlockInSourceTaskStop() throws Exception {
- log.info("Starting test testBlockInSourceTaskStop");
- createConnectorWithBlock(BlockingSourceConnector.class, TASK_STOP);
+ createConnectorWithBlock(BlockingConnector.STOP);
waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
- Block.waitForBlock();
-
- createNormalConnector();
- verifyNormalConnector();
- }
-
- @Test
- public void testBlockInSinkTaskStart() throws Exception {
- log.info("Starting test testBlockInSinkTaskStart");
- createConnectorWithBlock(BlockingSinkConnector.class, TASK_START);
- Block.waitForBlock();
-
- createNormalConnector();
- verifyNormalConnector();
- }
-
- @Test
- public void testBlockInSinkTaskStop() throws Exception {
- log.info("Starting test testBlockInSinkTaskStop");
- createConnectorWithBlock(BlockingSinkConnector.class, TASK_STOP);
- waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
- connect.deleteConnector(BLOCKING_CONNECTOR_NAME);
- Block.waitForBlock();
+ BlockingConnector.waitForBlock();
createNormalConnector();
verifyNormalConnector();
@@ -252,41 +167,50 @@ public class BlockingConnectorTest {
@Test
public void testWorkerRestartWithBlockInConnectorStart() throws Exception {
log.info("Starting test testWorkerRestartWithBlockInConnectorStart");
- createConnectorWithBlock(BlockingConnector.class, CONNECTOR_START);
+ createConnectorWithBlock(BlockingConnector.START);
// First instance of the connector should block on startup
- Block.waitForBlock();
+ BlockingConnector.waitForBlock();
createNormalConnector();
connect.removeWorker();
connect.addWorker();
// After stopping the only worker and restarting it, a new instance of
the blocking
// connector should be created and we can ensure that it blocks again
- Block.waitForBlock();
+ BlockingConnector.waitForBlock();
verifyNormalConnector();
}
@Test
public void testWorkerRestartWithBlockInConnectorStop() throws Exception {
log.info("Starting test testWorkerRestartWithBlockInConnectorStop");
- createConnectorWithBlock(BlockingConnector.class, CONNECTOR_STOP);
+ createConnectorWithBlock(BlockingConnector.STOP);
waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
createNormalConnector();
waitForConnectorStart(NORMAL_CONNECTOR_NAME);
connect.removeWorker();
- Block.waitForBlock();
+ BlockingConnector.waitForBlock();
connect.addWorker();
waitForConnectorStart(BLOCKING_CONNECTOR_NAME);
verifyNormalConnector();
}
- private void createConnectorWithBlock(Class<? extends Connector>
connectorClass, String block) {
- Map<String, String> props = new HashMap<>();
+ private void createConnectorWithBlock(String block) {
+ Map<String, String> props = baseBlockingConnectorProps();
+ props.put(BlockingConnector.BLOCK_CONFIG, block);
+ log.info("Creating connector with block during {}", block);
+ try {
+ connect.configureConnector(BLOCKING_CONNECTOR_NAME, props);
+ } catch (RuntimeException e) {
+ log.info("Failed to create connector", e);
+ throw e;
+ }
+ }
+
+ private void createConnectorWithBlock(Class<? extends BlockingConnector>
connectorClass) {
+ Map<String, String> props = baseBlockingConnectorProps();
props.put(CONNECTOR_CLASS_CONFIG, connectorClass.getName());
- props.put(TASKS_MAX_CONFIG, "1");
- props.put(TOPICS_CONFIG, "t1"); // Required for sink connectors
- props.put(Block.BLOCK_CONFIG, Objects.requireNonNull(block));
- log.info("Creating blocking connector of type {} with block in {}",
connectorClass.getSimpleName(), block);
+ log.info("Creating blocking connector of type {}",
connectorClass.getSimpleName());
try {
connect.configureConnector(BLOCKING_CONNECTOR_NAME, props);
} catch (RuntimeException e) {
@@ -295,6 +219,13 @@ public class BlockingConnectorTest {
}
}
+ private Map<String, String> baseBlockingConnectorProps() {
+ Map<String, String> result = new HashMap<>();
+ result.put(CONNECTOR_CLASS_CONFIG, BlockingConnector.class.getName());
+ result.put(TASKS_MAX_CONFIG, "1");
+ return result;
+ }
+
private void createNormalConnector() {
connect.kafka().createTopic(TEST_TOPIC, 3);
@@ -332,43 +263,63 @@ public class BlockingConnectorTest {
normalConnectorHandle.awaitCommits(RECORD_TRANSFER_DURATION_MS);
}
- private static class Block {
+ public static class BlockingConnector extends SourceConnector {
+
private static CountDownLatch blockLatch;
- private final String block;
+ private String block;
public static final String BLOCK_CONFIG = "block";
- private static ConfigDef config() {
- return new ConfigDef()
- .define(
- BLOCK_CONFIG,
- ConfigDef.Type.STRING,
- "",
- ConfigDef.Importance.MEDIUM,
- "Where to block indefinitely, e.g., 'Connector::start',
'Connector::initialize', "
- + "'Connector::taskConfigs', 'Task::version',
'SinkTask::put', 'SourceTask::poll'"
- );
+ public static final String INITIALIZE = "initialize";
+ public static final String INITIALIZE_WITH_TASK_CONFIGS =
"initializeWithTaskConfigs";
+ public static final String START = "start";
+ public static final String RECONFIGURE = "reconfigure";
+ public static final String TASK_CLASS = "taskClass";
+ public static final String TASK_CONFIGS = "taskConfigs";
+ public static final String STOP = "stop";
+ public static final String VALIDATE = "validate";
+ public static final String CONFIG = "config";
+ public static final String VERSION = "version";
+
+ private static final ConfigDef CONFIG_DEF = new ConfigDef()
+ .define(
+ BLOCK_CONFIG,
+ ConfigDef.Type.STRING,
+ "",
+ ConfigDef.Importance.MEDIUM,
+ "Where to block indefinitely, e.g., 'start', 'initialize',
'taskConfigs', 'version'"
+ );
+
+ // No-args constructor required by the framework
+ public BlockingConnector() {
+ this(null);
+ }
+
+ protected BlockingConnector(String block) {
+ this.block = block;
+ synchronized (BlockingConnector.class) {
+ if (blockLatch != null) {
+ blockLatch.countDown();
+ }
+ blockLatch = new CountDownLatch(1);
+ }
}
public static void waitForBlock() throws InterruptedException {
- synchronized (Block.class) {
+ synchronized (BlockingConnector.class) {
if (blockLatch == null) {
throw new IllegalArgumentException("No connector has been
created yet");
}
}
-
+
log.debug("Waiting for connector to block");
blockLatch.await();
log.debug("Connector should now be blocked");
}
- // Note that there is only ever at most one global block latch at a
time, which makes tests that
- // use blocks in multiple places impossible. If necessary, this can be
addressed in the future by
- // adding support for multiple block latches at a time, possibly
identifiable by a connector/task
- // ID, the location of the expected block, or both.
public static void resetBlockLatch() {
- synchronized (Block.class) {
+ synchronized (BlockingConnector.class) {
if (blockLatch != null) {
blockLatch.countDown();
blockLatch = null;
@@ -376,114 +327,81 @@ public class BlockingConnectorTest {
}
}
- public Block(Map<String, String> props) {
- this(new AbstractConfig(config(), props).getString(BLOCK_CONFIG));
- }
-
- public Block(String block) {
- this.block = block;
- synchronized (Block.class) {
- if (blockLatch != null) {
- blockLatch.countDown();
- }
- blockLatch = new CountDownLatch(1);
- }
- }
-
- public Map<String, String> taskConfig() {
- return Collections.singletonMap(BLOCK_CONFIG, block);
- }
-
- public void maybeBlockOn(String block) {
- if (block.equals(this.block)) {
- log.info("Will block on {}", block);
- blockLatch.countDown();
- while (true) {
- try {
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- // No-op. Just keep blocking.
- }
- }
- } else {
- log.debug("Will not block on {}", block);
- }
- }
- }
-
- // Used to test blocks in Connector (as opposed to Task) methods
- public static class BlockingConnector extends SourceConnector {
-
- private Block block;
-
- // No-args constructor required by the framework
- public BlockingConnector() {
- this(null);
- }
-
- protected BlockingConnector(String block) {
- this.block = new Block(block);
- }
-
@Override
public void initialize(ConnectorContext ctx) {
- block.maybeBlockOn(CONNECTOR_INITIALIZE);
+ maybeBlockOn(INITIALIZE);
super.initialize(ctx);
}
@Override
public void initialize(ConnectorContext ctx, List<Map<String, String>>
taskConfigs) {
- block.maybeBlockOn(CONNECTOR_INITIALIZE_WITH_TASK_CONFIGS);
+ maybeBlockOn(INITIALIZE_WITH_TASK_CONFIGS);
super.initialize(ctx, taskConfigs);
}
@Override
public void start(Map<String, String> props) {
- this.block = new Block(props);
- block.maybeBlockOn(CONNECTOR_START);
+ this.block = new AbstractConfig(CONFIG_DEF,
props).getString(BLOCK_CONFIG);
+ maybeBlockOn(START);
}
@Override
public void reconfigure(Map<String, String> props) {
- block.maybeBlockOn(CONNECTOR_RECONFIGURE);
super.reconfigure(props);
+ maybeBlockOn(RECONFIGURE);
}
@Override
public Class<? extends Task> taskClass() {
- block.maybeBlockOn(CONNECTOR_TASK_CLASS);
+ maybeBlockOn(TASK_CLASS);
return BlockingTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
- block.maybeBlockOn(CONNECTOR_TASK_CONFIGS);
+ maybeBlockOn(TASK_CONFIGS);
return Collections.singletonList(Collections.emptyMap());
}
@Override
public void stop() {
- block.maybeBlockOn(CONNECTOR_STOP);
+ maybeBlockOn(STOP);
}
@Override
public Config validate(Map<String, String> connectorConfigs) {
- block.maybeBlockOn(CONNECTOR_VALIDATE);
+ maybeBlockOn(VALIDATE);
return super.validate(connectorConfigs);
}
@Override
public ConfigDef config() {
- block.maybeBlockOn(CONNECTOR_CONFIG);
- return Block.config();
+ maybeBlockOn(CONFIG);
+ return CONFIG_DEF;
}
@Override
public String version() {
- block.maybeBlockOn(CONNECTOR_VERSION);
+ maybeBlockOn(VERSION);
return "0.0.0";
}
+ protected void maybeBlockOn(String block) {
+ if (block.equals(this.block)) {
+ log.info("Will block on {}", block);
+ blockLatch.countDown();
+ while (true) {
+ try {
+ Thread.sleep(Long.MAX_VALUE);
+ } catch (InterruptedException e) {
+ // No-op. Just keep blocking.
+ }
+ }
+ } else {
+ log.debug("Will not block on {}", block);
+ }
+ }
+
public static class BlockingTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
@@ -508,287 +426,19 @@ public class BlockingConnectorTest {
// Some methods are called before Connector::start, so we use this as a
workaround
public static class InitializeBlockingConnector extends BlockingConnector {
public InitializeBlockingConnector() {
- super(CONNECTOR_INITIALIZE);
+ super(INITIALIZE);
}
}
public static class ConfigBlockingConnector extends BlockingConnector {
public ConfigBlockingConnector() {
- super(CONNECTOR_CONFIG);
+ super(CONFIG);
}
}
public static class ValidateBlockingConnector extends BlockingConnector {
public ValidateBlockingConnector() {
- super(CONNECTOR_VALIDATE);
- }
- }
-
- // Used to test blocks in SourceTask methods
- public static class BlockingSourceConnector extends SourceConnector {
-
- private Map<String, String> props;
- private final Class<? extends BlockingSourceTask> taskClass;
-
- // No-args constructor required by the framework
- public BlockingSourceConnector() {
- this(BlockingSourceTask.class);
- }
-
- protected BlockingSourceConnector(Class<? extends BlockingSourceTask>
taskClass) {
- this.taskClass = taskClass;
- }
-
- @Override
- public void start(Map<String, String> props) {
- this.props = props;
- }
-
- @Override
- public Class<? extends Task> taskClass() {
- return taskClass;
- }
-
- @Override
- public List<Map<String, String>> taskConfigs(int maxTasks) {
- return IntStream.range(0, maxTasks)
- .mapToObj(i -> new HashMap<>(props))
- .collect(Collectors.toList());
- }
-
- @Override
- public void stop() {
- }
-
- @Override
- public Config validate(Map<String, String> connectorConfigs) {
- return super.validate(connectorConfigs);
- }
-
- @Override
- public ConfigDef config() {
- return Block.config();
- }
-
- @Override
- public String version() {
- return "0.0.0";
- }
-
- public static class BlockingSourceTask extends SourceTask {
- private Block block;
-
- // No-args constructor required by the framework
- public BlockingSourceTask() {
- this(null);
- }
-
- protected BlockingSourceTask(String block) {
- this.block = new Block(block);
- }
-
- @Override
- public void start(Map<String, String> props) {
- this.block = new Block(props);
- block.maybeBlockOn(TASK_START);
- }
-
- @Override
- public List<SourceRecord> poll() {
- block.maybeBlockOn(SOURCE_TASK_POLL);
- return null;
- }
-
- @Override
- public void stop() {
- block.maybeBlockOn(TASK_STOP);
- }
-
- @Override
- public String version() {
- block.maybeBlockOn(TASK_VERSION);
- return "0.0.0";
- }
-
- @Override
- public void initialize(SourceTaskContext context) {
- block.maybeBlockOn(SOURCE_TASK_INITIALIZE);
- super.initialize(context);
- }
-
- @Override
- public void commit() throws InterruptedException {
- block.maybeBlockOn(SOURCE_TASK_COMMIT);
- super.commit();
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public void commitRecord(SourceRecord record) throws
InterruptedException {
- block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD);
- super.commitRecord(record);
- }
-
- @Override
- public void commitRecord(SourceRecord record, RecordMetadata
metadata) throws InterruptedException {
- block.maybeBlockOn(SOURCE_TASK_COMMIT_RECORD_WITH_METADATA);
- super.commitRecord(record, metadata);
- }
- }
- }
-
- public static class TaskInitializeBlockingSourceConnector extends
BlockingSourceConnector {
- public TaskInitializeBlockingSourceConnector() {
- super(InitializeBlockingSourceTask.class);
- }
-
- public static class InitializeBlockingSourceTask extends
BlockingSourceTask {
- public InitializeBlockingSourceTask() {
- super(SOURCE_TASK_INITIALIZE);
- }
- }
- }
-
- // Used to test blocks in SinkTask methods
- public static class BlockingSinkConnector extends SinkConnector {
-
- private Map<String, String> props;
- private final Class<? extends BlockingSinkTask> taskClass;
-
- // No-args constructor required by the framework
- public BlockingSinkConnector() {
- this(BlockingSinkTask.class);
- }
-
- protected BlockingSinkConnector(Class<? extends BlockingSinkTask>
taskClass) {
- this.taskClass = taskClass;
- }
-
- @Override
- public void start(Map<String, String> props) {
- this.props = props;
- }
-
- @Override
- public Class<? extends Task> taskClass() {
- return taskClass;
- }
-
- @Override
- public List<Map<String, String>> taskConfigs(int maxTasks) {
- return IntStream.rangeClosed(0, maxTasks)
- .mapToObj(i -> new HashMap<>(props))
- .collect(Collectors.toList());
- }
-
- @Override
- public void stop() {
- }
-
- @Override
- public Config validate(Map<String, String> connectorConfigs) {
- return super.validate(connectorConfigs);
- }
-
- @Override
- public ConfigDef config() {
- return Block.config();
- }
-
- @Override
- public String version() {
- return "0.0.0";
- }
-
- public static class BlockingSinkTask extends SinkTask {
- private Block block;
-
- // No-args constructor required by the framework
- public BlockingSinkTask() {
- this(null);
- }
-
- protected BlockingSinkTask(String block) {
- this.block = new Block(block);
- }
-
- @Override
- public void start(Map<String, String> props) {
- this.block = new Block(props);
- block.maybeBlockOn(TASK_START);
- }
-
- @Override
- public void put(Collection<SinkRecord> records) {
- block.maybeBlockOn(SINK_TASK_PUT);
- }
-
- @Override
- public void stop() {
- block.maybeBlockOn(TASK_STOP);
- }
-
- @Override
- public String version() {
- block.maybeBlockOn(TASK_VERSION);
- return "0.0.0";
- }
-
- @Override
- public void initialize(SinkTaskContext context) {
- block.maybeBlockOn(SINK_TASK_INITIALIZE);
- super.initialize(context);
- }
-
- @Override
- public void flush(Map<TopicPartition, OffsetAndMetadata>
currentOffsets) {
- block.maybeBlockOn(SINK_TASK_FLUSH);
- super.flush(currentOffsets);
- }
-
- @Override
- public Map<TopicPartition, OffsetAndMetadata>
preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
- block.maybeBlockOn(SINK_TASK_PRE_COMMIT);
- return super.preCommit(currentOffsets);
- }
-
- @Override
- public void open(Collection<TopicPartition> partitions) {
- block.maybeBlockOn(SINK_TASK_OPEN);
- super.open(partitions);
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
- block.maybeBlockOn(SINK_TASK_ON_PARTITIONS_ASSIGNED);
- super.onPartitionsAssigned(partitions);
- }
-
- @Override
- public void close(Collection<TopicPartition> partitions) {
- block.maybeBlockOn(SINK_TASK_CLOSE);
- super.close(partitions);
- }
-
- @Override
- @SuppressWarnings("deprecation")
- public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
- block.maybeBlockOn(SINK_TASK_ON_PARTITIONS_REVOKED);
- super.onPartitionsRevoked(partitions);
- }
- }
- }
-
- public static class TaskInitializeBlockingSinkConnector extends
BlockingSinkConnector {
- public TaskInitializeBlockingSinkConnector() {
- super(InitializeBlockingSinkTask.class);
- }
-
- public static class InitializeBlockingSinkTask extends
BlockingSinkTask {
- public InitializeBlockingSinkTask() {
- super(SINK_TASK_INITIALIZE);
- }
+ super(VALIDATE);
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 70bbfc6..c471b03 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -236,6 +236,9 @@ public class ErrorHandlingTaskTest {
createSourceTask(initialState, retryWithToleranceOperator);
+ sourceTask.stop();
+ PowerMock.expectLastCall();
+
expectClose();
reporter.close();
@@ -260,6 +263,9 @@ public class ErrorHandlingTaskTest {
createSourceTask(initialState, retryWithToleranceOperator);
+ sourceTask.stop();
+ PowerMock.expectLastCall();
+
expectClose();
// Even though the reporters throw exceptions, they should both still
be closed.
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
index 9c115ac..aba6445 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
@@ -250,6 +250,9 @@ public class ErrorHandlingTaskWithTopicCreationTest {
createSourceTask(initialState, retryWithToleranceOperator);
+ sourceTask.stop();
+ PowerMock.expectLastCall();
+
expectClose();
reporter.close();
@@ -274,6 +277,9 @@ public class ErrorHandlingTaskWithTopicCreationTest {
createSourceTask(initialState, retryWithToleranceOperator);
+ sourceTask.stop();
+ PowerMock.expectLastCall();
+
expectClose();
// Even though the reporters throw exceptions, they should both still
be closed.