This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new a7ad0b4  KAFKA-12983: reset needsJoinPrepare flag before rejoining the 
group (#10986)
a7ad0b4 is described below

commit a7ad0b4f636ed364c9195834cd38046a3a965e2a
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Jul 13 12:14:39 2021 -0700

    KAFKA-12983: reset needsJoinPrepare flag before rejoining the group (#10986)
    
    The #onJoinPrepare callback is not always invoked before a member (re)joins 
the group, but only once when it first enters the rebalance. This means that 
any updates or events that occur during the join phase can be lost in the 
internal state: for example, clearing the SubscriptionState (and thus the 
"ownedPartitions" that are used for cooperative rebalancing) after losing its 
memberId during a rebalance. We should reset the needsJoinPrepare flag inside 
the resetStateAndRejoin() method.
    
    Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson 
<[email protected]>, David Jacot <[email protected]>
---
 .../consumer/internals/AbstractCoordinator.java        |  1 +
 .../consumer/internals/ConsumerCoordinatorTest.java    | 18 +++++++++++++++++-
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 9608b8f..6ce083d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -972,6 +972,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
     private synchronized void resetStateAndRejoin(final String reason) {
         resetStateAndGeneration(reason);
         requestRejoin(reason);
+        needsJoinPrepare = true;
     }
 
     synchronized void resetGenerationOnResponseError(ApiKeys api, Errors 
error) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 9eb1736..89bd68e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -100,6 +100,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
+import static java.util.Collections.emptySet;
 import static java.util.Collections.singleton;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
@@ -1044,8 +1045,12 @@ public abstract class ConsumerCoordinatorTest {
         Collection<TopicPartition> revoked = getRevoked(partitions, 
partitions);
         assertEquals(revoked.isEmpty() ? 0 : 1, 
rebalanceListener.revokedCount);
         assertEquals(revoked.isEmpty() ? null : revoked, 
rebalanceListener.revoked);
+        Collection<TopicPartition> lost = getLost(partitions);
+        assertEquals(lost.isEmpty() ? 0 : 1, rebalanceListener.lostCount);
+        assertEquals(lost.isEmpty() ? null : lost, rebalanceListener.lost);
         assertEquals(2, rebalanceListener.assignedCount);
-        assertEquals(getAdded(partitions, partitions), 
rebalanceListener.assigned);
+        // Since onPartitionsLost is invoked when the JoinGroup failed, all 
owned partitions have to be re-added
+        assertEquals(toSet(partitions), rebalanceListener.assigned);
         assertEquals(toSet(partitions), subscriptions.assignedPartitions());
     }
 
@@ -3014,6 +3019,17 @@ public abstract class ConsumerCoordinatorTest {
         }
     }
 
+    private Collection<TopicPartition> getLost(final List<TopicPartition> 
owned) {
+        switch (protocol) {
+            case EAGER:
+                return emptySet();
+            case COOPERATIVE:
+                return toSet(owned);
+            default:
+                throw new IllegalStateException("This should not happen");
+        }
+    }
+
     private Collection<TopicPartition> getAdded(final List<TopicPartition> 
owned,
                                                 final List<TopicPartition> 
assigned) {
         switch (protocol) {

Reply via email to