This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 779a814fcfa6efbb61951c6b40c24272b5f470f2 Author: Yufan Sheng <[email protected]> AuthorDate: Tue Sep 6 00:01:17 2022 +0800 [FLINK-27400][Connector/pulsar] Filter system topics for Pulsar connector. --- .../subscriber/impl/TopicPatternSubscriber.java | 3 ++ .../source/enumerator/topic/TopicNameUtils.java | 52 +++++++++++++++++++--- .../enumerator/topic/TopicNameUtilsTest.java | 25 +++++++++++ 3 files changed, 75 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java index 472dbde3e35..fae2bac7b3e 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; @@ -35,6 +36,7 @@ import java.util.Set; import java.util.regex.Pattern; import static java.util.stream.Collectors.toSet; +import static org.apache.flink.shaded.guava30.com.google.common.base.Predicates.not; /** Subscribe to matching topics based on topic pattern. */ public class TopicPatternSubscriber extends BasePulsarSubscriber { @@ -64,6 +66,7 @@ public class TopicPatternSubscriber extends BasePulsarSubscriber { .getTopics(namespace) .parallelStream() .filter(this::matchesSubscriptionMode) + .filter(not(TopicNameUtils::isInternal)) .filter(topic -> topicPattern.matcher(topic).find()) .map(topic -> queryTopicMetadata(pulsarAdmin, topic)) .filter(Objects::nonNull) diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java index 41c54892838..efa31227499 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java @@ -21,6 +21,7 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic; import org.apache.flink.annotation.Internal; import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet; import org.apache.pulsar.common.naming.TopicName; @@ -30,13 +31,32 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE; +import static org.apache.pulsar.common.naming.TopicDomain.persistent; /** util for topic name. */ @Internal public final class TopicNameUtils { + private static final Pattern HEARTBEAT_NAMESPACE_PATTERN = + Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)"); + private static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = + Pattern.compile("pulsar/([^:]+:\\d+)"); + private static final Pattern SLA_NAMESPACE_PATTERN = + Pattern.compile("sla-monitor" + "/[^/]+/([^:]+:\\d+)"); + private static final Set<String> EVENTS_TOPIC_NAMES = + ImmutableSet.of("__change_events", "__transaction_buffer_snapshot"); + private static final String TRANSACTION_COORDINATOR_ASSIGN_PREFIX = + TopicName.get(persistent.value(), SYSTEM_NAMESPACE, "transaction_coordinator_assign") + .toString(); + private static final String TRANSACTION_COORDINATOR_LOG_PREFIX = + TopicName.get(persistent.value(), SYSTEM_NAMESPACE, "__transaction_log_").toString(); + private static final String PENDING_ACK_STORE_SUFFIX = "__transaction_pending_ack"; + private static final String PENDING_ACK_STORE_CURSOR_SUFFIX = "__pending_ack_state"; + private TopicNameUtils() { // No public constructor. } @@ -52,11 +72,6 @@ public final class TopicNameUtils { return TopicName.get(topic).getPartition(partitionId).toString(); } - /** Get a non-partitioned topic name that does not belong to any partitioned topic. */ - public static String topicNameWithoutPartition(String topic) { - return TopicName.get(topic).toString(); - } - public static boolean isPartition(String topic) { return TopicName.get(topic).isPartitioned(); } @@ -92,4 +107,31 @@ public final class TopicNameUtils { return builder.build(); } + + /** + * This method is refactored from {@code BrokerService} in pulsar-broker which is not available + * in the Pulsar client. We have to put it here and self maintained. Since these topic names + * would never be changed for backward compatible, we only need to add new topic names after + * version bump. + * + * @see <a + * href="https://github.com/apache/pulsar/blob/7075a5ce0d4a70f52625ac8c3d0c48894442b72a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L3024">BrokerService#isSystemTopic</a> + */ + public static boolean isInternal(String topic) { + // A topic name instance without partition information. + String topicName = topicName(topic); + TopicName topicInstance = TopicName.get(topicName); + String localName = topicInstance.getLocalName(); + String namespace = topicInstance.getNamespace(); + + return namespace.equals(SYSTEM_NAMESPACE.toString()) + || SLA_NAMESPACE_PATTERN.matcher(namespace).matches() + || HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches() + || HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches() + || EVENTS_TOPIC_NAMES.contains(localName) + || topicName.startsWith(TRANSACTION_COORDINATOR_ASSIGN_PREFIX) + || topicName.startsWith(TRANSACTION_COORDINATOR_LOG_PREFIX) + || localName.endsWith(PENDING_ACK_STORE_SUFFIX) + || localName.endsWith(PENDING_ACK_STORE_CURSOR_SUFFIX); + } } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java index 54e5e4e53b3..8f6b3ff7615 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtilsTest.java @@ -20,8 +20,13 @@ package org.apache.flink.connector.pulsar.source.enumerator.topic; import org.junit.jupiter.api.Test; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** Unit tests for {@link TopicNameUtils}. */ class TopicNameUtilsTest { @@ -68,4 +73,24 @@ class TopicNameUtilsTest { String name4 = TopicNameUtils.topicNameWithPartition(topicNameWithoutCluster, 8); assertEquals(name4, topicNameWithoutCluster + "-partition-8"); } + + @Test + void mergeTheTopicNamesIntoOneSet() { + List<String> topics = + Arrays.asList("short-topic-partition-8", "short-topic", "long-topic-partition-1"); + List<String> results = TopicNameUtils.distinctTopics(topics); + + assertThat(results) + .containsExactlyInAnyOrder( + "persistent://public/default/short-topic", + "persistent://public/default/long-topic-partition-1"); + } + + @Test + void internalTopicAssertion() { + boolean internal = + TopicNameUtils.isInternal( + "persistent://public/default/topic__transaction_pending_ack"); + assertTrue(internal); + } }
