This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 7944d7b3280 KAFKA-13891: reset generation when syncgroup failed with
REBALANCE_IN_PROGRESS (#12140)
7944d7b3280 is described below
commit 7944d7b32809358bd2c1d4150e9d74c11dd5f320
Author: Shawn <[email protected]>
AuthorDate: Mon Jun 13 10:39:54 2022 +0800
KAFKA-13891: reset generation when syncgroup failed with
REBALANCE_IN_PROGRESS (#12140)
Reviewers: Luke Chen <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 3 +
.../internals/AbstractCoordinatorTest.java | 65 ++++++++++++++++++++++
2 files changed, 68 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c59629f4ca7..8579a269f63 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -825,6 +825,9 @@ public abstract class AbstractCoordinator implements
Closeable {
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
log.info("SyncGroup failed: The group began another
rebalance. Need to re-join the group. " +
"Sent generation was {}", sentGeneration);
+ // consumer didn't get assignment in this generation, so
we need to reset generation
+ // to avoid joinGroup with out-of-data ownedPartitions in
cooperative rebalance
+ resetStateOnResponseError(ApiKeys.SYNC_GROUP, error,
false);
future.raise(error);
} else if (error == Errors.FENCED_INSTANCE_ID) {
// for sync-group request, even if the generation has
changed we would not expect the instance id
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 0745b99749f..4471b6f88cf 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -67,6 +67,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -503,6 +504,54 @@ public class AbstractCoordinatorTest {
ensureActiveGroup(rejoinedGeneration, memberId);
}
+ @Test
+ public void
testResetGenerationIdAfterSyncGroupFailedWithRebalanceInProgress() throws
InterruptedException, ExecutionException {
+ setupCoordinator();
+
+ String memberId = "memberId";
+ int generation = 5;
+
+ // Rebalance once to initialize the generation and memberId
+ mockClient.prepareResponse(groupCoordinatorResponse(node,
Errors.NONE));
+ expectJoinGroup("", generation, memberId);
+ expectSyncGroup(generation, memberId);
+ ensureActiveGroup(generation, memberId);
+
+ // Force a rebalance
+ coordinator.requestRejoin("Manual test trigger");
+ assertTrue(coordinator.rejoinNeededOrPending());
+
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ try {
+ // Return RebalanceInProgress in syncGroup
+ int rejoinedGeneration = 10;
+ expectJoinGroup(memberId, rejoinedGeneration, memberId);
+ expectRebalanceInProgressForSyncGroup(rejoinedGeneration,
memberId);
+ Future<Boolean> secondJoin = executor.submit(() ->
+
coordinator.ensureActiveGroup(mockTime.timer(Integer.MAX_VALUE)));
+
+ TestUtils.waitForCondition(() -> {
+ AbstractCoordinator.Generation currentGeneration =
coordinator.generation();
+ return currentGeneration.generationId ==
AbstractCoordinator.Generation.NO_GENERATION.generationId &&
+ currentGeneration.memberId.equals(memberId);
+ }, 2000, "Generation should be reset");
+
+ rejoinedGeneration = 20;
+ expectSyncGroup(rejoinedGeneration, memberId);
+ mockClient.respond(joinGroupFollowerResponse(
+ rejoinedGeneration,
+ memberId,
+ "leaderId",
+ Errors.NONE,
+ PROTOCOL_TYPE
+ ));
+ assertTrue(secondJoin.get());
+ } finally {
+ executor.shutdownNow();
+ executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
+ }
+ }
+
@Test
public void testRejoinReason() {
setupCoordinator();
@@ -590,6 +639,22 @@ public class AbstractCoordinatorTest {
}, null, true);
}
+ private void expectRebalanceInProgressForSyncGroup(
+ int expectedGeneration,
+ String expectedMemberId
+ ) {
+ mockClient.prepareResponse(body -> {
+ if (!(body instanceof SyncGroupRequest)) {
+ return false;
+ }
+ SyncGroupRequestData syncGroupRequest = ((SyncGroupRequest)
body).data();
+ return syncGroupRequest.generationId() == expectedGeneration
+ && syncGroupRequest.memberId().equals(expectedMemberId)
+ && syncGroupRequest.protocolType().equals(PROTOCOL_TYPE)
+ && syncGroupRequest.protocolName().equals(PROTOCOL_NAME);
+ }, syncGroupResponse(Errors.REBALANCE_IN_PROGRESS, PROTOCOL_TYPE,
PROTOCOL_NAME));
+ }
+
private void expectDisconnectInJoinGroup(
String expectedMemberId
) {