This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 7929b16d [FLINK-36210] Optimize the logic for fetching topic metadata
in the TopicPatternSubscriber mode (#117)
7929b16d is described below
commit 7929b16dcfe648da30b6cc9755f63de2ed3d5319
Author: xiaochen <[email protected]>
AuthorDate: Thu Sep 12 14:10:47 2024 +0800
[FLINK-36210] Optimize the logic for fetching topic metadata in the
TopicPatternSubscriber mode (#117)
In TopicPatternSubscriber mode, our current logic for fetch topic metadata
for all topics and then filtering it. We can optimize this by first filtering
the topic names and then fetch metadata only for the filtered topics.
Co-authored-by: ClownXC <[email protected]>
---
.../enumerator/subscriber/KafkaSubscriberUtils.java | 18 ++++++++++++++++++
.../enumerator/subscriber/TopicPatternSubscriber.java | 18 ++++++++----------
2 files changed, 26 insertions(+), 10 deletions(-)
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
index 404ffaef..72e7f64d 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/KafkaSubscriberUtils.java
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.admin.TopicDescription;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
/** The base implementations of {@link KafkaSubscriber}. */
class KafkaSubscriberUtils {
@@ -38,6 +40,22 @@ class KafkaSubscriberUtils {
}
}
+ static Map<String, TopicDescription> getTopicMetadata(
+ AdminClient adminClient, Pattern topicPattern) {
+ try {
+ Set<String> allTopicNames = adminClient.listTopics().names().get();
+ Set<String> matchedTopicNames =
+ allTopicNames.stream()
+ .filter(name ->
topicPattern.matcher(name).matches())
+ .collect(Collectors.toSet());
+ return getTopicMetadata(adminClient, matchedTopicNames);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format("Failed to get metadata for %s topics.",
topicPattern.pattern()),
+ e);
+ }
+ }
+
static Map<String, TopicDescription> getTopicMetadata(
AdminClient adminClient, Set<String> topicNames) {
try {
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
index 2a9a7533..985ca713 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/subscriber/TopicPatternSubscriber.java
@@ -30,7 +30,7 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
-import static
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getAllTopicMetadata;
+import static
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata;
/** A subscriber to a topic pattern. */
class TopicPatternSubscriber implements KafkaSubscriber {
@@ -44,19 +44,17 @@ class TopicPatternSubscriber implements KafkaSubscriber {
@Override
public Set<TopicPartition> getSubscribedTopicPartitions(AdminClient
adminClient) {
- LOG.debug("Fetching descriptions for all topics on Kafka cluster");
- final Map<String, TopicDescription> allTopicMetadata =
getAllTopicMetadata(adminClient);
+ LOG.debug("Fetching descriptions for {} topics on Kafka cluster",
topicPattern.pattern());
+ final Map<String, TopicDescription> matchedTopicMetadata =
+ getTopicMetadata(adminClient, topicPattern);
Set<TopicPartition> subscribedTopicPartitions = new HashSet<>();
- allTopicMetadata.forEach(
+ matchedTopicMetadata.forEach(
(topicName, topicDescription) -> {
- if (topicPattern.matcher(topicName).matches()) {
- for (TopicPartitionInfo partition :
topicDescription.partitions()) {
- subscribedTopicPartitions.add(
- new TopicPartition(
- topicDescription.name(),
partition.partition()));
- }
+ for (TopicPartitionInfo partition :
topicDescription.partitions()) {
+ subscribedTopicPartitions.add(
+ new TopicPartition(topicDescription.name(),
partition.partition()));
}
});