This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new dbcf949f89b [FLINK-29153][connector/kafka] Retry
KafkaConsumer#commitAsync on WakeupException in KafkaConsumerThread
dbcf949f89b is described below
commit dbcf949f89b5bead6c2bec4de77ca68bc8614fe6
Author: Qingsheng Ren <[email protected]>
AuthorDate: Tue Sep 6 17:01:45 2022 +0800
[FLINK-29153][connector/kafka] Retry KafkaConsumer#commitAsync on
WakeupException in KafkaConsumerThread
KafkaConsumerThread makes a wakeup on the KafkaConsumer on offset commit to
wakeup the potential blocking KafkaConsumer.poll(). However the wakeup might
happen when the consumer is not polling. The wakeup will be remembered by the
consumer and re-examined while committing the offset asynchronously, which
leads to an unnecessary WakeupException.
---
.../kafka/internals/KafkaConsumerThread.java | 20 +++++++++++++++++---
1 file changed, 17 insertions(+), 3 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
index eba9f3b0387..4c4cc907944 100644
---
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
+++
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
@@ -223,9 +223,12 @@ public class KafkaConsumerThread<T> extends Thread {
// also record that a commit is already in progress
// the order here matters! first set the flag, then
send the commit command.
commitInProgress = true;
- consumer.commitAsync(
- commitOffsetsAndCallback.f0,
- new
CommitCallback(commitOffsetsAndCallback.f1));
+ retryOnceOnWakeup(
+ () ->
+ consumer.commitAsync(
+ commitOffsetsAndCallback.f0,
+ new
CommitCallback(commitOffsetsAndCallback.f1)),
+ "commitAsync");
}
}
@@ -503,6 +506,17 @@ public class KafkaConsumerThread<T> extends Thread {
return new KafkaConsumer<>(kafkaProperties);
}
+ private void retryOnceOnWakeup(Runnable consumerCall, String description) {
+ try {
+ consumerCall.run();
+ } catch (WakeupException we) {
+ log.info(
+ "Caught WakeupException while executing Kafka consumer
call for {}. Will retry it once.",
+ description);
+ consumerCall.run();
+ }
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------