This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 1efbbb8ec16 KAFKA-18117; KAFKA-18729: Use assigned topic IDs to avoid
full metadata requests on broker-side regex (#19814)
1efbbb8ec16 is described below
commit 1efbbb8ec16b28e37457f5976a4d1c6ca820c3db
Author: Lianet Magrans <[email protected]>
AuthorDate: Thu Jun 12 10:50:31 2025 -0400
KAFKA-18117; KAFKA-18729: Use assigned topic IDs to avoid full metadata
requests on broker-side regex (#19814)
This PR uses topic IDs received in assignment (under new protocol) to
ensure that only these assigned topics are included in the consumer
metadata requests performed when the user subscribes to broker-side
regex (RE2J).
For handling the edge case of consumer needing metadata for topics IDs
(from RE2J) and topic names (from transient topics), the approach is to
send a request for the transient topics needed temporarily, and once
those resolved, the request for the topic IDs needed for RE2J will
follow. (this is because the broker doesn't accept requests for names
and IDs at the same time)
With the changes we also end up fixing another issue (KAFKA-18729) aimed
at avoiding iterating the full set of assigned partitions when checking
if a topic should be retained from the metadata response when using
RE2J.
Reviewers: David Jacot <[email protected]>
---
.../PlaintextConsumerSubscriptionTest.java | 43 +++++++++++++
.../java/org/apache/kafka/clients/Metadata.java | 12 +++-
.../internals/AbstractMembershipManager.java | 12 ++--
.../consumer/internals/ConsumerMetadata.java | 51 ++++++++++++++-
.../consumer/internals/SubscriptionState.java | 41 +++++++++---
.../consumer/internals/TopicIdPartitionSet.java | 13 +++-
.../kafka/common/requests/MetadataRequest.java | 45 +++++++++----
.../consumer/internals/ConsumerMetadataTest.java | 73 +++++++++++++++++++++
.../consumer/internals/SubscriptionStateTest.java | 74 +++++++++++++++++++++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 2 +-
10 files changed, 332 insertions(+), 34 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
index 7e014537cd9..e8c32d37908 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerSubscriptionTest.java
@@ -374,6 +374,49 @@ public class PlaintextConsumerSubscriptionTest {
}
}
+ @ClusterTest
+ public void testTopicIdSubscriptionWithRe2JRegexAndOffsetsFetch() throws
InterruptedException {
+ var topic1 = "topic1"; // matches subscribed pattern
+ cluster.createTopic(topic1, 2, (short) BROKER_COUNT);
+
+ Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
+ try (
+ Producer<byte[], byte[]> producer = cluster.producer();
+ Consumer<byte[], byte[]> consumer = cluster.consumer(config)
+ ) {
+ assertEquals(0, consumer.assignment().size());
+
+ // Subscribe to broker-side regex and fetch. This will require
metadata for topic IDs.
+ var pattern = new SubscriptionPattern("topic.*");
+ consumer.subscribe(pattern);
+ var assignment = Set.of(
+ new TopicPartition(topic, 0),
+ new TopicPartition(topic, 1),
+ new TopicPartition(topic1, 0),
+ new TopicPartition(topic1, 1));
+ awaitAssignment(consumer, assignment);
+ var totalRecords = 10;
+ var startingTimestamp = System.currentTimeMillis();
+ var tp = new TopicPartition(topic1, 0);
+ sendRecords(producer, tp, totalRecords, startingTimestamp);
+ consumeAndVerifyRecords(consumer, tp, totalRecords, 0, 0,
startingTimestamp);
+
+ // Fetch offsets for known and unknown topics. This will require
metadata for topic names temporarily (transient topics)
+ var topic2 = "newTopic2";
+ cluster.createTopic(topic2, 2, (short) BROKER_COUNT);
+ var unassignedPartition = new TopicPartition(topic2, 0);
+ var offsets = consumer.endOffsets(List.of(unassignedPartition,
tp));
+ var expectedOffsets = Map.of(
+ unassignedPartition, 0L,
+ tp, (long) totalRecords);
+ assertEquals(expectedOffsets, offsets);
+
+ // Fetch records again with the regex subscription. This will
require metadata for topic IDs again.
+ sendRecords(producer, tp, totalRecords, startingTimestamp);
+ consumeAndVerifyRecords(consumer, tp, totalRecords, totalRecords,
0, startingTimestamp);
+ }
+ }
+
@ClusterTest
public void testRe2JPatternSubscriptionAndTopicSubscription() throws
InterruptedException {
Map<String, Object> config = Map.of(GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 4c567b7d466..0986d8a67bc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -508,7 +508,7 @@ public class Metadata implements Closeable {
topicId = null;
}
- if (!retainTopic(topicName, metadata.isInternal(), nowMs))
+ if (!retainTopic(topicName, topicId, metadata.isInternal(), nowMs))
continue;
if (metadata.isInternal())
@@ -758,10 +758,20 @@ public class Metadata implements Closeable {
return metadataSnapshot.topicNames();
}
+ /**
+ * Based on the topic name, check if the topic metadata should be kept
when received in a metadata response.
+ */
protected boolean retainTopic(String topic, boolean isInternal, long
nowMs) {
return true;
}
+ /**
+ * Based on the topic name and topic ID, check if the topic metadata
should be kept when received in a metadata response.
+ */
+ protected boolean retainTopic(String topicName, Uuid topicId, boolean
isInternal, long nowMs) {
+ return retainTopic(topicName, isInternal, nowMs);
+ }
+
public static class MetadataRequestAndVersion {
public final MetadataRequest.Builder requestBuilder;
public final int requestVersion;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 30d28fb722f..524a0a4a8dd 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -21,7 +21,6 @@ import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.requests.AbstractResponse;
@@ -373,6 +372,9 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
log.debug("Member {} updated its target assignment from {} to {}.
Member will reconcile it on the next poll.",
memberId, currentTargetAssignment, updatedAssignment);
currentTargetAssignment = updatedAssignment;
+ // Register the assigned topic IDs on the subscription state.
+ // This will be used to ensure they are included in metadata
requests (even though they may not be reconciled yet).
+
subscriptions.setAssignedTopicIds(currentTargetAssignment.partitions.keySet());
});
}
@@ -830,8 +832,8 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
return;
}
if (reconciliationInProgress) {
- log.trace("Ignoring reconciliation attempt. Another reconciliation
is already in progress. Assignment " +
- currentTargetAssignment + " will be handled in the next
reconciliation loop.");
+ log.trace("Ignoring reconciliation attempt. Another reconciliation
is already in progress. " +
+ "Assignment {} will be handled in the next reconciliation
loop.", currentTargetAssignment);
return;
}
@@ -1077,9 +1079,7 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
Optional<String> nameFromMetadata =
findTopicNameInGlobalOrLocalCache(topicId);
nameFromMetadata.ifPresent(resolvedTopicName -> {
// Name resolved, so assignment is ready for reconciliation.
- topicPartitions.forEach(tp ->
- assignmentReadyToReconcile.add(new
TopicIdPartition(topicId, tp, resolvedTopicName))
- );
+ assignmentReadyToReconcile.addAll(topicId, resolvedTopicName,
topicPartitions);
it.remove();
});
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
index 434e989f068..677beaa5fa1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadata.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.utils.LogContext;
@@ -66,14 +67,34 @@ public class ConsumerMetadata extends Metadata {
return allowAutoTopicCreation;
}
+ /**
+ * Constructs a metadata request builder for fetching cluster metadata for
the topics the consumer needs.
+ * This will include:
+ * <ul>
+ * <li>topics the consumer is subscribed to using topic names (calls
to subscribe with topic name list or client-side regex)</li>
+ * <li>topics the consumer is subscribed to using topic IDs (calls to
subscribe with broker-side regex RE2J)</li>
+ * <li>topics involved in calls for fetching offsets (transient
topics)</li>
+ * </ul>
+ * Note that this will generate a request for all topics in the cluster
only when the consumer is subscribed to a client-side regex.
+ */
@Override
public synchronized MetadataRequest.Builder newMetadataRequestBuilder() {
- if (subscription.hasPatternSubscription() ||
subscription.hasRe2JPatternSubscription())
+ if (subscription.hasPatternSubscription()) {
+ // Consumer subscribed to client-side regex => request all topics
to compute regex
return MetadataRequest.Builder.allTopics();
+ }
+ if (subscription.hasRe2JPatternSubscription() &&
transientTopics.isEmpty()) {
+ // Consumer subscribed to broker-side regex and no need for
transient topic names metadata => request topic IDs
+ return
MetadataRequest.Builder.forTopicIds(subscription.assignedTopicIds());
+ }
+ // Subscription to explicit topic names or transient topics present.
+ // Note that in the case of RE2J broker-side regex subscription, we
may end up in this path
+ // if there are transient topics. They are just needed temporarily
(lifetime of offsets-related API calls),
+ // so we'll request them to unblock their APIs, then go back to
requesting assigned topic IDs as needed
List<String> topics = new ArrayList<>();
topics.addAll(subscription.metadataTopics());
topics.addAll(transientTopics);
- return new MetadataRequest.Builder(topics, allowAutoTopicCreation);
+ return MetadataRequest.Builder.forTopicNames(topics,
allowAutoTopicCreation);
}
synchronized void addTransientTopics(Set<String> topics) {
@@ -86,6 +107,15 @@ public class ConsumerMetadata extends Metadata {
this.transientTopics.clear();
}
+ /**
+ * Check if the metadata for the topic should be retained, based on the
topic name.
+ * It will return true for:
+ * <ul>
+ * <li>topic names the consumer subscribed to</li>
+ * <li>topic names that match a client-side regex the consumer
subscribed to</li>
+ * <li>topics involved in fetching offsets</li>
+ * </ul>
+ */
@Override
protected synchronized boolean retainTopic(String topic, boolean
isInternal, long nowMs) {
if (transientTopics.contains(topic) ||
subscription.needsMetadata(topic))
@@ -94,6 +124,21 @@ public class ConsumerMetadata extends Metadata {
if (isInternal && !includeInternalTopics)
return false;
- return subscription.matchesSubscribedPattern(topic) ||
subscription.isAssignedFromRe2j(topic);
+ return subscription.matchesSubscribedPattern(topic);
+ }
+
+ /**
+ * Check if the metadata for the topic should be retained, based on topic
name and topic ID.
+ * This will return true for:
+ * <ul>
+ * <li>topic names the consumer subscribed to</li>
+ * <li>topic names that match a client-side regex the consumer
subscribed to</li>
+ * <li>topic IDs that have been received in an assignment from the
broker after the consumer subscribed to a broker-side regex</li>
+ * <li>topics involved in fetching offsets</li>
+ * </ul>
+ */
+ @Override
+ protected synchronized boolean retainTopic(String topicName, Uuid topicId,
boolean isInternal, long nowMs) {
+ return retainTopic(topicName, isInternal, nowMs) ||
subscription.isAssignedFromRe2j(topicId);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index e237165f5b7..e048ab90b1c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.internals.PartitionStates;
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
import org.apache.kafka.common.utils.LogContext;
@@ -91,6 +92,13 @@ public class SubscriptionState {
/* the list of topics the user has requested */
private Set<String> subscription;
+ /**
+ * Topic IDs received in an assignment from the coordinator when using the
Consumer rebalance protocol.
+ * This will be used to include assigned topic IDs in metadata requests
when the consumer
+ * does not know the topic names (ex. when the user subscribes to a RE2J
regex computed on the broker)
+ */
+ private Set<Uuid> assignedTopicIds;
+
/* The list of topics the group has subscribed to. This may include some
topics which are not part
* of `subscription` for the leader of a group since it is responsible for
detecting metadata changes
* which require a group rebalance. */
@@ -149,6 +157,7 @@ public class SubscriptionState {
this.log = logContext.logger(this.getClass());
this.defaultResetStrategy = defaultResetStrategy;
this.subscription = new TreeSet<>(); // use a sorted set for better
logging
+ this.assignedTopicIds = new TreeSet<>();
this.assignment = new PartitionStates<>();
this.groupSubscription = new HashSet<>();
this.subscribedPattern = null;
@@ -338,6 +347,7 @@ public class SubscriptionState {
this.subscription = Collections.emptySet();
this.groupSubscription = Collections.emptySet();
this.assignment.clear();
+ this.assignedTopicIds = Collections.emptySet();
this.subscribedPattern = null;
this.subscriptionType = SubscriptionType.NONE;
this.assignmentId++;
@@ -490,18 +500,16 @@ public class SubscriptionState {
|| this.subscriptionType == SubscriptionType.AUTO_TOPICS_SHARE
|| this.subscriptionType == SubscriptionType.AUTO_PATTERN_RE2J;
}
- public synchronized boolean isAssignedFromRe2j(String topic) {
- if (!hasRe2JPatternSubscription()) {
+ /**
+ * Check if the topic ID has been received in an assignment
+ * from the coordinator after subscribing to a broker-side regex.
+ */
+ public synchronized boolean isAssignedFromRe2j(Uuid topicId) {
+ if (topicId == null || !hasRe2JPatternSubscription()) {
return false;
}
- for (TopicPartition topicPartition : assignment.partitionSet()) {
- if (topicPartition.topic().equals(topic)) {
- return true;
- }
- }
-
- return false;
+ return this.assignedTopicIds.contains(topicId);
}
public synchronized void position(TopicPartition tp, FetchPosition
position) {
@@ -911,6 +919,21 @@ public class SubscriptionState {
markPendingOnAssignedCallback(addedPartitions, true);
}
+ /**
+ * @return Topic IDs received in an assignment that have not been
reconciled yet, so we need metadata for them.
+ */
+ public synchronized Set<Uuid> assignedTopicIds() {
+ return assignedTopicIds;
+ }
+
+ /**
+ * Set the set of topic IDs that have been assigned to the consumer by the
coordinator.
+ * This is used for topic IDs received in an assignment when using the new
consumer rebalance protocol (KIP-848).
+ */
+ public synchronized void setAssignedTopicIds(Set<Uuid> assignedTopicIds) {
+ this.assignedTopicIds = assignedTopicIds;
+ }
+
/**
* Enable fetching and updating positions for the given partitions that
were assigned to the
* consumer, but waiting for the onPartitionsAssigned callback to
complete. This is
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java
index 920fb63515d..6bf708d9354 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/TopicIdPartitionSet.java
@@ -61,15 +61,24 @@ public class TopicIdPartitionSet {
}
/**
- * Add a new topic (id+name) and partition. This will keep it, and also
save references to the topic ID, topic name and partition.
+ * Add a single partition to the assignment, along with its topic ID and
name.
+ * This will keep it, and also save references to the topic ID, topic name
and partition.
+ * Visible for testing.
*/
- public void add(TopicIdPartition topicIdPartition) {
+ void add(TopicIdPartition topicIdPartition) {
topicIdPartitions.add(topicIdPartition);
topicPartitions.add(topicIdPartition.topicPartition());
topicIds.add(topicIdPartition.topicId());
topicNames.add(topicIdPartition.topicPartition().topic());
}
+ /**
+ * Add a set of partitions to the assignment, along with the topic ID and
name.
+ */
+ public void addAll(Uuid topicId, String topicName, Set<Integer>
partitions) {
+ partitions.forEach(tp -> add(new TopicIdPartition(topicId, tp,
topicName)));
+ }
+
public boolean isEmpty() {
return this.topicIdPartitions.isEmpty();
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index e0a213fa62e..d3dcabfb4f9 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -27,7 +27,9 @@ import org.apache.kafka.common.protocol.Readable;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
public class MetadataRequest extends AbstractRequest {
@@ -49,33 +51,33 @@ public class MetadataRequest extends AbstractRequest {
public Builder(List<String> topics, boolean allowAutoTopicCreation,
short minVersion, short maxVersion) {
super(ApiKeys.METADATA, minVersion, maxVersion);
+ this.data = requestTopicNamesOrAllTopics(topics,
allowAutoTopicCreation);
+ }
+
+ private MetadataRequestData requestTopicNamesOrAllTopics(List<String>
topics, boolean allowAutoTopicCreation) {
MetadataRequestData data = new MetadataRequestData();
if (topics == null)
data.setTopics(null);
else {
topics.forEach(topic -> data.topics().add(new
MetadataRequestTopic().setName(topic)));
}
-
data.setAllowAutoTopicCreation(allowAutoTopicCreation);
- this.data = data;
+ return data;
}
- public Builder(List<String> topics, boolean allowAutoTopicCreation) {
- this(topics, allowAutoTopicCreation,
ApiKeys.METADATA.oldestVersion(), ApiKeys.METADATA.latestVersion());
- }
-
- public Builder(List<Uuid> topicIds) {
- super(ApiKeys.METADATA, ApiKeys.METADATA.oldestVersion(),
ApiKeys.METADATA.latestVersion());
+ private static MetadataRequestData requestTopicIds(Set<Uuid> topicIds)
{
MetadataRequestData data = new MetadataRequestData();
if (topicIds == null)
data.setTopics(null);
else {
topicIds.forEach(topicId -> data.topics().add(new
MetadataRequestTopic().setTopicId(topicId)));
}
+ data.setAllowAutoTopicCreation(false); // can't auto-create
without topic name
+ return data;
+ }
- // It's impossible to create topic with topicId
- data.setAllowAutoTopicCreation(false);
- this.data = data;
+ public Builder(List<String> topics, boolean allowAutoTopicCreation) {
+ this(topics, allowAutoTopicCreation,
ApiKeys.METADATA.oldestVersion(), ApiKeys.METADATA.latestVersion());
}
public static Builder allTopics() {
@@ -84,6 +86,20 @@ public class MetadataRequest extends AbstractRequest {
return new Builder(ALL_TOPICS_REQUEST_DATA);
}
+ /**
+ * @return Builder for metadata request using topic names.
+ */
+ public static Builder forTopicNames(List<String> topicNames, boolean
allowAutoTopicCreation) {
+ return new MetadataRequest.Builder(topicNames,
allowAutoTopicCreation);
+ }
+
+ /**
+ * @return Builder for metadata request using topic IDs.
+ */
+ public static Builder forTopicIds(Set<Uuid> topicIds) {
+ return new MetadataRequest.Builder(requestTopicIds(new
HashSet<>(topicIds)));
+ }
+
public boolean emptyTopicList() {
return data.topics().isEmpty();
}
@@ -92,6 +108,13 @@ public class MetadataRequest extends AbstractRequest {
return data.topics() == null;
}
+ public List<Uuid> topicIds() {
+ return data.topics()
+ .stream()
+ .map(MetadataRequestTopic::topicId)
+ .collect(Collectors.toList());
+ }
+
public List<String> topics() {
return data.topics()
.stream()
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
index 949bdc9aa72..f57e93a2a15 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMetadataTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ClusterResourceListener;
import org.apache.kafka.common.Node;
@@ -100,6 +101,78 @@ public class ConsumerMetadataTest {
assertEquals(Collections.singleton("__matching_topic"),
metadata.fetch().topics());
}
+ @Test
+ public void testSubscriptionToBrokerRegexDoesNotRequestAllTopicsMetadata()
{
+ // Subscribe to broker-side regex
+ subscription.subscribe(new SubscriptionPattern("__.*"),
Optional.empty());
+
+ // Receive assignment from coordinator with topic IDs only
+ Uuid assignedTopicId = Uuid.randomUuid();
+ subscription.setAssignedTopicIds(Set.of(assignedTopicId));
+
+ // Metadata request should only include the assigned topic IDs
+ try (ConsumerMetadata metadata = newConsumerMetadata(false)) {
+ MetadataRequest.Builder builder =
metadata.newMetadataRequestBuilder();
+ assertFalse(builder.isAllTopics(), "Should not request all topics
when using broker-side regex");
+ assertEquals(List.of(assignedTopicId), builder.topicIds(), "Should
only request assigned topic IDs when using broker-side regex");
+ }
+ }
+
+ @Test
+ public void testSubscriptionToBrokerRegexRetainsAssignedTopics() {
+ // Subscribe to broker-side regex
+ subscription.subscribe(new SubscriptionPattern("__.*"),
Optional.empty());
+
+ // Receive assignment from coordinator with topic IDs only
+ Uuid assignedTopicId = Uuid.randomUuid();
+ subscription.setAssignedTopicIds(Set.of(assignedTopicId));
+
+ // Metadata request for assigned topic IDs
+ try (ConsumerMetadata metadata = newConsumerMetadata(false)) {
+ MetadataRequest.Builder builder =
metadata.newMetadataRequestBuilder();
+ assertEquals(List.of(assignedTopicId), builder.topicIds());
+
+ // Metadata response with the assigned topic ID and name
+ Map<String, Uuid> topicIds = Map.of("__matching_topic",
assignedTopicId);
+ MetadataResponse response =
RequestTestUtils.metadataUpdateWithIds(1, singletonMap("__matching_topic", 1),
topicIds);
+ metadata.updateWithCurrentRequestVersion(response, false,
time.milliseconds());
+
+ assertEquals(Set.of("__matching_topic"), new
HashSet<>(metadata.fetch().topics()));
+ assertEquals(Set.of("__matching_topic"),
metadata.fetch().topics());
+ }
+ }
+
+ @Test
+ public void testSubscriptionToBrokerRegexAllowsTransientTopics() {
+ // Subscribe to broker-side regex
+ subscription.subscribe(new SubscriptionPattern("__.*"),
Optional.empty());
+
+ // Receive assignment from coordinator with topic IDs only
+ Uuid assignedTopicId = Uuid.randomUuid();
+ subscription.setAssignedTopicIds(Set.of(assignedTopicId));
+
+ // Metadata request should only include the assigned topic IDs
+ try (ConsumerMetadata metadata = newConsumerMetadata(false)) {
+ MetadataRequest.Builder builder =
metadata.newMetadataRequestBuilder();
+ assertFalse(builder.isAllTopics());
+ assertEquals(List.of(assignedTopicId), builder.topicIds());
+
+ // Call to offsets-related APIs starts. Metadata requests should
move to requesting topic names temporarily.
+ String transientTopic = "__transient_topic";
+ metadata.addTransientTopics(Set.of(transientTopic));
+ builder = metadata.newMetadataRequestBuilder();
+ assertFalse(builder.isAllTopics());
+ // assertTrue(builder.topicIds().isEmpty());
+ assertEquals(List.of(transientTopic), builder.topics());
+
+ // Call to offsets-related APIs ends. Metadata requests should
move back to requesting topic IDs for RE2J.
+ metadata.clearTransientTopics();
+ builder = metadata.newMetadataRequestBuilder();
+ assertFalse(builder.isAllTopics());
+ assertEquals(List.of(assignedTopicId), builder.topicIds());
+ }
+ }
+
@Test
public void testUserAssignment() {
subscription.assignFromUser(Set.of(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index f697990b544..064664d2f4d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -26,6 +26,7 @@ import
org.apache.kafka.clients.consumer.internals.SubscriptionState.LogTruncati
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.utils.LogContext;
@@ -273,6 +274,7 @@ public class SubscriptionStateTest {
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
+ assertEquals(singleton(tp0.topic()), state.subscription());
// Simulate callback setting position to start fetching from
state.seek(tp0, 100);
@@ -292,6 +294,7 @@ public class SubscriptionStateTest {
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
+ assertEquals(singleton(tp0.topic()), state.subscription());
// Callback completed (without updating positions). Partition should
require initializing
// positions, and start fetching once a valid position is set.
@@ -309,6 +312,7 @@ public class SubscriptionStateTest {
state.subscribe(singleton(topic), Optional.of(rebalanceListener));
state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
assertAssignmentAppliedAwaitingCallback(tp0);
+ assertEquals(singleton(tp0.topic()), state.subscription());
state.enablePartitionsAwaitingCallback(singleton(tp0));
state.seek(tp0, 100);
assertTrue(state.isFetchable(tp0));
@@ -331,7 +335,6 @@ public class SubscriptionStateTest {
private void assertAssignmentAppliedAwaitingCallback(TopicPartition
topicPartition) {
assertEquals(singleton(topicPartition), state.assignedPartitions());
assertEquals(1, state.numAssignedPartitions());
- assertEquals(singleton(topicPartition.topic()), state.subscription());
assertFalse(state.isFetchable(topicPartition));
assertEquals(1, state.initializingPartitions().size());
@@ -404,6 +407,75 @@ public class SubscriptionStateTest {
state.subscribe(new SubscriptionPattern(pattern),
Optional.of(rebalanceListener));
assertTrue(state.toString().contains("type=AUTO_PATTERN_RE2J"));
assertTrue(state.toString().contains("subscribedPattern=" + pattern));
+ assertTrue(state.assignedTopicIds().isEmpty());
+ }
+
+ @Test
+ public void testIsAssignedFromRe2j() {
+ assertFalse(state.isAssignedFromRe2j(null));
+ Uuid assignedUuid = Uuid.randomUuid();
+ assertFalse(state.isAssignedFromRe2j(assignedUuid));
+
+ state.subscribe(new SubscriptionPattern("foo.*"), Optional.empty());
+ assertTrue(state.hasRe2JPatternSubscription());
+ assertFalse(state.isAssignedFromRe2j(assignedUuid));
+
+ state.setAssignedTopicIds(Set.of(assignedUuid));
+ assertTrue(state.isAssignedFromRe2j(assignedUuid));
+
+ state.unsubscribe();
+ assertFalse(state.isAssignedFromRe2j(assignedUuid));
+ assertFalse(state.hasRe2JPatternSubscription());
+
+ }
+
+ @Test
+ public void testAssignedPartitionsWithTopicIdsForRe2Pattern() {
+ state.subscribe(new SubscriptionPattern("t.*"),
Optional.of(rebalanceListener));
+ assertTrue(state.assignedTopicIds().isEmpty());
+
+ TopicIdPartitionSet reconciledAssignmentFromRegex = new
TopicIdPartitionSet();
+ reconciledAssignmentFromRegex.addAll(Uuid.randomUuid(), topic,
Set.of(0));
+ state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
+ assertAssignmentAppliedAwaitingCallback(tp0);
+
+ // Simulate callback setting position to start fetching from
+ state.seek(tp0, 100);
+
+ // Callback completed. Partition should be fetchable, from the
position previously defined
+ state.enablePartitionsAwaitingCallback(singleton(tp0));
+ assertEquals(0, state.initializingPartitions().size());
+ assertTrue(state.isFetchable(tp0));
+ assertTrue(state.hasAllFetchPositions());
+ assertEquals(100L, state.position(tp0).offset);
+ }
+
+ @Test
+ public void testAssignedTopicIdsPreservedWhenReconciliationCompletes() {
+ state.subscribe(new SubscriptionPattern("t.*"),
Optional.of(rebalanceListener));
+ assertTrue(state.assignedTopicIds().isEmpty());
+
+ // First assignment received from coordinator
+ Uuid firstAssignedUuid = Uuid.randomUuid();
+ state.setAssignedTopicIds(Set.of(firstAssignedUuid));
+
+ // Second assignment received from coordinator (while the 1st still be
reconciling)
+ Uuid secondAssignedUuid = Uuid.randomUuid();
+ state.setAssignedTopicIds(Set.of(firstAssignedUuid,
secondAssignedUuid));
+
+ // First reconciliation completes and updates the subscription state
+ state.assignFromSubscribedAwaitingCallback(singleton(tp0),
singleton(tp0));
+
+ // First assignment should have been applied
+ assertAssignmentAppliedAwaitingCallback(tp0);
+
+ // Assigned topic IDs should still have both topics (one reconciled,
one not reconciled yet)
+ assertEquals(
+ Set.of(firstAssignedUuid, secondAssignedUuid),
+ state.assignedTopicIds(),
+ "Updating the subscription state when a reconciliation
completes " +
+ "should not overwrite assigned topics that have not
been reconciled yet"
+ );
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 034c49c5c07..a6c26589635 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -3924,7 +3924,7 @@ class KafkaApisTest extends Logging {
MetadataCacheTest.updateCache(metadataCache, partitionRecords)
// 4. Send TopicMetadataReq using topicId
- val metadataReqByTopicId = new
MetadataRequest.Builder(util.List.of(authorizedTopicId,
unauthorizedTopicId)).build()
+ val metadataReqByTopicId =
MetadataRequest.Builder.forTopicIds(util.Set.of(authorizedTopicId,
unauthorizedTopicId)).build()
val repByTopicId = buildRequest(metadataReqByTopicId, plaintextListener)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
any[Long])).thenReturn(0)