Alberto Lago created FLINK-36630: ------------------------------------ Summary: kafka-connector random WakeupException after enabling watermark alignment Key: FLINK-36630 URL: https://issues.apache.org/jira/browse/FLINK-36630 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.19.1 Reporter: Alberto Lago
Hello, in Flink 19, AWS managed Flink flink-connector-kafka:3.3.0-1.19 After i enable watermark alignment at KafkaSource, It starts throwing uncaught WakeupException. It happens: * On every checkpoint unless i disable offset committing: setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false") * randomly watermark alignment aded with wmStrategy.withWatermarkAlignment("kafkaSource", Duration.ofMinutes(wa)) Stack trace is for both cases. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.WakeupException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759) at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717) at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:127) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ... 6 more Watermark alignment stops working after recovery. Checking the code, i see that long consumerPosition = consumer.position(tp); at line KafkaPartitionSplitReader.java:127 is the only call to consumer.position that is not wrapped on retryOnWakeup on the whole file ( there are a few calls in there ) Tested wrapping it and i am able to make my app work without any exception. I could make a PR, waiting for ASF Self-Service account. But i dont really understand what the race condition here and not able to reproduce on tests. Hints and help would be appreciated, Thanks -- This message was sent by Atlassian Jira (v8.20.10#820010)