This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 654ebe10f4a KAFKA-18071: Avoid event to refresh regex if no pattern
subscription (#17917)
654ebe10f4a is described below
commit 654ebe10f4a5c31e449b2a2ef6c284254ed7dceb
Author: Lianet Magrans <[email protected]>
AuthorDate: Sun Nov 24 21:39:11 2024 -0500
KAFKA-18071: Avoid event to refresh regex if no pattern subscription
(#17917)
Reviewers: David Jacot <[email protected]>, Andrew Schofield
<[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 14 +++++++------
.../events/ApplicationEventProcessor.java | 7 ++++---
.../consumer/internals/AsyncKafkaConsumerTest.java | 23 ++++++++++++++++++++++
.../events/ApplicationEventProcessorTest.java | 4 +---
4 files changed, 36 insertions(+), 12 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 033b16ba7b6..646087ee58d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1766,12 +1766,14 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
@Override
public boolean updateAssignmentMetadataIfNeeded(Timer timer) {
offsetCommitCallbackInvoker.executeCallbacks();
- try {
- applicationEventHandler.addAndGet(new
UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
- } catch (TimeoutException e) {
- return false;
- } finally {
- timer.update();
+ if (subscriptions.hasPatternSubscription()) {
+ try {
+ applicationEventHandler.addAndGet(new
UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer)));
+ } catch (TimeoutException e) {
+ return false;
+ } finally {
+ timer.update();
+ }
}
processBackgroundEvents();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
index d2e45370c66..c9735617abb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
@@ -320,11 +320,12 @@ public class ApplicationEventProcessor implements
EventProcessor<ApplicationEven
* This will make the consumer send the updated subscription on the next
poll.
*/
private void process(final UpdatePatternSubscriptionEvent event) {
+ if (!subscriptions.hasPatternSubscription()) {
+ return;
+ }
if (this.metadataVersionSnapshot < metadata.updateVersion()) {
this.metadataVersionSnapshot = metadata.updateVersion();
- if (subscriptions.hasPatternSubscription()) {
- updatePatternSubscription(metadata.fetch());
- }
+ updatePatternSubscription(metadata.fetch());
}
event.future().complete(null);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 91e4cae0f98..940d9cbaef3 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -54,6 +54,7 @@ import
org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import
org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
+import
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.TopicPartition;
@@ -1830,6 +1831,27 @@ public class AsyncKafkaConsumerTest {
assertEquals(OffsetResetStrategy.LATEST,
resetOffsetEvent.offsetResetStrategy());
}
+ @Test
+ public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed()
{
+ consumer = newConsumer();
+
doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class));
+
when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true);
+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
+ completeAssignmentChangeEventSuccessfully();
+ completeTopicPatternSubscriptionChangeEventSuccessfully();
+ completeUnsubscribeApplicationEventSuccessfully();
+
+ consumer.assign(singleton(new TopicPartition("topic1", 0)));
+ consumer.poll(Duration.ZERO);
+ verify(applicationEventHandler,
never()).addAndGet(any(UpdatePatternSubscriptionEvent.class));
+
+ consumer.unsubscribe();
+
+ consumer.subscribe(Pattern.compile("t*"));
+ consumer.poll(Duration.ZERO);
+
verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class));
+ }
+
@Test
public void testSubscribeToRe2JPatternValidation() {
consumer = newConsumer();
@@ -1927,6 +1949,7 @@ public class AsyncKafkaConsumerTest {
private void completeUnsubscribeApplicationEventSuccessfully() {
doAnswer(invocation -> {
UnsubscribeEvent event = invocation.getArgument(0);
+ consumer.subscriptions().unsubscribe();
event.future().complete(null);
return null;
}).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
index a16f9612c74..ea09fc2ae8b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java
@@ -355,11 +355,10 @@ public class ApplicationEventProcessorTest {
UpdatePatternSubscriptionEvent event1 = new
UpdatePatternSubscriptionEvent(12345);
setupProcessor(true);
-
+ when(subscriptionState.hasPatternSubscription()).thenReturn(true);
when(metadata.updateVersion()).thenReturn(0);
processor.process(event1);
- verify(subscriptionState, never()).hasPatternSubscription();
assertDoesNotThrow(() -> event1.future().get());
Cluster cluster = mock(Cluster.class);
@@ -377,7 +376,6 @@ public class ApplicationEventProcessorTest {
UpdatePatternSubscriptionEvent event2 = new
UpdatePatternSubscriptionEvent(12345);
processor.process(event2);
verify(metadata).requestUpdateForNewTopics();
- verify(subscriptionState).hasPatternSubscription();
verify(subscriptionState).subscribeFromPattern(topics);
assertEquals(1, processor.metadataVersionSnapshot());
verify(membershipManager).onSubscriptionUpdated();