Alberto Lago created FLINK-36630:
------------------------------------
Summary: kafka-connector random WakeupException after enabling
watermark alignment
Key: FLINK-36630
URL: https://issues.apache.org/jira/browse/FLINK-36630
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.19.1
Reporter: Alberto Lago
Hello,
in Flink 19, AWS managed Flink
flink-connector-kafka:3.3.0-1.19
After i enable watermark alignment at KafkaSource, It starts throwing
uncaught WakeupException.
It happens:
* On every checkpoint unless i disable offset committing:
setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
* randomly
watermark alignment aded with
wmStrategy.withWatermarkAlignment("kafkaSource", Duration.ofMinutes(wa))
Stack trace is for both cases.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
exception while polling the records
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.common.errors.WakeupException
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759)
at
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717)
at
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:127)
at
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more
Watermark alignment stops working after recovery.
Checking the code, i see that
long consumerPosition = consumer.position(tp);
at line KafkaPartitionSplitReader.java:127 is the only call to
consumer.position that is not wrapped on retryOnWakeup on the whole
file ( there are a few calls in there )
Tested wrapping it and i am able to make my app work without any exception.
I could make a PR, waiting for ASF Self-Service account.
But i dont really understand what the race condition here and not able
to reproduce on tests.
Hints and help would be appreciated,
Thanks
--
This message was sent by Atlassian Jira
(v8.20.10#820010)