[ 
https://issues.apache.org/jira/browse/KAFKA-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380686#comment-16380686
 ] 

Andrey Bratus commented on KAFKA-4632:
--------------------------------------

I am experiencing a similar issue, when pausing a connector (version 1.0.0)

 
WorkerSinkTask\{id=cerved-balance-sheets-collaudo-1} Task threw an uncaught and 
unrecoverable exception
 

The stack trace is:
org.apache.kafka.common.errors.WakeupException: null
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:441)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
    at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
    at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1242)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:299)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:327)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:398)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:547)
    at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
 

Looks like it happens in the finally clause:
{code:java}
@Override
public void execute() {
    initializeAndStart();
    try {
        while (!isStopping())
            iteration();
    } finally {
        // Make sure any uncommitted data has been committed and the task has
        // a chance to clean up its state
        closePartitions();
    }
}
{code}
 

Should we reopen this?

> Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-4632
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4632
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0
>            Reporter: Scott Reynolds
>            Priority: Major
>             Fix For: 0.10.0.1, 0.10.1.0
>
>
> WorkerSinkTask's closePartitions method isn't handling WakeupException that 
> can be thrown from commitSync.
> {code}
> org.apache.kafka.common.errors.WakeupException
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup
>  (ConsumerNetworkClient.java:404)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll 
> (ConsumerNetworkClient.java:245)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll 
> (ConsumerNetworkClient.java:180)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
>  (ConsumerCoordinator.java:499)
> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync 
> (KafkaConsumer.java:1104)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync 
> (WorkerSinkTask.java:245)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit 
> (WorkerSinkTask.java:264)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets 
> (WorkerSinkTask.java:305)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions 
> (WorkerSinkTask.java:435)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.execute 
> (WorkerSinkTask.java:147)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
> at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
> at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
> at java.util.concurrent.FutureTask.run (FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker 
> (ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run 
> (ThreadPoolExecutor.java:617)
> at java.lang.Thread.run (Thread.java:745)
> {code}
> I believe it should catch it and ignore it as that is what the poll method 
> does when isStopping is true
> {code:java}
>         } catch (WakeupException we) {
>             log.trace("{} consumer woken up", id);
>             if (isStopping())
>                 return;
>             if (shouldPause()) {
>                 pauseAll();
>             } else if (!pausedForRedelivery) {
>                 resumeAll();
>             }
>         }
> {code}
> But unsure, love some insight into this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to