[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399573#comment-16399573 ]
Randall Hauch commented on KAFKA-6661: -------------------------------------- The {{WorkerSinkTask}} and {{WorkerSinkTaskContext}} both deal with pausing and resuming the consumer. Most of this logic is correct: * {{WorkerSinkTaskContext}} is already tracking the topic partitions that have been explicitly paused or resumed by the connector * {{WorkerSinkTaskContext.pause(TopicPartition...)}} adds the topic partitions to its paused set and always pauses those topic partitions in the consumer * {{WorkerSinkTaskContext.resume(TopicPartition...)}} removes the topic partitions from its paused set and *_always_* resumes those topic partitions in the consumer. *The _always_ part is what is incorrect; it should still remove the topic partitions from its set but should only tell the consumer to resume the topic partitions _when the consumer is not paused_.* * {{WorkerSinkTask.pauseAll()}} currently pauses all of the partitions, but does not use or change the context's set of paused partitions * {{WorkerSinkTask.resumeAll()}} currently resumes all topic partitions *_except_* those that are still explicitly paused in the context So, I think the only change that needs to be made is that {{WorkerSinkTaskContext.resume(TopicPartition...)}} should still remove the topic partitions from its set but should only tell the consumer to resume the topic partitions _when the consumer is not paused_. > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > ---------------------------------------------------------------------------------- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 > Reporter: Randall Hauch > Assignee: Randall Hauch > Priority: Critical > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)