This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0a7b16c501b MINOR: fix share poll event to call the share membership
manager (#21701)
0a7b16c501b is described below
commit 0a7b16c501b2d4e7eb981521a0235f8fa1836753
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Mar 10 15:28:40 2026 -0400
MINOR: fix share poll event to call the share membership manager (#21701)
Fix to call the shareMembershipMgr reconcile when processing a share
poll event (not the consumerMembershipManager)
No changes in logic because maybeReconcile is implemented in the parent
class AbstractMembershipMgr, but it's confusing and could lead to errors
if ever we override the maybeReconcile.
Reviewers: Andrew Schofield <[email protected]>
---
.../events/ApplicationEventProcessor.java | 10 +++----
.../events/ApplicationEventProcessorTest.java | 34 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 5 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index daa9fe6a3a9..314684b6129 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -228,12 +228,12 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
}
private void process(final SharePollEvent event) {
-
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
- consumerMembershipManager.maybeReconcile(true));
- requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
- hrm.membershipManager().onConsumerPoll();
- hrm.resetPollTimer(event.pollTimeMs());
+
requestManagers.shareMembershipManager.ifPresent(shareMembershipManager -> {
+ shareMembershipManager.maybeReconcile(true);
+ shareMembershipManager.onConsumerPoll();
});
+ requestManagers.shareHeartbeatRequestManager.ifPresent(hrm ->
+ hrm.resetPollTimer(event.pollTimeMs()));
}
private void process(final CreateFetchRequestsEvent event) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index f07a9da5ab3..938a4b86618 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -31,6 +31,9 @@ import
org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
+import
org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager;
+import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
import
org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
@@ -95,6 +98,8 @@ public class ApplicationEventProcessorTest {
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
private final StreamsGroupHeartbeatRequestManager
streamsGroupHeartbeatRequestManager =
mock(StreamsGroupHeartbeatRequestManager.class);
private final StreamsMembershipManager streamsMembershipManager =
mock(StreamsMembershipManager.class);
+ private final ShareHeartbeatRequestManager shareHeartbeatRequestManager =
mock(ShareHeartbeatRequestManager.class);
+ private final ShareMembershipManager shareMembershipManager =
mock(ShareMembershipManager.class);
private ApplicationEventProcessor processor;
private void setupProcessor(boolean withGroupId) {
@@ -139,6 +144,22 @@ public class ApplicationEventProcessorTest {
);
}
+ private void setupShareProcessor() {
+ RequestManagers requestManagers = new RequestManagers(
+ new LogContext(),
+ mock(ShareConsumeRequestManager.class),
+ Optional.of(mock(CoordinatorRequestManager.class)),
+ Optional.of(shareHeartbeatRequestManager),
+ Optional.of(shareMembershipManager)
+ );
+ processor = new ApplicationEventProcessor(
+ new LogContext(),
+ requestManagers,
+ metadata,
+ subscriptionState
+ );
+ }
+
@Test
public void testPrepClosingCommitEvents() {
setupProcessor(true);
@@ -283,6 +304,19 @@ public class ApplicationEventProcessorTest {
verify(fetchRequestManager).createFetchRequests();
}
+ @Test
+ public void testSharePollEventCallsShareManagers() {
+ SharePollEvent event = new SharePollEvent(12345);
+
+ setupShareProcessor();
+ processor.process(event);
+
+ verify(shareMembershipManager).maybeReconcile(true);
+ verify(shareMembershipManager).onConsumerPoll();
+
+
verify(shareHeartbeatRequestManager).resetPollTimer(event.pollTimeMs());
+ }
+
@Test
public void testTopicSubscriptionChangeEvent() {
Set<String> topics = Set.of("topic1", "topic2");