aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924670900
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition } @Override - protected boolean onJoinPrepare(int generation, String memberId) { + protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId); - boolean onJoinPrepareAsyncCommitCompleted = false; + if (joinPrepareTimer == null) { + joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs); Review Comment: added, thanks ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -809,11 +850,13 @@ else if (future.failed() && !future.isRetriable()) { isLeader = false; subscriptions.resetGroupSubscription(); + joinPrepareTimer = null; + autoCommitOffsetRequestFuture = null; Review Comment: added, thanks. ########## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ########## @@ -3540,7 +3353,7 @@ public void testPrepareJoinAndRejoinAfterFailedRebalance() { MockTime time = new MockTime(1); // onJoinPrepare will be executed and onJoinComplete will not. - boolean res = coordinator.joinGroupIfNeeded(time.timer(2)); + boolean res = coordinator.joinGroupIfNeeded(time.timer(100)); Review Comment: because we add a `timer.update();` at end of onJoinPrepare, this test will be failed on my local machine. So i have to increase the timeout. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org