This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 37b4d9b01d5 KAFKA-15561 [3/N]: Client support for SubscriptionPattern
in HB (#17951)
37b4d9b01d5 is described below
commit 37b4d9b01d5790f307d9c5877b7252306780da37
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Nov 27 12:01:12 2024 -0500
KAFKA-15561 [3/N]: Client support for SubscriptionPattern in HB (#17951)
Reviewers: David Jacot <[email protected]>
---
.../internals/AbstractHeartbeatRequestManager.java | 6 +++
.../internals/ConsumerHeartbeatRequestManager.java | 15 +++++-
.../consumer/internals/SubscriptionState.java | 20 +++++++-
.../ConsumerHeartbeatRequestManagerTest.java | 56 ++++++++++++++++++++++
.../consumer/internals/SubscriptionStateTest.java | 19 ++++++--
5 files changed, 110 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
index 031ccee70ae..9c1a63cd79a 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java
@@ -406,6 +406,12 @@ public abstract class AbstractHeartbeatRequestManager<R
extends AbstractResponse
heartbeatRequestState.reset();
break;
+ case INVALID_REGULAR_EXPRESSION:
+ logger.error("{} failed due to {}: {}",
heartbeatRequestName(), error, errorMessage);
+ handleFatalFailure(error.exception("Invalid RE2J
SubscriptionPattern provided in the call to " +
+ "subscribe. " + errorMessage));
+ break;
+
default:
if (!handleSpecificError(response, currentTimeMs)) {
// If the manager receives an unknown error - there could
be a bug in the code or a new error code
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
index a87ffcd07eb..4fe0e7085a2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.Uuid;
@@ -32,6 +33,7 @@ import org.apache.kafka.common.utils.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -231,6 +233,15 @@ public class ConsumerHeartbeatRequestManager extends
AbstractHeartbeatRequestMan
sentFields.subscribedTopicNames = subscribedTopicNames;
}
+ // SubscribedTopicRegex - only sent if it has changed since the
last heartbeat.
+ // Send empty string to indicate that a subscribed pattern needs
to be removed.
+ SubscriptionPattern pattern = subscriptions.subscriptionPattern();
+ boolean patternUpdated = !Objects.equals(pattern,
sentFields.pattern);
+ if ((sendAllFields && pattern != null) || patternUpdated) {
+ data.setSubscribedTopicRegex((pattern != null) ?
pattern.pattern() : "");
+ sentFields.pattern = pattern;
+ }
+
// ServerAssignor - sent when joining or if it has changed since
the last heartbeat
this.membershipManager.serverAssignor().ifPresent(serverAssignor
-> {
if (sendAllFields ||
!serverAssignor.equals(sentFields.serverAssignor)) {
@@ -239,8 +250,6 @@ public class ConsumerHeartbeatRequestManager extends
AbstractHeartbeatRequestMan
}
});
- // ClientAssignors - not supported yet
-
// TopicPartitions - sent when joining or with the first heartbeat
after a new assignment from
// the server was reconciled. This is ensured by resending the
topic partitions whenever the
// local assignment, including its local epoch is changed
(although the local epoch is not sent
@@ -268,6 +277,7 @@ public class ConsumerHeartbeatRequestManager extends
AbstractHeartbeatRequestMan
static class SentFields {
private int rebalanceTimeoutMs = -1;
private TreeSet<String> subscribedTopicNames = null;
+ private SubscriptionPattern pattern = null;
private String serverAssignor = null;
private AbstractMembershipManager.LocalAssignment localAssignment
= null;
@@ -278,6 +288,7 @@ public class ConsumerHeartbeatRequestManager extends
AbstractHeartbeatRequestMan
rebalanceTimeoutMs = -1;
serverAssignor = null;
localAssignment = null;
+ pattern = null;
}
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index a51383d39fa..0df99301e9e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -361,6 +361,24 @@ public class SubscriptionState {
return Collections.emptySet();
}
+ /**
+ * @return The RE2J compatible pattern in use, provided via a call to
+ * {@link #subscribe(SubscriptionPattern, Optional)}.
+ * Null if there is no SubscriptionPattern in use.
+ */
+ public synchronized SubscriptionPattern subscriptionPattern() {
+ if (hasRe2JPatternSubscription())
+ return this.subscribedRe2JPattern;
+ return null;
+ }
+
+ /**
+ * @return True if subscribed using RE2J pattern. False otherwise.
+ */
+ public synchronized boolean hasRe2JPatternSubscription() {
+ return this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
+ }
+
public synchronized Set<TopicPartition> pausedPartitions() {
return collectPartitions(TopicPartitionState::isPaused);
}
@@ -469,7 +487,7 @@ public class SubscriptionState {
public synchronized boolean hasAutoAssignedPartitions() {
return this.subscriptionType == SubscriptionType.AUTO_TOPICS ||
this.subscriptionType == SubscriptionType.AUTO_PATTERN
- || this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE;
+ || this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE
|| this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
}
public synchronized void position(TopicPartition tp, FetchPosition
position) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
index 0d76fc88274..47630c4e9d8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
import
org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager.HeartbeatRequestState;
import
org.apache.kafka.clients.consumer.internals.AbstractMembershipManager.LocalAssignment;
import
org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager.HeartbeatState;
@@ -864,6 +865,61 @@ public class ConsumerHeartbeatRequestManagerTest {
"No requests should be generated on close if the member is not
leaving when closing the manager");
}
+ @Test
+ public void testRegexInHeartbeatLifecycle() {
+ heartbeatState = new HeartbeatState(subscriptions, membershipManager,
DEFAULT_MAX_POLL_INTERVAL_MS);
+ createHeartbeatRequestStateWithZeroHeartbeatInterval();
+
+ // Initial heartbeat with regex
+ mockJoiningMemberData(null);
+ when(subscriptions.subscriptionPattern()).thenReturn(new
SubscriptionPattern("t1.*"));
+ ConsumerGroupHeartbeatRequestData data =
heartbeatState.buildRequestData();
+ assertEquals("t1.*", data.subscribedTopicRegex());
+
+ // Regex not included in HB if not updated
+ when(membershipManager.state()).thenReturn(MemberState.STABLE);
+ data = heartbeatState.buildRequestData();
+ assertNull(data.subscribedTopicRegex());
+
+ // Regex included in HB if updated
+ when(subscriptions.subscriptionPattern()).thenReturn(new
SubscriptionPattern("t2.*"));
+ data = heartbeatState.buildRequestData();
+ assertEquals("t2.*", data.subscribedTopicRegex());
+
+ // Empty regex included in HB to remove pattern subscription
+ when(subscriptions.subscriptionPattern()).thenReturn(null);
+ data = heartbeatState.buildRequestData();
+ assertEquals("", data.subscribedTopicRegex());
+
+ // Regex not included in HB after pattern subscription removed
+ when(subscriptions.subscriptionPattern()).thenReturn(null);
+ data = heartbeatState.buildRequestData();
+ assertNull(data.subscribedTopicRegex());
+ }
+
+ @Test
+ public void testRegexInJoiningHeartbeat() {
+ heartbeatState = new HeartbeatState(subscriptions, membershipManager,
DEFAULT_MAX_POLL_INTERVAL_MS);
+ createHeartbeatRequestStateWithZeroHeartbeatInterval();
+
+ // Initial heartbeat with regex
+ mockJoiningMemberData(null);
+ when(subscriptions.subscriptionPattern()).thenReturn(new
SubscriptionPattern("t1.*"));
+ ConsumerGroupHeartbeatRequestData data =
heartbeatState.buildRequestData();
+ assertEquals("t1.*", data.subscribedTopicRegex());
+
+ // Members unsubscribes from regex (empty regex included in HB)
+ when(subscriptions.subscriptionPattern()).thenReturn(null);
+ data = heartbeatState.buildRequestData();
+ assertEquals("", data.subscribedTopicRegex());
+
+ // Member rejoins (ie. fenced) should not include regex field in HB
+ when(membershipManager.state()).thenReturn(MemberState.JOINING);
+ when(subscriptions.subscriptionPattern()).thenReturn(null);
+ data = heartbeatState.buildRequestData();
+ assertNull(data.subscribedTopicRegex());
+ }
+
private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int
nextPollMs) {
NetworkClientDelegate.PollResult pollResult =
hrm.poll(time.milliseconds());
assertEquals(1, pollResult.unsentRequests.size());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index a4a020a22f3..f697990b544 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -400,7 +400,7 @@ public class SubscriptionStateTest {
@Test
public void testSubscribeToRe2JPattern() {
- String pattern = "t*";
+ String pattern = "t.*";
state.subscribe(new SubscriptionPattern(pattern),
Optional.of(rebalanceListener));
assertTrue(state.toString().contains("type=AUTO_PATTERN_RE2J"));
assertTrue(state.toString().contains("subscribedPattern=" + pattern));
@@ -409,15 +409,28 @@ public class SubscriptionStateTest {
@Test
public void testMixedPatternSubscriptionNotAllowed() {
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener));
- assertThrows(IllegalStateException.class, () -> state.subscribe(new
SubscriptionPattern("t*"),
+ assertThrows(IllegalStateException.class, () -> state.subscribe(new
SubscriptionPattern("t.*"),
Optional.of(rebalanceListener)));
state.unsubscribe();
- state.subscribe(new SubscriptionPattern("t*"),
Optional.of(rebalanceListener));
+ state.subscribe(new SubscriptionPattern("t.*"),
Optional.of(rebalanceListener));
assertThrows(IllegalStateException.class, () ->
state.subscribe(Pattern.compile(".*"), Optional.of(rebalanceListener)));
}
+ @Test
+ public void testSubscriptionPattern() {
+ SubscriptionPattern pattern = new SubscriptionPattern("t.*");
+ state.subscribe(pattern, Optional.of(rebalanceListener));
+ assertTrue(state.hasRe2JPatternSubscription());
+ assertEquals(pattern, state.subscriptionPattern());
+ assertTrue(state.hasAutoAssignedPartitions());
+
+ state.unsubscribe();
+ assertFalse(state.hasRe2JPatternSubscription());
+ assertNull(state.subscriptionPattern());
+ }
+
@Test
public void unsubscribeUserAssignment() {