This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a195f729ba4d1ffe657d82888b26ab9db9121ab8
Author: Yufan Sheng <yu...@streamnative.io>
AuthorDate: Wed Feb 9 14:47:28 2022 +0800

    [FLINK-26021][connector/pulsar] Add the ability to merge the partitioned 
Pulsar topics.
---
 .../pulsar/source/PulsarSourceBuilder.java         |  4 +-
 .../source/enumerator/topic/TopicNameUtils.java    | 45 ++++++++++++++++++++++
 .../enumerator/topic/TopicNameUtilsTest.java       | 16 ++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
index 0959b1b..b1f6250 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
 import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
@@ -195,7 +196,8 @@ public final class PulsarSourceBuilder<OUT> {
      */
     public PulsarSourceBuilder<OUT> setTopics(List<String> topics) {
         ensureSubscriberIsNull("topics");
-        this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics);
+        List<String> distinctTopics = TopicNameUtils.distinctTopics(topics);
+        this.subscriber = 
PulsarSubscriber.getTopicListSubscriber(distinctTopics);
         return this;
     }
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
index 446622c..b5d814a 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
@@ -20,8 +20,17 @@ package 
org.apache.flink.connector.pulsar.source.enumerator.topic;
 
 import org.apache.flink.annotation.Internal;
 
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
 import org.apache.pulsar.common.naming.TopicName;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** util for topic name. */
@@ -42,4 +51,40 @@ public final class TopicNameUtils {
         checkArgument(partitionId >= 0, "Illegal partition id %s", 
partitionId);
         return TopicName.get(topic).getPartition(partitionId).toString();
     }
+
+    public static boolean isPartitioned(String topic) {
+        return TopicName.get(topic).isPartitioned();
+    }
+
+    /** Merge the same topics into one topics. */
+    public static List<String> distinctTopics(List<String> topics) {
+        Set<String> fullTopics = new HashSet<>();
+        Map<String, List<Integer>> partitionedTopics = new HashMap<>();
+
+        for (String topic : topics) {
+            TopicName topicName = TopicName.get(topic);
+            String partitionedTopicName = topicName.getPartitionedTopicName();
+
+            if (!topicName.isPartitioned()) {
+                fullTopics.add(partitionedTopicName);
+                partitionedTopics.remove(partitionedTopicName);
+            } else if (!fullTopics.contains(partitionedTopicName)) {
+                List<Integer> partitionIds =
+                        partitionedTopics.computeIfAbsent(
+                                partitionedTopicName, k -> new ArrayList<>());
+                partitionIds.add(topicName.getPartitionIndex());
+            }
+        }
+
+        ImmutableList.Builder<String> builder = 
ImmutableList.<String>builder().addAll(fullTopics);
+
+        for (Map.Entry<String, List<Integer>> topicSet : 
partitionedTopics.entrySet()) {
+            String topicName = topicSet.getKey();
+            for (Integer partitionId : topicSet.getValue()) {
+                builder.add(topicNameWithPartition(topicName, partitionId));
+            }
+        }
+
+        return builder.build();
+    }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java
index 54e5e4e..0abacc4 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java
@@ -20,6 +20,10 @@ package 
org.apache.flink.connector.pulsar.source.enumerator.topic;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
@@ -68,4 +72,16 @@ class TopicNameUtilsTest {
         String name4 = 
TopicNameUtils.topicNameWithPartition(topicNameWithoutCluster, 8);
         assertEquals(name4, topicNameWithoutCluster + "-partition-8");
     }
+
+    @Test
+    void mergeTheTopicNamesIntoOneSet() {
+        List<String> topics =
+                Arrays.asList("short-topic-partition-8", "short-topic", 
"long-topic-partition-1");
+        List<String> results = TopicNameUtils.distinctTopics(topics);
+
+        assertThat(results)
+                .containsExactlyInAnyOrder(
+                        "persistent://public/default/short-topic",
+                        "persistent://public/default/long-topic-partition-1");
+    }
 }

Reply via email to