[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16399567#comment-16399567 ]
Randall Hauch commented on KAFKA-6661: -------------------------------------- After a connector is paused, the sink task worker will call {{consumer.poll(long)}}, which blocks and then times out after the configurable timeout period, returning 0 records: {noformat} [2018-03-13 18:21:33,756] TRACE WorkerSinkTask{id=s3-sink-0} Polling consumer with timeout 76631 ms (org.apache.kafka.connect.runtime.WorkerSinkTask:282) ... [2018-03-13 18:21:33,758] DEBUG WorkerSinkTask{id=s3-sink-0} Finished offset commit successfully in 6 ms for sequence number 1: {s3_topic-0=OffsetAndMetadata{offset=27300, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:238) [2018-03-13 18:21:33,758] DEBUG WorkerSinkTask{id=s3-sink-0} Setting last committed offsets to {s3_topic-0=OffsetAndMetadata{offset=27300, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:241) ... [2018-03-13 18:22:50,391] TRACE WorkerSinkTask{id=s3-sink-0} Polling returned 0 messages (org.apache.kafka.connect.runtime.WorkerSinkTask:285) {noformat} The worker then processes and delivers the 0 messages to the connector, at which point the connector might ultimately call a method that calls {{WorkerSinkTaskContext.resume(...)}}, which currently *_resumes the consumer regardless whether the connector is paused_*. Here's a stack trace showing this: {noformat} "pool-1-thread-1@4159" prio=5 tid=0x1b nid=NA runnable java.lang.Thread.State: RUNNABLE at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1542) at org.apache.kafka.connect.runtime.WorkerSinkTaskContext.resume(WorkerSinkTaskContext.java:109) at io.confluent.connect.s3.TopicPartitionWriter.resume(TopicPartitionWriter.java:405) at io.confluent.connect.s3.TopicPartitionWriter.commitOnTimeIfNoData(TopicPartitionWriter.java:295) at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:177) at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) {noformat} > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > ---------------------------------------------------------------------------------- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 > Reporter: Randall Hauch > Assignee: Randall Hauch > Priority: Critical > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)