This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b99c22770aa regex integration tests (#18079)
b99c22770aa is described below

commit b99c22770aafb873935d6daed4b942d326c05aba
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Dec 10 09:01:02 2024 -0500

    regex integration tests (#18079)
    
    Reviewers: David Jacot <[email protected]>
---
 .../api/PlaintextConsumerSubscriptionTest.scala    | 84 +++++++++++++++++++++-
 1 file changed, 82 insertions(+), 2 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index f9c6d18b470..5eea54b23d1 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -193,10 +193,10 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
     val consumer = createConsumer()
     assertEquals(0, consumer.assignment().size)
 
-    val pattern = new SubscriptionPattern("t.*c")
+    var pattern = new SubscriptionPattern("t.*c")
     consumer.subscribe(pattern)
 
-    val assignment = Set(
+    var assignment = Set(
       new TopicPartition(topic, 0),
       new TopicPartition(topic, 1),
       new TopicPartition(topic1, 0),
@@ -204,6 +204,86 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
     awaitAssignment(consumer, assignment)
     consumer.unsubscribe()
     assertEquals(0, consumer.assignment().size)
+
+    // Subscribe to a different pattern to match topic2 (that did not match 
before)
+    pattern = new SubscriptionPattern(topic2 + ".*")
+    consumer.subscribe(pattern)
+
+    assignment = Set(
+      new TopicPartition(topic2, 0),
+      new TopicPartition(topic2, 1))
+    awaitAssignment(consumer, assignment)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRe2JPatternExpandSubscription(quorum: String, groupProtocol: 
String): Unit = {
+    val topic1 = "topic1" // matches first pattern
+    createTopic(topic1, 2, brokerCount)
+
+    val topic2 = "topic2" // does not match first pattern
+    createTopic(topic2, 2, brokerCount)
+
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    var pattern = new SubscriptionPattern("topic1.*")
+    consumer.subscribe(pattern)
+    val assignment = Set(
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+    awaitAssignment(consumer, assignment)
+
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
+
+    // Subscribe to a different pattern that should match
+    // the same topics the member already had plus new ones
+    pattern = new SubscriptionPattern("topic1|topic2")
+    consumer.subscribe(pattern)
+
+    val expandedAssignment = assignment ++ Set(new TopicPartition(topic2, 0), 
new TopicPartition(topic2, 1))
+    awaitAssignment(consumer, expandedAssignment)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRe2JPatternSubscriptionAndTopicSubscription(quorum: String, 
groupProtocol: String): Unit = {
+    val topic1 = "topic1" // matches subscribed pattern
+    createTopic(topic1, 2, brokerCount)
+
+    val topic11 = "topic11" // matches subscribed pattern
+    createTopic(topic11, 2, brokerCount)
+
+    val topic2 = "topic2" // does not match subscribed pattern
+    createTopic(topic2, 2, brokerCount)
+
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    // Subscribe to pattern
+    val pattern = new SubscriptionPattern("topic1.*")
+    consumer.subscribe(pattern)
+    val patternAssignment = Set(
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1),
+      new TopicPartition(topic11, 0),
+      new TopicPartition(topic11, 1))
+    awaitAssignment(consumer, patternAssignment)
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
+
+    // Subscribe to explicit topic names
+    consumer.subscribe(List(topic2).asJava)
+    val assignment = Set(
+      new TopicPartition(topic2, 0),
+      new TopicPartition(topic2, 1))
+    awaitAssignment(consumer, assignment)
+    consumer.unsubscribe()
+
+    // Subscribe to pattern again
+    consumer.subscribe(pattern)
+    awaitAssignment(consumer, patternAssignment)
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)

Reply via email to