[ 
https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399573#comment-16399573
 ] 

Randall Hauch commented on KAFKA-6661:
--------------------------------------

The {{WorkerSinkTask}} and {{WorkerSinkTaskContext}} both deal with pausing and 
resuming the consumer. Most of this logic is correct:

* {{WorkerSinkTaskContext}} is already tracking the topic partitions that have 
been explicitly paused or resumed by the connector
* {{WorkerSinkTaskContext.pause(TopicPartition...)}} adds the topic partitions 
to its paused set and always pauses those topic partitions in the consumer
* {{WorkerSinkTaskContext.resume(TopicPartition...)}} removes the topic 
partitions from its paused set and *_always_* resumes those topic partitions in 
the consumer. *The _always_ part is what is incorrect; it should still remove 
the topic partitions from its set but should only tell the consumer to resume 
the topic partitions _when the consumer is not paused_.*
* {{WorkerSinkTask.pauseAll()}} currently pauses all of the partitions, but 
does not use or change the context's set of paused partitions
* {{WorkerSinkTask.resumeAll()}} currently resumes all topic partitions 
*_except_* those that are still explicitly paused in the context

So, I think the only change that needs to be made is that 
{{WorkerSinkTaskContext.resume(TopicPartition...)}} should still remove the 
topic partitions from its set but should only tell the consumer to resume the 
topic partitions _when the consumer is not paused_.

> Sink connectors that explicitly 'resume' topic partitions can resume a paused 
> task
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-6661
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6661
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0
>            Reporter: Randall Hauch
>            Assignee: Randall Hauch
>            Priority: Critical
>
> Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to 
> explicitly pause and resume topic partitions. This is useful when connectors 
> need additional time processing the records for specific topic partitions 
> (e.g., the external system has an outage).
> However, when the sink connector has been paused via the REST API, the worker 
> for the sink tasks pause the consumer. When the connector is polled, the poll 
> request might timeout and return no records. Connect then calls the task's 
> {{put(...)}} method (with no records), and this allows the task to optionally 
> call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls 
> resume, this will unexpectedly resume the paused consumer, causing the 
> consumer to return messages and the connector to process those messages --  
> despite the connector still being paused.
> This is reported against 1.0, but the affected code has not been changed 
> since at least 0.9.0.0.
> A workaround is to remove rather than pause a connector. It's inconvenient, 
> but it works.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to