Scott Reynolds created KAFKA-4632:
-------------------------------------
Summary: Kafka Connect WorkerSinkTask.closePartitions doesn't
handle WakeupException
Key: KAFKA-4632
URL: https://issues.apache.org/jira/browse/KAFKA-4632
Project: Kafka
Issue Type: Bug
Affects Versions: 0.10.0.1, 0.10.0.0, 0.10.1.0
Reporter: Scott Reynolds
WorkerSinkTask's closePartitions method isn't handling WakeupException that can
be thrown from commitSync.
{code}
org.apache.kafka.common.errors.WakeupException
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup
(ConsumerNetworkClient.java:404)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll
(ConsumerNetworkClient.java:245)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll
(ConsumerNetworkClient.java:180)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
(ConsumerCoordinator.java:499)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync
(KafkaConsumer.java:1104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync
(WorkerSinkTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit
(WorkerSinkTask.java:264)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets
(WorkerSinkTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions
(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute
(WorkerSinkTask.java:147)
at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker
(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run
(ThreadPoolExecutor.java:617)
at java.lang.Thread.run (Thread.java:745)
{code}
I believe it should catch it and ignore it as that is what the poll method does
when isStopping is true
{code:java}
} catch (WakeupException we) {
log.trace("{} consumer woken up", id);
if (isStopping())
return;
if (shouldPause()) {
pauseAll();
} else if (!pausedForRedelivery) {
resumeAll();
}
}
{code}
But unsure, love some insight into this.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)