[ 
https://issues.apache.org/jira/browse/KAFKA-15090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751214#comment-17751214
 ] 

Chris Egerton commented on KAFKA-15090:
---------------------------------------

This is a tricky one. First off, although we're almost certainly not going to 
move calls to {{SourceTask::stop}} back onto the herder tick thread, it's not 
unreasonable to try to restore some of the original behavior by spinning up a 
separate thread that can be used to call that method (so that we don't block on 
tasks returning from {{SourceTask::poll}} before being able to stop them).

However, this complicates the logic for task cleanup. Currently, tasks can 
deallocate all of their resources in {{{}SourceTask::stop{}}}, secure in the 
knowledge that the runtime will never poll them again or notify them of 
committed offsets. If we go back to potentially invoking {{SourceTask::stop}} 
and {{SourceTask::poll}} concurrently, then it becomes difficult for tasks to 
know when exactly they can deallocate resources. This is the motivation behind 
the now-abandoned 
[KIP-419|https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped].

 

One alternative is to keep most of the current behavior, but with stronger 
semantics for cancelling source tasks. This is described in KAFKA-14725; to 
summarize: we would continue invoking {{SourceTask::stop}} and 
{{SourceTask::poll}} on the same thread like we do today, but would start 
interrupting the poll-convert-produce thread when tasks exceed the graceful 
shutdown timeout.

Advantages of this alternative are that it preserves behavior that has been 
around in the Connect runtime for the last five releases, and makes it easier 
for developers to correctly implement resource deallocation logic in their 
source connectors. Disadvantages are that it does not make graceful task 
shutdown easier (increasing the likelihood of [ERROR-level log 
messages|https://github.com/apache/kafka/blob/b3db905b27ff4133f4018ac922c9ce2beb2d6087/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1035]),
 and may not work for all connectors (interrupting a thread in Java is not 
guaranteed to actually interrupt some operations, even if they are blocking).

 

Finally, we could pursue something identical or similar to 
[KIP-419|https://cwiki.apache.org/confluence/display/KAFKA/KIP-419%3A+Safely+notify+Kafka+Connect+SourceTask+is+stopped],
 by modifying the source task API to have separate methods for "triggering" a 
stop (which signals that the task should immediately return from any future or 
in-progress calls to {{{}SourceTask::poll{}}}) and "performing" a stop (wherein 
the task deallocates resources).

> Source tasks are no longer stopped on a separate thread
> -------------------------------------------------------
>
>                 Key: KAFKA-15090
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15090
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1, 3.4.0, 3.2.2, 3.2.3, 3.3.1, 3.3.2, 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 
> 3.3.3, 3.6.0, 3.5.1
>            Reporter: Chris Egerton
>            Assignee: Chris Egerton
>            Priority: Major
>
> Before [https://github.com/apache/kafka/pull/9669], in distributed mode, the 
> {{SourceTask::stop}} method would be invoked on the herder tick thread, which 
> is a separate thread from the dedicated thread which was responsible for 
> polling data from the task and producing it to Kafka.
> This aligned with the Javadocs for {{{}SourceTask:poll{}}}, which state:
> {quote}The task will be stopped on a separate thread, and when that happens 
> this method is expected to unblock, quickly finish up any remaining 
> processing, and return.
> {quote}
> However, it came with the downside that the herder's tick thread would be 
> blocked until the invocation of {{SourceTask::stop}} completed, which could 
> result in major parts of the worker's REST API becoming unavailable and even 
> the worker falling out of the cluster.
> As a result, in [https://github.com/apache/kafka/pull/9669,] we changed the 
> logic for task shutdown to cause {{SourceTask::stop}} to be invoked on the 
> dedicated thread for the task (i.e., the one responsible for polling data 
> from it and producing that data to Kafka).
> This altered the semantics for {{SourceTask:poll}} and {{SourceTask::stop}} 
> and may have broken connectors that block during {{poll}} with the 
> expectation that {{stop}} can and will be invoked concurrently as a signal 
> that any ongoing polls should be interrupted immediately.
> Although reverting the fix is likely not a viable option (blocking the herder 
> thread on interactions with user-written plugins is high-risk and we have 
> tried to eliminate all instances of this where feasible), we may try to 
> restore the expected contract by spinning up a separate thread exclusively 
> for invoking {{SourceTask::stop}} separately from the dedicated thread for 
> the task and the herder's thread.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to