[
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16472374#comment-16472374
]
Robert Yokota commented on KAFKA-6566:
--------------------------------------
I've just started looking at this, but it looks like the correct place to put
the the `task.stop()` is in `WorkerSourceTask.close()`. This would mirror the
call to `task.stop()` in `WorkerSinkTask.close()`. `close()` is called in a
finally block in `WorkerTask.doRun()` here:
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L176
There is another possible change I am looking at and that is to put a call to
`task.stop()` in `WorkerSinkTask.stop()`. This would mirror the call to
`task.stop()` in `WorkerSourceTask.stop()`.
Ideally the source and sink would be symmetrical in order to make it easier to
reason about esp. for Connect developers. The above changes assume that
`task.stop()` is idempotent for both the source and sink.
> SourceTask#stop() not called after exception raised in poll()
> -------------------------------------------------------------
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 1.0.0
> Reporter: Gunnar Morling
> Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case
> an exception has been raised in {{poll()}}. That's not the case, though. As
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful
> action to take, as it'll allow the task to clean up any resources such as
> releasing any database connections, right after that failure and not only
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map<String, String> props) {
> }
> @Override
> public Class<? extends Task> taskClass() {
> return TestTask.class;
> }
> @Override
> public List<Map<String, String>> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo",
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map<String, String> props) {
> }
> @Override
> public List<SourceRecord> poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)