This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new dbcf949f89b [FLINK-29153][connector/kafka] Retry 
KafkaConsumer#commitAsync on WakeupException in KafkaConsumerThread
dbcf949f89b is described below

commit dbcf949f89b5bead6c2bec4de77ca68bc8614fe6
Author: Qingsheng Ren <[email protected]>
AuthorDate: Tue Sep 6 17:01:45 2022 +0800

    [FLINK-29153][connector/kafka] Retry KafkaConsumer#commitAsync on 
WakeupException in KafkaConsumerThread
    
    KafkaConsumerThread makes a wakeup on the KafkaConsumer on offset commit to 
wakeup the potential blocking KafkaConsumer.poll(). However the wakeup might 
happen when the consumer is not polling. The wakeup will be remembered by the 
consumer and re-examined while committing the offset asynchronously, which 
leads to an unnecessary WakeupException.
---
 .../kafka/internals/KafkaConsumerThread.java         | 20 +++++++++++++++++---
 1 file changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
index eba9f3b0387..4c4cc907944 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaConsumerThread.java
@@ -223,9 +223,12 @@ public class KafkaConsumerThread<T> extends Thread {
                         // also record that a commit is already in progress
                         // the order here matters! first set the flag, then 
send the commit command.
                         commitInProgress = true;
-                        consumer.commitAsync(
-                                commitOffsetsAndCallback.f0,
-                                new 
CommitCallback(commitOffsetsAndCallback.f1));
+                        retryOnceOnWakeup(
+                                () ->
+                                        consumer.commitAsync(
+                                                commitOffsetsAndCallback.f0,
+                                                new 
CommitCallback(commitOffsetsAndCallback.f1)),
+                                "commitAsync");
                     }
                 }
 
@@ -503,6 +506,17 @@ public class KafkaConsumerThread<T> extends Thread {
         return new KafkaConsumer<>(kafkaProperties);
     }
 
+    private void retryOnceOnWakeup(Runnable consumerCall, String description) {
+        try {
+            consumerCall.run();
+        } catch (WakeupException we) {
+            log.info(
+                    "Caught WakeupException while executing Kafka consumer 
call for {}. Will retry it once.",
+                    description);
+            consumerCall.run();
+        }
+    }
+
     // ------------------------------------------------------------------------
     //  Utilities
     // ------------------------------------------------------------------------

Reply via email to