aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924670900


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map<String, List<TopicPartition
     }
 
     @Override
-    protected boolean onJoinPrepare(int generation, String memberId) {
+    protected boolean onJoinPrepare(Timer timer, int generation, String 
memberId) {
         log.debug("Executing onJoinPrepare with generation {} and memberId 
{}", generation, memberId);
-        boolean onJoinPrepareAsyncCommitCompleted = false;
+        if (joinPrepareTimer == null) {
+            joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);

Review Comment:
   added, thanks



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -809,11 +850,13 @@ else if (future.failed() && !future.isRetriable()) {
 
         isLeader = false;
         subscriptions.resetGroupSubscription();
+        joinPrepareTimer = null;
+        autoCommitOffsetRequestFuture = null;

Review Comment:
   added, thanks.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3540,7 +3353,7 @@ public void 
testPrepareJoinAndRejoinAfterFailedRebalance() {
             MockTime time = new MockTime(1);
 
             // onJoinPrepare will be executed and onJoinComplete will not.
-            boolean res = coordinator.joinGroupIfNeeded(time.timer(2));
+            boolean res = coordinator.joinGroupIfNeeded(time.timer(100));

Review Comment:
   because we add a `timer.update();` at end of onJoinPrepare, this test will 
be failed on my local machine. So i have to increase the timeout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to