This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e52a126ee3 KAFKA-17154: New consumer subscribe may join group without 
a call to consumer.poll (#17165)
4e52a126ee3 is described below

commit 4e52a126ee3b3d6e07b16a705b45ee1cf4849fe8
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 02:55:21 2024 +0800

    KAFKA-17154: New consumer subscribe may join group without a call to 
consumer.poll (#17165)
    
    Reviewers: Lianet Magrans <[email protected]>, TaiJuWu 
<[email protected]>
---
 .../internals/AbstractMembershipManager.java       | 31 +++++++++++++++++++---
 .../events/ApplicationEventProcessor.java          | 10 +++++--
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 28 +++++++++++++++++++
 .../internals/ConsumerMembershipManagerTest.java   | 17 +++++++++++-
 .../internals/ShareMembershipManagerTest.java      | 17 +++++++++++-
 .../events/ApplicationEventProcessorTest.java      | 25 ++++++++++++++++-
 6 files changed, 120 insertions(+), 8 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 455e48a0c89..2b4bd8efab7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Time;
 
 import org.slf4j.Logger;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -43,6 +44,7 @@ import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.unmodifiableList;
@@ -189,6 +191,12 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
 
     private final Time time;
 
+    /**
+     * AtomicBoolean to track whether the subscription is updated.
+     * If it's true and subscription state is UNSUBSCRIBED, the next {@link 
#onConsumerPoll()} will change member state to JOINING.
+     */
+    private final AtomicBoolean subscriptionUpdated = new AtomicBoolean(false);
+
     /**
      * True if the poll timer has expired, signaled by a call to
      * {@link #transitionToSendingLeaveGroup(boolean)} with 
dueToExpiredPollTimer param true. This
@@ -458,14 +466,24 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
     }
 
     /**
-     * Join the group with the updated subscription, if the member is not part 
of it yet. If the
-     * member is already part of the group, this will only ensure that the 
updated subscription
+     * Set {@link #subscriptionUpdated} to true to indicate that the 
subscription has been updated.
+     * The next {@link #onConsumerPoll()} will join the group with the updated 
subscription, if the member is not part of it yet.
+     * If the member is already part of the group, this will only ensure that 
the updated subscription
      * is included in the next heartbeat request.
      * <p/>
      * Note that list of topics of the subscription is taken from the shared 
subscription state.
      */
     public void onSubscriptionUpdated() {
-        if (state == MemberState.UNSUBSCRIBED) {
+        subscriptionUpdated.compareAndSet(false, true);
+    }
+
+    /**
+     * Join the group if the member is not part of it yet. This function 
separates {@link #transitionToJoining}
+     * from the {@link #onSubscriptionUpdated} to fulfill the requirement of 
the "rebalances will only occur during an
+     * active call to {@link 
org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
+     */
+    public void onConsumerPoll() {
+        if (subscriptionUpdated.compareAndSet(true, false) && state == 
MemberState.UNSUBSCRIBED) {
             transitionToJoining();
         }
     }
@@ -1408,4 +1426,11 @@ public abstract class AbstractMembershipManager<R 
extends AbstractResponse> impl
             return Optional.of(new LocalAssignment(nextLocalEpoch, 
assignment));
         }
     }
+
+    /*
+     * Visible for testing.
+     */
+    boolean subscriptionUpdated() {
+        return subscriptionUpdated.get();
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 6ae6c23f6ee..6008e850324 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -155,9 +155,15 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     private void process(final PollEvent event) {
         if (requestManagers.commitRequestManager.isPresent()) {
             requestManagers.commitRequestManager.ifPresent(m -> 
m.updateAutoCommitTimer(event.pollTimeMs()));
-            requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> 
hrm.resetPollTimer(event.pollTimeMs()));
+            requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
+                hrm.membershipManager().onConsumerPoll();
+                hrm.resetPollTimer(event.pollTimeMs());
+            });
         } else {
-            requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> 
hrm.resetPollTimer(event.pollTimeMs()));
+            requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
+                hrm.membershipManager().onConsumerPoll();
+                hrm.resetPollTimer(event.pollTimeMs());
+            });
         }
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8d3e955e799..164ea05113e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -3473,6 +3473,34 @@ public void 
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
         }
     }
 
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class)
+    public void testPollSendsRequestToJoin(GroupProtocol groupProtocol) throws 
InterruptedException {
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+        initMetadata(client, Collections.singletonMap(topic, 1));
+        Node node = metadata.fetch().nodes().get(0);
+
+        
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
groupId, node), node);
+        KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, 
time, client, subscription, metadata,
+                assignor, true, groupInstanceId);
+        consumer.subscribe(singletonList(topic));
+        assertFalse(groupProtocol == GroupProtocol.CLASSIC ?
+                        requestGenerated(client, ApiKeys.JOIN_GROUP) :
+                        requestGenerated(client, 
ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+                "KafkaConsumer#subscribe should not send " + (groupProtocol == 
GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request");
+
+        consumer.poll(Duration.ZERO);
+        TestUtils.waitForCondition(() -> groupProtocol == 
GroupProtocol.CLASSIC ?
+                requestGenerated(client, ApiKeys.JOIN_GROUP) :
+                requestGenerated(client, ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+                "Expected " + (groupProtocol == GroupProtocol.CLASSIC ? 
"JoinGroup" : "Heartbeat") + " request");
+    }
+
+    private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
+        return client.requests().stream().anyMatch(request -> 
request.requestBuilder().apiKey().equals(apiKey));
+    }
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
     public static class DeserializerForClientId implements 
Deserializer<byte[]> {
         @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 75c98235188..3ca7cf2bd2b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -1617,10 +1617,25 @@ public class ConsumerMembershipManagerTest {
     }
 
     @Test
-    public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
+    public void testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup() 
{
         ConsumerMembershipManager membershipManager = 
createMemberInStableState();
         membershipManager.onSubscriptionUpdated();
+        assertTrue(membershipManager.subscriptionUpdated());
+        membershipManager.onConsumerPoll();
         verify(membershipManager, never()).transitionToJoining();
+        assertFalse(membershipManager.subscriptionUpdated());
+    }
+
+    @Test
+    public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup() {
+        ConsumerMembershipManager membershipManager = 
createMembershipManager(null);
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        membershipManager.onSubscriptionUpdated();
+        verify(membershipManager, never()).transitionToJoining();
+        assertTrue(membershipManager.subscriptionUpdated());
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        membershipManager.onConsumerPoll();
+        verify(membershipManager).transitionToJoining();
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
index 96a2b98ce61..eca8db88852 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
@@ -1137,10 +1137,25 @@ public class ShareMembershipManagerTest {
     }
 
     @Test
-    public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
+    public void testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup() 
{
         ShareMembershipManager membershipManager = createMemberInStableState();
         membershipManager.onSubscriptionUpdated();
+        assertTrue(membershipManager.subscriptionUpdated());
+        membershipManager.onConsumerPoll();
         verify(membershipManager, never()).transitionToJoining();
+        assertFalse(membershipManager.subscriptionUpdated());
+    }
+
+    @Test
+    public void 
testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup() {
+        ShareMembershipManager membershipManager = createMembershipManager();
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        membershipManager.onSubscriptionUpdated();
+        verify(membershipManager, never()).transitionToJoining();
+        assertTrue(membershipManager.subscriptionUpdated());
+        assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+        membershipManager.onConsumerPoll();
+        verify(membershipManager).transitionToJoining();
     }
 
     private void 
assertLeaveGroupDueToExpiredPollAndTransitionToStale(ShareMembershipManager 
membershipManager) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 80315099fb8..db7d623c59e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -126,7 +126,6 @@ public class ApplicationEventProcessorTest {
 
     private static Stream<Arguments> applicationEvents() {
         return Stream.of(
-                Arguments.of(new PollEvent(100)),
                 Arguments.of(new AsyncCommitEvent(new HashMap<>())),
                 Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
                 Arguments.of(new CheckAndUpdatePositionsEvent(500)),
@@ -208,6 +207,30 @@ public class ApplicationEventProcessorTest {
         assertInstanceOf(IllegalStateException.class, e.getCause());
     }
 
+    @Test
+    public void testPollEvent() {
+        PollEvent event = new PollEvent(12345);
+
+        setupProcessor(true);
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        processor.process(event);
+        verify(commitRequestManager).updateAutoCommitTimer(12345);
+        verify(membershipManager).onConsumerPoll();
+        verify(heartbeatRequestManager).resetPollTimer(12345);
+    }
+
+    @Test
+    public void testSubscriptionChangeEvent() {
+        SubscriptionChangeEvent event = new SubscriptionChangeEvent();
+
+        setupProcessor(true);
+        
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+        processor.process(event);
+        verify(membershipManager).onSubscriptionUpdated();
+        // verify member state doesn't transition to JOINING.
+        verify(membershipManager, never()).onConsumerPoll();
+    }
+
     private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
         return 
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
     }

Reply via email to