This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e52a126ee3 KAFKA-17154: New consumer subscribe may join group without
a call to consumer.poll (#17165)
4e52a126ee3 is described below
commit 4e52a126ee3b3d6e07b16a705b45ee1cf4849fe8
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 02:55:21 2024 +0800
KAFKA-17154: New consumer subscribe may join group without a call to
consumer.poll (#17165)
Reviewers: Lianet Magrans <[email protected]>, TaiJuWu
<[email protected]>
---
.../internals/AbstractMembershipManager.java | 31 +++++++++++++++++++---
.../events/ApplicationEventProcessor.java | 10 +++++--
.../kafka/clients/consumer/KafkaConsumerTest.java | 28 +++++++++++++++++++
.../internals/ConsumerMembershipManagerTest.java | 17 +++++++++++-
.../internals/ShareMembershipManagerTest.java | 17 +++++++++++-
.../events/ApplicationEventProcessorTest.java | 25 ++++++++++++++++-
6 files changed, 120 insertions(+), 8 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 455e48a0c89..2b4bd8efab7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -43,6 +44,7 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableList;
@@ -189,6 +191,12 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
private final Time time;
+ /**
+ * AtomicBoolean to track whether the subscription is updated.
+ * If it's true and subscription state is UNSUBSCRIBED, the next {@link
#onConsumerPoll()} will change member state to JOINING.
+ */
+ private final AtomicBoolean subscriptionUpdated = new AtomicBoolean(false);
+
/**
* True if the poll timer has expired, signaled by a call to
* {@link #transitionToSendingLeaveGroup(boolean)} with
dueToExpiredPollTimer param true. This
@@ -458,14 +466,24 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
}
/**
- * Join the group with the updated subscription, if the member is not part
of it yet. If the
- * member is already part of the group, this will only ensure that the
updated subscription
+ * Set {@link #subscriptionUpdated} to true to indicate that the
subscription has been updated.
+ * The next {@link #onConsumerPoll()} will join the group with the updated
subscription, if the member is not part of it yet.
+ * If the member is already part of the group, this will only ensure that
the updated subscription
* is included in the next heartbeat request.
* <p/>
* Note that list of topics of the subscription is taken from the shared
subscription state.
*/
public void onSubscriptionUpdated() {
- if (state == MemberState.UNSUBSCRIBED) {
+ subscriptionUpdated.compareAndSet(false, true);
+ }
+
+ /**
+ * Join the group if the member is not part of it yet. This function
separates {@link #transitionToJoining}
+ * from the {@link #onSubscriptionUpdated} to fulfill the requirement of
the "rebalances will only occur during an
+ * active call to {@link
org.apache.kafka.clients.consumer.KafkaConsumer#poll(Duration)}"
+ */
+ public void onConsumerPoll() {
+ if (subscriptionUpdated.compareAndSet(true, false) && state ==
MemberState.UNSUBSCRIBED) {
transitionToJoining();
}
}
@@ -1408,4 +1426,11 @@ public abstract class AbstractMembershipManager<R
extends AbstractResponse> impl
return Optional.of(new LocalAssignment(nextLocalEpoch,
assignment));
}
}
+
+ /*
+ * Visible for testing.
+ */
+ boolean subscriptionUpdated() {
+ return subscriptionUpdated.get();
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index 6ae6c23f6ee..6008e850324 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -155,9 +155,15 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
private void process(final PollEvent event) {
if (requestManagers.commitRequestManager.isPresent()) {
requestManagers.commitRequestManager.ifPresent(m ->
m.updateAutoCommitTimer(event.pollTimeMs()));
- requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm ->
hrm.resetPollTimer(event.pollTimeMs()));
+ requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
+ hrm.membershipManager().onConsumerPoll();
+ hrm.resetPollTimer(event.pollTimeMs());
+ });
} else {
- requestManagers.shareHeartbeatRequestManager.ifPresent(hrm ->
hrm.resetPollTimer(event.pollTimeMs()));
+ requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
+ hrm.membershipManager().onConsumerPoll();
+ hrm.resetPollTimer(event.pollTimeMs());
+ });
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 8d3e955e799..164ea05113e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -3473,6 +3473,34 @@ public void
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
}
}
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class)
+ public void testPollSendsRequestToJoin(GroupProtocol groupProtocol) throws
InterruptedException {
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+ initMetadata(client, Collections.singletonMap(topic, 1));
+ Node node = metadata.fetch().nodes().get(0);
+
+
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE,
groupId, node), node);
+ KafkaConsumer<String, String> consumer = newConsumer(groupProtocol,
time, client, subscription, metadata,
+ assignor, true, groupInstanceId);
+ consumer.subscribe(singletonList(topic));
+ assertFalse(groupProtocol == GroupProtocol.CLASSIC ?
+ requestGenerated(client, ApiKeys.JOIN_GROUP) :
+ requestGenerated(client,
ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+ "KafkaConsumer#subscribe should not send " + (groupProtocol ==
GroupProtocol.CLASSIC ? "JoinGroup" : "Heartbeat") + " request");
+
+ consumer.poll(Duration.ZERO);
+ TestUtils.waitForCondition(() -> groupProtocol ==
GroupProtocol.CLASSIC ?
+ requestGenerated(client, ApiKeys.JOIN_GROUP) :
+ requestGenerated(client, ApiKeys.CONSUMER_GROUP_HEARTBEAT),
+ "Expected " + (groupProtocol == GroupProtocol.CLASSIC ?
"JoinGroup" : "Heartbeat") + " request");
+ }
+
+ private boolean requestGenerated(MockClient client, ApiKeys apiKey) {
+ return client.requests().stream().anyMatch(request ->
request.requestBuilder().apiKey().equals(apiKey));
+ }
+
private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements
Deserializer<byte[]> {
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 75c98235188..3ca7cf2bd2b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -1617,10 +1617,25 @@ public class ConsumerMembershipManagerTest {
}
@Test
- public void
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
+ public void testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup()
{
ConsumerMembershipManager membershipManager =
createMemberInStableState();
membershipManager.onSubscriptionUpdated();
+ assertTrue(membershipManager.subscriptionUpdated());
+ membershipManager.onConsumerPoll();
verify(membershipManager, never()).transitionToJoining();
+ assertFalse(membershipManager.subscriptionUpdated());
+ }
+
+ @Test
+ public void
testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup() {
+ ConsumerMembershipManager membershipManager =
createMembershipManager(null);
+ assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+ membershipManager.onSubscriptionUpdated();
+ verify(membershipManager, never()).transitionToJoining();
+ assertTrue(membershipManager.subscriptionUpdated());
+ assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+ membershipManager.onConsumerPoll();
+ verify(membershipManager).transitionToJoining();
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
index 96a2b98ce61..eca8db88852 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
@@ -1137,10 +1137,25 @@ public class ShareMembershipManagerTest {
}
@Test
- public void
testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
+ public void testOnSubscriptionUpdatedDoesNotTransitionToJoiningIfInGroup()
{
ShareMembershipManager membershipManager = createMemberInStableState();
membershipManager.onSubscriptionUpdated();
+ assertTrue(membershipManager.subscriptionUpdated());
+ membershipManager.onConsumerPoll();
verify(membershipManager, never()).transitionToJoining();
+ assertFalse(membershipManager.subscriptionUpdated());
+ }
+
+ @Test
+ public void
testOnSubscriptionUpdatedTransitionsToJoiningOnPollIfNotInGroup() {
+ ShareMembershipManager membershipManager = createMembershipManager();
+ assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+ membershipManager.onSubscriptionUpdated();
+ verify(membershipManager, never()).transitionToJoining();
+ assertTrue(membershipManager.subscriptionUpdated());
+ assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
+ membershipManager.onConsumerPoll();
+ verify(membershipManager).transitionToJoining();
}
private void
assertLeaveGroupDueToExpiredPollAndTransitionToStale(ShareMembershipManager
membershipManager) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index 80315099fb8..db7d623c59e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -126,7 +126,6 @@ public class ApplicationEventProcessorTest {
private static Stream<Arguments> applicationEvents() {
return Stream.of(
- Arguments.of(new PollEvent(100)),
Arguments.of(new AsyncCommitEvent(new HashMap<>())),
Arguments.of(new SyncCommitEvent(new HashMap<>(), 500)),
Arguments.of(new CheckAndUpdatePositionsEvent(500)),
@@ -208,6 +207,30 @@ public class ApplicationEventProcessorTest {
assertInstanceOf(IllegalStateException.class, e.getCause());
}
+ @Test
+ public void testPollEvent() {
+ PollEvent event = new PollEvent(12345);
+
+ setupProcessor(true);
+
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+ processor.process(event);
+ verify(commitRequestManager).updateAutoCommitTimer(12345);
+ verify(membershipManager).onConsumerPoll();
+ verify(heartbeatRequestManager).resetPollTimer(12345);
+ }
+
+ @Test
+ public void testSubscriptionChangeEvent() {
+ SubscriptionChangeEvent event = new SubscriptionChangeEvent();
+
+ setupProcessor(true);
+
when(heartbeatRequestManager.membershipManager()).thenReturn(membershipManager);
+ processor.process(event);
+ verify(membershipManager).onSubscriptionUpdated();
+ // verify member state doesn't transition to JOINING.
+ verify(membershipManager, never()).onConsumerPoll();
+ }
+
private List<NetworkClientDelegate.UnsentRequest> mockCommitResults() {
return
Collections.singletonList(mock(NetworkClientDelegate.UnsentRequest.class));
}