AHeise commented on a change in pull request #15304:
URL: https://github.com/apache/flink/pull/15304#discussion_r685430757



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
##########
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import 
org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+import static java.lang.Boolean.FALSE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES;
+import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The builder class for {@link PulsarSource} to make it easier for the users 
to construct a {@link
+ * PulsarSource}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSource 
that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .build();
+ * }</pre>
+ *
+ * <p>The service url, admin url, subscription name, topics to consume, and 
the record deserializer
+ * are required fields that must be set.
+ *
+ * <p>To specify the starting position of PulsarSource, one can call {@link
+ * #setStartCursorSupplier(SerializableSupplier)}.
+ *
+ * <p>By default the PulsarSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and never
+ * stop until the Flink job is canceled or fails. To let the PulsarSource run 
in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can 
call {@link
+ * #setUnbounded(SerializableSupplier)}. For example the following 
PulsarSource stops after it
+ * consumes up to a event time when the Flink started.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis()))
+ *     .build();
+ * }</pre>
+ *
+ * @param <IN> The input type of the pulsar {@link Message <?>}
+ * @param <OUT> The output type of the source.
+ */
+@PublicEvolving
+public final class PulsarSourceBuilder<IN, OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceBuilder.class);
+
+    private final Configuration configuration;
+    private PulsarSubscriber subscriber;
+    private RangeGenerator rangeGenerator;
+    private SerializableSupplier<StartCursor> startCursorSupplier;
+    private SerializableSupplier<StopCursor> stopCursorSupplier;
+    private Boundedness boundedness;
+    private PulsarDeserializationSchema<IN, OUT> deserializationSchema;
+    private ConfigurationDataCustomizer<ClientConfigurationData> 
clientConfigurationCustomizer;
+    private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>>
+            consumerConfigurationCustomizer;
+
+    // private builder constructor.
+    PulsarSourceBuilder() {
+        // The default configuration holder.
+        this.configuration = new Configuration();
+    }
+
+    /**
+     * Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+     *
+     * @param adminUrl the url for the PulsarAdmin.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) {
+        configuration.set(PULSAR_ADMIN_URL, adminUrl);
+        return this;
+    }
+
+    /**
+     * Sets the server's link for the PulsarConsumer of the PulsarSource.
+     *
+     * @param serviceUrl the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) {
+        configuration.set(PULSAR_SERVICE_URL, serviceUrl);
+        return this;
+    }
+
+    /**
+     * Sets the name for this pulsar subscription.
+     *
+     * @param subscriptionName the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String 
subscriptionName) {
+        configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
+        return this;
+    }
+
+    /**
+     * {@link SubscriptionType} is the consuming behavior for pulsar, we would 
generator different
+     * split by the given subscription type, it's required and quite important 
for end-user.
+     *
+     * @param subscriptionType The type of subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setSubscriptionType(SubscriptionType 
subscriptionType) {
+        configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+        return this;
+    }
+
+    /**
+     * Set a pulsar topic list for flink source. Some topic may not exist 
currently, consuming this
+     * non-existed topic wouldn't throw any exception. But the best solution 
is just consuming by
+     * using a topic regex. This method is conflict with {@code 
setTopicPattern}.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopics(String... topics) {
+        return setTopics(Arrays.asList(topics));
+    }
+
+    /**
+     * Set a pulsar topic list for flink source. Some topic may not exist 
currently, consuming this
+     * non-existed topic wouldn't throw any exception. But the best solution 
is just consuming by
+     * using a topic regex. This method is conflict with {@code 
setTopicPattern}.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopics(List<String> topics) {
+        ensureSubscriberIsNull("topics");
+        configuration.set(PULSAR_TOPIC_NAMES, 
PulsarJsonUtils.toString(topics));
+
+        this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics);
+
+        return this;
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(String topicsPattern) {
+        return setTopicPattern(Pattern.compile(topicsPattern));
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(Pattern topicsPattern) 
{
+        return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics);
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode The topic filter for regex subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(
+            String topicsPattern, RegexSubscriptionMode regexSubscriptionMode) 
{
+        return setTopicPattern(Pattern.compile(topicsPattern), 
regexSubscriptionMode);
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode The topic filter for regex subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(
+            Pattern topicsPattern, RegexSubscriptionMode 
regexSubscriptionMode) {
+        ensureSubscriberIsNull("topic pattern");
+        configuration.set(PULSAR_TOPICS_PATTERN, topicsPattern.toString());
+        this.subscriber =
+                PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, 
regexSubscriptionMode);
+
+        return setRegexSubscriptionMode(regexSubscriptionMode);
+    }
+
+    /**
+     * When subscribing to a topic using a regular expression, you can pick a 
certain type of
+     * topics.
+     *
+     * @param regexSubscriptionMode The topic type.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setRegexSubscriptionMode(
+            RegexSubscriptionMode regexSubscriptionMode) {
+        configuration.set(PULSAR_REGEX_SUBSCRIPTION_MODE, 
regexSubscriptionMode);
+        return this;
+    }
+
+    /**
+     * Set a topic range generator for Key_Shared subscription.
+     *
+     * @param rangeGenerator A generator which would generate a set of {@link 
TopicRange} for given
+     *     topic.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setRangeGenerator(RangeGenerator 
rangeGenerator) {
+        this.rangeGenerator = rangeGenerator;
+        if (configuration.contains(PULSAR_SUBSCRIPTION_TYPE)) {
+            SubscriptionType subscriptionType = 
configuration.get(PULSAR_SUBSCRIPTION_TYPE);
+            if (subscriptionType != SubscriptionType.Key_Shared) {
+                LOG.warn(
+                        "Key_Shared subscription should be used for custom 
rangeGenerator instead of {}",
+                        subscriptionType);
+            }
+        }
+
+        return this;
+    }
+
+    /**
+     * Specify from which offsets the PulsarSource should start consume from 
by providing an {@link
+     * StartCursor}.
+     *
+     * @param startCursorSupplier the supplier providing a {@link StartCursor} 
which set the
+     *     starting offsets for the Source.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setStartCursorSupplier(
+            SerializableSupplier<StartCursor> startCursorSupplier) {
+        this.startCursorSupplier = startCursorSupplier;
+        return this;
+    }
+
+    /**
+     * By default the PulsarSource is set to run in {@link 
Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * and thus never stops until the Flink job fails or is canceled. To let 
the PulsarSource run as
+     * a streaming source but still stops at some point, one can set an {@link 
StartCursor} to
+     * specify the stopping offsets for each partition. When all the 
partitions have reached their
+     * stopping offsets, the PulsarSource will then exit.
+     *
+     * <p>This method is different from {@link 
#setBounded(SerializableSupplier)} that after setting
+     * the stopping offsets with this method, {@link 
PulsarSource#getBoundedness()} will still
+     * return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will 
stop at the stopping
+     * offsets specified by the stopping offsets {@link StartCursor}.
+     *
+     * @param stopCursorSupplier The {@link StopCursor} to specify the 
stopping offset.
+     * @return this PulsarSourceBuilder.
+     * @see #setBounded(SerializableSupplier)
+     */
+    public PulsarSourceBuilder<IN, OUT> setUnbounded(
+            SerializableSupplier<StopCursor> stopCursorSupplier) {
+        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+        this.stopCursorSupplier = stopCursorSupplier;
+        return this;
+    }
+
+    /**
+     * By default the PulsarSource is set to run in {@link 
Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * and thus never stops until the Flink job fails or is canceled. To let 
the PulsarSource run in
+     * {@link Boundedness#BOUNDED} manner and stops at some point, one can set 
an {@link
+     * StartCursor} to specify the stopping offsets for each partition. When 
all the partitions have
+     * reached their stopping offsets, the PulsarSource will then exit.
+     *
+     * <p>This method is different from {@link 
#setUnbounded(SerializableSupplier)} that after
+     * setting the stopping offsets with this method, {@link 
PulsarSource#getBoundedness()} will
+     * return {@link Boundedness#BOUNDED} instead of {@link 
Boundedness#CONTINUOUS_UNBOUNDED}.
+     *
+     * @param stopCursorSupplier the {@link StopCursor} to specify the 
stopping offsets.
+     * @return this PulsarSourceBuilder.
+     * @see #setUnbounded(SerializableSupplier)
+     */
+    public PulsarSourceBuilder<IN, OUT> setBounded(
+            SerializableSupplier<StopCursor> stopCursorSupplier) {
+        this.boundedness = Boundedness.BOUNDED;
+        this.stopCursorSupplier = stopCursorSupplier;
+        return this;
+    }
+
+    /**
+     * DeserializationSchema is required for getting the {@link Schema} for 
deserialize message from
+     * pulsar and getting the {@link TypeInformation} for message 
serialization in flink.
+     *
+     * <p>We have defined a set of implementations, using {@code
+     * PulsarDeserializationSchema#pulsarSchema} or {@code 
PulsarDeserializationSchema#flinkSchema}
+     * for creating the desired schema.
+     */
+    public PulsarSourceBuilder<IN, OUT> setDeserializationSchema(

Review comment:
       Here would be an implementation sketch.
   ```
       public <T extends OUT> PulsarSourceBuilder<T> setDeserializationSchema(
               PulsarDeserializationSchema<T> deserializationSchema) {
           PulsarSourceBuilder<T> self = specialized();
           self.deserializationSchema = deserializationSchema;
           return self;
       }
   
       @SuppressWarnings("unchecked")
       private <T extends OUT> PulsarSourceBuilder<T> specialized() {
           return (PulsarSourceBuilder<T>) this;
       }
   ```
   
   This allows your first SourceITCase to be written as
   ```
               PulsarSource<String> source =
                       PulsarSource.builder()
                               .setTopics("tp")
                               .setServiceUrl(randomAlphanumeric(8))
                               .setAdminUrl(getAdminUrl())
                               .setSubscriptionName("test")
                               .setDeserializationSchema(flinkSchema(new 
SimpleStringSchema()))
                               
.setBoundedStopCursor(StopCursor::defaultStopCursor)
                               .build();
   ```
   (note the plain `builder` call)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to