[ 
https://issues.apache.org/jira/browse/FLINK-36630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896592#comment-17896592
 ] 

Hongshun Wang commented on FLINK-36630:
---------------------------------------

[~alago] Be free to make a PR.

> kafka-connector random WakeupException after enabling watermark alignment
> -------------------------------------------------------------------------
>
>                 Key: FLINK-36630
>                 URL: https://issues.apache.org/jira/browse/FLINK-36630
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>    Affects Versions: 1.19.1
>            Reporter: Alberto Lago
>            Priority: Major
>         Attachments: image-2024-10-31-12-03-14-115.png
>
>
> Hello,
> in Flink 19, AWS managed Flink
> flink-connector-kafka:3.3.0-1.19
> After i enable watermark alignment at KafkaSource, It starts throwing
> uncaught WakeupException.
> It happens:
> * On every checkpoint unless i disable offset committing:
>    setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
> * randomly
> watermark alignment aded with 
> wmStrategy.withWatermarkAlignment("kafkaSource", Duration.ofMinutes(wa))
> Stack trace is for both cases.
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
> exception while polling the records
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.WakeupException
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717)
> at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:127)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> ... 6 more
> Watermark alignment stops working after recovery.
> Checking the code, i see that
> long consumerPosition = consumer.position(tp);
> at line KafkaPartitionSplitReader.java:127 is the only call to
> consumer.position that is not wrapped on retryOnWakeup on the whole
> file ( there are a few calls in there )
> Tested wrapping it and i am able to make my app work without any exception.
> I could make a PR, waiting for ASF Self-Service account.
> But i dont really understand what the race condition here and not able
> to reproduce on tests.
> Hints and help would be appreciated,
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to