This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 779a814fcfa6efbb61951c6b40c24272b5f470f2
Author: Yufan Sheng <[email protected]>
AuthorDate: Tue Sep 6 00:01:17 2022 +0800

    [FLINK-27400][Connector/pulsar] Filter system topics for Pulsar connector.
---
 .../subscriber/impl/TopicPatternSubscriber.java    |  3 ++
 .../source/enumerator/topic/TopicNameUtils.java    | 52 +++++++++++++++++++---
 .../enumerator/topic/TopicNameUtilsTest.java       | 25 +++++++++++
 3 files changed, 75 insertions(+), 5 deletions(-)

diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java
index 472dbde3e35..fae2bac7b3e 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl;
 
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
@@ -35,6 +36,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import static java.util.stream.Collectors.toSet;
+import static 
org.apache.flink.shaded.guava30.com.google.common.base.Predicates.not;
 
 /** Subscribe to matching topics based on topic pattern. */
 public class TopicPatternSubscriber extends BasePulsarSubscriber {
@@ -64,6 +66,7 @@ public class TopicPatternSubscriber extends 
BasePulsarSubscriber {
                     .getTopics(namespace)
                     .parallelStream()
                     .filter(this::matchesSubscriptionMode)
+                    .filter(not(TopicNameUtils::isInternal))
                     .filter(topic -> topicPattern.matcher(topic).find())
                     .map(topic -> queryTopicMetadata(pulsarAdmin, topic))
                     .filter(Objects::nonNull)
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
index 41c54892838..efa31227499 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
@@ -21,6 +21,7 @@ package 
org.apache.flink.connector.pulsar.source.enumerator.topic;
 import org.apache.flink.annotation.Internal;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
 
 import org.apache.pulsar.common.naming.TopicName;
 
@@ -30,13 +31,32 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
+import static org.apache.pulsar.common.naming.TopicDomain.persistent;
 
 /** util for topic name. */
 @Internal
 public final class TopicNameUtils {
 
+    private static final Pattern HEARTBEAT_NAMESPACE_PATTERN =
+            Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
+    private static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 =
+            Pattern.compile("pulsar/([^:]+:\\d+)");
+    private static final Pattern SLA_NAMESPACE_PATTERN =
+            Pattern.compile("sla-monitor" + "/[^/]+/([^:]+:\\d+)");
+    private static final Set<String> EVENTS_TOPIC_NAMES =
+            ImmutableSet.of("__change_events", 
"__transaction_buffer_snapshot");
+    private static final String TRANSACTION_COORDINATOR_ASSIGN_PREFIX =
+            TopicName.get(persistent.value(), SYSTEM_NAMESPACE, 
"transaction_coordinator_assign")
+                    .toString();
+    private static final String TRANSACTION_COORDINATOR_LOG_PREFIX =
+            TopicName.get(persistent.value(), SYSTEM_NAMESPACE, 
"__transaction_log_").toString();
+    private static final String PENDING_ACK_STORE_SUFFIX = 
"__transaction_pending_ack";
+    private static final String PENDING_ACK_STORE_CURSOR_SUFFIX = 
"__pending_ack_state";
+
     private TopicNameUtils() {
         // No public constructor.
     }
@@ -52,11 +72,6 @@ public final class TopicNameUtils {
         return TopicName.get(topic).getPartition(partitionId).toString();
     }
 
-    /** Get a non-partitioned topic name that does not belong to any 
partitioned topic. */
-    public static String topicNameWithoutPartition(String topic) {
-        return TopicName.get(topic).toString();
-    }
-
     public static boolean isPartition(String topic) {
         return TopicName.get(topic).isPartitioned();
     }
@@ -92,4 +107,31 @@ public final class TopicNameUtils {
 
         return builder.build();
     }
+
+    /**
+     * This method is refactored from {@code BrokerService} in pulsar-broker 
which is not available
+     * in the Pulsar client. We have to put it here and self maintained. Since 
these topic names
+     * would never be changed for backward compatible, we only need to add new 
topic names after
+     * version bump.
+     *
+     * @see <a
+     *     
href="https://github.com/apache/pulsar/blob/7075a5ce0d4a70f52625ac8c3d0c48894442b72a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L3024";>BrokerService#isSystemTopic</a>
+     */
+    public static boolean isInternal(String topic) {
+        // A topic name instance without partition information.
+        String topicName = topicName(topic);
+        TopicName topicInstance = TopicName.get(topicName);
+        String localName = topicInstance.getLocalName();
+        String namespace = topicInstance.getNamespace();
+
+        return namespace.equals(SYSTEM_NAMESPACE.toString())
+                || SLA_NAMESPACE_PATTERN.matcher(namespace).matches()
+                || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
+                || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
+                || EVENTS_TOPIC_NAMES.contains(localName)
+                || topicName.startsWith(TRANSACTION_COORDINATOR_ASSIGN_PREFIX)
+                || topicName.startsWith(TRANSACTION_COORDINATOR_LOG_PREFIX)
+                || localName.endsWith(PENDING_ACK_STORE_SUFFIX)
+                || localName.endsWith(PENDING_ACK_STORE_CURSOR_SUFFIX);
+    }
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java
index 54e5e4e53b3..8f6b3ff7615 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java
@@ -20,8 +20,13 @@ package 
org.apache.flink.connector.pulsar.source.enumerator.topic;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Unit tests for {@link TopicNameUtils}. */
 class TopicNameUtilsTest {
@@ -68,4 +73,24 @@ class TopicNameUtilsTest {
         String name4 = 
TopicNameUtils.topicNameWithPartition(topicNameWithoutCluster, 8);
         assertEquals(name4, topicNameWithoutCluster + "-partition-8");
     }
+
+    @Test
+    void mergeTheTopicNamesIntoOneSet() {
+        List<String> topics =
+                Arrays.asList("short-topic-partition-8", "short-topic", 
"long-topic-partition-1");
+        List<String> results = TopicNameUtils.distinctTopics(topics);
+
+        assertThat(results)
+                .containsExactlyInAnyOrder(
+                        "persistent://public/default/short-topic",
+                        "persistent://public/default/long-topic-partition-1");
+    }
+
+    @Test
+    void internalTopicAssertion() {
+        boolean internal =
+                TopicNameUtils.isInternal(
+                        
"persistent://public/default/topic__transaction_pending_ack");
+        assertTrue(internal);
+    }
 }

Reply via email to