AHeise commented on a change in pull request #15304: URL: https://github.com/apache/flink/pull/15304#discussion_r685432496
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.regex.Pattern; + +import static java.lang.Boolean.FALSE; +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES; +import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The builder class for {@link PulsarSource} to make it easier for the users to construct a {@link + * PulsarSource}. + * + * <p>The following example shows the minimum setup to create a PulsarSource that reads the String + * values from a Pulsar topic. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .build(); + * }</pre> + * + * <p>The service url, admin url, subscription name, topics to consume, and the record deserializer + * are required fields that must be set. + * + * <p>To specify the starting position of PulsarSource, one can call {@link + * #setStartCursorSupplier(SerializableSupplier)}. + * + * <p>By default the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never + * stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link + * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link + * #setUnbounded(SerializableSupplier)}. For example the following PulsarSource stops after it + * consumes up to a event time when the Flink started. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis())) + * .build(); + * }</pre> + * + * @param <IN> The input type of the pulsar {@link Message <?>} + * @param <OUT> The output type of the source. + */ +@PublicEvolving +public final class PulsarSourceBuilder<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class); + + private final Configuration configuration; + private PulsarSubscriber subscriber; + private RangeGenerator rangeGenerator; + private SerializableSupplier<StartCursor> startCursorSupplier; + private SerializableSupplier<StopCursor> stopCursorSupplier; + private Boundedness boundedness; + private PulsarDeserializationSchema<IN, OUT> deserializationSchema; + private ConfigurationDataCustomizer<ClientConfigurationData> clientConfigurationCustomizer; + private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer; + + // private builder constructor. + PulsarSourceBuilder() { + // The default configuration holder. + this.configuration = new Configuration(); + } + + /** + * Sets the admin endpoint for the PulsarAdmin of the PulsarSource. + * + * @param adminUrl the url for the PulsarAdmin. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) { + configuration.set(PULSAR_ADMIN_URL, adminUrl); + return this; + } + + /** + * Sets the server's link for the PulsarConsumer of the PulsarSource. + * + * @param serviceUrl the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) { + configuration.set(PULSAR_SERVICE_URL, serviceUrl); + return this; + } + + /** + * Sets the name for this pulsar subscription. + * + * @param subscriptionName the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String subscriptionName) { + configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName); + return this; + } + + /** + * {@link SubscriptionType} is the consuming behavior for pulsar, we would generator different + * split by the given subscription type, it's required and quite important for end-user. Review comment: Okay I understand, but strictly speaking it's not required to set if it has a default. How about `{@link SubscriptionType} is the consuming behavior for pulsar, we would generator different split by the given subscription type. Please take time to consider which subscription type matches your application best. Default is Shared.` Any external link to better understand the implications would be very welcome. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
