This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new f814b70 KAFKA-12983: reset needsJoinPrepare flag before rejoining the
group (#10986)
f814b70 is described below
commit f814b70eb9e000d0d68d1eb74cf69848d86aa421
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Jul 13 12:14:39 2021 -0700
KAFKA-12983: reset needsJoinPrepare flag before rejoining the group (#10986)
The #onJoinPrepare callback is not always invoked before a member (re)joins
the group, but only once when it first enters the rebalance. This means that
any updates or events that occur during the join phase can be lost in the
internal state: for example, clearing the SubscriptionState (and thus the
"ownedPartitions" that are used for cooperative rebalancing) after losing its
memberId during a rebalance. We should reset the needsJoinPrepare flag inside
the resetStateAndRejoin() method.
Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson
<[email protected]>, David Jacot <[email protected]>
---
.../consumer/internals/AbstractCoordinator.java | 1 +
.../consumer/internals/ConsumerCoordinatorTest.java | 18 +++++++++++++++++-
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b70b67c..aff5571 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -969,6 +969,7 @@ public abstract class AbstractCoordinator implements
Closeable {
private synchronized void resetStateAndRejoin() {
resetState();
rejoinNeeded = true;
+ needsJoinPrepare = true;
}
synchronized void resetGenerationOnResponseError(ApiKeys api, Errors
error) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 7310f85..a73f664 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -100,6 +100,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
+import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@@ -1044,8 +1045,12 @@ public abstract class ConsumerCoordinatorTest {
Collection<TopicPartition> revoked = getRevoked(partitions,
partitions);
assertEquals(revoked.isEmpty() ? 0 : 1,
rebalanceListener.revokedCount);
assertEquals(revoked.isEmpty() ? null : revoked,
rebalanceListener.revoked);
+ Collection<TopicPartition> lost = getLost(partitions);
+ assertEquals(lost.isEmpty() ? 0 : 1, rebalanceListener.lostCount);
+ assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
assertEquals(2, rebalanceListener.assignedCount);
- assertEquals(getAdded(partitions, partitions),
rebalanceListener.assigned);
+ // Since onPartitionsLost is invoked when the JoinGroup failed, all
owned partitions have to be re-added
+ assertEquals(toSet(partitions), rebalanceListener.assigned);
assertEquals(toSet(partitions), subscriptions.assignedPartitions());
}
@@ -3014,6 +3019,17 @@ public abstract class ConsumerCoordinatorTest {
}
}
+ private Collection<TopicPartition> getLost(final List<TopicPartition>
owned) {
+ switch (protocol) {
+ case EAGER:
+ return emptySet();
+ case COOPERATIVE:
+ return toSet(owned);
+ default:
+ throw new IllegalStateException("This should not happen");
+ }
+ }
+
private Collection<TopicPartition> getAdded(final List<TopicPartition>
owned,
final List<TopicPartition>
assigned) {
switch (protocol) {