This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 7944d7b3280 KAFKA-13891: reset generation when syncgroup failed with 
REBALANCE_IN_PROGRESS (#12140)
7944d7b3280 is described below

commit 7944d7b32809358bd2c1d4150e9d74c11dd5f320
Author: Shawn <[email protected]>
AuthorDate: Mon Jun 13 10:39:54 2022 +0800

    KAFKA-13891: reset generation when syncgroup failed with 
REBALANCE_IN_PROGRESS (#12140)
    
    Reviewers: Luke Chen <[email protected]>
---
 .../consumer/internals/AbstractCoordinator.java    |  3 +
 .../internals/AbstractCoordinatorTest.java         | 65 ++++++++++++++++++++++
 2 files changed, 68 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c59629f4ca7..8579a269f63 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -825,6 +825,9 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                     log.info("SyncGroup failed: The group began another 
rebalance. Need to re-join the group. " +
                                  "Sent generation was {}", sentGeneration);
+                    // consumer didn't get assignment in this generation, so 
we need to reset generation
+                    // to avoid joinGroup with out-of-data ownedPartitions in 
cooperative rebalance
+                    resetStateOnResponseError(ApiKeys.SYNC_GROUP, error, 
false);
                     future.raise(error);
                 } else if (error == Errors.FENCED_INSTANCE_ID) {
                     // for sync-group request, even if the generation has 
changed we would not expect the instance id
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 0745b99749f..4471b6f88cf 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -67,6 +67,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -503,6 +504,54 @@ public class AbstractCoordinatorTest {
         ensureActiveGroup(rejoinedGeneration, memberId);
     }
 
+    @Test
+    public void 
testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws 
InterruptedException, ExecutionException {
+        setupCoordinator();
+
+        String memberId = "memberId";
+        int generation = 5;
+
+        // Rebalance once to initialize the generation and memberId
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        expectJoinGroup("", generation, memberId);
+        expectSyncGroup(generation, memberId);
+        ensureActiveGroup(generation, memberId);
+
+        // Force a rebalance
+        coordinator.requestRejoin("Manual test trigger");
+        assertTrue(coordinator.rejoinNeededOrPending());
+
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        try {
+            // Return RebalanceInProgress in syncGroup
+            int rejoinedGeneration = 10;
+            expectJoinGroup(memberId, rejoinedGeneration, memberId);
+            expectRebalanceInProgressForSyncGroup(rejoinedGeneration, 
memberId);
+            Future<Boolean> secondJoin = executor.submit(() ->
+                
coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE)));
+
+            TestUtils.waitForCondition(() -> {
+                AbstractCoordinator.Generation currentGeneration = 
coordinator.generation();
+                return currentGeneration.generationId == 
AbstractCoordinator.Generation.NO_GENERATION.generationId &&
+                        currentGeneration.memberId.equals(memberId);
+            }, 2000, "Generation should be reset");
+
+            rejoinedGeneration = 20;
+            expectSyncGroup(rejoinedGeneration, memberId);
+            mockClient.respond(joinGroupFollowerResponse(
+                    rejoinedGeneration,
+                    memberId,
+                    "leaderId",
+                    Errors.NONE,
+                    PROTOCOL_TYPE
+            ));
+            assertTrue(secondJoin.get());
+        } finally {
+            executor.shutdownNow();
+            executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+        }
+    }
+
     @Test
     public void testRejoinReason() {
         setupCoordinator();
@@ -590,6 +639,22 @@ public class AbstractCoordinatorTest {
         }, null, true);
     }
 
+    private void expectRebalanceInProgressForSyncGroup(
+            int expectedGeneration,
+            String expectedMemberId
+    ) {
+        mockClient.prepareResponse(body -> {
+            if (!(body instanceof SyncGroupRequest)) {
+                return false;
+            }
+            SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest) 
body).data();
+            return syncGroupRequest.generationId() == expectedGeneration
+                    && syncGroupRequest.memberId().equals(expectedMemberId)
+                    && syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
+                    && syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
+        }, syncGroupResponse(Errors.REBALANCE_IN_PROGRESS, PROTOCOL_TYPE, 
PROTOCOL_NAME));
+    }
+
     private void expectDisconnectInJoinGroup(
         String expectedMemberId
     ) {

Reply via email to