This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6237325fb14 KAFKA-15561 [5/N]: Integration tests for new subscribe API 
with Re2J pattern (#17964)
6237325fb14 is described below

commit 6237325fb14047fc96ed7ed048540483baa61b0c
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Nov 29 12:02:39 2024 -0500

    KAFKA-15561 [5/N]: Integration tests for new subscribe API with Re2J 
pattern (#17964)
    
    - integration tests for new subscribe api with RE2J pattern
    - fix to ensure all topics are included in metadata requests when consumer 
is subscribed to RE2J pattern
    
    Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../consumer/internals/ConsumerMetadata.java       |  2 +-
 .../api/PlaintextConsumerSubscriptionTest.scala    | 45 +++++++++++++++++++++-
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index bf81027d8ea..cb4c7dde6f8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -68,7 +68,7 @@ public class ConsumerMetadata extends Metadata {
 
     @Override
     public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
-        if (subscription.hasPatternSubscription())
+        if (subscription.hasPatternSubscription() || 
subscription.hasRe2JPatternSubscription())
             return MetadataRequest.Builder.allTopics();
         List<String> topics = new ArrayList<>();
         topics.addAll(subscription.metadataTopics());
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index 2a3a2fcfdf7..f9c6d18b470 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -15,7 +15,7 @@ package kafka.api
 import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{InvalidRegularExpression, 
InvalidTopicException}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Timeout
 import org.junit.jupiter.api.function.Executable
@@ -178,6 +178,49 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
     assertEquals(0, consumer.assignment().size)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRe2JPatternSubscription(quorum: String, groupProtocol: String): Unit 
= {
+    val topic1 = "tblablac" // matches subscribed pattern
+    createTopic(topic1, 2, brokerCount)
+
+    val topic2 = "tblablak" // does not match subscribed pattern
+    createTopic(topic2, 2, brokerCount)
+
+    val topic3 = "tblab1" // does not match subscribed pattern
+    createTopic(topic3, 2, brokerCount)
+
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    val pattern = new SubscriptionPattern("t.*c")
+    consumer.subscribe(pattern)
+
+    val assignment = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+    awaitAssignment(consumer, assignment)
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRe2JPatternSubscriptionInvalidRegex(quorum: String, groupProtocol: 
String): Unit = {
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    val pattern = new SubscriptionPattern("(t.*c")
+    consumer.subscribe(pattern)
+
+    TestUtils.tryUntilNoAssertionError() {
+      assertThrows(classOf[InvalidRegularExpression], () => 
consumer.poll(Duration.ZERO))
+    }
+    consumer.unsubscribe()
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
   def testExpandingTopicSubscriptions(quorum: String, groupProtocol: String): 
Unit = {

Reply via email to