[
https://issues.apache.org/jira/browse/FLINK-36630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17894479#comment-17894479
]
Alberto Lago commented on FLINK-36630:
--------------------------------------
I think I see what you mean, we should handle Wakeup and IllegalState
differently, as now Wakeup also marks the splits as finished?
{code:java}
try {
consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
} catch (WakeupException | IllegalStateException e) {
// IllegalStateException will be thrown if the consumer is not
assigned any partitions.
// This happens if all assigned partitions are invalid or empty
(starting offset >=
// stopping offset). We just mark empty partitions as finished and
return an empty
// record container, and this consumer will be closed by
SplitFetcherManager.
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(
ConsumerRecords.empty(), kafkaSourceReaderMetrics);
markEmptySplitsAsFinished(recordsBySplits);
return recordsBySplits;
}
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(consumerRecords,
kafkaSourceReaderMetrics);
List<TopicPartition> finishedPartitions = new ArrayList<>();
for (TopicPartition tp : consumer.assignment()) {
long stoppingOffset = getStoppingOffset(tp);
long consumerPosition = consumer.position(tp);
{code}
to
{code:java}
try {
consumerRecords = retryOnWakeup(
() -> consumer.poll(Duration.ofMillis(POLL_TIMEOUT), <-----
This is where you mean to also add the retryOnWakeup?
"polling from consumer"
);
} catch (WakeupException | IllegalStateException e) {
// IllegalStateException will be thrown if the consumer is not
assigned any partitions.
// This happens if all assigned partitions are invalid or empty
(starting offset >=
// stopping offset). We just mark empty partitions as finished and
return an empty
// record container, and this consumer will be closed by
SplitFetcherManager.
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(
ConsumerRecords.empty(), kafkaSourceReaderMetrics);
markEmptySplitsAsFinished(recordsBySplits);
return recordsBySplits;
}
KafkaPartitionSplitRecords recordsBySplits =
new KafkaPartitionSplitRecords(consumerRecords,
kafkaSourceReaderMetrics);
List<TopicPartition> finishedPartitions = new ArrayList<>();
for (TopicPartition tp : consumer.assignment()) {
long stoppingOffset = getStoppingOffset(tp);
long consumerPosition =retryOnWakeup( // <----- Here is what i
have running and fixed my issue
() -> consumer.position(tp),
"getting starting offset to check if has reached
stopping offset");
{code}
Yeah, I'd need help wioth the test, as i wasnt able to reproduce it, yet
> kafka-connector random WakeupException after enabling watermark alignment
> -------------------------------------------------------------------------
>
> Key: FLINK-36630
> URL: https://issues.apache.org/jira/browse/FLINK-36630
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.19.1
> Reporter: Alberto Lago
> Priority: Major
> Attachments: image-2024-10-31-12-03-14-115.png
>
>
> Hello,
> in Flink 19, AWS managed Flink
> flink-connector-kafka:3.3.0-1.19
> After i enable watermark alignment at KafkaSource, It starts throwing
> uncaught WakeupException.
> It happens:
> * On every checkpoint unless i disable offset committing:
> setProperty(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT.key(), "false")
> * randomly
> watermark alignment aded with
> wmStrategy.withWatermarkAlignment("kafkaSource", Duration.ofMinutes(wa))
> Stack trace is for both cases.
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
> exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.kafka.common.errors.WakeupException
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:529)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:293)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:251)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1759)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1717)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:127)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> ... 6 more
> Watermark alignment stops working after recovery.
> Checking the code, i see that
> long consumerPosition = consumer.position(tp);
> at line KafkaPartitionSplitReader.java:127 is the only call to
> consumer.position that is not wrapped on retryOnWakeup on the whole
> file ( there are a few calls in there )
> Tested wrapping it and i am able to make my app work without any exception.
> I could make a PR, waiting for ASF Self-Service account.
> But i dont really understand what the race condition here and not able
> to reproduce on tests.
> Hints and help would be appreciated,
> Thanks
--
This message was sent by Atlassian Jira
(v8.20.10#820010)