This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 1efbbb8ec16 KAFKA-18117; KAFKA-18729: Use assigned topic IDs to avoid 
full metadata requests on broker-side regex (#19814)
1efbbb8ec16 is described below

commit 1efbbb8ec16b28e37457f5976a4d1c6ca820c3db
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Jun 12 10:50:31 2025 -0400

    KAFKA-18117; KAFKA-18729: Use assigned topic IDs to avoid full metadata 
requests on broker-side regex (#19814)
    
    This PR uses topic IDs received in assignment (under new protocol) to
    ensure that only these assigned topics are included in the consumer
    metadata requests performed when the user subscribes to broker-side
    regex (RE2J).
    
    For handling the edge case of consumer needing metadata for topics IDs
    (from RE2J) and topic names (from transient topics), the approach is to
    send a request for the transient topics needed temporarily, and once
    those resolved, the request for the topic IDs needed for RE2J will
    follow. (this is because the broker doesn't accept requests for names
    and IDs at the same time)
    
    With the changes we also end up fixing another issue (KAFKA-18729) aimed
    at avoiding iterating the full set of assigned partitions when checking
    if a topic should be retained from the metadata response when using
    RE2J.
    
    Reviewers: David Jacot <[email protected]>
---
 .../PlaintextConsumerSubscriptionTest.java         | 43 +++++++++++++
 .../java/org/apache/kafka/clients/Metadata.java    | 12 +++-
 .../internals/AbstractMembershipManager.java       | 12 ++--
 .../consumer/internals/ConsumerMetadata.java       | 51 ++++++++++++++-
 .../consumer/internals/SubscriptionState.java      | 41 +++++++++---
 .../consumer/internals/TopicIdPartitionSet.java    | 13 +++-
 .../kafka/common/requests/MetadataRequest.java     | 45 +++++++++----
 .../consumer/internals/ConsumerMetadataTest.java   | 73 +++++++++++++++++++++
 .../consumer/internals/SubscriptionStateTest.java  | 74 +++++++++++++++++++++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  2 +-
 10 files changed, 332 insertions(+), 34 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
index 7e014537cd9..e8c32d37908 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
@@ -374,6 +374,49 @@ public class PlaintextConsumerSubscriptionTest {
         }
     }
 
+    @ClusterTest
+    public void testTopicIdSubscriptionWithRe2JRegexAndOffsetsFetch() throws 
InterruptedException {
+        var topic1 = "topic1"; // matches subscribed pattern
+        cluster.createTopic(topic1, 2, (short) BROKER_COUNT);
+
+        Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
+        try (
+                Producer<byte[], byte[]> producer = cluster.producer();
+                Consumer<byte[], byte[]> consumer = cluster.consumer(config)
+        ) {
+            assertEquals(0, consumer.assignment().size());
+
+            // Subscribe to broker-side regex and fetch. This will require 
metadata for topic IDs.
+            var pattern = new SubscriptionPattern("topic.*");
+            consumer.subscribe(pattern);
+            var assignment = Set.of(
+                    new TopicPartition(topic, 0),
+                    new TopicPartition(topic, 1),
+                    new TopicPartition(topic1, 0),
+                    new TopicPartition(topic1, 1));
+            awaitAssignment(consumer, assignment);
+            var totalRecords = 10;
+            var startingTimestamp = System.currentTimeMillis();
+            var tp = new TopicPartition(topic1, 0);
+            sendRecords(producer, tp, totalRecords, startingTimestamp);
+            consumeAndVerifyRecords(consumer, tp, totalRecords, 0, 0, 
startingTimestamp);
+
+            // Fetch offsets for known and unknown topics. This will require 
metadata for topic names temporarily (transient topics)
+            var topic2 = "newTopic2";
+            cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+            var unassignedPartition = new TopicPartition(topic2, 0);
+            var offsets = consumer.endOffsets(List.of(unassignedPartition, 
tp));
+            var expectedOffsets = Map.of(
+                    unassignedPartition, 0L,
+                    tp, (long) totalRecords);
+            assertEquals(expectedOffsets, offsets);
+
+            // Fetch records again with the regex subscription. This will 
require metadata for topic IDs again.
+            sendRecords(producer, tp, totalRecords, startingTimestamp);
+            consumeAndVerifyRecords(consumer, tp, totalRecords, totalRecords, 
0, startingTimestamp);
+        }
+    }
+
     @ClusterTest
     public void testRe2JPatternSubscriptionAndTopicSubscription() throws 
InterruptedException {
         Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 4c567b7d466..0986d8a67bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -508,7 +508,7 @@ public class Metadata implements Closeable {
                 topicId = null;
             }
 
-            if (!retainTopic(topicName, metadata.isInternal(), nowMs))
+            if (!retainTopic(topicName, topicId, metadata.isInternal(), nowMs))
                 continue;
 
             if (metadata.isInternal())
@@ -758,10 +758,20 @@ public class Metadata implements Closeable {
         return metadataSnapshot.topicNames();
     }
 
+    /**
+     * Based on the topic name, check if the topic metadata should be kept 
when received in a metadata response.
+     */
     protected boolean retainTopic(String topic, boolean isInternal, long 
nowMs) {
         return true;
     }
 
+    /**
+     * Based on the topic name and topic ID, check if the topic metadata 
should be kept when received in a metadata response.
+     */
+    protected boolean retainTopic(String topicName, Uuid topicId, boolean 
isInternal, long nowMs) {
+        return retainTopic(topicName, isInternal, nowMs);
+    }
+
     public static class MetadataRequestAndVersion {
         public final MetadataRequest.Builder requestBuilder;
         public final int requestVersion;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 30d28fb722f..524a0a4a8dd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -373,6 +372,9 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
             log.debug("Member {} updated its target assignment from {} to {}. 
Member will reconcile it on the next poll.",
                 memberId, currentTargetAssignment, updatedAssignment);
             currentTargetAssignment = updatedAssignment;
+            // Register the assigned topic IDs on the subscription state.
+            // This will be used to ensure they are included in metadata 
requests (even though they may not be reconciled yet).
+            
subscriptions.setAssignedTopicIds(currentTargetAssignment.partitions.keySet());
         });
     }
 
@@ -830,8 +832,8 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
             return;
         }
         if (reconciliationInProgress) {
-            log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. Assignment " +
-                currentTargetAssignment + " will be handled in the next 
reconciliation loop.");
+            log.trace("Ignoring reconciliation attempt. Another reconciliation 
is already in progress. " +
+                 "Assignment {} will be handled in the next reconciliation 
loop.", currentTargetAssignment);
             return;
         }
 
@@ -1077,9 +1079,7 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
             Optional<String> nameFromMetadata = 
findTopicNameInGlobalOrLocalCache(topicId);
             nameFromMetadata.ifPresent(resolvedTopicName -> {
                 // Name resolved, so assignment is ready for reconciliation.
-                topicPartitions.forEach(tp ->
-                    assignmentReadyToReconcile.add(new 
TopicIdPartition(topicId, tp, resolvedTopicName))
-                );
+                assignmentReadyToReconcile.addAll(topicId, resolvedTopicName, 
topicPartitions);
                 it.remove();
             });
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index 434e989f068..677beaa5fa1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.utils.LogContext;
@@ -66,14 +67,34 @@ public class ConsumerMetadata extends Metadata {
         return allowAutoTopicCreation;
     }
 
+    /**
+     * Constructs a metadata request builder for fetching cluster metadata for 
the topics the consumer needs.
+     * This will include:
+     * <ul>
+     *     <li>topics the consumer is subscribed to using topic names (calls 
to subscribe with topic name list or client-side regex)</li>
+     *     <li>topics the consumer is subscribed to using topic IDs (calls to 
subscribe with broker-side regex RE2J)</li>
+     *     <li>topics involved in calls for fetching offsets (transient 
topics)</li>
+     * </ul>
+     * Note that this will generate a request for all topics in the cluster 
only when the consumer is subscribed to a client-side regex.
+     */
     @Override
     public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
-        if (subscription.hasPatternSubscription() || 
subscription.hasRe2JPatternSubscription())
+        if (subscription.hasPatternSubscription()) {
+            // Consumer subscribed to client-side regex => request all topics 
to compute regex
             return MetadataRequest.Builder.allTopics();
+        }
+        if (subscription.hasRe2JPatternSubscription() && 
transientTopics.isEmpty()) {
+            // Consumer subscribed to broker-side regex and no need for 
transient topic names metadata => request topic IDs
+            return 
MetadataRequest.Builder.forTopicIds(subscription.assignedTopicIds());
+        }
+        // Subscription to explicit topic names or transient topics present.
+        // Note that in the case of RE2J broker-side regex subscription, we 
may end up in this path
+        // if there are transient topics. They are just needed temporarily 
(lifetime of offsets-related API calls),
+        // so we'll request them to unblock their APIs, then go back to 
requesting assigned topic IDs as needed
         List<String> topics = new ArrayList<>();
         topics.addAll(subscription.metadataTopics());
         topics.addAll(transientTopics);
-        return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
+        return MetadataRequest.Builder.forTopicNames(topics, 
allowAutoTopicCreation);
     }
 
     synchronized void addTransientTopics(Set<String> topics) {
@@ -86,6 +107,15 @@ public class ConsumerMetadata extends Metadata {
         this.transientTopics.clear();
     }
 
+    /**
+     * Check if the metadata for the topic should be retained, based on the 
topic name.
+     * It will return true for:
+     * <ul>
+     *     <li>topic names the consumer subscribed to</li>
+     *     <li>topic names that match a client-side regex the consumer 
subscribed to</li>
+     *     <li>topics involved in fetching offsets</li>
+     * </ul>
+     */
     @Override
     protected synchronized boolean retainTopic(String topic, boolean 
isInternal, long nowMs) {
         if (transientTopics.contains(topic) || 
subscription.needsMetadata(topic))
@@ -94,6 +124,21 @@ public class ConsumerMetadata extends Metadata {
         if (isInternal && !includeInternalTopics)
             return false;
 
-        return subscription.matchesSubscribedPattern(topic) || 
subscription.isAssignedFromRe2j(topic);
+        return subscription.matchesSubscribedPattern(topic);
+    }
+
+    /**
+     * Check if the metadata for the topic should be retained, based on topic 
name and topic ID.
+     * This will return true for:
+     * <ul>
+     *     <li>topic names the consumer subscribed to</li>
+     *     <li>topic names that match a client-side regex the consumer 
subscribed to</li>
+     *     <li>topic IDs that have been received in an assignment from the 
broker after the consumer subscribed to a broker-side regex</li>
+     *     <li>topics involved in fetching offsets</li>
+     * </ul>
+     */
+    @Override
+    protected synchronized boolean retainTopic(String topicName, Uuid topicId, 
boolean isInternal, long nowMs) {
+        return retainTopic(topicName, isInternal, nowMs) || 
subscription.isAssignedFromRe2j(topicId);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index e237165f5b7..e048ab90b1c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.internals.PartitionStates;
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
 import org.apache.kafka.common.utils.LogContext;
@@ -91,6 +92,13 @@ public class SubscriptionState {
     /* the list of topics the user has requested */
     private Set<String> subscription;
 
+    /**
+     * Topic IDs received in an assignment from the coordinator when using the 
Consumer rebalance protocol.
+     * This will be used to include assigned topic IDs in metadata requests 
when the consumer
+     * does not know the topic names (ex. when the user subscribes to a RE2J 
regex computed on the broker)
+     */
+    private Set<Uuid> assignedTopicIds;
+
     /* The list of topics the group has subscribed to. This may include some 
topics which are not part
      * of `subscription` for the leader of a group since it is responsible for 
detecting metadata changes
      * which require a group rebalance. */
@@ -149,6 +157,7 @@ public class SubscriptionState {
         this.log = logContext.logger(this.getClass());
         this.defaultResetStrategy = defaultResetStrategy;
         this.subscription = new TreeSet<>(); // use a sorted set for better 
logging
+        this.assignedTopicIds = new TreeSet<>();
         this.assignment = new PartitionStates<>();
         this.groupSubscription = new HashSet<>();
         this.subscribedPattern = null;
@@ -338,6 +347,7 @@ public class SubscriptionState {
         this.subscription = Collections.emptySet();
         this.groupSubscription = Collections.emptySet();
         this.assignment.clear();
+        this.assignedTopicIds = Collections.emptySet();
         this.subscribedPattern = null;
         this.subscriptionType = SubscriptionType.NONE;
         this.assignmentId++;
@@ -490,18 +500,16 @@ public class SubscriptionState {
                 || this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE 
|| this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
     }
 
-    public synchronized boolean isAssignedFromRe2j(String topic) {
-        if (!hasRe2JPatternSubscription()) {
+    /**
+     * Check if the topic ID has been received in an assignment
+     * from the coordinator after subscribing to a broker-side regex.
+     */
+    public synchronized boolean isAssignedFromRe2j(Uuid topicId) {
+        if (topicId == null || !hasRe2JPatternSubscription()) {
             return false;
         }
 
-        for (TopicPartition topicPartition : assignment.partitionSet()) {
-            if (topicPartition.topic().equals(topic)) {
-                return true;
-            }
-        }
-
-        return false;
+        return this.assignedTopicIds.contains(topicId);
     }
 
     public synchronized void position(TopicPartition tp, FetchPosition 
position) {
@@ -911,6 +919,21 @@ public class SubscriptionState {
         markPendingOnAssignedCallback(addedPartitions, true);
     }
 
+    /**
+     * @return Topic IDs received in an assignment that have not been 
reconciled yet, so we need metadata for them.
+     */
+    public synchronized Set<Uuid> assignedTopicIds() {
+        return assignedTopicIds;
+    }
+
+    /**
+     * Set the set of topic IDs that have been assigned to the consumer by the 
coordinator.
+     * This is used for topic IDs received in an assignment when using the new 
consumer rebalance protocol (KIP-848).
+     */
+    public synchronized  void setAssignedTopicIds(Set<Uuid> assignedTopicIds) {
+        this.assignedTopicIds = assignedTopicIds;
+    }
+
     /**
      * Enable fetching and updating positions for the given partitions that 
were assigned to the
      * consumer, but waiting for the onPartitionsAssigned callback to 
complete. This is
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java
index 920fb63515d..6bf708d9354 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java
@@ -61,15 +61,24 @@ public class TopicIdPartitionSet {
     }
 
     /**
-     * Add a new topic (id+name) and partition. This will keep it, and also 
save references to the topic ID, topic name and partition.
+     * Add a single partition to the assignment, along with its topic ID and 
name.
+     * This will keep it, and also save references to the topic ID, topic name 
and partition.
+     * Visible for testing.
      */
-    public void add(TopicIdPartition topicIdPartition) {
+    void add(TopicIdPartition topicIdPartition) {
         topicIdPartitions.add(topicIdPartition);
         topicPartitions.add(topicIdPartition.topicPartition());
         topicIds.add(topicIdPartition.topicId());
         topicNames.add(topicIdPartition.topicPartition().topic());
     }
 
+    /**
+     * Add a set of partitions to the assignment, along with the topic ID and 
name.
+     */
+    public void addAll(Uuid topicId, String topicName, Set<Integer> 
partitions) {
+        partitions.forEach(tp -> add(new TopicIdPartition(topicId, tp, 
topicName)));
+    }
+
     public boolean isEmpty() {
         return this.topicIdPartitions.isEmpty();
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index e0a213fa62e..d3dcabfb4f9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -27,7 +27,9 @@ import org.apache.kafka.common.protocol.Readable;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 public class MetadataRequest extends AbstractRequest {
@@ -49,33 +51,33 @@ public class MetadataRequest extends AbstractRequest {
 
         public Builder(List<String> topics, boolean allowAutoTopicCreation, 
short minVersion, short maxVersion) {
             super(ApiKeys.METADATA, minVersion, maxVersion);
+            this.data = requestTopicNamesOrAllTopics(topics, 
allowAutoTopicCreation);
+        }
+
+        private MetadataRequestData requestTopicNamesOrAllTopics(List<String> 
topics, boolean allowAutoTopicCreation) {
             MetadataRequestData data = new MetadataRequestData();
             if (topics == null)
                 data.setTopics(null);
             else {
                 topics.forEach(topic -> data.topics().add(new 
MetadataRequestTopic().setName(topic)));
             }
-
             data.setAllowAutoTopicCreation(allowAutoTopicCreation);
-            this.data = data;
+            return data;
         }
 
-        public Builder(List<String> topics, boolean allowAutoTopicCreation) {
-            this(topics, allowAutoTopicCreation, 
ApiKeys.METADATA.oldestVersion(),  ApiKeys.METADATA.latestVersion());
-        }
-
-        public Builder(List<Uuid> topicIds) {
-            super(ApiKeys.METADATA, ApiKeys.METADATA.oldestVersion(), 
ApiKeys.METADATA.latestVersion());
+        private static MetadataRequestData requestTopicIds(Set<Uuid> topicIds) 
{
             MetadataRequestData data = new MetadataRequestData();
             if (topicIds == null)
                 data.setTopics(null);
             else {
                 topicIds.forEach(topicId -> data.topics().add(new 
MetadataRequestTopic().setTopicId(topicId)));
             }
+            data.setAllowAutoTopicCreation(false); // can't auto-create 
without topic name
+            return data;
+        }
 
-            // It's impossible to create topic with topicId
-            data.setAllowAutoTopicCreation(false);
-            this.data = data;
+        public Builder(List<String> topics, boolean allowAutoTopicCreation) {
+            this(topics, allowAutoTopicCreation, 
ApiKeys.METADATA.oldestVersion(),  ApiKeys.METADATA.latestVersion());
         }
 
         public static Builder allTopics() {
@@ -84,6 +86,20 @@ public class MetadataRequest extends AbstractRequest {
             return new Builder(ALL_TOPICS_REQUEST_DATA);
         }
 
+        /**
+         * @return Builder for metadata request using topic names.
+         */
+        public static Builder forTopicNames(List<String> topicNames, boolean 
allowAutoTopicCreation) {
+            return new MetadataRequest.Builder(topicNames, 
allowAutoTopicCreation);
+        }
+
+        /**
+         * @return Builder for metadata request using topic IDs.
+         */
+        public static Builder forTopicIds(Set<Uuid> topicIds) {
+            return new MetadataRequest.Builder(requestTopicIds(new 
HashSet<>(topicIds)));
+        }
+
         public boolean emptyTopicList() {
             return data.topics().isEmpty();
         }
@@ -92,6 +108,13 @@ public class MetadataRequest extends AbstractRequest {
             return data.topics() == null;
         }
 
+        public List<Uuid> topicIds() {
+            return data.topics()
+                .stream()
+                .map(MetadataRequestTopic::topicId)
+                .collect(Collectors.toList());
+        }
+
         public List<String> topics() {
             return data.topics()
                 .stream()
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 949bdc9aa72..f57e93a2a15 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.ClusterResourceListener;
 import org.apache.kafka.common.Node;
@@ -100,6 +101,78 @@ public class ConsumerMetadataTest {
             assertEquals(Collections.singleton("__matching_topic"), 
metadata.fetch().topics());
     }
 
+    @Test
+    public void testSubscriptionToBrokerRegexDoesNotRequestAllTopicsMetadata() 
{
+        // Subscribe to broker-side regex
+        subscription.subscribe(new SubscriptionPattern("__.*"), 
Optional.empty());
+
+        // Receive assignment from coordinator with topic IDs only
+        Uuid assignedTopicId = Uuid.randomUuid();
+        subscription.setAssignedTopicIds(Set.of(assignedTopicId));
+
+        // Metadata request should only include the assigned topic IDs
+        try (ConsumerMetadata metadata = newConsumerMetadata(false)) {
+            MetadataRequest.Builder builder = 
metadata.newMetadataRequestBuilder();
+            assertFalse(builder.isAllTopics(), "Should not request all topics 
when using broker-side regex");
+            assertEquals(List.of(assignedTopicId), builder.topicIds(), "Should 
only request assigned topic IDs when using broker-side regex");
+        }
+    }
+
+    @Test
+    public void testSubscriptionToBrokerRegexRetainsAssignedTopics() {
+        // Subscribe to broker-side regex
+        subscription.subscribe(new SubscriptionPattern("__.*"), 
Optional.empty());
+
+        // Receive assignment from coordinator with topic IDs only
+        Uuid assignedTopicId = Uuid.randomUuid();
+        subscription.setAssignedTopicIds(Set.of(assignedTopicId));
+
+        // Metadata request for assigned topic IDs
+        try (ConsumerMetadata metadata = newConsumerMetadata(false)) {
+            MetadataRequest.Builder builder = 
metadata.newMetadataRequestBuilder();
+            assertEquals(List.of(assignedTopicId), builder.topicIds());
+
+            // Metadata response with the assigned topic ID and name
+            Map<String, Uuid> topicIds = Map.of("__matching_topic", 
assignedTopicId);
+            MetadataResponse response = 
RequestTestUtils.metadataUpdateWithIds(1, singletonMap("__matching_topic", 1), 
topicIds);
+            metadata.updateWithCurrentRequestVersion(response, false, 
time.milliseconds());
+
+            assertEquals(Set.of("__matching_topic"), new 
HashSet<>(metadata.fetch().topics()));
+            assertEquals(Set.of("__matching_topic"), 
metadata.fetch().topics());
+        }
+    }
+
+    @Test
+    public void testSubscriptionToBrokerRegexAllowsTransientTopics() {
+        // Subscribe to broker-side regex
+        subscription.subscribe(new SubscriptionPattern("__.*"), 
Optional.empty());
+
+        // Receive assignment from coordinator with topic IDs only
+        Uuid assignedTopicId = Uuid.randomUuid();
+        subscription.setAssignedTopicIds(Set.of(assignedTopicId));
+
+        // Metadata request should only include the assigned topic IDs
+        try (ConsumerMetadata metadata = newConsumerMetadata(false)) {
+            MetadataRequest.Builder builder = 
metadata.newMetadataRequestBuilder();
+            assertFalse(builder.isAllTopics());
+            assertEquals(List.of(assignedTopicId), builder.topicIds());
+
+            // Call to offsets-related APIs starts. Metadata requests should 
move to requesting topic names temporarily.
+            String transientTopic = "__transient_topic";
+            metadata.addTransientTopics(Set.of(transientTopic));
+            builder = metadata.newMetadataRequestBuilder();
+            assertFalse(builder.isAllTopics());
+            // assertTrue(builder.topicIds().isEmpty());
+            assertEquals(List.of(transientTopic), builder.topics());
+
+            // Call to offsets-related APIs ends. Metadata requests should 
move back to requesting topic IDs for RE2J.
+            metadata.clearTransientTopics();
+            builder = metadata.newMetadataRequestBuilder();
+            assertFalse(builder.isAllTopics());
+            assertEquals(List.of(assignedTopicId), builder.topicIds());
+        }
+    }
+
     @Test
     public void testUserAssignment() {
         subscription.assignFromUser(Set.of(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index f697990b544..064664d2f4d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -26,6 +26,7 @@ import 
org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncati
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.utils.LogContext;
@@ -273,6 +274,7 @@ public class SubscriptionStateTest {
         state.subscribe(singleton(topic), Optional.of(rebalanceListener));
         state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
         assertAssignmentAppliedAwaitingCallback(tp0);
+        assertEquals(singleton(tp0.topic()), state.subscription());
 
         // Simulate callback setting position to start fetching from
         state.seek(tp0, 100);
@@ -292,6 +294,7 @@ public class SubscriptionStateTest {
         state.subscribe(singleton(topic), Optional.of(rebalanceListener));
         state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
         assertAssignmentAppliedAwaitingCallback(tp0);
+        assertEquals(singleton(tp0.topic()), state.subscription());
 
         // Callback completed (without updating positions). Partition should 
require initializing
         // positions, and start fetching once a valid position is set.
@@ -309,6 +312,7 @@ public class SubscriptionStateTest {
         state.subscribe(singleton(topic), Optional.of(rebalanceListener));
         state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
         assertAssignmentAppliedAwaitingCallback(tp0);
+        assertEquals(singleton(tp0.topic()), state.subscription());
         state.enablePartitionsAwaitingCallback(singleton(tp0));
         state.seek(tp0, 100);
         assertTrue(state.isFetchable(tp0));
@@ -331,7 +335,6 @@ public class SubscriptionStateTest {
     private void assertAssignmentAppliedAwaitingCallback(TopicPartition 
topicPartition) {
         assertEquals(singleton(topicPartition), state.assignedPartitions());
         assertEquals(1, state.numAssignedPartitions());
-        assertEquals(singleton(topicPartition.topic()), state.subscription());
 
         assertFalse(state.isFetchable(topicPartition));
         assertEquals(1, state.initializingPartitions().size());
@@ -404,6 +407,75 @@ public class SubscriptionStateTest {
         state.subscribe(new SubscriptionPattern(pattern), 
Optional.of(rebalanceListener));
         assertTrue(state.toString().contains("type=AUTO_PATTERN_RE2J"));
         assertTrue(state.toString().contains("subscribedPattern=" + pattern));
+        assertTrue(state.assignedTopicIds().isEmpty());
+    }
+
+    @Test
+    public void testIsAssignedFromRe2j() {
+        assertFalse(state.isAssignedFromRe2j(null));
+        Uuid assignedUuid = Uuid.randomUuid();
+        assertFalse(state.isAssignedFromRe2j(assignedUuid));
+
+        state.subscribe(new SubscriptionPattern("foo.*"), Optional.empty());
+        assertTrue(state.hasRe2JPatternSubscription());
+        assertFalse(state.isAssignedFromRe2j(assignedUuid));
+
+        state.setAssignedTopicIds(Set.of(assignedUuid));
+        assertTrue(state.isAssignedFromRe2j(assignedUuid));
+
+        state.unsubscribe();
+        assertFalse(state.isAssignedFromRe2j(assignedUuid));
+        assertFalse(state.hasRe2JPatternSubscription());
+
+    }
+
+    @Test
+    public void testAssignedPartitionsWithTopicIdsForRe2Pattern() {
+        state.subscribe(new SubscriptionPattern("t.*"), 
Optional.of(rebalanceListener));
+        assertTrue(state.assignedTopicIds().isEmpty());
+
+        TopicIdPartitionSet reconciledAssignmentFromRegex = new 
TopicIdPartitionSet();
+        reconciledAssignmentFromRegex.addAll(Uuid.randomUuid(), topic, 
Set.of(0));
+        state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
+        assertAssignmentAppliedAwaitingCallback(tp0);
+
+        // Simulate callback setting position to start fetching from
+        state.seek(tp0, 100);
+
+        // Callback completed. Partition should be fetchable, from the 
position previously defined
+        state.enablePartitionsAwaitingCallback(singleton(tp0));
+        assertEquals(0, state.initializingPartitions().size());
+        assertTrue(state.isFetchable(tp0));
+        assertTrue(state.hasAllFetchPositions());
+        assertEquals(100L, state.position(tp0).offset);
+    }
+
+    @Test
+    public void testAssignedTopicIdsPreservedWhenReconciliationCompletes() {
+        state.subscribe(new SubscriptionPattern("t.*"), 
Optional.of(rebalanceListener));
+        assertTrue(state.assignedTopicIds().isEmpty());
+
+        // First assignment received from coordinator
+        Uuid firstAssignedUuid = Uuid.randomUuid();
+        state.setAssignedTopicIds(Set.of(firstAssignedUuid));
+
+        // Second assignment received from coordinator (while the 1st still be 
reconciling)
+        Uuid secondAssignedUuid = Uuid.randomUuid();
+        state.setAssignedTopicIds(Set.of(firstAssignedUuid, 
secondAssignedUuid));
+
+        // First reconciliation completes and updates the subscription state
+        state.assignFromSubscribedAwaitingCallback(singleton(tp0), 
singleton(tp0));
+
+        // First assignment should have been applied
+        assertAssignmentAppliedAwaitingCallback(tp0);
+
+        // Assigned topic IDs should still have both topics (one reconciled, 
one not reconciled yet)
+        assertEquals(
+                Set.of(firstAssignedUuid, secondAssignedUuid),
+                state.assignedTopicIds(),
+                "Updating the subscription state when a reconciliation 
completes " +
+                        "should not overwrite assigned topics that have not 
been reconciled yet"
+        );
     }
 
     @Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 034c49c5c07..a6c26589635 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3924,7 +3924,7 @@ class KafkaApisTest extends Logging {
     MetadataCacheTest.updateCache(metadataCache, partitionRecords)
 
     // 4. Send TopicMetadataReq using topicId
-    val metadataReqByTopicId = new 
MetadataRequest.Builder(util.List.of(authorizedTopicId, 
unauthorizedTopicId)).build()
+    val metadataReqByTopicId = 
MetadataRequest.Builder.forTopicIds(util.Set.of(authorizedTopicId, 
unauthorizedTopicId)).build()
     val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
     
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
       any[Long])).thenReturn(0)


Reply via email to