[
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16366341#comment-16366341
]
Ted Yu commented on KAFKA-6566:
-------------------------------
How about the following change ?
{code}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6ef5ae3..f90c0ac 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -196,6 +196,9 @@ class WorkerSourceTask extends WorkerTask {
}
} catch (InterruptedException e) {
// Ignore and allow to exit.
+ } catch (RuntimeException re) {
+ task.stop();
+ throw re;
} finally {
// It should still be safe to commit offsets since any exception
would have
// simply resulted in not getting more records but all the
existing records should be ok to flush
{code}
> SourceTask#stop() not called after exception raised in poll()
> -------------------------------------------------------------
>
> Key: KAFKA-6566
> URL: https://issues.apache.org/jira/browse/KAFKA-6566
> Project: Kafka
> Issue Type: Bug
> Reporter: Gunnar Morling
> Priority: Major
>
> Having discussed this with [~rhauch], it has been my assumption that
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case
> an exception has been raised in {{poll()}}. That's not the case, though. As
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful
> action to take, as it'll allow the task to clean up any resources such as
> releasing any database connections, right after that failure and not only
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map<String, String> props) {
> }
> @Override
> public Class<? extends Task> taskClass() {
> return TestTask.class;
> }
> @Override
> public List<Map<String, String>> taskConfigs(int maxTasks) {
> return Collections.singletonList(Collections.singletonMap("foo",
> "bar"));
> }
> @Override
> public void stop() {
> }
> @Override
> public ConfigDef config() {
> return new ConfigDef();
> }
> public static class TestTask extends SourceTask {
> @Override
> public String version() {
> return null;
> }
> @Override
> public void start(Map<String, String> props) {
> }
> @Override
> public List<SourceRecord> poll() throws InterruptedException {
> throw new RuntimeException();
> }
> @Override
> public void stop() {
> System.out.println("stop() called");
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)