AHeise commented on a change in pull request #15304: URL: https://github.com/apache/flink/pull/15304#discussion_r685441462
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl; + +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +import static java.util.stream.Collectors.toSet; + +/** Subscribe to matching topics based on topic pattern. */ +public class TopicPatternSubscriber extends BasePulsarSubscriber { + private static final long serialVersionUID = 3307710093243745104L; + + private final Pattern topicPattern; + private final RegexSubscriptionMode subscriptionMode; + private final String namespace; + + public TopicPatternSubscriber(Pattern topicPattern, RegexSubscriptionMode subscriptionMode) { + this.topicPattern = topicPattern; + this.subscriptionMode = subscriptionMode; + + // Extract the namespace from topic pattern regex. + // If no namespace provided in the regex, we would directly use "default" as the namespace. + TopicName destination = TopicName.get(topicPattern.toString()); + NamespaceName namespaceName = destination.getNamespaceObject(); + this.namespace = namespaceName.toString(); + } + + @Override + public Set<TopicPartition> getSubscribedTopicPartitions( + PulsarAdmin pulsarAdmin, + RangeGenerator rangeGenerator, + int parallelism, + SourceConfiguration sourceConfiguration) { + try { + return pulsarAdmin + .namespaces() + .getTopics(namespace) + .parallelStream() + .filter(topicFilter()) + .filter(topic -> topicPattern.matcher(topic).find()) + .map(topic -> queryTopicMetadata(pulsarAdmin, topic)) + .filter(Objects::nonNull) + .flatMap( + metadata -> + toTopicPartitions( + metadata, + parallelism, + sourceConfiguration, + rangeGenerator) + .stream()) + .collect(toSet()); + } catch (PulsarAdminException e) { + if (e.getStatusCode() == 404) { + // Skip the topic metadata query. + return Collections.emptySet(); + } else { + // This method would cause the failure for subscriber. + throw new IllegalStateException(e); + } + } + } + + /** + * Filter the topic by regex subscription mode. This logic is the same as pulsar consumer's + * regex subscription. + */ + private Predicate<String> topicFilter() { + return topic -> { + TopicName topicName = TopicName.get(topic); + // Filter the topic persistence. + switch (subscriptionMode) { + case PersistentOnly: + return topicName.isPersistent(); + case NonPersistentOnly: + return !topicName.isPersistent(); + default: + // RegexSubscriptionMode.AllTopics + return true; + } + }; + } Review comment: ```suggestion /** * Filter the topic by regex subscription mode. This logic is the same as pulsar consumer's * regex subscription. */ private boolean matchesSubscriptionMode(String topic) { // Filter the topic persistence. switch (subscriptionMode) { case PersistentOnly: return TopicName.get(topic).isPersistent(); case NonPersistentOnly: return !TopicName.get(topic).isPersistent(); default: // RegexSubscriptionMode.AllTopics return true; } } ``` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java ########## @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl; + +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; + +import java.util.Collections; +import java.util.Objects; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; + +import static java.util.stream.Collectors.toSet; + +/** Subscribe to matching topics based on topic pattern. */ +public class TopicPatternSubscriber extends BasePulsarSubscriber { + private static final long serialVersionUID = 3307710093243745104L; + + private final Pattern topicPattern; + private final RegexSubscriptionMode subscriptionMode; + private final String namespace; + + public TopicPatternSubscriber(Pattern topicPattern, RegexSubscriptionMode subscriptionMode) { + this.topicPattern = topicPattern; + this.subscriptionMode = subscriptionMode; + + // Extract the namespace from topic pattern regex. + // If no namespace provided in the regex, we would directly use "default" as the namespace. + TopicName destination = TopicName.get(topicPattern.toString()); + NamespaceName namespaceName = destination.getNamespaceObject(); + this.namespace = namespaceName.toString(); + } + + @Override + public Set<TopicPartition> getSubscribedTopicPartitions( + PulsarAdmin pulsarAdmin, + RangeGenerator rangeGenerator, + int parallelism, + SourceConfiguration sourceConfiguration) { + try { + return pulsarAdmin + .namespaces() + .getTopics(namespace) + .parallelStream() + .filter(topicFilter()) Review comment: ```suggestion .filter(this::matchesSubscriptionMode) ``` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java ########## @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; + +import org.apache.pulsar.client.api.Range; +import org.apache.pulsar.client.api.SubscriptionType; + +import java.io.Serializable; +import java.util.Objects; + +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Topic partition is the basic topic information used by {@link SplitReader}, we create this topic + * metas for a specified topic by subscription type and convert it into a partition split. + */ +@Internal +public class TopicPartition implements Serializable { + private static final long serialVersionUID = -1474354741550810953L; + + /** + * The topic name of the pulsar. It would be a full topic name, if your don't provide the tenant + * and namespace, we would add them automatically. + */ + private final String topic; + + /** + * Index of partition for the topic. It would be natural number for partitioned topic with a + * non-key_shared subscription. + */ + private final int partitionId; + + /** + * The ranges for this topic, used for limiting consume scope. It would be a {@link + * TopicRange#createFullRange()} full range for all the subscription type except {@link + * SubscriptionType#Key_Shared}. + */ + private final TopicRange range; + + public TopicPartition(String topic, int partitionId, TopicRange range) { + checkNotNull(topic); + checkNotNull(range); + + this.topic = topicName(topic); Review comment: nit: You could inline the null check (`checkNotNull` is the only check that passes the value through). ```suggestion this.topic = topicName(checkNotNull(topic)); ``` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicMetadata.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; + +/** The pojo class for pulsar topic metadata information. */ +public final class TopicMetadata { + + /** + * The name of the topic, it would be a {@link TopicNameUtils#topicName(String)} which don't + * contains partition information. + */ + private final String name; + + /** If this topic is a partitioned topic. */ + private final boolean partitioned; + + /** The size for a partitioned topic. It would be zero for non-partitioned topic. */ + private final int partitionSize; + + public TopicMetadata(String name, int partitionSize) { + this.name = name; + this.partitioned = partitionSize != NON_PARTITIONED; Review comment: I meant it differently, but the current version is actually also good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
