AHeise commented on a change in pull request #15304:
URL: https://github.com/apache/flink/pull/15304#discussion_r685441462



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl;
+
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+import static java.util.stream.Collectors.toSet;
+
+/** Subscribe to matching topics based on topic pattern. */
+public class TopicPatternSubscriber extends BasePulsarSubscriber {
+    private static final long serialVersionUID = 3307710093243745104L;
+
+    private final Pattern topicPattern;
+    private final RegexSubscriptionMode subscriptionMode;
+    private final String namespace;
+
+    public TopicPatternSubscriber(Pattern topicPattern, RegexSubscriptionMode 
subscriptionMode) {
+        this.topicPattern = topicPattern;
+        this.subscriptionMode = subscriptionMode;
+
+        // Extract the namespace from topic pattern regex.
+        // If no namespace provided in the regex, we would directly use 
"default" as the namespace.
+        TopicName destination = TopicName.get(topicPattern.toString());
+        NamespaceName namespaceName = destination.getNamespaceObject();
+        this.namespace = namespaceName.toString();
+    }
+
+    @Override
+    public Set<TopicPartition> getSubscribedTopicPartitions(
+            PulsarAdmin pulsarAdmin,
+            RangeGenerator rangeGenerator,
+            int parallelism,
+            SourceConfiguration sourceConfiguration) {
+        try {
+            return pulsarAdmin
+                    .namespaces()
+                    .getTopics(namespace)
+                    .parallelStream()
+                    .filter(topicFilter())
+                    .filter(topic -> topicPattern.matcher(topic).find())
+                    .map(topic -> queryTopicMetadata(pulsarAdmin, topic))
+                    .filter(Objects::nonNull)
+                    .flatMap(
+                            metadata ->
+                                    toTopicPartitions(
+                                            metadata,
+                                            parallelism,
+                                            sourceConfiguration,
+                                            rangeGenerator)
+                                            .stream())
+                    .collect(toSet());
+        } catch (PulsarAdminException e) {
+            if (e.getStatusCode() == 404) {
+                // Skip the topic metadata query.
+                return Collections.emptySet();
+            } else {
+                // This method would cause the failure for subscriber.
+                throw new IllegalStateException(e);
+            }
+        }
+    }
+
+    /**
+     * Filter the topic by regex subscription mode. This logic is the same as 
pulsar consumer's
+     * regex subscription.
+     */
+    private Predicate<String> topicFilter() {
+        return topic -> {
+            TopicName topicName = TopicName.get(topic);
+            // Filter the topic persistence.
+            switch (subscriptionMode) {
+                case PersistentOnly:
+                    return topicName.isPersistent();
+                case NonPersistentOnly:
+                    return !topicName.isPersistent();
+                default:
+                    // RegexSubscriptionMode.AllTopics
+                    return true;
+            }
+        };
+    }

Review comment:
       ```suggestion
   
       /**
        * Filter the topic by regex subscription mode. This logic is the same 
as pulsar consumer's
        * regex subscription.
        */
       private boolean matchesSubscriptionMode(String topic) {
           // Filter the topic persistence.
           switch (subscriptionMode) {
               case PersistentOnly:
                   return TopicName.get(topic).isPersistent();
               case NonPersistentOnly:
                   return !TopicName.get(topic).isPersistent();
               default:
                   // RegexSubscriptionMode.AllTopics
                   return true;
           }
       }
   ```

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/TopicPatternSubscriber.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl;
+
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.regex.Pattern;
+
+import static java.util.stream.Collectors.toSet;
+
+/** Subscribe to matching topics based on topic pattern. */
+public class TopicPatternSubscriber extends BasePulsarSubscriber {
+    private static final long serialVersionUID = 3307710093243745104L;
+
+    private final Pattern topicPattern;
+    private final RegexSubscriptionMode subscriptionMode;
+    private final String namespace;
+
+    public TopicPatternSubscriber(Pattern topicPattern, RegexSubscriptionMode 
subscriptionMode) {
+        this.topicPattern = topicPattern;
+        this.subscriptionMode = subscriptionMode;
+
+        // Extract the namespace from topic pattern regex.
+        // If no namespace provided in the regex, we would directly use 
"default" as the namespace.
+        TopicName destination = TopicName.get(topicPattern.toString());
+        NamespaceName namespaceName = destination.getNamespaceObject();
+        this.namespace = namespaceName.toString();
+    }
+
+    @Override
+    public Set<TopicPartition> getSubscribedTopicPartitions(
+            PulsarAdmin pulsarAdmin,
+            RangeGenerator rangeGenerator,
+            int parallelism,
+            SourceConfiguration sourceConfiguration) {
+        try {
+            return pulsarAdmin
+                    .namespaces()
+                    .getTopics(namespace)
+                    .parallelStream()
+                    .filter(topicFilter())

Review comment:
       ```suggestion
                       .filter(this::matchesSubscriptionMode)
   ```

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.topic;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+
+import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Topic partition is the basic topic information used by {@link SplitReader}, 
we create this topic
+ * metas for a specified topic by subscription type and convert it into a 
partition split.
+ */
+@Internal
+public class TopicPartition implements Serializable {
+    private static final long serialVersionUID = -1474354741550810953L;
+
+    /**
+     * The topic name of the pulsar. It would be a full topic name, if your 
don't provide the tenant
+     * and namespace, we would add them automatically.
+     */
+    private final String topic;
+
+    /**
+     * Index of partition for the topic. It would be natural number for 
partitioned topic with a
+     * non-key_shared subscription.
+     */
+    private final int partitionId;
+
+    /**
+     * The ranges for this topic, used for limiting consume scope. It would be 
a {@link
+     * TopicRange#createFullRange()} full range for all the subscription type 
except {@link
+     * SubscriptionType#Key_Shared}.
+     */
+    private final TopicRange range;
+
+    public TopicPartition(String topic, int partitionId, TopicRange range) {
+        checkNotNull(topic);
+        checkNotNull(range);
+
+        this.topic = topicName(topic);

Review comment:
       nit: You could inline the null check (`checkNotNull` is the only check 
that passes the value through).
   
   ```suggestion
           this.topic = topicName(checkNotNull(topic));
   ```

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicMetadata.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.topic;
+
+import static 
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
+
+/** The pojo class for pulsar topic metadata information. */
+public final class TopicMetadata {
+
+    /**
+     * The name of the topic, it would be a {@link 
TopicNameUtils#topicName(String)} which don't
+     * contains partition information.
+     */
+    private final String name;
+
+    /** If this topic is a partitioned topic. */
+    private final boolean partitioned;
+
+    /** The size for a partitioned topic. It would be zero for non-partitioned 
topic. */
+    private final int partitionSize;
+
+    public TopicMetadata(String name, int partitionSize) {
+        this.name = name;
+        this.partitioned = partitionSize != NON_PARTITIONED;

Review comment:
       I meant it differently, but the current version is actually also good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to