Brian Sang created KAFKA-20232:
----------------------------------

             Summary: KafkaConsumer#commitAsync throws unexpected 
WakeupException in awaitMetadataUpdate()
                 Key: KAFKA-20232
                 URL: https://issues.apache.org/jira/browse/KAFKA-20232
             Project: Kafka
          Issue Type: Bug
            Reporter: Brian Sang


This is basically the same problem reported in 
https://issues.apache.org/jira/browse/KAFKA-14208, but it occurs in a slightly 
different code path:


{code:java}
Caused by: org.apache.kafka.common.errors.WakeupException
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:277)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReadyAsync(AbstractCoordinator.java:251)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnreadyAsync(ConsumerCoordinator.java:501)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1080)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1574)
        at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.notifyCheckpointComplete(KafkaPartitionSplitReader.java:258)
        at 
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager$1.run(KafkaSourceFetcherManager.java:104)
        at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        ... 6 more {code}
i.e. starting from 
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L309]

The fix before was to use a new poll() invocation with a disableWakeup flag, 
and I think a similar fix would work here. I will create a PR and attach it to 
this ticket.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to