This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b99c22770aa regex integration tests (#18079)
b99c22770aa is described below
commit b99c22770aafb873935d6daed4b942d326c05aba
Author: Lianet Magrans <[email protected]>
AuthorDate: Tue Dec 10 09:01:02 2024 -0500
regex integration tests (#18079)
Reviewers: David Jacot <[email protected]>
---
.../api/PlaintextConsumerSubscriptionTest.scala | 84 +++++++++++++++++++++-
1 file changed, 82 insertions(+), 2 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index f9c6d18b470..5eea54b23d1 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -193,10 +193,10 @@ class PlaintextConsumerSubscriptionTest extends
AbstractConsumerTest {
val consumer = createConsumer()
assertEquals(0, consumer.assignment().size)
- val pattern = new SubscriptionPattern("t.*c")
+ var pattern = new SubscriptionPattern("t.*c")
consumer.subscribe(pattern)
- val assignment = Set(
+ var assignment = Set(
new TopicPartition(topic, 0),
new TopicPartition(topic, 1),
new TopicPartition(topic1, 0),
@@ -204,6 +204,86 @@ class PlaintextConsumerSubscriptionTest extends
AbstractConsumerTest {
awaitAssignment(consumer, assignment)
consumer.unsubscribe()
assertEquals(0, consumer.assignment().size)
+
+ // Subscribe to a different pattern to match topic2 (that did not match
before)
+ pattern = new SubscriptionPattern(topic2 + ".*")
+ consumer.subscribe(pattern)
+
+ assignment = Set(
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1))
+ awaitAssignment(consumer, assignment)
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRe2JPatternExpandSubscription(quorum: String, groupProtocol:
String): Unit = {
+ val topic1 = "topic1" // matches first pattern
+ createTopic(topic1, 2, brokerCount)
+
+ val topic2 = "topic2" // does not match first pattern
+ createTopic(topic2, 2, brokerCount)
+
+ val consumer = createConsumer()
+ assertEquals(0, consumer.assignment().size)
+
+ var pattern = new SubscriptionPattern("topic1.*")
+ consumer.subscribe(pattern)
+ val assignment = Set(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1))
+ awaitAssignment(consumer, assignment)
+
+ consumer.unsubscribe()
+ assertEquals(0, consumer.assignment().size)
+
+ // Subscribe to a different pattern that should match
+ // the same topics the member already had plus new ones
+ pattern = new SubscriptionPattern("topic1|topic2")
+ consumer.subscribe(pattern)
+
+ val expandedAssignment = assignment ++ Set(new TopicPartition(topic2, 0),
new TopicPartition(topic2, 1))
+ awaitAssignment(consumer, expandedAssignment)
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRe2JPatternSubscriptionAndTopicSubscription(quorum: String,
groupProtocol: String): Unit = {
+ val topic1 = "topic1" // matches subscribed pattern
+ createTopic(topic1, 2, brokerCount)
+
+ val topic11 = "topic11" // matches subscribed pattern
+ createTopic(topic11, 2, brokerCount)
+
+ val topic2 = "topic2" // does not match subscribed pattern
+ createTopic(topic2, 2, brokerCount)
+
+ val consumer = createConsumer()
+ assertEquals(0, consumer.assignment().size)
+
+ // Subscribe to pattern
+ val pattern = new SubscriptionPattern("topic1.*")
+ consumer.subscribe(pattern)
+ val patternAssignment = Set(
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1),
+ new TopicPartition(topic11, 0),
+ new TopicPartition(topic11, 1))
+ awaitAssignment(consumer, patternAssignment)
+ consumer.unsubscribe()
+ assertEquals(0, consumer.assignment().size)
+
+ // Subscribe to explicit topic names
+ consumer.subscribe(List(topic2).asJava)
+ val assignment = Set(
+ new TopicPartition(topic2, 0),
+ new TopicPartition(topic2, 1))
+ awaitAssignment(consumer, assignment)
+ consumer.unsubscribe()
+
+ // Subscribe to pattern again
+ consumer.subscribe(pattern)
+ awaitAssignment(consumer, patternAssignment)
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)