Gunnar Morling created KAFKA-6566:
-------------------------------------
Summary: SourceTask#stop() not called after exception raised in
poll()
Key: KAFKA-6566
URL: https://issues.apache.org/jira/browse/KAFKA-6566
Project: Kafka
Issue Type: Bug
Reporter: Gunnar Morling
Having discussed this with [~rhauch], it has been my assumption that
{{SourceTask#stop()}} will be called by the Kafka Connect framework in case an
exception has been raised in {{poll()}}. That's not the case, though. As an
example see the connector and task below.
Calling {{stop()}} after an exception in {{poll()}} seems like a very useful
action to take, as it'll allow the task to clean up any resources such as
releasing any database connections, right after that failure and not only once
the connector is stopped.
{code}
package com.example;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
public class TestConnector extends SourceConnector {
@Override
public String version() {
return null;
}
@Override
public void start(Map<String, String> props) {
}
@Override
public Class<? extends Task> taskClass() {
return TestTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
return Collections.singletonList(Collections.singletonMap("foo",
"bar"));
}
@Override
public void stop() {
}
@Override
public ConfigDef config() {
return new ConfigDef();
}
public static class TestTask extends SourceTask {
@Override
public String version() {
return null;
}
@Override
public void start(Map<String, String> props) {
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
throw new RuntimeException();
}
@Override
public void stop() {
System.out.println("stop() called");
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)