This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0a7b16c501b MINOR: fix share poll event to call the share membership 
manager (#21701)
0a7b16c501b is described below

commit 0a7b16c501b2d4e7eb981521a0235f8fa1836753
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Mar 10 15:28:40 2026 -0400

    MINOR: fix share poll event to call the share membership manager (#21701)
    
    Fix to call the shareMembershipMgr reconcile when processing a share
    poll event (not the consumerMembershipManager)
    
    No changes in logic because maybeReconcile is implemented in the parent
    class AbstractMembershipMgr, but it's confusing and could lead to errors
    if ever we override the maybeReconcile.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../events/ApplicationEventProcessor.java          | 10 +++----
 .../events/ApplicationEventProcessorTest.java      | 34 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index daa9fe6a3a9..314684b6129 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -228,12 +228,12 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
     }
 
     private void process(final SharePollEvent event) {
-        
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
-            consumerMembershipManager.maybeReconcile(true));
-        requestManagers.shareHeartbeatRequestManager.ifPresent(hrm -> {
-            hrm.membershipManager().onConsumerPoll();
-            hrm.resetPollTimer(event.pollTimeMs());
+        
requestManagers.shareMembershipManager.ifPresent(shareMembershipManager -> {
+            shareMembershipManager.maybeReconcile(true);
+            shareMembershipManager.onConsumerPoll();
         });
+        requestManagers.shareHeartbeatRequestManager.ifPresent(hrm ->
+            hrm.resetPollTimer(event.pollTimeMs()));
     }
 
     private void process(final CreateFetchRequestsEvent event) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index f07a9da5ab3..938a4b86618 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -31,6 +31,9 @@ import 
org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
 import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
+import 
org.apache.kafka.clients.consumer.internals.ShareHeartbeatRequestManager;
+import org.apache.kafka.clients.consumer.internals.ShareMembershipManager;
 import 
org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager;
 import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
@@ -95,6 +98,8 @@ public class ApplicationEventProcessorTest {
     private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
     private final StreamsGroupHeartbeatRequestManager 
streamsGroupHeartbeatRequestManager = 
mock(StreamsGroupHeartbeatRequestManager.class);
     private final StreamsMembershipManager streamsMembershipManager = 
mock(StreamsMembershipManager.class);
+    private final ShareHeartbeatRequestManager shareHeartbeatRequestManager = 
mock(ShareHeartbeatRequestManager.class);
+    private final ShareMembershipManager shareMembershipManager = 
mock(ShareMembershipManager.class);
     private ApplicationEventProcessor processor;
 
     private void setupProcessor(boolean withGroupId) {
@@ -139,6 +144,22 @@ public class ApplicationEventProcessorTest {
         );
     }
 
+    private void setupShareProcessor() {
+        RequestManagers requestManagers = new RequestManagers(
+            new LogContext(),
+            mock(ShareConsumeRequestManager.class),
+            Optional.of(mock(CoordinatorRequestManager.class)),
+            Optional.of(shareHeartbeatRequestManager),
+            Optional.of(shareMembershipManager)
+        );
+        processor = new ApplicationEventProcessor(
+            new LogContext(),
+            requestManagers,
+            metadata,
+            subscriptionState
+        );
+    }
+
     @Test
     public void testPrepClosingCommitEvents() {
         setupProcessor(true);
@@ -283,6 +304,19 @@ public class ApplicationEventProcessorTest {
         verify(fetchRequestManager).createFetchRequests();
     }
 
+    @Test
+    public void testSharePollEventCallsShareManagers() {
+        SharePollEvent event = new SharePollEvent(12345);
+
+        setupShareProcessor();
+        processor.process(event);
+
+        verify(shareMembershipManager).maybeReconcile(true);
+        verify(shareMembershipManager).onConsumerPoll();
+
+        
verify(shareHeartbeatRequestManager).resetPollTimer(event.pollTimeMs());
+    }
+
     @Test
     public void testTopicSubscriptionChangeEvent() {
         Set<String> topics = Set.of("topic1", "topic2");

Reply via email to