This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 654ebe10f4a KAFKA-18071: Avoid event to refresh regex if no pattern 
subscription (#17917)
654ebe10f4a is described below

commit 654ebe10f4a5c31e449b2a2ef6c284254ed7dceb
Author: Lianet Magrans <[email protected]>
AuthorDate: Sun Nov 24 21:39:11 2024 -0500

    KAFKA-18071: Avoid event to refresh regex if no pattern subscription 
(#17917)
    
    Reviewers: David Jacot <[email protected]>, Andrew Schofield 
<[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     | 14 +++++++------
 .../events/ApplicationEventProcessor.java          |  7 ++++---
 .../consumer/internals/AsyncKafkaConsumerTest.java | 23 ++++++++++++++++++++++
 .../events/ApplicationEventProcessorTest.java      |  4 +---
 4 files changed, 36 insertions(+), 12 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 033b16ba7b6..646087ee58d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1766,12 +1766,14 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     @Override
     public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
         offsetCommitCallbackInvoker.executeCallbacks();
-        try {
-            applicationEventHandler.addAndGet(new 
UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
-        } catch (TimeoutException e) {
-            return false;
-        } finally {
-            timer.update();
+        if (subscriptions.hasPatternSubscription()) {
+            try {
+                applicationEventHandler.addAndGet(new 
UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
+            } catch (TimeoutException e) {
+                return false;
+            } finally {
+                timer.update();
+            }
         }
         processBackgroundEvents();
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index d2e45370c66..c9735617abb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -320,11 +320,12 @@ public class ApplicationEventProcessor implements 
EventProcessor<ApplicationEven
      * This will make the consumer send the updated subscription on the next 
poll.
      */
     private void process(final UpdatePatternSubscriptionEvent event) {
+        if (!subscriptions.hasPatternSubscription()) {
+            return;
+        }
         if (this.metadataVersionSnapshot < metadata.updateVersion()) {
             this.metadataVersionSnapshot = metadata.updateVersion();
-            if (subscriptions.hasPatternSubscription()) {
-                updatePatternSubscription(metadata.fetch());
-            }
+            updatePatternSubscription(metadata.fetch());
         }
         event.future().complete(null);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 91e4cae0f98..940d9cbaef3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -54,6 +54,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.TopicPartition;
@@ -1830,6 +1831,27 @@ public class AsyncKafkaConsumerTest {
         assertEquals(OffsetResetStrategy.LATEST, 
resetOffsetEvent.offsetResetStrategy());
     }
 
+    @Test
+    public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() 
{
+        consumer = newConsumer();
+        
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+        
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
+        
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+        completeAssignmentChangeEventSuccessfully();
+        completeTopicPatternSubscriptionChangeEventSuccessfully();
+        completeUnsubscribeApplicationEventSuccessfully();
+
+        consumer.assign(singleton(new TopicPartition("topic1", 0)));
+        consumer.poll(Duration.ZERO);
+        verify(applicationEventHandler, 
never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
+
+        consumer.unsubscribe();
+
+        consumer.subscribe(Pattern.compile("t*"));
+        consumer.poll(Duration.ZERO);
+        
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
+    }
+
     @Test
     public void testSubscribeToRe2JPatternValidation() {
         consumer = newConsumer();
@@ -1927,6 +1949,7 @@ public class AsyncKafkaConsumerTest {
     private void completeUnsubscribeApplicationEventSuccessfully() {
         doAnswer(invocation -> {
             UnsubscribeEvent event = invocation.getArgument(0);
+            consumer.subscriptions().unsubscribe();
             event.future().complete(null);
             return null;
         
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index a16f9612c74..ea09fc2ae8b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -355,11 +355,10 @@ public class ApplicationEventProcessorTest {
         UpdatePatternSubscriptionEvent event1 = new 
UpdatePatternSubscriptionEvent(12345);
 
         setupProcessor(true);
-
+        when(subscriptionState.hasPatternSubscription()).thenReturn(true);
         when(metadata.updateVersion()).thenReturn(0);
 
         processor.process(event1);
-        verify(subscriptionState, never()).hasPatternSubscription();
         assertDoesNotThrow(() -> event1.future().get());
 
         Cluster cluster = mock(Cluster.class);
@@ -377,7 +376,6 @@ public class ApplicationEventProcessorTest {
         UpdatePatternSubscriptionEvent event2 = new 
UpdatePatternSubscriptionEvent(12345);
         processor.process(event2);
         verify(metadata).requestUpdateForNewTopics();
-        verify(subscriptionState).hasPatternSubscription();
         verify(subscriptionState).subscribeFromPattern(topics);
         assertEquals(1, processor.metadataVersionSnapshot());
         verify(membershipManager).onSubscriptionUpdated();

Reply via email to