This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new dbf4cef8522 KAFKA-18690: Keep leader metadata for RE2J-assigned
partitions (#18777)
dbf4cef8522 is described below
commit dbf4cef8522ca690e07ceefc6ab684567327fa1f
Author: Sean Quah <[email protected]>
AuthorDate: Tue Feb 4 18:22:28 2025 +0000
KAFKA-18690: Keep leader metadata for RE2J-assigned partitions (#18777)
Reviewers: Lianet Magrans <[email protected]>
---
.../consumer/internals/ConsumerMetadata.java | 2 +-
.../consumer/internals/SubscriptionState.java | 14 +++++++++++
.../api/PlaintextConsumerSubscriptionTest.scala | 27 ++++++++++++++++++++++
3 files changed, 42 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index cb4c7dde6f8..434e989f068 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -94,6 +94,6 @@ public class ConsumerMetadata extends Metadata {
if (isInternal && !includeInternalTopics)
return false;
- return subscription.matchesSubscribedPattern(topic);
+ return subscription.matchesSubscribedPattern(topic) ||
subscription.isAssignedFromRe2j(topic);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index bd45e71c884..e237165f5b7 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -490,6 +490,20 @@ public class SubscriptionState {
|| this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE
|| this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
}
+ public synchronized boolean isAssignedFromRe2j(String topic) {
+ if (!hasRe2JPatternSubscription()) {
+ return false;
+ }
+
+ for (TopicPartition topicPartition : assignment.partitionSet()) {
+ if (topicPartition.topic().equals(topic)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
public synchronized void position(TopicPartition tp, FetchPosition
position) {
assignedState(tp).position(position);
}
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index 5eea54b23d1..70abc3f8412 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -215,6 +215,33 @@ class PlaintextConsumerSubscriptionTest extends
AbstractConsumerTest {
awaitAssignment(consumer, assignment)
}
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+ def testRe2JPatternSubscriptionFetch(quorum: String, groupProtocol: String):
Unit = {
+ val topic1 = "topic1" // matches subscribed pattern
+ createTopic(topic1, 2, brokerCount)
+
+ val consumer = createConsumer()
+ assertEquals(0, consumer.assignment().size)
+
+ val pattern = new SubscriptionPattern("topic.*")
+ consumer.subscribe(pattern)
+
+ val assignment = Set(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1))
+ awaitAssignment(consumer, assignment)
+
+ val producer = createProducer()
+ val totalRecords = 10L
+ val startingTimestamp = System.currentTimeMillis()
+ val tp = new TopicPartition(topic1, 0)
+ sendRecords(producer, totalRecords.toInt, tp, startingTimestamp =
startingTimestamp)
+ consumeAndVerifyRecords(consumer = consumer, numRecords =
totalRecords.toInt, startingOffset = 0, startingTimestamp = startingTimestamp,
tp = tp)
+ }
+
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
def testRe2JPatternExpandSubscription(quorum: String, groupProtocol:
String): Unit = {