This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new d783a9f KAFKA-9815; Ensure consumer always re-joins if JoinGroup
fails (#8420)
d783a9f is described below
commit d783a9f9f697c4df74abdc2a8ee18c533aa0716e
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Apr 7 01:00:11 2020 +0100
KAFKA-9815; Ensure consumer always re-joins if JoinGroup fails (#8420)
On metadata change for assigned topics, we trigger rebalance, revoke
partitions and send JoinGroup. If metadata reverts to the original value and
JoinGroup fails, we don't resend JoinGroup because we don't set `rejoinNeeded`.
This PR sets `rejoinNeeded=true` when rebalance is triggered due to metadata
change to ensure that we retry on failure.
Reviewers: Boyang Chen <[email protected]>, Chia-Ping Tsai
<[email protected]>, Jason Gustafson <[email protected]>
---
.../consumer/internals/ConsumerCoordinator.java | 5 +-
.../internals/ConsumerCoordinatorTest.java | 58 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b4d9fa5..6b905c5 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -727,11 +727,14 @@ public final class ConsumerCoordinator extends
AbstractCoordinator {
// we need to rejoin if we performed the assignment and metadata has
changed;
// also for those owned-but-no-longer-existed partitions we should
drop them as lost
- if (assignmentSnapshot != null &&
!assignmentSnapshot.matches(metadataSnapshot))
+ if (assignmentSnapshot != null &&
!assignmentSnapshot.matches(metadataSnapshot)) {
+ requestRejoin();
return true;
+ }
// we need to join if our subscription has changed since the last join
if (joinedSubscription != null &&
!joinedSubscription.equals(subscriptions.subscription())) {
+ requestRejoin();
return true;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9e6128b..afef951 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -843,6 +843,64 @@ public class ConsumerCoordinatorTest {
assertFalse(coordinator.rejoinNeededOrPending());
}
+ /**
+ * Verifies that the consumer re-joins after a metadata change. If
JoinGroup fails
+ * and metadata reverts to its original value, the consumer should still
retry JoinGroup.
+ */
+ @Test
+ public void testRebalanceWithMetadataChange() {
+ final String consumerId = "leader";
+ final List<String> topics = Arrays.asList(topic1, topic2);
+ final List<TopicPartition> partitions = Arrays.asList(t1p, t2p);
+ subscriptions.subscribe(toSet(topics), rebalanceListener);
+ client.updateMetadata(TestUtils.metadataUpdateWith(1,
+ Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2,
1))));
+ coordinator.maybeUpdateSubscriptionMetadata();
+
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+ Map<String, List<String>> initialSubscription =
singletonMap(consumerId, topics);
+ partitionAssignor.prepare(singletonMap(consumerId, partitions));
+
+ client.prepareResponse(joinGroupLeaderResponse(1, consumerId,
initialSubscription, Errors.NONE));
+ client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+
+ // rejoin will only be set in the next poll call
+ assertFalse(coordinator.rejoinNeededOrPending());
+ assertEquals(toSet(topics), subscriptions.subscription());
+ assertEquals(toSet(partitions), subscriptions.assignedPartitions());
+ assertEquals(0, rebalanceListener.revokedCount);
+ assertNull(rebalanceListener.revoked);
+ assertEquals(1, rebalanceListener.assignedCount);
+
+ // Change metadata to trigger rebalance.
+ client.updateMetadata(TestUtils.metadataUpdateWith(1,
singletonMap(topic1, 1)));
+ coordinator.poll(time.timer(0));
+
+ // Revert metadata to original value. Fail pending JoinGroup. Another
+ // JoinGroup should be sent, which will be completed successfully.
+ client.updateMetadata(TestUtils.metadataUpdateWith(1,
+ Utils.mkMap(Utils.mkEntry(topic1, 1), Utils.mkEntry(topic2,
1))));
+ client.respond(joinGroupFollowerResponse(1, consumerId, "leader",
Errors.NOT_COORDINATOR));
+ client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+ coordinator.poll(time.timer(0));
+ assertTrue(coordinator.rejoinNeededOrPending());
+
+ client.respond(joinGroupLeaderResponse(2, consumerId,
initialSubscription, Errors.NONE));
+ client.prepareResponse(syncGroupResponse(partitions, Errors.NONE));
+ coordinator.poll(time.timer(Long.MAX_VALUE));
+
+ assertFalse(coordinator.rejoinNeededOrPending());
+ Collection<TopicPartition> revoked = getRevoked(partitions,
partitions);
+ assertEquals(revoked.isEmpty() ? 0 : 1,
rebalanceListener.revokedCount);
+ assertEquals(revoked.isEmpty() ? null : revoked,
rebalanceListener.revoked);
+ assertEquals(2, rebalanceListener.assignedCount);
+ assertEquals(getAdded(partitions, partitions),
rebalanceListener.assigned);
+ assertEquals(toSet(partitions), subscriptions.assignedPartitions());
+ }
+
@Test
public void testWakeupDuringJoin() {
final String consumerId = "leader";