This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6237325fb14 KAFKA-15561 [5/N]: Integration tests for new subscribe API
with Re2J pattern (#17964)
6237325fb14 is described below
commit 6237325fb14047fc96ed7ed048540483baa61b0c
Author: Lianet Magrans <[email protected]>
AuthorDate: Fri Nov 29 12:02:39 2024 -0500
KAFKA-15561 [5/N]: Integration tests for new subscribe API with Re2J
pattern (#17964)
- integration tests for new subscribe api with RE2J pattern
- fix to ensure all topics are included in metadata requests when consumer
is subscribed to RE2J pattern
Reviewers: David Jacot <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../consumer/internals/ConsumerMetadata.java | 2 +-
.../api/PlaintextConsumerSubscriptionTest.scala | 45 +++++++++++++++++++++-
2 files changed, 45 insertions(+), 2 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index bf81027d8ea..cb4c7dde6f8 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -68,7 +68,7 @@ public class ConsumerMetadata extends Metadata {
@Override
public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
- if (subscription.hasPatternSubscription())
+ if (subscription.hasPatternSubscription() ||
subscription.hasRe2JPatternSubscription())
return MetadataRequest.Builder.allTopics();
List<String> topics = new ArrayList<>();
topics.addAll(subscription.metadataTopics());
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index 2a3a2fcfdf7..f9c6d18b470 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -15,7 +15,7 @@ package kafka.api
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.InvalidTopicException
+import org.apache.kafka.common.errors.{InvalidRegularExpression,
InvalidTopicException}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Timeout
import org.junit.jupiter.api.function.Executable
@@ -178,6 +178,49 @@ class PlaintextConsumerSubscriptionTest extends
AbstractConsumerTest {
assertEquals(0, consumer.assignment().size)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRe2JPatternSubscription(quorum: String, groupProtocol: String): Unit
= {
+ val topic1 = "tblablac" // matches subscribed pattern
+ createTopic(topic1, 2, brokerCount)
+
+ val topic2 = "tblablak" // does not match subscribed pattern
+ createTopic(topic2, 2, brokerCount)
+
+ val topic3 = "tblab1" // does not match subscribed pattern
+ createTopic(topic3, 2, brokerCount)
+
+ val consumer = createConsumer()
+ assertEquals(0, consumer.assignment().size)
+
+ val pattern = new SubscriptionPattern("t.*c")
+ consumer.subscribe(pattern)
+
+ val assignment = Set(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1))
+ awaitAssignment(consumer, assignment)
+ consumer.unsubscribe()
+ assertEquals(0, consumer.assignment().size)
+ }
+
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRe2JPatternSubscriptionInvalidRegex(quorum: String, groupProtocol:
String): Unit = {
+ val consumer = createConsumer()
+ assertEquals(0, consumer.assignment().size)
+
+ val pattern = new SubscriptionPattern("(t.*c")
+ consumer.subscribe(pattern)
+
+ TestUtils.tryUntilNoAssertionError() {
+ assertThrows(classOf[InvalidRegularExpression], () =>
consumer.poll(Duration.ZERO))
+ }
+ consumer.unsubscribe()
+ }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testExpandingTopicSubscriptions(quorum: String, groupProtocol: String):
Unit = {