[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480970#comment-16480970
 ] 

ASF GitHub Bot commented on KAFKA-6566:
---------------------------------------

ewencp closed pull request #5020: KAFKA-6566: Improve Connect Resource Cleanup
URL: https://github.com/apache/kafka/pull/5020
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 2ba785c4668..6edcfd41886 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -148,10 +148,23 @@ public void stop() {
     protected void close() {
         // FIXME Kafka needs to add a timeout parameter here for us to 
properly obey the timeout
         // passed in
-        task.stop();
-        if (consumer != null)
-            consumer.close();
-        transformationChain.close();
+        try {
+            task.stop();
+        } catch (Throwable t) {
+            log.warn("Could not stop task", t);
+        }
+        if (consumer != null) {
+            try {
+                consumer.close();
+            } catch (Throwable t) {
+                log.warn("Could not close consumer", t);
+            }
+        }
+        try {
+            transformationChain.close();
+        } catch (Throwable t) {
+            log.warn("Could not close transformation chain", t);
+        }
     }
 
     @Override
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index f2cef5a63f7..f17475dacfc 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -87,6 +87,7 @@
     private Map<String, String> taskConfig;
     private boolean finishedStart = false;
     private boolean startedShutdownBeforeStartCompleted = false;
+    private boolean stopped = false;
 
     public WorkerSourceTask(ConnectorTaskId id,
                             SourceTask task,
@@ -137,8 +138,21 @@ public void initialize(TaskConfig taskConfig) {
 
     @Override
     protected void close() {
-        producer.close(30, TimeUnit.SECONDS);
-        transformationChain.close();
+        if (!shouldPause()) {
+            tryStop();
+        }
+        if (producer != null) {
+            try {
+                producer.close(30, TimeUnit.SECONDS);
+            } catch (Throwable t) {
+                log.warn("Could not close producer", t);
+            }
+        }
+        try {
+            transformationChain.close();
+        } catch (Throwable t) {
+            log.warn("Could not close transformation chain", t);
+        }
     }
 
     @Override
@@ -152,12 +166,23 @@ public void stop() {
         stopRequestedLatch.countDown();
         synchronized (this) {
             if (finishedStart)
-                task.stop();
+                tryStop();
             else
                 startedShutdownBeforeStartCompleted = true;
         }
     }
 
+    private synchronized void tryStop() {
+        if (!stopped) {
+            try {
+                task.stop();
+                stopped = true;
+            } catch (Throwable t) {
+                log.warn("Could not stop task", t);
+            }
+        }
+    }
+
     @Override
     public void execute() {
         try {
@@ -166,7 +191,7 @@ public void execute() {
             log.info("{} Source task finished initialization and start", this);
             synchronized (this) {
                 if (startedShutdownBeforeStartCompleted) {
-                    task.stop();
+                    tryStop();
                     return;
                 }
                 finishedStart = true;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SourceTask#stop() not called after exception raised in poll()
> -------------------------------------------------------------
>
>                 Key: KAFKA-6566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6566
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>            Reporter: Gunnar Morling
>            Assignee: Robert Yokota
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
>     @Override
>     public String version() {
>         return null;
>     }
>     @Override
>     public void start(Map<String, String> props) {
>     }
>     @Override
>     public Class<? extends Task> taskClass() {
>         return TestTask.class;
>     }
>     @Override
>     public List<Map<String, String>> taskConfigs(int maxTasks) {
>         return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
>     }
>     @Override
>     public void stop() {
>     }
>     @Override
>     public ConfigDef config() {
>         return new ConfigDef();
>     }
>     public static class TestTask extends SourceTask {
>         @Override
>         public String version() {
>             return null;
>         }
>         @Override
>         public void start(Map<String, String> props) {
>         }
>         @Override
>         public List<SourceRecord> poll() throws InterruptedException {
>             throw new RuntimeException();
>         }
>         @Override
>         public void stop() {
>             System.out.println("stop() called");
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to