showuon commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r909314930
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -745,6 +745,10 @@ protected boolean onJoinPrepare(int generation, String memberId) { boolean onJoinPrepareAsyncCommitCompleted = false; // async commit offsets prior to rebalance if auto-commit enabled RequestFuture<Void> future = maybeAutoCommitOffsetsAsync(); + // wait for commit offset response if future exist + if (future != null) { + client.poll(future, time.timer(rebalanceConfig.rebalanceTimeoutMs)); + } Review Comment: Thanks @RivenSun2 , I forgot the purpose of our previous change. The purpose of offset commit in `onJoinPrepare` is to make sure after rebalance, the other consumer can start from the correct offset. So, this offset commit must complete with the correct offset. In @RivenSun2 's suggestion, if we store the future as instance variable, and make sure there's only 1 inflight future, and that can fix the infinite offset commit issue in this PR. And since during this time, the group consumers are not able to consume messages due to rebalance in progress, we can make sure the offset commit is the only one and up-to-date one before reblancing. So I think this is a good solution. For the comment: > it will cause KafkaConsuemr.poll() to be in a busy loop between the time when commitOffsetRequest is in flight. I agree. I'm thinking we can backoff when `onJoinPrepare` return false like this: ```java // return false when onJoinPrepare is waiting for committing offset if (!onJoinPrepare(generation.generationId, generation.memberId)) { needsJoinPrepare = true; // backoff some time <-- new added timer.sleep(Math.min(timer.remainingMs(), rebalanceConfig.retryBackoffMs)); //should not initiateJoinGroup if needsJoinPrepare still is true return false; } ``` WDYT? @RivenSun2 @aiquestion ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org