[ https://issues.apache.org/jira/browse/FLINK-36630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17896592#comment-17896592 ]
Hongshun Wang commented on FLINK-36630: --------------------------------------- [~alago] Be free to make a PR. > kafka-connector random WakeupException after enabling watermark alignment > ------------------------------------------------------------------------- > > Key: FLINK-36630 > URL: https://issues.apache.org/jira/browse/FLINK-36630 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka > Affects Versions: 1.19.1 > Reporter: Alberto Lago > Priority: Major > Attachments: image-2024-10-31-12-03-14-115.png > > > Hello, > in Flink 19, AWS managed Flink > flink-connector-kafka:3.3.0-1.19 > After i enable watermark alignment at KafkaSource, It starts throwing > uncaught WakeupException. > It happens: > * On every checkpoint unless i disable offset committing: > setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false") > * randomly > watermark alignment aded with > wmStrategy.withWatermarkAlignment("kafkaSource", Duration.ofMinutes(wa)) > Stack trace is for both cases. > java.lang.RuntimeException: SplitFetcher thread 0 received unexpected > exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:829) > Caused by: org.apache.kafka.common.errors.WakeupException > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) > at > org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717) > at > org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:127) > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > ... 6 more > Watermark alignment stops working after recovery. > Checking the code, i see that > long consumerPosition = consumer.position(tp); > at line KafkaPartitionSplitReader.java:127 is the only call to > consumer.position that is not wrapped on retryOnWakeup on the whole > file ( there are a few calls in there ) > Tested wrapping it and i am able to make my app work without any exception. > I could make a PR, waiting for ASF Self-Service account. > But i dont really understand what the race condition here and not able > to reproduce on tests. > Hints and help would be appreciated, > Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)