This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new dbf4cef8522 KAFKA-18690: Keep leader metadata for RE2J-assigned 
partitions (#18777)
dbf4cef8522 is described below

commit dbf4cef8522ca690e07ceefc6ab684567327fa1f
Author: Sean Quah <[email protected]>
AuthorDate: Tue Feb 4 18:22:28 2025 +0000

    KAFKA-18690: Keep leader metadata for RE2J-assigned partitions (#18777)
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../consumer/internals/ConsumerMetadata.java       |  2 +-
 .../consumer/internals/SubscriptionState.java      | 14 +++++++++++
 .../api/PlaintextConsumerSubscriptionTest.scala    | 27 ++++++++++++++++++++++
 3 files changed, 42 insertions(+), 1 deletion(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index cb4c7dde6f8..434e989f068 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -94,6 +94,6 @@ public class ConsumerMetadata extends Metadata {
         if (isInternal && !includeInternalTopics)
             return false;
 
-        return subscription.matchesSubscribedPattern(topic);
+        return subscription.matchesSubscribedPattern(topic) || 
subscription.isAssignedFromRe2j(topic);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index bd45e71c884..e237165f5b7 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -490,6 +490,20 @@ public class SubscriptionState {
                 || this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE 
|| this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
     }
 
+    public synchronized boolean isAssignedFromRe2j(String topic) {
+        if (!hasRe2JPatternSubscription()) {
+            return false;
+        }
+
+        for (TopicPartition topicPartition : assignment.partitionSet()) {
+            if (topicPartition.topic().equals(topic)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     public synchronized void position(TopicPartition tp, FetchPosition 
position) {
         assignedState(tp).position(position);
     }
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
index 5eea54b23d1..70abc3f8412 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala
@@ -215,6 +215,33 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
     awaitAssignment(consumer, assignment)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRe2JPatternSubscriptionFetch(quorum: String, groupProtocol: String): 
Unit = {
+    val topic1 = "topic1" // matches subscribed pattern
+    createTopic(topic1, 2, brokerCount)
+
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    val pattern = new SubscriptionPattern("topic.*")
+    consumer.subscribe(pattern)
+
+    val assignment = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+    awaitAssignment(consumer, assignment)
+
+    val producer = createProducer()
+    val totalRecords = 10L
+    val startingTimestamp = System.currentTimeMillis()
+    val tp = new TopicPartition(topic1, 0)
+    sendRecords(producer, totalRecords.toInt, tp, startingTimestamp = 
startingTimestamp)
+    consumeAndVerifyRecords(consumer = consumer, numRecords = 
totalRecords.toInt, startingOffset = 0, startingTimestamp = startingTimestamp, 
tp = tp)
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
   def testRe2JPatternExpandSubscription(quorum: String, groupProtocol: 
String): Unit = {

Reply via email to