RivenSun2 commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r908114549


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String 
memberId) {
         boolean onJoinPrepareAsyncCommitCompleted = false;
         // async commit offsets prior to rebalance if auto-commit enabled
         RequestFuture<Void> future = maybeAutoCommitOffsetsAsync();
+        // wait for commit offset response if future exist
+        if (future != null) {
+            client.poll(future, 
time.timer(rebalanceConfig.rebalanceTimeoutMs));
+        }

Review Comment:
   If `client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs));` is 
still called here, the problem maybe will recur, and the user maybe still block 
the `poll` method of kafkaConsumer. ` rebalanceConfig.rebalanceTimeoutMs` may 
be much larger than `pollDuration`
   
   Suggest:
   1. Promote the `future` variable in `onJoinPrepare` to the instance variable 
of `ConsumerCoordinator`. The variable name can be tentatively 
`rebalanceAutoCommitFuture`, and the initial value is `null`.
       ` private RequestFuture<Void> rebalanceAutoCommitFuture =null;`
   
   2. Refactor the `onJoinPrepare` method. The `rebalanceAutoCommitFuture` can 
be completed after the user has called the `poll` method multiple times without 
blocking the user's `poll` method.
   ```
           boolean onJoinPrepareAsyncCommitCompleted = false;
           if(autoCommitEnabled && rebalanceAutoCommitFuture == null){
               // async commit offsets prior to rebalance if auto-commit enabled
               rebalanceAutoCommitFuture = maybeAutoCommitOffsetsAsync();
           }
           if (rebalanceAutoCommitFuture != null) {
               client.poll(rebalanceAutoCommitFuture, time.timer(0));
           }
           // return true when
           // 1. future is null, which means no commit request sent, so it is 
still considered completed
           // 2. offset commit completed
           // 3. offset commit failed with non-retriable exception
           if (rebalanceAutoCommitFuture == null)
               onJoinPrepareAsyncCommitCompleted = true;
           else if (rebalanceAutoCommitFuture.succeeded()) {
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           } else if (rebalanceAutoCommitFuture.failed() && 
!rebalanceAutoCommitFuture.isRetriable()) {
               log.error("Asynchronous auto-commit of offsets failed: {}", 
rebalanceAutoCommitFuture.exception().getMessage());
               onJoinPrepareAsyncCommitCompleted = true;
               rebalanceAutoCommitFuture = null;
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to