[ 
https://issues.apache.org/jira/browse/KAFKA-6566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16476251#comment-16476251
 ] 

ASF GitHub Bot commented on KAFKA-6566:
---------------------------------------

rayokota opened a new pull request #5020: KAFKA-6566: Improve ConnectRresource 
Cleanup
URL: https://github.com/apache/kafka/pull/5020
 
 
   This is a change to improve resource cleanup for sink tasks and source 
tasks.  Now `Task.stop()` may be called from both 
`WorkerSinkTask`/`WorkerSourceTask` `stop()` as well as `close()`.  However, if 
it has already been called in `close()` it is not called in `stop()`, and vice 
versa.
   
   It is called from `WorkerXXXTask.close()` since this method is called in the 
`finally` block of `WorkerTask.run()`, and Connect developers use `stop()` to 
clean up resources.
   
   It is called from `WorkerXXXTask.stop()` since it makes sense to attempt a 
stop on the Task in this call.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> SourceTask#stop() not called after exception raised in poll()
> -------------------------------------------------------------
>
>                 Key: KAFKA-6566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6566
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>            Reporter: Gunnar Morling
>            Assignee: Robert Yokota
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> Having discussed this with [~rhauch], it has been my assumption that 
> {{SourceTask#stop()}} will be called by the Kafka Connect framework in case 
> an exception has been raised in {{poll()}}. That's not the case, though. As 
> an example see the connector and task below.
> Calling {{stop()}} after an exception in {{poll()}} seems like a very useful 
> action to take, as it'll allow the task to clean up any resources such as 
> releasing any database connections, right after that failure and not only 
> once the connector is stopped.
> {code}
> package com.example;
> import java.util.Collections;
> import java.util.List;
> import java.util.Map;
> import org.apache.kafka.common.config.ConfigDef;
> import org.apache.kafka.connect.connector.Task;
> import org.apache.kafka.connect.source.SourceConnector;
> import org.apache.kafka.connect.source.SourceRecord;
> import org.apache.kafka.connect.source.SourceTask;
> public class TestConnector extends SourceConnector {
>     @Override
>     public String version() {
>         return null;
>     }
>     @Override
>     public void start(Map<String, String> props) {
>     }
>     @Override
>     public Class<? extends Task> taskClass() {
>         return TestTask.class;
>     }
>     @Override
>     public List<Map<String, String>> taskConfigs(int maxTasks) {
>         return Collections.singletonList(Collections.singletonMap("foo", 
> "bar"));
>     }
>     @Override
>     public void stop() {
>     }
>     @Override
>     public ConfigDef config() {
>         return new ConfigDef();
>     }
>     public static class TestTask extends SourceTask {
>         @Override
>         public String version() {
>             return null;
>         }
>         @Override
>         public void start(Map<String, String> props) {
>         }
>         @Override
>         public List<SourceRecord> poll() throws InterruptedException {
>             throw new RuntimeException();
>         }
>         @Override
>         public void stop() {
>             System.out.println("stop() called");
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to