This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab0df20489a KAFKA-17592; Support for SubscribedTopicsRegex in 
ConsumerGroupHeartbeat RPC (#17257)
ab0df20489a is described below

commit ab0df20489a8d6079cc576f4d9f7326db4b333c9
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Sep 25 03:52:05 2024 -0400

    KAFKA-17592; Support for SubscribedTopicsRegex in ConsumerGroupHeartbeat 
RPC (#17257)
    
    This patch includes:
    - Bump ConsumerGroupHeartbeatRequest version to include 
subscribedTopicRegex field
    - Introduce new error code for InvalidRegularExpression
    - Bump ConsumerGroupHeartbeatResponse version to support new regex error
    - Wire the new field into the GroupMetadataManager when processing HB
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/errors/InvalidRegularExpression.java    | 27 ++++++++++++++++++++++
 .../org/apache/kafka/common/protocol/Errors.java   |  4 +++-
 .../requests/ConsumerGroupHeartbeatResponse.java   |  1 +
 .../message/ConsumerGroupHeartbeatRequest.json     |  6 ++++-
 .../message/ConsumerGroupHeartbeatResponse.json    |  3 ++-
 .../coordinator/group/GroupMetadataManager.java    |  8 ++++++-
 .../group/GroupMetadataManagerTest.java            | 17 ++++++++++++++
 7 files changed, 62 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/InvalidRegularExpression.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRegularExpression.java
new file mode 100644
index 00000000000..f1ea4d19abc
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidRegularExpression.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * Thrown when a regular expression received in a request is not valid.
+ */
+public class InvalidRegularExpression extends ApiException {
+    public InvalidRegularExpression(String message) {
+        super(message);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index b304d7e08ff..a80ec308ebb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -67,6 +67,7 @@ import 
org.apache.kafka.common.errors.InvalidPrincipalTypeException;
 import org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.apache.kafka.common.errors.InvalidRecordStateException;
 import org.apache.kafka.common.errors.InvalidRegistrationException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -409,7 +410,8 @@ public enum Errors {
     FENCED_STATE_EPOCH(124, "The share coordinator rejected the request 
because the share-group state epoch did not match.", 
FencedStateEpochException::new),
     INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving 
replica's key.", InvalidVoterKeyException::new),
     DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", 
DuplicateVoterException::new),
-    VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", 
VoterNotFoundException::new);
+    VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", 
VoterNotFoundException::new),
+    INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", 
InvalidRegularExpression::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
index 6f881cc8efa..76f89ed4df8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatResponse.java
@@ -38,6 +38,7 @@ import java.util.Map;
  * - {@link Errors#UNSUPPORTED_ASSIGNOR}
  * - {@link Errors#UNRELEASED_INSTANCE_ID}
  * - {@link Errors#GROUP_MAX_SIZE_REACHED}
+ * - {@link Errors#INVALID_REGULAR_EXPRESSION}
  */
 public class ConsumerGroupHeartbeatResponse extends AbstractResponse {
     private final ConsumerGroupHeartbeatResponseData data;
diff --git 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
index 71c6e2e2502..669e2547758 100644
--- 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
+++ 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json
@@ -18,8 +18,10 @@
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "ConsumerGroupHeartbeatRequest",
-  "validVersions": "0",
+  // Version 1 adds SubscribedTopicRegex (KIP-848).
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
+  "latestVersionUnstable": true,
   "fields": [
     { "name": "GroupId", "type": "string", "versions": "0+", "entityType": 
"groupId",
       "about": "The group identifier." },
@@ -35,6 +37,8 @@
       "about": "-1 if it didn't change since the last heartbeat; the maximum 
time in milliseconds that the coordinator will wait on the member to revoke its 
partitions otherwise." },
     { "name": "SubscribedTopicNames", "type": "[]string", "versions": "0+", 
"nullableVersions": "0+", "default": "null", "entityType": "topicName",
       "about": "null if it didn't change since the last heartbeat; the 
subscribed topic names otherwise." },
+    { "name": "SubscribedTopicRegex", "type": "string", "versions": "1+", 
"nullableVersions": "1+", "default": "null",
+      "about": "null if it didn't change since the last heartbeat; the 
subscribed topic regex otherwise" },
     { "name": "ServerAssignor", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
       "about": "null if not used or if it didn't change since the last 
heartbeat; the server side assignor to use otherwise." },
     { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": 
"0+", "nullableVersions": "0+", "default": "null",
diff --git 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
index fb55f80bd40..cda757cb32f 100644
--- 
a/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
+++ 
b/clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json
@@ -17,7 +17,7 @@
   "apiKey": 68,
   "type": "response",
   "name": "ConsumerGroupHeartbeatResponse",
-  "validVersions": "0",
+  "validVersions": "0-1",
   "flexibleVersions": "0+",
   // Supported errors:
   // - GROUP_AUTHORIZATION_FAILED (version 0+)
@@ -30,6 +30,7 @@
   // - UNSUPPORTED_ASSIGNOR (version 0+)
   // - UNRELEASED_INSTANCE_ID (version 0+)
   // - GROUP_MAX_SIZE_REACHED (version 0+)
+  // - INVALID_SUBSCRIPTION_REGEX (version 1+)
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 2a48c87c32d..ecf7efaad49 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1280,6 +1280,7 @@ public class GroupMetadataManager {
         throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
         throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
         throwIfEmptyString(request.rackId(), "RackId can't be empty.");
+        throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex 
is not supported yet.");
 
         if (request.memberEpoch() > 0 || request.memberEpoch() == 
LEAVE_GROUP_MEMBER_EPOCH) {
             throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
@@ -1685,7 +1686,9 @@ public class GroupMetadataManager {
      * @param clientId              The client id.
      * @param clientHost            The client host.
      * @param subscribedTopicNames  The list of subscribed topic names from 
the request
-     *                              of null.
+     *                              or null.
+     * @param subscribedTopicRegex  The regular expression based subscription 
from the request
+     *                              or null.
      * @param assignorName          The assignor name from the request or null.
      * @param ownedTopicPartitions  The list of owned partitions from the 
request or null.
      *
@@ -1702,6 +1705,7 @@ public class GroupMetadataManager {
         String clientId,
         String clientHost,
         List<String> subscribedTopicNames,
+        String subscribedTopicRegex,
         String assignorName,
         List<ConsumerGroupHeartbeatRequestData.TopicPartitions> 
ownedTopicPartitions
     ) throws ApiException {
@@ -1749,6 +1753,7 @@ public class GroupMetadataManager {
             .maybeUpdateRebalanceTimeoutMs(ofSentinel(rebalanceTimeoutMs))
             .maybeUpdateServerAssignorName(Optional.ofNullable(assignorName))
             
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscribedTopicNames))
+            
.maybeUpdateSubscribedTopicRegex(Optional.ofNullable(subscribedTopicRegex))
             .setClientId(clientId)
             .setClientHost(clientHost)
             .setClassicMemberMetadata(null)
@@ -3159,6 +3164,7 @@ public class GroupMetadataManager {
                 context.clientId(),
                 context.clientAddress.toString(),
                 request.subscribedTopicNames(),
+                request.subscribedTopicRegex(),
                 request.serverAssignor(),
                 request.topicPartitions()
             );
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 2a56cd912cd..653662b859b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -248,6 +248,23 @@ public class GroupMetadataManagerTest {
         assertEquals("InstanceId can't be null.", ex.getMessage());
     }
 
+    @Test
+    public void testConsumerHeartbeatRegexValidation() {
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroupAssignors(Collections.singletonList(assignor))
+            .build();
+        Exception ex;
+        // Regex not supported for now. This test will evolve to actually 
validate the regex when it's supported
+        ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("t*")));
+        assertEquals("SubscribedTopicRegex is not supported yet.", 
ex.getMessage());
+    }
+
     @Test
     public void testMemberIdGeneration() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");

Reply via email to