This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 37b4d9b01d5 KAFKA-15561 [3/N]: Client support for SubscriptionPattern 
in HB (#17951)
37b4d9b01d5 is described below

commit 37b4d9b01d5790f307d9c5877b7252306780da37
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Nov 27 12:01:12 2024 -0500

    KAFKA-15561 [3/N]: Client support for SubscriptionPattern in HB (#17951)
    
    Reviewers: David Jacot <[email protected]>
---
 .../internals/AbstractHeartbeatRequestManager.java |  6 +++
 .../internals/ConsumerHeartbeatRequestManager.java | 15 +++++-
 .../consumer/internals/SubscriptionState.java      | 20 +++++++-
 .../ConsumerHeartbeatRequestManagerTest.java       | 56 ++++++++++++++++++++++
 .../consumer/internals/SubscriptionStateTest.java  | 19 ++++++--
 5 files changed, 110 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 031ccee70ae..9c1a63cd79a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -406,6 +406,12 @@ public abstract class AbstractHeartbeatRequestManager<R 
extends AbstractResponse
                 heartbeatRequestState.reset();
                 break;
 
+            case INVALID_REGULAR_EXPRESSION:
+                logger.error("{} failed due to {}: {}", 
heartbeatRequestName(), error, errorMessage);
+                handleFatalFailure(error.exception("Invalid RE2J 
SubscriptionPattern provided in the call to " +
+                    "subscribe. " + errorMessage));
+                break;
+
             default:
                 if (!handleSpecificError(response, currentTimeMs)) {
                     // If the manager receives an unknown error - there could 
be a bug in the code or a new error code
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
index a87ffcd07eb..4fe0e7085a2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
 import org.apache.kafka.common.Uuid;
@@ -32,6 +33,7 @@ import org.apache.kafka.common.utils.Timer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
@@ -231,6 +233,15 @@ public class ConsumerHeartbeatRequestManager extends 
AbstractHeartbeatRequestMan
                 sentFields.subscribedTopicNames = subscribedTopicNames;
             }
 
+            // SubscribedTopicRegex - only sent if it has changed since the 
last heartbeat.
+            // Send empty string to indicate that a subscribed pattern needs 
to be removed.
+            SubscriptionPattern pattern = subscriptions.subscriptionPattern();
+            boolean patternUpdated = !Objects.equals(pattern, 
sentFields.pattern);
+            if ((sendAllFields && pattern != null) || patternUpdated) {
+                data.setSubscribedTopicRegex((pattern != null) ? 
pattern.pattern() : "");
+                sentFields.pattern = pattern;
+            }
+
             // ServerAssignor - sent when joining or if it has changed since 
the last heartbeat
             this.membershipManager.serverAssignor().ifPresent(serverAssignor 
-> {
                 if (sendAllFields || 
!serverAssignor.equals(sentFields.serverAssignor)) {
@@ -239,8 +250,6 @@ public class ConsumerHeartbeatRequestManager extends 
AbstractHeartbeatRequestMan
                 }
             });
 
-            // ClientAssignors - not supported yet
-
             // TopicPartitions - sent when joining or with the first heartbeat 
after a new assignment from
             // the server was reconciled. This is ensured by resending the 
topic partitions whenever the
             // local assignment, including its local epoch is changed 
(although the local epoch is not sent
@@ -268,6 +277,7 @@ public class ConsumerHeartbeatRequestManager extends 
AbstractHeartbeatRequestMan
         static class SentFields {
             private int rebalanceTimeoutMs = -1;
             private TreeSet<String> subscribedTopicNames = null;
+            private SubscriptionPattern pattern = null;
             private String serverAssignor = null;
             private AbstractMembershipManager.LocalAssignment localAssignment 
= null;
 
@@ -278,6 +288,7 @@ public class ConsumerHeartbeatRequestManager extends 
AbstractHeartbeatRequestMan
                 rebalanceTimeoutMs = -1;
                 serverAssignor = null;
                 localAssignment = null;
+                pattern = null;
             }
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index a51383d39fa..0df99301e9e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -361,6 +361,24 @@ public class SubscriptionState {
         return Collections.emptySet();
     }
 
+    /**
+     * @return The RE2J compatible pattern in use, provided via a call to
+     * {@link #subscribe(SubscriptionPattern, Optional)}.
+     * Null if there is no SubscriptionPattern in use.
+     */
+    public synchronized SubscriptionPattern subscriptionPattern() {
+        if (hasRe2JPatternSubscription())
+            return this.subscribedRe2JPattern;
+        return null;
+    }
+
+    /**
+     * @return True if subscribed using RE2J pattern. False otherwise.
+     */
+    public synchronized boolean hasRe2JPatternSubscription() {
+        return this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
+    }
+
     public synchronized Set<TopicPartition> pausedPartitions() {
         return collectPartitions(TopicPartitionState::isPaused);
     }
@@ -469,7 +487,7 @@ public class SubscriptionState {
 
     public synchronized boolean hasAutoAssignedPartitions() {
         return this.subscriptionType == SubscriptionType.AUTO_TOPICS || 
this.subscriptionType == SubscriptionType.AUTO_PATTERN
-                || this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE;
+                || this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE 
|| this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
     }
 
     public synchronized void position(TopicPartition tp, FetchPosition 
position) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 0d76fc88274..47630c4e9d8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import 
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.HeartbeatRequestState;
 import 
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
 import 
org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager.HeartbeatState;
@@ -864,6 +865,61 @@ public class ConsumerHeartbeatRequestManagerTest {
             "No requests should be generated on close if the member is not 
leaving when closing the manager");
     }
 
+    @Test
+    public void testRegexInHeartbeatLifecycle() {
+        heartbeatState = new HeartbeatState(subscriptions, membershipManager, 
DEFAULT_MAX_POLL_INTERVAL_MS);
+        createHeartbeatRequestStateWithZeroHeartbeatInterval();
+
+        // Initial heartbeat with regex
+        mockJoiningMemberData(null);
+        when(subscriptions.subscriptionPattern()).thenReturn(new 
SubscriptionPattern("t1.*"));
+        ConsumerGroupHeartbeatRequestData data = 
heartbeatState.buildRequestData();
+        assertEquals("t1.*", data.subscribedTopicRegex());
+
+        // Regex not included in HB if not updated
+        when(membershipManager.state()).thenReturn(MemberState.STABLE);
+        data = heartbeatState.buildRequestData();
+        assertNull(data.subscribedTopicRegex());
+
+        // Regex included in HB if updated
+        when(subscriptions.subscriptionPattern()).thenReturn(new 
SubscriptionPattern("t2.*"));
+        data = heartbeatState.buildRequestData();
+        assertEquals("t2.*", data.subscribedTopicRegex());
+
+        // Empty regex included in HB to remove pattern subscription
+        when(subscriptions.subscriptionPattern()).thenReturn(null);
+        data = heartbeatState.buildRequestData();
+        assertEquals("", data.subscribedTopicRegex());
+
+        // Regex not included in HB after pattern subscription removed
+        when(subscriptions.subscriptionPattern()).thenReturn(null);
+        data = heartbeatState.buildRequestData();
+        assertNull(data.subscribedTopicRegex());
+    }
+
+    @Test
+    public void testRegexInJoiningHeartbeat() {
+        heartbeatState = new HeartbeatState(subscriptions, membershipManager, 
DEFAULT_MAX_POLL_INTERVAL_MS);
+        createHeartbeatRequestStateWithZeroHeartbeatInterval();
+
+        // Initial heartbeat with regex
+        mockJoiningMemberData(null);
+        when(subscriptions.subscriptionPattern()).thenReturn(new 
SubscriptionPattern("t1.*"));
+        ConsumerGroupHeartbeatRequestData data = 
heartbeatState.buildRequestData();
+        assertEquals("t1.*", data.subscribedTopicRegex());
+
+        // Members unsubscribes from regex (empty regex included in HB)
+        when(subscriptions.subscriptionPattern()).thenReturn(null);
+        data = heartbeatState.buildRequestData();
+        assertEquals("", data.subscribedTopicRegex());
+
+        // Member rejoins (ie. fenced) should not include regex field in HB
+        when(membershipManager.state()).thenReturn(MemberState.JOINING);
+        when(subscriptions.subscriptionPattern()).thenReturn(null);
+        data = heartbeatState.buildRequestData();
+        assertNull(data.subscribedTopicRegex());
+    }
+
     private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int 
nextPollMs) {
         NetworkClientDelegate.PollResult pollResult = 
hrm.poll(time.milliseconds());
         assertEquals(1, pollResult.unsentRequests.size());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index a4a020a22f3..f697990b544 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -400,7 +400,7 @@ public class SubscriptionStateTest {
 
     @Test
     public void testSubscribeToRe2JPattern() {
-        String pattern = "t*";
+        String pattern = "t.*";
         state.subscribe(new SubscriptionPattern(pattern), 
Optional.of(rebalanceListener));
         assertTrue(state.toString().contains("type=AUTO_PATTERN_RE2J"));
         assertTrue(state.toString().contains("subscribedPattern=" + pattern));
@@ -409,15 +409,28 @@ public class SubscriptionStateTest {
     @Test
     public void testMixedPatternSubscriptionNotAllowed() {
         state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
-        assertThrows(IllegalStateException.class, () -> state.subscribe(new 
SubscriptionPattern("t*"),
+        assertThrows(IllegalStateException.class, () -> state.subscribe(new 
SubscriptionPattern("t.*"),
             Optional.of(rebalanceListener)));
 
         state.unsubscribe();
 
-        state.subscribe(new SubscriptionPattern("t*"), 
Optional.of(rebalanceListener));
+        state.subscribe(new SubscriptionPattern("t.*"), 
Optional.of(rebalanceListener));
         assertThrows(IllegalStateException.class, () -> 
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
     }
 
+    @Test
+    public void testSubscriptionPattern() {
+        SubscriptionPattern pattern = new SubscriptionPattern("t.*");
+        state.subscribe(pattern, Optional.of(rebalanceListener));
+        assertTrue(state.hasRe2JPatternSubscription());
+        assertEquals(pattern, state.subscriptionPattern());
+        assertTrue(state.hasAutoAssignedPartitions());
+
+        state.unsubscribe();
+        assertFalse(state.hasRe2JPatternSubscription());
+        assertNull(state.subscriptionPattern());
+    }
+
 
     @Test
     public void unsubscribeUserAssignment() {

Reply via email to