[
https://issues.apache.org/jira/browse/KAFKA-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Randall Hauch resolved KAFKA-4632.
----------------------------------
Resolution: Fixed
Fix Version/s: 0.10.0.1
0.10.1.0
I'm going to close this as fixed in 0.10.0.1. [~ScottReynolds], if you
disagree, please feel free to reopen with more detail.
> Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException
> ---------------------------------------------------------------------------
>
> Key: KAFKA-4632
> URL: https://issues.apache.org/jira/browse/KAFKA-4632
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0
> Reporter: Scott Reynolds
> Priority: Major
> Fix For: 0.10.1.0, 0.10.0.1
>
>
> WorkerSinkTask's closePartitions method isn't handling WakeupException that
> can be thrown from commitSync.
> {code}
> org.apache.kafka.common.errors.WakeupException
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup
> (ConsumerNetworkClient.java:404)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll
> (ConsumerNetworkClient.java:245)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll
> (ConsumerNetworkClient.java:180)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
> (ConsumerCoordinator.java:499)
> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync
> (KafkaConsumer.java:1104)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync
> (WorkerSinkTask.java:245)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit
> (WorkerSinkTask.java:264)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets
> (WorkerSinkTask.java:305)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions
> (WorkerSinkTask.java:435)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.execute
> (WorkerSinkTask.java:147)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
> at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
> at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
> at java.util.concurrent.FutureTask.run (FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker
> (ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run
> (ThreadPoolExecutor.java:617)
> at java.lang.Thread.run (Thread.java:745)
> {code}
> I believe it should catch it and ignore it as that is what the poll method
> does when isStopping is true
> {code:java}
> } catch (WakeupException we) {
> log.trace("{} consumer woken up", id);
> if (isStopping())
> return;
> if (shouldPause()) {
> pauseAll();
> } else if (!pausedForRedelivery) {
> resumeAll();
> }
> }
> {code}
> But unsure, love some insight into this.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)