Brian Sang created KAFKA-20232:
----------------------------------
Summary: KafkaConsumer#commitAsync throws unexpected
WakeupException in awaitMetadataUpdate()
Key: KAFKA-20232
URL: https://issues.apache.org/jira/browse/KAFKA-20232
Project: Kafka
Issue Type: Bug
Reporter: Brian Sang
This is basically the same problem reported in
https://issues.apache.org/jira/browse/KAFKA-14208, but it occurs in a slightly
different code path:
{code:java}
Caused by: org.apache.kafka.common.errors.WakeupException
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:277)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReadyAsync(AbstractCoordinator.java:251)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnreadyAsync(ConsumerCoordinator.java:501)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1080)
at
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1574)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.notifyCheckpointComplete(KafkaPartitionSplitReader.java:258)
at
org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager$1.run(KafkaSourceFetcherManager.java:104)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more {code}
i.e. starting from
[https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L309]
The fix before was to use a new poll() invocation with a disableWakeup flag,
and I think a similar fix would work here. I will create a PR and attach it to
this ticket.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)