This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 64f3ee4c336 KAFKA-17593; [2/N] Update request validation & validate 
regex (#17651)
64f3ee4c336 is described below

commit 64f3ee4c336339ee17163cc742442fef0db7b62c
Author: David Jacot <[email protected]>
AuthorDate: Mon Nov 4 15:38:09 2024 +0100

    KAFKA-17593; [2/N] Update request validation & validate regex (#17651)
    
    This patch does two things:
    1) Change the validation of the ConsumerGroupHeartbeat request to accept 
subscribed topic names and/or subscribed topic regex. At least of them must be 
set in the first request with epoch 0.
    2) Validate the provided regular expression by compiling it.
    
    Co-authored-by: Lianet Magrans <[email protected]>
    
    Reviewers: Jeff Kim <[email protected]>, Lianet Magrans 
<[email protected]>
---
 checkstyle/import-control-group-coordinator.xml    |  1 +
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 78 ++++++++++++++++++++++
 .../coordinator/group/GroupMetadataManager.java    | 37 ++++++++--
 .../group/GroupMetadataManagerTest.java            | 52 +++++++++++++--
 4 files changed, 157 insertions(+), 11 deletions(-)

diff --git a/checkstyle/import-control-group-coordinator.xml 
b/checkstyle/import-control-group-coordinator.xml
index 55d265343af..0619ea444d5 100644
--- a/checkstyle/import-control-group-coordinator.xml
+++ b/checkstyle/import-control-group-coordinator.xml
@@ -70,6 +70,7 @@
             <allow pkg="org.apache.kafka.timeline" />
             <allow pkg="org.apache.kafka.coordinator.common" />
             <allow pkg="org.apache.kafka.coordinator.common.runtime" />
+            <allow pkg="com.google.re2j" />
             <subpackage name="metrics">
                 <allow pkg="com.yammer.metrics"/>
                 <allow pkg="org.HdrHistogram" />
diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index d869109ab73..63004b99d2a 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -164,6 +164,84 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) {
     assertEquals(-1, consumerGroupHeartbeatResponse.data.memberEpoch)
   }
 
+  @ClusterTest
+  def testConsumerGroupHeartbeatWithRegularExpression(): Unit = {
+    val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+    val admin = cluster.createAdminClient()
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    TestUtils.createOffsetsTopicWithAdmin(
+      admin = admin,
+      brokers = raftCluster.brokers.values().asScala.toSeq,
+      controllers = raftCluster.controllers().values().asScala.toSeq
+    )
+
+    // Heartbeat request to join the group. Note that the member subscribes
+    // to an nonexistent topic.
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid().toString)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicRegex("foo")
+        .setTopicPartitions(List.empty.asJava),
+      true
+    ).build()
+
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == Errors.NONE.code
+    }, msg = s"Could not join the group successfully. Last response 
$consumerGroupHeartbeatResponse.")
+
+    // Verify the response.
+    assertNotNull(consumerGroupHeartbeatResponse.data.memberId)
+    assertEquals(1, consumerGroupHeartbeatResponse.data.memberEpoch)
+    assertEquals(new ConsumerGroupHeartbeatResponseData.Assignment(), 
consumerGroupHeartbeatResponse.data.assignment)
+  }
+
+  @ClusterTest
+  def testConsumerGroupHeartbeatWithInvalidRegularExpression(): Unit = {
+    val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
+    val admin = cluster.createAdminClient()
+
+    // Creates the __consumer_offsets topics because it won't be created 
automatically
+    // in this test because it does not use FindCoordinator API.
+    TestUtils.createOffsetsTopicWithAdmin(
+      admin = admin,
+      brokers = raftCluster.brokers.values().asScala.toSeq,
+      controllers = raftCluster.controllers().values().asScala.toSeq
+    )
+
+    // Heartbeat request to join the group. Note that the member subscribes
+    // to an nonexistent topic.
+    val consumerGroupHeartbeatRequest = new 
ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId("grp")
+        .setMemberId(Uuid.randomUuid().toString)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicRegex("[")
+        .setTopicPartitions(List.empty.asJava),
+      true
+    ).build()
+
+    // Send the request until receiving a successful response. There is a delay
+    // here because the group coordinator is loaded in the background.
+    var consumerGroupHeartbeatResponse: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      consumerGroupHeartbeatResponse = 
connectAndReceive(consumerGroupHeartbeatRequest)
+      consumerGroupHeartbeatResponse.data.errorCode == 
Errors.INVALID_REGULAR_EXPRESSION.code
+    }, msg = s"Did not receive the expected error. Last response 
$consumerGroupHeartbeatResponse.")
+
+    // Verify the response.
+    assertEquals(Errors.INVALID_REGULAR_EXPRESSION.code, 
consumerGroupHeartbeatResponse.data.errorCode)
+  }
+
   @ClusterTest
   def 
testRejoiningStaticMemberGetsAssignmentsBackWhenNewGroupCoordinatorIsEnabled(): 
Unit = {
     val raftCluster = cluster.asInstanceOf[RaftClusterInstance]
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9a5c517a956..3435f3b3d2f 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -118,6 +119,9 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
 import org.apache.kafka.timeline.TimelineHashSet;
 
+import com.google.re2j.Pattern;
+import com.google.re2j.PatternSyntaxException;
+
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
@@ -1312,7 +1316,6 @@ public class GroupMetadataManager {
         throwIfEmptyString(request.groupId(), "GroupId can't be empty.");
         throwIfEmptyString(request.instanceId(), "InstanceId can't be empty.");
         throwIfEmptyString(request.rackId(), "RackId can't be empty.");
-        throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex 
is not supported yet.");
 
         if (request.memberEpoch() == 0) {
             if (request.rebalanceTimeoutMs() == -1) {
@@ -1321,8 +1324,10 @@ public class GroupMetadataManager {
             if (request.topicPartitions() == null || 
!request.topicPartitions().isEmpty()) {
                 throw new InvalidRequestException("TopicPartitions must be 
empty when (re-)joining.");
             }
-            if (request.subscribedTopicNames() == null || 
request.subscribedTopicNames().isEmpty()) {
-                throw new InvalidRequestException("SubscribedTopicNames must 
be set in first request.");
+            boolean hasSubscribedTopicNames = request.subscribedTopicNames() 
!= null && !request.subscribedTopicNames().isEmpty();
+            boolean hasSubscribedTopicRegex = request.subscribedTopicRegex() 
!= null && !request.subscribedTopicRegex().isEmpty();
+            if (!hasSubscribedTopicNames && !hasSubscribedTopicRegex) {
+                throw new InvalidRequestException("SubscribedTopicNames or 
SubscribedTopicRegex must be set in first request.");
             }
         } else if (request.memberEpoch() == LEAVE_GROUP_STATIC_MEMBER_EPOCH) {
             throwIfNull(request.instanceId(), "InstanceId can't be null.");
@@ -1642,6 +1647,24 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Validates if the provided regular expression is valid.
+     *
+     * @param regex The regular expression to validate.
+     * @throws InvalidRegularExpression if the regular expression is invalid.
+     */
+    private static void throwIfRegularExpressionIsInvalid(
+        String regex
+    ) throws InvalidRegularExpression {
+        try {
+            Pattern.compile(regex);
+        } catch (PatternSyntaxException ex) {
+            throw new InvalidRegularExpression(
+                String.format("SubscribedTopicRegex `%s` is not a valid 
regular expression: %s.",
+                    regex, ex.getDescription()));
+        }
+    }
+
     /**
      * Deserialize the subscription in JoinGroupRequestProtocolCollection.
      * All the protocols have the same subscription, so the method picks a 
random one.
@@ -2390,13 +2413,14 @@ public class GroupMetadataManager {
      * @param records       The list to accumulate any new records.
      * @return A boolean indicating whether the updatedMember has a different
      *         subscribedTopicNames/subscribedTopicRegex from the old member.
+     * @throws InvalidRegularExpression if the regular expression is invalid.
      */
     private boolean hasMemberSubscriptionChanged(
         String groupId,
         ConsumerGroupMember member,
         ConsumerGroupMember updatedMember,
         List<CoordinatorRecord> records
-    ) {
+    ) throws InvalidRegularExpression {
         String memberId = updatedMember.memberId();
         if (!updatedMember.equals(member)) {
             records.add(newConsumerGroupMemberSubscriptionRecord(groupId, 
updatedMember));
@@ -2410,6 +2434,11 @@ public class GroupMetadataManager {
             if 
(!updatedMember.subscribedTopicRegex().equals(member.subscribedTopicRegex())) {
                 log.debug("[GroupId {}] Member {} updated its subscribed regex 
to: {}.",
                     groupId, memberId, updatedMember.subscribedTopicRegex());
+                // If the regular expression has changed, we compile it to 
ensure that
+                // its syntax is valid.
+                if (updatedMember.subscribedTopicRegex() != null) {
+                    
throwIfRegularExpressionIsInvalid(updatedMember.subscribedTopicRegex());
+                }
                 return true;
             }
         }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 5d1962da233..b912b4f18de 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -27,6 +27,7 @@ import 
org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
+import org.apache.kafka.common.errors.InvalidRegularExpression;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
@@ -206,7 +207,7 @@ public class GroupMetadataManagerTest {
                 .setRebalanceTimeoutMs(5000)));
         assertEquals("TopicPartitions must be empty when (re-)joining.", 
ex.getMessage());
 
-        // SubscribedTopicNames must be present and empty in the first request 
(epoch == 0).
+        // SubscribedTopicNames or SubscribedTopicRegex must be present and 
non-empty in the first request (epoch == 0).
         ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
                 .setMemberId(memberId)
@@ -214,7 +215,7 @@ public class GroupMetadataManagerTest {
                 .setMemberEpoch(0)
                 .setRebalanceTimeoutMs(5000)
                 .setTopicPartitions(Collections.emptyList())));
-        assertEquals("SubscribedTopicNames must be set in first request.", 
ex.getMessage());
+        assertEquals("SubscribedTopicNames or SubscribedTopicRegex must be set 
in first request.", ex.getMessage());
 
         // InstanceId must be non-empty if provided in all requests.
         ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(
@@ -257,20 +258,57 @@ public class GroupMetadataManagerTest {
 
     @Test
     public void testConsumerHeartbeatRegexValidation() {
+        String memberId = Uuid.randomUuid().toString();
         MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        assignor.prepareGroupAssignment(new 
GroupAssignment(Collections.emptyMap()));
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
             .withConsumerGroupAssignors(Collections.singletonList(assignor))
             .build();
-        Exception ex;
-        // Regex not supported for now. This test will evolve to actually 
validate the regex when it's supported
-        ex = assertThrows(InvalidRequestException.class, () -> 
context.consumerGroupHeartbeat(
+
+        // Subscribing with an invalid regular expression fails.
+        Exception ex = assertThrows(InvalidRegularExpression.class, () -> 
context.consumerGroupHeartbeat(
             new ConsumerGroupHeartbeatRequestData()
                 .setMemberId(Uuid.randomUuid().toString())
                 .setGroupId("foo")
+                .setMemberId(memberId)
                 .setMemberEpoch(0)
                 .setRebalanceTimeoutMs(5000)
-                .setSubscribedTopicRegex("t*")));
-        assertEquals("SubscribedTopicRegex is not supported yet.", 
ex.getMessage());
+                .setSubscribedTopicRegex("[")
+                .setTopicPartitions(Collections.emptyList())));
+        assertEquals("SubscribedTopicRegex `[` is not a valid regular 
expression: missing closing ].", ex.getMessage());
+
+        // Subscribing with a valid regular expression succeeds.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex(".*")
+                .setTopicPartitions(Collections.emptyList()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Updating the subscription to an invalid regular expression fails.
+        assertThrows(InvalidRegularExpression.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicRegex("[")
+                .setTopicPartitions(Collections.emptyList())));
+        assertEquals("SubscribedTopicRegex `[` is not a valid regular 
expression: missing closing ].", ex.getMessage());
+
+        // Updating the subscription to topic names succeeds (checking when 
the regex becomes null).
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId("foo")
+                .setMemberId(memberId)
+                .setMemberEpoch(1)
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Collections.singletonList("foo"))
+                .setTopicPartitions(Collections.emptyList()));
+        assertEquals(2, result.response().memberEpoch());
     }
 
     @Test

Reply via email to