This is an automated email from the ASF dual-hosted git repository. fpaul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a195f729ba4d1ffe657d82888b26ab9db9121ab8 Author: Yufan Sheng <yu...@streamnative.io> AuthorDate: Wed Feb 9 14:47:28 2022 +0800 [FLINK-26021][connector/pulsar] Add the ability to merge the partitioned Pulsar topics. --- .../pulsar/source/PulsarSourceBuilder.java | 4 +- .../source/enumerator/topic/TopicNameUtils.java | 45 ++++++++++++++++++++++ .../enumerator/topic/TopicNameUtilsTest.java | 16 ++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index 0959b1b..b1f6250 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -29,6 +29,7 @@ import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; @@ -195,7 +196,8 @@ public final class PulsarSourceBuilder<OUT> { */ public PulsarSourceBuilder<OUT> setTopics(List<String> topics) { ensureSubscriberIsNull("topics"); - this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics); + List<String> distinctTopics = TopicNameUtils.distinctTopics(topics); + this.subscriber = PulsarSubscriber.getTopicListSubscriber(distinctTopics); return this; } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java index 446622c..b5d814a 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java @@ -20,8 +20,17 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic; import org.apache.flink.annotation.Internal; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; + import org.apache.pulsar.common.naming.TopicName; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import static org.apache.flink.util.Preconditions.checkArgument; /** util for topic name. */ @@ -42,4 +51,40 @@ public final class TopicNameUtils { checkArgument(partitionId >= 0, "Illegal partition id %s", partitionId); return TopicName.get(topic).getPartition(partitionId).toString(); } + + public static boolean isPartitioned(String topic) { + return TopicName.get(topic).isPartitioned(); + } + + /** Merge the same topics into one topics. */ + public static List<String> distinctTopics(List<String> topics) { + Set<String> fullTopics = new HashSet<>(); + Map<String, List<Integer>> partitionedTopics = new HashMap<>(); + + for (String topic : topics) { + TopicName topicName = TopicName.get(topic); + String partitionedTopicName = topicName.getPartitionedTopicName(); + + if (!topicName.isPartitioned()) { + fullTopics.add(partitionedTopicName); + partitionedTopics.remove(partitionedTopicName); + } else if (!fullTopics.contains(partitionedTopicName)) { + List<Integer> partitionIds = + partitionedTopics.computeIfAbsent( + partitionedTopicName, k -> new ArrayList<>()); + partitionIds.add(topicName.getPartitionIndex()); + } + } + + ImmutableList.Builder<String> builder = ImmutableList.<String>builder().addAll(fullTopics); + + for (Map.Entry<String, List<Integer>> topicSet : partitionedTopics.entrySet()) { + String topicName = topicSet.getKey(); + for (Integer partitionId : topicSet.getValue()) { + builder.add(topicNameWithPartition(topicName, partitionId)); + } + } + + return builder.build(); + } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java index 54e5e4e..0abacc4 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java @@ -20,6 +20,10 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -68,4 +72,16 @@ class TopicNameUtilsTest { String name4 = TopicNameUtils.topicNameWithPartition(topicNameWithoutCluster, 8); assertEquals(name4, topicNameWithoutCluster + "-partition-8"); } + + @Test + void mergeTheTopicNamesIntoOneSet() { + List<String> topics = + Arrays.asList("short-topic-partition-8", "short-topic", "long-topic-partition-1"); + List<String> results = TopicNameUtils.distinctTopics(topics); + + assertThat(results) + .containsExactlyInAnyOrder( + "persistent://public/default/short-topic", + "persistent://public/default/long-topic-partition-1"); + } }