syhily commented on a change in pull request #15304:
URL: https://github.com/apache/flink/pull/15304#discussion_r684572500



##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTO_CERT_REFRESH_TIME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECT_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_BUSY_WAIT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_KEEP_ALIVE_INTERVAL_SECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_LISTENER_NAME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_BACKOFF_INTERVAL_NANOS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REDIRECTS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REQUEST;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_IO_THREADS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_LISTENER_THREADS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_PROTOCOL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_READ_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SSL_PROVIDER;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_ALLOW_INSECURE_CONNECTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_CIPHERS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_PROTOCOLS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PASSWORD;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PATH;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_KEY_STORE_TLS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TCP_NO_DELAY;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TLS;
+import static 
org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils.configMap;
+
+/** The util for creating pulsar configuration class from flink's {@link 
Configuration}. */
+public final class PulsarConfigUtils {
+
+    private PulsarConfigUtils() {
+        // No need to create instance.
+    }
+
+    /**
+     * Create a common {@link ClientConfigurationData} for both pulsar source 
and sink connector by
+     * using the given {@link Configuration}. Add the configuration was listed 
in {@link
+     * PulsarSourceOptions}, and we would parse it one by one.
+     *
+     * @param configuration The flink configuration
+     */
+    public static ClientConfigurationData createClientConfig(

Review comment:
       We have set the default value fallback in config option definition. The 
option that user don't set would use the default value instead. The required 
config option would be checked in `PulsarSourceConfigUtils.checkConfigurations`

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.utils;
+
+import org.apache.pulsar.client.api.transaction.Transaction;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.client.impl.transaction.TransactionImpl;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+import static 
org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Transaction was introduced into pulsar since 2.7.0, but the interface 
{@link Transaction} didn't
+ * provide a id method until 2.8.1. We have to add this util for acquiring the 
{@link TxnID} for
+ * compatible consideration.
+ */
+@SuppressWarnings("java:S3011")
+public final class PulsarTransactionUtils {

Review comment:
       Yes, this hack must be removed after pulsar 2.8.1 release.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaInitializationContext;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createAdmin;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createClient;
+import static 
org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky;
+
+/**
+ * The Source implementation of Pulsar. Please use a {@link 
PulsarSourceBuilder} to construct a
+ * {@link PulsarSource}. The following example shows how to create a 
PulsarSource emitting records
+ * of <code>String</code> type.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource.<byte[], String>builder()
+ *     .setTopics(TOPIC1, TOPIC2)
+ *     .setServiceUrl(getServiceUrl())
+ *     .setAdminUrl(getAdminUrl())
+ *     .setSubscriptionName("test")
+ *     .setDeserializationSchema(flinkSchema(new SimpleStringSchema()))
+ *     .setBounded(StopCursor::defaultStopCursor)
+ *     .build();
+ * }</pre>
+ *
+ * <p>See {@link PulsarSourceBuilder} for more details.
+ *
+ * @param <IN> The input type of the pulsar {@link Message}.
+ * @param <OUT> The output type of the source.
+ */
+@PublicEvolving
+public final class PulsarSource<IN, OUT>
+        implements Source<OUT, PulsarPartitionSplit, PulsarSourceEnumState>,
+                ResultTypeQueryable<OUT> {
+    private static final long serialVersionUID = 7773108631275567433L;
+
+    /**
+     * The common configuration for pulsar source, we don't support the 
pulsar's configuration class
+     * directly.
+     */
+    private final Configuration configuration;
+
+    private final SourceConfiguration sourceConfiguration;
+
+    private final PulsarSubscriber subscriber;
+
+    private final RangeGenerator rangeGenerator;
+
+    private final SerializableSupplier<StartCursor> startCursorSupplier;
+
+    private final SerializableSupplier<StopCursor> stopCursorSupplier;
+
+    /**
+     * Boundedness for source, we only support {@link 
Boundedness#CONTINUOUS_UNBOUNDED} currently.
+     */
+    private final Boundedness boundedness;
+
+    /** The pulsar deserialization schema used for deserialize message. */
+    private final PulsarDeserializationSchema<IN, OUT> deserializationSchema;
+
+    /** Modify the flink generated {@link ClientConfigurationData}. */
+    private final ConfigurationDataCustomizer<ClientConfigurationData>
+            clientConfigurationCustomizer;
+
+    /** Modify the flink generated {@link ConsumerConfigurationData}. */
+    private final ConfigurationDataCustomizer<ConsumerConfigurationData<IN>>
+            consumerConfigurationCustomizer;
+
+    /** A lazy evaluated pulsar client for split reader. */
+    private transient volatile PulsarClient pulsarClient;
+
+    /** A lazy evaluated pulsar admin for source enumerator. */
+    private transient volatile PulsarAdmin pulsarAdmin;

Review comment:
       OK, that means we could just remove the `volatile` and synchronization. 
Since pulsar client has an inner thread pool which could consume a lot of 
resources. I prefer cache them here.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTO_CERT_REFRESH_TIME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECT_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_BUSY_WAIT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_KEEP_ALIVE_INTERVAL_SECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_LISTENER_NAME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_BACKOFF_INTERVAL_NANOS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REDIRECTS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REQUEST;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_IO_THREADS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_LISTENER_THREADS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_PROTOCOL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_READ_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SSL_PROVIDER;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_ALLOW_INSECURE_CONNECTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_CIPHERS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_PROTOCOLS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PASSWORD;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PATH;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_KEY_STORE_TLS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TCP_NO_DELAY;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TLS;
+import static 
org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils.configMap;
+
+/** The util for creating pulsar configuration class from flink's {@link 
Configuration}. */
+public final class PulsarConfigUtils {
+
+    private PulsarConfigUtils() {
+        // No need to create instance.
+    }
+
+    /**
+     * Create a common {@link ClientConfigurationData} for both pulsar source 
and sink connector by
+     * using the given {@link Configuration}. Add the configuration was listed 
in {@link
+     * PulsarSourceOptions}, and we would parse it one by one.
+     *
+     * @param configuration The flink configuration
+     */
+    public static ClientConfigurationData createClientConfig(
+            Configuration configuration,
+            @Nullable ConfigurationDataCustomizer<ClientConfigurationData> 
customizer) {
+        ClientConfigurationData data = new ClientConfigurationData();
+
+        // Set the properties one by one.
+        data.setServiceUrl(configuration.get(PULSAR_SERVICE_URL));
+        
data.setAuthPluginClassName(configuration.get(PULSAR_AUTH_PLUGIN_CLASS_NAME));
+        data.setAuthParams(configuration.get(PULSAR_AUTH_PARAMS));
+        data.setAuthParamMap(
+                getOptionValue(
+                        configuration,
+                        PULSAR_AUTH_PARAM_MAP,
+                        v -> PulsarJsonUtils.toMap(String.class, String.class, 
v)));
+        
data.setOperationTimeoutMs(configuration.get(PULSAR_OPERATION_TIMEOUT_MS));
+        
data.setStatsIntervalSeconds(configuration.get(PULSAR_STATS_INTERVAL_SECONDS));
+        data.setNumIoThreads(configuration.get(PULSAR_NUM_IO_THREADS));
+        
data.setNumListenerThreads(configuration.get(PULSAR_NUM_LISTENER_THREADS));
+        
data.setConnectionsPerBroker(configuration.get(PULSAR_CONNECTIONS_PER_BROKER));
+        data.setUseTcpNoDelay(configuration.get(PULSAR_USE_TCP_NO_DELAY));
+        data.setUseTls(configuration.get(PULSAR_USE_TLS));
+        
data.setTlsTrustCertsFilePath(configuration.get(PULSAR_TLS_TRUST_CERTS_FILE_PATH));
+        
data.setTlsAllowInsecureConnection(configuration.get(PULSAR_TLS_ALLOW_INSECURE_CONNECTION));
+        data.setTlsHostnameVerificationEnable(
+                configuration.get(PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE));
+        
data.setConcurrentLookupRequest(configuration.get(PULSAR_CONCURRENT_LOOKUP_REQUEST));
+        data.setMaxLookupRequest(configuration.get(PULSAR_MAX_LOOKUP_REQUEST));
+        
data.setMaxLookupRedirects(configuration.get(PULSAR_MAX_LOOKUP_REDIRECTS));
+        data.setMaxNumberOfRejectedRequestPerConnection(
+                
configuration.get(PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION));
+        
data.setKeepAliveIntervalSeconds(configuration.get(PULSAR_KEEP_ALIVE_INTERVAL_SECONDS));
+        
data.setConnectionTimeoutMs(configuration.get(PULSAR_CONNECTION_TIMEOUT_MS));
+        data.setRequestTimeoutMs(configuration.get(PULSAR_REQUEST_TIMEOUT_MS));
+        data.setInitialBackoffIntervalNanos(
+                configuration.get(PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS));
+        
data.setMaxBackoffIntervalNanos(configuration.get(PULSAR_MAX_BACKOFF_INTERVAL_NANOS));
+        data.setEnableBusyWait(configuration.get(PULSAR_ENABLE_BUSY_WAIT));
+        data.setListenerName(configuration.get(PULSAR_LISTENER_NAME));
+        data.setUseKeyStoreTls(configuration.get(PULSAR_USE_KEY_STORE_TLS));
+        data.setSslProvider(configuration.get(PULSAR_SSL_PROVIDER));
+        
data.setTlsTrustStoreType(configuration.get(PULSAR_TLS_TRUST_STORE_TYPE));
+        
data.setTlsTrustStorePath(configuration.get(PULSAR_TLS_TRUST_STORE_PATH));
+        
data.setTlsTrustStorePassword(configuration.get(PULSAR_TLS_TRUST_STORE_PASSWORD));
+        data.setTlsCiphers(
+                getOptionValue(
+                        configuration,
+                        PULSAR_TLS_CIPHERS,
+                        v -> PulsarJsonUtils.toSet(String.class, v)));
+        data.setTlsProtocols(
+                getOptionValue(
+                        configuration,
+                        PULSAR_TLS_PROTOCOLS,
+                        v -> PulsarJsonUtils.toSet(String.class, v)));
+        data.setMemoryLimitBytes(configuration.get(PULSAR_MEMORY_LIMIT_BYTES));
+        data.setProxyServiceUrl(configuration.get(PULSAR_PROXY_SERVICE_URL));
+        data.setProxyProtocol(configuration.get(PULSAR_PROXY_PROTOCOL));
+        
data.setEnableTransaction(configuration.get(PULSAR_ENABLE_TRANSACTION));
+
+        // Create Authentication instance like pulsar command line tools.
+        // The token and TLS authentication should be created by the below 
factory method.

Review comment:
       Yep, you are right. I have removed.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.deserializer;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.Collector;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import java.io.Serializable;
+
+/**
+ * A schema bridge for deserializing the pulsar's <code>Message<?></code> into 
a flink managed
+ * instance. We support both the pulsar's self managed schema and flink 
managed schema.
+ *
+ * @param <M> The decode message type from pulsar client, which would create a 
message {@link
+ *     SchemaInfo} from this type.
+ * @param <T> The output message type for sinking to downstream flink operator.
+ */
+@PublicEvolving
+public interface PulsarDeserializationSchema<M, T> extends Serializable, 
ResultTypeQueryable<T> {

Review comment:
       I think you are right. After reading this review comment carefully. 
`Schema` is a quite important feature for pulsar internal. Decode the message 
in pulsar client don't give us any benefits but complexity. Wrap the `Schema` 
into a flink `TypeInformation` and consume the message in bytes make look 
better.
   
   `Schema` have some extra [evolution and 
compatibility](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/)
 features in pulsar. This is quite important for experienced pulsar user. We 
can implement this feature in the future and receive a `Message<byte[]>` now.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.topic;
+
+import org.apache.pulsar.common.naming.TopicName;
+
+/** util for topic name. */
+public final class TopicNameUtils {

Review comment:
       Nope.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java
##########
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.config;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils;
+import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.function.Function;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTO_CERT_REFRESH_TIME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECT_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_BUSY_WAIT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_KEEP_ALIVE_INTERVAL_SECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_LISTENER_NAME;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_BACKOFF_INTERVAL_NANOS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REDIRECTS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REQUEST;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_IO_THREADS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_LISTENER_THREADS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_PROTOCOL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_READ_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT_MS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SSL_PROVIDER;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_ALLOW_INSECURE_CONNECTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_CIPHERS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_PROTOCOLS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PASSWORD;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PATH;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_KEY_STORE_TLS;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TCP_NO_DELAY;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TLS;
+import static 
org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky;
+import static 
org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils.configMap;
+
+/** The util for creating pulsar configuration class from flink's {@link 
Configuration}. */
+public final class PulsarConfigUtils {
+
+    private PulsarConfigUtils() {
+        // No need to create instance.
+    }
+
+    /**
+     * Create a common {@link ClientConfigurationData} for both pulsar source 
and sink connector by
+     * using the given {@link Configuration}. Add the configuration was listed 
in {@link
+     * PulsarSourceOptions}, and we would parse it one by one.
+     *
+     * @param configuration The flink configuration
+     */
+    public static ClientConfigurationData createClientConfig(
+            Configuration configuration,
+            @Nullable ConfigurationDataCustomizer<ClientConfigurationData> 
customizer) {
+        ClientConfigurationData data = new ClientConfigurationData();
+
+        // Set the properties one by one.
+        data.setServiceUrl(configuration.get(PULSAR_SERVICE_URL));
+        
data.setAuthPluginClassName(configuration.get(PULSAR_AUTH_PLUGIN_CLASS_NAME));
+        data.setAuthParams(configuration.get(PULSAR_AUTH_PARAMS));
+        data.setAuthParamMap(
+                getOptionValue(
+                        configuration,
+                        PULSAR_AUTH_PARAM_MAP,
+                        v -> PulsarJsonUtils.toMap(String.class, String.class, 
v)));
+        
data.setOperationTimeoutMs(configuration.get(PULSAR_OPERATION_TIMEOUT_MS));
+        
data.setStatsIntervalSeconds(configuration.get(PULSAR_STATS_INTERVAL_SECONDS));
+        data.setNumIoThreads(configuration.get(PULSAR_NUM_IO_THREADS));
+        
data.setNumListenerThreads(configuration.get(PULSAR_NUM_LISTENER_THREADS));
+        
data.setConnectionsPerBroker(configuration.get(PULSAR_CONNECTIONS_PER_BROKER));
+        data.setUseTcpNoDelay(configuration.get(PULSAR_USE_TCP_NO_DELAY));
+        data.setUseTls(configuration.get(PULSAR_USE_TLS));
+        
data.setTlsTrustCertsFilePath(configuration.get(PULSAR_TLS_TRUST_CERTS_FILE_PATH));
+        
data.setTlsAllowInsecureConnection(configuration.get(PULSAR_TLS_ALLOW_INSECURE_CONNECTION));
+        data.setTlsHostnameVerificationEnable(
+                configuration.get(PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE));
+        
data.setConcurrentLookupRequest(configuration.get(PULSAR_CONCURRENT_LOOKUP_REQUEST));
+        data.setMaxLookupRequest(configuration.get(PULSAR_MAX_LOOKUP_REQUEST));
+        
data.setMaxLookupRedirects(configuration.get(PULSAR_MAX_LOOKUP_REDIRECTS));
+        data.setMaxNumberOfRejectedRequestPerConnection(
+                
configuration.get(PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION));
+        
data.setKeepAliveIntervalSeconds(configuration.get(PULSAR_KEEP_ALIVE_INTERVAL_SECONDS));
+        
data.setConnectionTimeoutMs(configuration.get(PULSAR_CONNECTION_TIMEOUT_MS));
+        data.setRequestTimeoutMs(configuration.get(PULSAR_REQUEST_TIMEOUT_MS));
+        data.setInitialBackoffIntervalNanos(
+                configuration.get(PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS));
+        
data.setMaxBackoffIntervalNanos(configuration.get(PULSAR_MAX_BACKOFF_INTERVAL_NANOS));
+        data.setEnableBusyWait(configuration.get(PULSAR_ENABLE_BUSY_WAIT));
+        data.setListenerName(configuration.get(PULSAR_LISTENER_NAME));
+        data.setUseKeyStoreTls(configuration.get(PULSAR_USE_KEY_STORE_TLS));
+        data.setSslProvider(configuration.get(PULSAR_SSL_PROVIDER));
+        
data.setTlsTrustStoreType(configuration.get(PULSAR_TLS_TRUST_STORE_TYPE));
+        
data.setTlsTrustStorePath(configuration.get(PULSAR_TLS_TRUST_STORE_PATH));
+        
data.setTlsTrustStorePassword(configuration.get(PULSAR_TLS_TRUST_STORE_PASSWORD));
+        data.setTlsCiphers(
+                getOptionValue(
+                        configuration,
+                        PULSAR_TLS_CIPHERS,
+                        v -> PulsarJsonUtils.toSet(String.class, v)));
+        data.setTlsProtocols(
+                getOptionValue(
+                        configuration,
+                        PULSAR_TLS_PROTOCOLS,
+                        v -> PulsarJsonUtils.toSet(String.class, v)));
+        data.setMemoryLimitBytes(configuration.get(PULSAR_MEMORY_LIMIT_BYTES));
+        data.setProxyServiceUrl(configuration.get(PULSAR_PROXY_SERVICE_URL));
+        data.setProxyProtocol(configuration.get(PULSAR_PROXY_PROTOCOL));
+        
data.setEnableTransaction(configuration.get(PULSAR_ENABLE_TRANSACTION));
+
+        // Create Authentication instance like pulsar command line tools.
+        // The token and TLS authentication should be created by the below 
factory method.
+        data.setAuthentication(createAuthentication(configuration));
+
+        if (customizer != null) {
+            customizer.customize(data);
+        }
+
+        return data;
+    }
+
+    /**
+     * Create the {@link Authentication} instance for both {@code 
PulsarClient} and {@code
+     * PulsarAdmin}. If the user didn't provide configuration, a {@link 
AuthenticationDisabled}
+     * instance would be returned.
+     */
+    public static Authentication createAuthentication(Configuration 
configuration) {
+        if (configuration.contains(PULSAR_AUTH_PLUGIN_CLASS_NAME)) {
+            String authPluginClassName = 
configuration.get(PULSAR_AUTH_PLUGIN_CLASS_NAME);
+
+            if (configuration.contains(PULSAR_AUTH_PARAMS)) {
+                String authParamsString = 
configuration.get(PULSAR_AUTH_PARAMS);
+                return sneaky(

Review comment:
       `UnsupportedAuthenticationException` is the exception that 
`AuthenticationFactory` used. It's no need to recovery if the authentication 
config is incorrect. So we could just sneaky throws this exception and let 
flink crash.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
##########
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.common.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+
+import org.apache.pulsar.client.api.ProxyProtocol;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Configuration for Pulsar Client, these config options would be used for 
both source, sink and
+ * table.
+ */
+@PublicEvolving
+public final class PulsarOptions {

Review comment:
       I have exposed the required options in source builder methods, these 
options are supposed to be used for experienced user. And a document should be 
provided for clarifying these. I'll submit the document next monday.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaInitializationContext;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createAdmin;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createClient;
+import static 
org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky;
+
+/**
+ * The Source implementation of Pulsar. Please use a {@link 
PulsarSourceBuilder} to construct a
+ * {@link PulsarSource}. The following example shows how to create a 
PulsarSource emitting records
+ * of <code>String</code> type.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource.<byte[], String>builder()
+ *     .setTopics(TOPIC1, TOPIC2)
+ *     .setServiceUrl(getServiceUrl())
+ *     .setAdminUrl(getAdminUrl())
+ *     .setSubscriptionName("test")
+ *     .setDeserializationSchema(flinkSchema(new SimpleStringSchema()))
+ *     .setBounded(StopCursor::defaultStopCursor)
+ *     .build();
+ * }</pre>
+ *
+ * <p>See {@link PulsarSourceBuilder} for more details.
+ *
+ * @param <IN> The input type of the pulsar {@link Message}.
+ * @param <OUT> The output type of the source.
+ */
+@PublicEvolving
+public final class PulsarSource<IN, OUT>
+        implements Source<OUT, PulsarPartitionSplit, PulsarSourceEnumState>,
+                ResultTypeQueryable<OUT> {
+    private static final long serialVersionUID = 7773108631275567433L;
+
+    /**
+     * The common configuration for pulsar source, we don't support the 
pulsar's configuration class
+     * directly.
+     */
+    private final Configuration configuration;
+
+    private final SourceConfiguration sourceConfiguration;
+
+    private final PulsarSubscriber subscriber;
+
+    private final RangeGenerator rangeGenerator;
+
+    private final SerializableSupplier<StartCursor> startCursorSupplier;
+
+    private final SerializableSupplier<StopCursor> stopCursorSupplier;
+
+    /**
+     * Boundedness for source, we only support {@link 
Boundedness#CONTINUOUS_UNBOUNDED} currently.
+     */
+    private final Boundedness boundedness;
+
+    /** The pulsar deserialization schema used for deserialize message. */
+    private final PulsarDeserializationSchema<IN, OUT> deserializationSchema;
+
+    /** Modify the flink generated {@link ClientConfigurationData}. */
+    private final ConfigurationDataCustomizer<ClientConfigurationData>
+            clientConfigurationCustomizer;
+
+    /** Modify the flink generated {@link ConsumerConfigurationData}. */
+    private final ConfigurationDataCustomizer<ConsumerConfigurationData<IN>>
+            consumerConfigurationCustomizer;
+
+    /** A lazy evaluated pulsar client for split reader. */
+    private transient volatile PulsarClient pulsarClient;
+
+    /** A lazy evaluated pulsar admin for source enumerator. */
+    private transient volatile PulsarAdmin pulsarAdmin;
+
+    /**
+     * The constructor for PulsarSource, it's package protected for forcing 
using {@link
+     * PulsarSourceBuilder}.
+     */
+    PulsarSource(
+            Configuration configuration,
+            PulsarSubscriber subscriber,
+            RangeGenerator rangeGenerator,
+            SerializableSupplier<StartCursor> startCursorSupplier,
+            SerializableSupplier<StopCursor> stopCursorSupplier,
+            Boundedness boundedness,
+            PulsarDeserializationSchema<IN, OUT> deserializationSchema,
+            ConfigurationDataCustomizer<ClientConfigurationData> 
clientConfigurationCustomizer,
+            ConfigurationDataCustomizer<ConsumerConfigurationData<IN>>
+                    consumerConfigurationCustomizer) {
+        // Since these implementation could be a lambda, make sure they are 
serializable.
+        ClosureCleaner.clean(
+                startCursorSupplier, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+        ClosureCleaner.clean(
+                stopCursorSupplier, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+        ClosureCleaner.clean(
+                clientConfigurationCustomizer, 
ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true);
+        ClosureCleaner.clean(
+                consumerConfigurationCustomizer,
+                ExecutionConfig.ClosureCleanerLevel.RECURSIVE,
+                true);

Review comment:
       Yep, you are right.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
##########
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import 
org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+import static java.lang.Boolean.FALSE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES;
+import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The builder class for {@link PulsarSource} to make it easier for the users 
to construct a {@link
+ * PulsarSource}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSource 
that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .build();
+ * }</pre>
+ *
+ * <p>The service url, admin url, subscription name, topics to consume, and 
the record deserializer
+ * are required fields that must be set.
+ *
+ * <p>To specify the starting position of PulsarSource, one can call {@link
+ * #setStartCursorSupplier(SerializableSupplier)}.
+ *
+ * <p>By default the PulsarSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and never
+ * stop until the Flink job is canceled or fails. To let the PulsarSource run 
in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can 
call {@link
+ * #setUnbounded(SerializableSupplier)}. For example the following 
PulsarSource stops after it
+ * consumes up to a event time when the Flink started.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis()))
+ *     .build();
+ * }</pre>
+ *
+ * @param <IN> The input type of the pulsar {@link Message <?>}
+ * @param <OUT> The output type of the source.
+ */
+@PublicEvolving
+public final class PulsarSourceBuilder<IN, OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceBuilder.class);
+
+    private final Configuration configuration;
+    private PulsarSubscriber subscriber;
+    private RangeGenerator rangeGenerator;
+    private SerializableSupplier<StartCursor> startCursorSupplier;
+    private SerializableSupplier<StopCursor> stopCursorSupplier;
+    private Boundedness boundedness;
+    private PulsarDeserializationSchema<IN, OUT> deserializationSchema;
+    private ConfigurationDataCustomizer<ClientConfigurationData> 
clientConfigurationCustomizer;
+    private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>>
+            consumerConfigurationCustomizer;
+
+    // private builder constructor.
+    PulsarSourceBuilder() {
+        // The default configuration holder.
+        this.configuration = new Configuration();
+    }
+
+    /**
+     * Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+     *
+     * @param adminUrl the url for the PulsarAdmin.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) {
+        configuration.set(PULSAR_ADMIN_URL, adminUrl);
+        return this;
+    }
+
+    /**
+     * Sets the server's link for the PulsarConsumer of the PulsarSource.
+     *
+     * @param serviceUrl the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) {
+        configuration.set(PULSAR_SERVICE_URL, serviceUrl);
+        return this;
+    }
+
+    /**
+     * Sets the name for this pulsar subscription.
+     *
+     * @param subscriptionName the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String 
subscriptionName) {
+        configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
+        return this;
+    }
+
+    /**
+     * {@link SubscriptionType} is the consuming behavior for pulsar, we would 
generator different
+     * split by the given subscription type, it's required and quite important 
for end-user.
+     *
+     * @param subscriptionType The type of subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setSubscriptionType(SubscriptionType 
subscriptionType) {
+        configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);

Review comment:
       Yep, this is quit important. I added.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
##########
@@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.description.Description;
+import org.apache.flink.connector.pulsar.source.config.CursorVerification;
+
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.configuration.description.TextElement.code;
+import static org.apache.flink.configuration.description.TextElement.text;
+
+/**
+ * Configurations for PulsarSource. All these options would be listed in 
pulsar flink connector
+ * documentation.
+ */
+@PublicEvolving
+public final class PulsarSourceOptions {

Review comment:
       The `PulsarOptions` is used for both pulsar source & sink. The 
`PulsarSourceOptions` could only be used for pulsar source. I'll add some 
clarification in java doc.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.deserializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.ReflectionUtil;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+/**
+ * The deserialization schema wrapper for pulsar original {@link Schema}. 
Pulsar would deserialize
+ * the message and pass it to flink with a auto generate or given {@link 
TypeInformation}.
+ *
+ * @param <T> The output type of the message.
+ */
+@Internal
+class PulsarSchemaWrapper<T> extends PulsarDeserializationSchemaBase<T, T> {
+    private static final long serialVersionUID = -4864701207257059158L;
+
+    public PulsarSchemaWrapper(PulsarSchemaFactory<T> schemaFactory) {
+        this(schemaFactory, createTypeInformation(schemaFactory));
+    }
+
+    public PulsarSchemaWrapper(Class<? extends PulsarSchemaFactory<T>> 
factoryClass) {
+        this(
+                ReflectionUtil.newInstance(factoryClass),
+                
createTypeInformation(ReflectionUtil.newInstance(factoryClass)));
+    }
+
+    public PulsarSchemaWrapper(
+            PulsarSchemaFactory<T> schemaFactory, TypeInformation<T> 
typeInformation) {
+        super(
+                schemaFactory,
+                message -> Collections.singletonList(message.getValue()),
+                typeInformation);
+    }
+
+    /**
+     * Convert the {@link Schema} into a flink readable {@link 
TypeInformation}. We only support all
+     * the primitive types in pulsar built-in schema.
+     *
+     * <p>Please visit <a
+     * 
href="ttp://pulsar.apache.org/docs/en/schema-understand/#schema-type">pulsar
+     * documentation</a> for detailed schema type clarify.
+     */
+    @SuppressWarnings("unchecked")
+    private static <T> TypeInformation<T> createTypeInformation(
+            PulsarSchemaFactory<T> schemaFactory) {
+        Schema<T> schema = schemaFactory.create();
+        // SchemaInfo contains all the required information for deserializing.
+        SchemaInfo schemaInfo = schema.getSchemaInfo();
+        SchemaType schemaType = schemaInfo.getType();
+
+        TypeInformation<?> information = null;
+        switch (schemaType) {
+            case STRING:
+                information = Types.STRING;
+                break;
+            case BOOLEAN:
+                information = Types.BOOLEAN;
+                break;
+            case INT8:
+                information = Types.BYTE;
+                break;
+            case INT16:
+                information = Types.SHORT;
+                break;
+            case INT32:
+                information = Types.INT;
+                break;
+            case INT64:
+                information = Types.LONG;
+                break;
+            case FLOAT:
+                information = Types.FLOAT;
+                break;
+            case DOUBLE:
+                information = Types.DOUBLE;
+                break;
+            case DATE:
+                // Since pulsar use this type for both util.Date and sql.Date,
+                // we just choose util.Date here.
+                information = BasicTypeInfo.DATE_TYPE_INFO;
+                break;
+            case TIME:
+                information = Types.SQL_TIME;
+                break;
+            case TIMESTAMP:
+                information = Types.SQL_TIMESTAMP;
+                break;
+            case INSTANT:
+                information = Types.INSTANT;
+                break;
+            case LOCAL_DATE:
+                information = Types.LOCAL_DATE;
+                break;
+            case LOCAL_TIME:
+                information = Types.LOCAL_TIME;
+                break;
+            case LOCAL_DATE_TIME:
+                information = Types.LOCAL_DATE_TIME;
+                break;
+            case BYTES:
+                information = Types.PRIMITIVE_ARRAY(Types.BYTE);
+                break;
+        }
+
+        if (information == null) {
+            // Try to extract the type info by using flink provided utils.
+            try {
+                // Support protobuf class after flink support it natively.

Review comment:
       Yep, the tick should be created.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
##########
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import 
org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+import static java.lang.Boolean.FALSE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES;
+import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The builder class for {@link PulsarSource} to make it easier for the users 
to construct a {@link
+ * PulsarSource}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSource 
that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .build();
+ * }</pre>
+ *
+ * <p>The service url, admin url, subscription name, topics to consume, and 
the record deserializer
+ * are required fields that must be set.
+ *
+ * <p>To specify the starting position of PulsarSource, one can call {@link
+ * #setStartCursorSupplier(SerializableSupplier)}.
+ *
+ * <p>By default the PulsarSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and never
+ * stop until the Flink job is canceled or fails. To let the PulsarSource run 
in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can 
call {@link
+ * #setUnbounded(SerializableSupplier)}. For example the following 
PulsarSource stops after it
+ * consumes up to a event time when the Flink started.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis()))
+ *     .build();
+ * }</pre>
+ *
+ * @param <IN> The input type of the pulsar {@link Message <?>}
+ * @param <OUT> The output type of the source.
+ */
+@PublicEvolving
+public final class PulsarSourceBuilder<IN, OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceBuilder.class);
+
+    private final Configuration configuration;
+    private PulsarSubscriber subscriber;
+    private RangeGenerator rangeGenerator;
+    private SerializableSupplier<StartCursor> startCursorSupplier;
+    private SerializableSupplier<StopCursor> stopCursorSupplier;
+    private Boundedness boundedness;
+    private PulsarDeserializationSchema<IN, OUT> deserializationSchema;
+    private ConfigurationDataCustomizer<ClientConfigurationData> 
clientConfigurationCustomizer;
+    private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>>
+            consumerConfigurationCustomizer;
+
+    // private builder constructor.
+    PulsarSourceBuilder() {
+        // The default configuration holder.
+        this.configuration = new Configuration();
+    }
+
+    /**
+     * Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+     *
+     * @param adminUrl the url for the PulsarAdmin.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) {
+        configuration.set(PULSAR_ADMIN_URL, adminUrl);
+        return this;
+    }
+
+    /**
+     * Sets the server's link for the PulsarConsumer of the PulsarSource.
+     *
+     * @param serviceUrl the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) {
+        configuration.set(PULSAR_SERVICE_URL, serviceUrl);
+        return this;
+    }
+
+    /**
+     * Sets the name for this pulsar subscription.
+     *
+     * @param subscriptionName the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String 
subscriptionName) {
+        configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
+        return this;
+    }
+
+    /**
+     * {@link SubscriptionType} is the consuming behavior for pulsar, we would 
generator different
+     * split by the given subscription type, it's required and quite important 
for end-user.

Review comment:
       `SubscriptionType` could be a very important feature for pulsar, the 
consuming behaviors are also different among these subscription types. The 
default subscription type is `Shared`. User don't required to set subscription 
type in source builder.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicMetadata.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.topic;
+
+import static 
org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED;
+
+/** The pojo class for pulsar topic metadata information. */
+public final class TopicMetadata {
+
+    /**
+     * The name of the topic, it would be a {@link 
TopicNameUtils#topicName(String)} which don't
+     * contains partition information.
+     */
+    private final String name;
+
+    /** If this topic is a partitioned topic. */
+    private final boolean partitioned;
+
+    /** The size for a partitioned topic. It would be zero for non-partitioned 
topic. */
+    private final int partitionSize;
+
+    public TopicMetadata(String name, int partitionSize) {
+        this.name = name;
+        this.partitioned = partitionSize != NON_PARTITIONED;

Review comment:
       👍 That sounds better.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl;
+
+import 
org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static java.util.stream.Collectors.toList;
+
+/** PulsarSubscriber abstract class to simplify Pulsar admin related 
operations. */
+public abstract class BasePulsarSubscriber implements PulsarSubscriber {
+    private static final long serialVersionUID = 2053021503331058888L;
+
+    protected TopicMetadata queryTopicMetadata(PulsarAdmin pulsarAdmin, String 
topicName) {
+        // Drop the complete topic name for a clean partitioned topic name.
+        String completeTopicName = TopicNameUtils.topicName(topicName);
+        try {
+            PartitionedTopicMetadata metadata =
+                    
pulsarAdmin.topics().getPartitionedTopicMetadata(completeTopicName);
+            return new TopicMetadata(topicName, metadata.partitions);
+        } catch (PulsarAdminException e) {
+            if (e.getStatusCode() == 404) {
+                // Return null for skipping the topic metadata query.
+                return null;
+            } else {
+                // This method would cause the failure for subscriber.
+                throw new PulsarRuntimeException(e);
+            }
+        }
+    }
+
+    protected List<TopicPartition> createPartitions(

Review comment:
       👍 

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.topic;
+
+import org.apache.pulsar.client.api.Range;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName;
+import static 
org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Metadata of a partitioned topic. */

Review comment:
       It's an internal API. The javadoc could be confused, I would rewrite it.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
##########
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import 
org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+import static java.lang.Boolean.FALSE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES;
+import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The builder class for {@link PulsarSource} to make it easier for the users 
to construct a {@link
+ * PulsarSource}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSource 
that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .build();
+ * }</pre>
+ *
+ * <p>The service url, admin url, subscription name, topics to consume, and 
the record deserializer
+ * are required fields that must be set.
+ *
+ * <p>To specify the starting position of PulsarSource, one can call {@link
+ * #setStartCursorSupplier(SerializableSupplier)}.
+ *
+ * <p>By default the PulsarSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and never
+ * stop until the Flink job is canceled or fails. To let the PulsarSource run 
in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can 
call {@link
+ * #setUnbounded(SerializableSupplier)}. For example the following 
PulsarSource stops after it
+ * consumes up to a event time when the Flink started.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis()))
+ *     .build();
+ * }</pre>
+ *
+ * @param <IN> The input type of the pulsar {@link Message <?>}
+ * @param <OUT> The output type of the source.
+ */
+@PublicEvolving
+public final class PulsarSourceBuilder<IN, OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceBuilder.class);
+
+    private final Configuration configuration;
+    private PulsarSubscriber subscriber;
+    private RangeGenerator rangeGenerator;
+    private SerializableSupplier<StartCursor> startCursorSupplier;
+    private SerializableSupplier<StopCursor> stopCursorSupplier;
+    private Boundedness boundedness;
+    private PulsarDeserializationSchema<IN, OUT> deserializationSchema;
+    private ConfigurationDataCustomizer<ClientConfigurationData> 
clientConfigurationCustomizer;
+    private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>>
+            consumerConfigurationCustomizer;
+
+    // private builder constructor.
+    PulsarSourceBuilder() {
+        // The default configuration holder.
+        this.configuration = new Configuration();
+    }
+
+    /**
+     * Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+     *
+     * @param adminUrl the url for the PulsarAdmin.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) {
+        configuration.set(PULSAR_ADMIN_URL, adminUrl);
+        return this;
+    }
+
+    /**
+     * Sets the server's link for the PulsarConsumer of the PulsarSource.
+     *
+     * @param serviceUrl the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) {
+        configuration.set(PULSAR_SERVICE_URL, serviceUrl);
+        return this;
+    }
+
+    /**
+     * Sets the name for this pulsar subscription.
+     *
+     * @param subscriptionName the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String 
subscriptionName) {
+        configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
+        return this;
+    }
+
+    /**
+     * {@link SubscriptionType} is the consuming behavior for pulsar, we would 
generator different
+     * split by the given subscription type, it's required and quite important 
for end-user.
+     *
+     * @param subscriptionType The type of subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setSubscriptionType(SubscriptionType 
subscriptionType) {
+        configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+        return this;
+    }
+
+    /**
+     * Set a pulsar topic list for flink source. Some topic may not exist 
currently, consuming this
+     * non-existed topic wouldn't throw any exception. But the best solution 
is just consuming by
+     * using a topic regex. This method is conflict with {@code 
setTopicPattern}.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopics(String... topics) {
+        return setTopics(Arrays.asList(topics));
+    }
+
+    /**
+     * Set a pulsar topic list for flink source. Some topic may not exist 
currently, consuming this
+     * non-existed topic wouldn't throw any exception. But the best solution 
is just consuming by
+     * using a topic regex. This method is conflict with {@code 
setTopicPattern}.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopics(List<String> topics) {
+        ensureSubscriberIsNull("topics");
+        configuration.set(PULSAR_TOPIC_NAMES, 
PulsarJsonUtils.toString(topics));
+
+        this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics);
+
+        return this;
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(String topicsPattern) {
+        return setTopicPattern(Pattern.compile(topicsPattern));
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(Pattern topicsPattern) 
{
+        return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics);
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode The topic filter for regex subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(
+            String topicsPattern, RegexSubscriptionMode regexSubscriptionMode) 
{
+        return setTopicPattern(Pattern.compile(topicsPattern), 
regexSubscriptionMode);
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode The topic filter for regex subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(
+            Pattern topicsPattern, RegexSubscriptionMode 
regexSubscriptionMode) {
+        ensureSubscriberIsNull("topic pattern");
+        configuration.set(PULSAR_TOPICS_PATTERN, topicsPattern.toString());
+        this.subscriber =
+                PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, 
regexSubscriptionMode);
+
+        return setRegexSubscriptionMode(regexSubscriptionMode);
+    }
+
+    /**
+     * When subscribing to a topic using a regular expression, you can pick a 
certain type of
+     * topics.
+     *
+     * @param regexSubscriptionMode The topic type.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setRegexSubscriptionMode(
+            RegexSubscriptionMode regexSubscriptionMode) {
+        configuration.set(PULSAR_REGEX_SUBSCRIPTION_MODE, 
regexSubscriptionMode);
+        return this;
+    }
+
+    /**
+     * Set a topic range generator for Key_Shared subscription.
+     *
+     * @param rangeGenerator A generator which would generate a set of {@link 
TopicRange} for given
+     *     topic.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setRangeGenerator(RangeGenerator 
rangeGenerator) {
+        this.rangeGenerator = rangeGenerator;
+        if (configuration.contains(PULSAR_SUBSCRIPTION_TYPE)) {
+            SubscriptionType subscriptionType = 
configuration.get(PULSAR_SUBSCRIPTION_TYPE);
+            if (subscriptionType != SubscriptionType.Key_Shared) {
+                LOG.warn(
+                        "Key_Shared subscription should be used for custom 
rangeGenerator instead of {}",
+                        subscriptionType);
+            }
+        }
+
+        return this;
+    }
+
+    /**
+     * Specify from which offsets the PulsarSource should start consume from 
by providing an {@link
+     * StartCursor}.
+     *
+     * @param startCursorSupplier the supplier providing a {@link StartCursor} 
which set the
+     *     starting offsets for the Source.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setStartCursorSupplier(
+            SerializableSupplier<StartCursor> startCursorSupplier) {

Review comment:
       `StartCursor` could be implemented by end user. It could have 
independent state in `PulsarPartitionSplitState`.
   So different Cursor couldn't share the same instance.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java
##########
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import 
org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException;
+import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.regex.Pattern;
+
+import static java.lang.Boolean.FALSE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN;
+import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES;
+import static 
org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The builder class for {@link PulsarSource} to make it easier for the users 
to construct a {@link
+ * PulsarSource}.
+ *
+ * <p>The following example shows the minimum setup to create a PulsarSource 
that reads the String
+ * values from a Pulsar topic.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .build();
+ * }</pre>
+ *
+ * <p>The service url, admin url, subscription name, topics to consume, and 
the record deserializer
+ * are required fields that must be set.
+ *
+ * <p>To specify the starting position of PulsarSource, one can call {@link
+ * #setStartCursorSupplier(SerializableSupplier)}.
+ *
+ * <p>By default the PulsarSource runs in an {@link 
Boundedness#CONTINUOUS_UNBOUNDED} mode and never
+ * stop until the Flink job is canceled or fails. To let the PulsarSource run 
in {@link
+ * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can 
call {@link
+ * #setUnbounded(SerializableSupplier)}. For example the following 
PulsarSource stops after it
+ * consumes up to a event time when the Flink started.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource
+ *     .<byte[], String>builder()
+ *     .setServiceUrl(PULSAR_BROKER_URL)
+ *     .setAdminUrl(PULSAR_BROKER_HTTP_URL)
+ *     .setSubscriptionName("flink-source-1")
+ *     .setTopics(Arrays.asList(TOPIC1, TOPIC2))
+ *     
.setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING))
+ *     .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis()))
+ *     .build();
+ * }</pre>
+ *
+ * @param <IN> The input type of the pulsar {@link Message <?>}
+ * @param <OUT> The output type of the source.
+ */
+@PublicEvolving
+public final class PulsarSourceBuilder<IN, OUT> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PulsarSourceBuilder.class);
+
+    private final Configuration configuration;
+    private PulsarSubscriber subscriber;
+    private RangeGenerator rangeGenerator;
+    private SerializableSupplier<StartCursor> startCursorSupplier;
+    private SerializableSupplier<StopCursor> stopCursorSupplier;
+    private Boundedness boundedness;
+    private PulsarDeserializationSchema<IN, OUT> deserializationSchema;
+    private ConfigurationDataCustomizer<ClientConfigurationData> 
clientConfigurationCustomizer;
+    private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>>
+            consumerConfigurationCustomizer;
+
+    // private builder constructor.
+    PulsarSourceBuilder() {
+        // The default configuration holder.
+        this.configuration = new Configuration();
+    }
+
+    /**
+     * Sets the admin endpoint for the PulsarAdmin of the PulsarSource.
+     *
+     * @param adminUrl the url for the PulsarAdmin.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) {
+        configuration.set(PULSAR_ADMIN_URL, adminUrl);
+        return this;
+    }
+
+    /**
+     * Sets the server's link for the PulsarConsumer of the PulsarSource.
+     *
+     * @param serviceUrl the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) {
+        configuration.set(PULSAR_SERVICE_URL, serviceUrl);
+        return this;
+    }
+
+    /**
+     * Sets the name for this pulsar subscription.
+     *
+     * @param subscriptionName the server url of the Pulsar cluster.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String 
subscriptionName) {
+        configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName);
+        return this;
+    }
+
+    /**
+     * {@link SubscriptionType} is the consuming behavior for pulsar, we would 
generator different
+     * split by the given subscription type, it's required and quite important 
for end-user.
+     *
+     * @param subscriptionType The type of subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setSubscriptionType(SubscriptionType 
subscriptionType) {
+        configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType);
+        return this;
+    }
+
+    /**
+     * Set a pulsar topic list for flink source. Some topic may not exist 
currently, consuming this
+     * non-existed topic wouldn't throw any exception. But the best solution 
is just consuming by
+     * using a topic regex. This method is conflict with {@code 
setTopicPattern}.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopics(String... topics) {
+        return setTopics(Arrays.asList(topics));
+    }
+
+    /**
+     * Set a pulsar topic list for flink source. Some topic may not exist 
currently, consuming this
+     * non-existed topic wouldn't throw any exception. But the best solution 
is just consuming by
+     * using a topic regex. This method is conflict with {@code 
setTopicPattern}.
+     *
+     * @param topics The topic list you would like to consume message.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopics(List<String> topics) {
+        ensureSubscriberIsNull("topics");
+        configuration.set(PULSAR_TOPIC_NAMES, 
PulsarJsonUtils.toString(topics));
+
+        this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics);
+
+        return this;
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(String topicsPattern) {
+        return setTopicPattern(Pattern.compile(topicsPattern));
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(Pattern topicsPattern) 
{
+        return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics);
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode The topic filter for regex subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(
+            String topicsPattern, RegexSubscriptionMode regexSubscriptionMode) 
{
+        return setTopicPattern(Pattern.compile(topicsPattern), 
regexSubscriptionMode);
+    }
+
+    /**
+     * Set a topic pattern to consume from use the java {@link Pattern}.
+     *
+     * @param topicsPattern the pattern of the topic name to consume from.
+     * @param regexSubscriptionMode The topic filter for regex subscription.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setTopicPattern(
+            Pattern topicsPattern, RegexSubscriptionMode 
regexSubscriptionMode) {
+        ensureSubscriberIsNull("topic pattern");
+        configuration.set(PULSAR_TOPICS_PATTERN, topicsPattern.toString());
+        this.subscriber =
+                PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, 
regexSubscriptionMode);
+
+        return setRegexSubscriptionMode(regexSubscriptionMode);
+    }
+
+    /**
+     * When subscribing to a topic using a regular expression, you can pick a 
certain type of
+     * topics.
+     *
+     * @param regexSubscriptionMode The topic type.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setRegexSubscriptionMode(
+            RegexSubscriptionMode regexSubscriptionMode) {
+        configuration.set(PULSAR_REGEX_SUBSCRIPTION_MODE, 
regexSubscriptionMode);
+        return this;
+    }
+
+    /**
+     * Set a topic range generator for Key_Shared subscription.
+     *
+     * @param rangeGenerator A generator which would generate a set of {@link 
TopicRange} for given
+     *     topic.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setRangeGenerator(RangeGenerator 
rangeGenerator) {
+        this.rangeGenerator = rangeGenerator;
+        if (configuration.contains(PULSAR_SUBSCRIPTION_TYPE)) {
+            SubscriptionType subscriptionType = 
configuration.get(PULSAR_SUBSCRIPTION_TYPE);
+            if (subscriptionType != SubscriptionType.Key_Shared) {
+                LOG.warn(
+                        "Key_Shared subscription should be used for custom 
rangeGenerator instead of {}",
+                        subscriptionType);
+            }
+        }
+
+        return this;
+    }
+
+    /**
+     * Specify from which offsets the PulsarSource should start consume from 
by providing an {@link
+     * StartCursor}.
+     *
+     * @param startCursorSupplier the supplier providing a {@link StartCursor} 
which set the
+     *     starting offsets for the Source.
+     * @return this PulsarSourceBuilder.
+     */
+    public PulsarSourceBuilder<IN, OUT> setStartCursorSupplier(
+            SerializableSupplier<StartCursor> startCursorSupplier) {
+        this.startCursorSupplier = startCursorSupplier;
+        return this;
+    }
+
+    /**
+     * By default the PulsarSource is set to run in {@link 
Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * and thus never stops until the Flink job fails or is canceled. To let 
the PulsarSource run as
+     * a streaming source but still stops at some point, one can set an {@link 
StartCursor} to
+     * specify the stopping offsets for each partition. When all the 
partitions have reached their
+     * stopping offsets, the PulsarSource will then exit.
+     *
+     * <p>This method is different from {@link 
#setBounded(SerializableSupplier)} that after setting
+     * the stopping offsets with this method, {@link 
PulsarSource#getBoundedness()} will still
+     * return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will 
stop at the stopping
+     * offsets specified by the stopping offsets {@link StartCursor}.
+     *
+     * @param stopCursorSupplier The {@link StopCursor} to specify the 
stopping offset.
+     * @return this PulsarSourceBuilder.
+     * @see #setBounded(SerializableSupplier)
+     */
+    public PulsarSourceBuilder<IN, OUT> setUnbounded(
+            SerializableSupplier<StopCursor> stopCursorSupplier) {
+        this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED;
+        this.stopCursorSupplier = stopCursorSupplier;
+        return this;
+    }
+
+    /**
+     * By default the PulsarSource is set to run in {@link 
Boundedness#CONTINUOUS_UNBOUNDED} manner
+     * and thus never stops until the Flink job fails or is canceled. To let 
the PulsarSource run in
+     * {@link Boundedness#BOUNDED} manner and stops at some point, one can set 
an {@link
+     * StartCursor} to specify the stopping offsets for each partition. When 
all the partitions have
+     * reached their stopping offsets, the PulsarSource will then exit.
+     *
+     * <p>This method is different from {@link 
#setUnbounded(SerializableSupplier)} that after
+     * setting the stopping offsets with this method, {@link 
PulsarSource#getBoundedness()} will
+     * return {@link Boundedness#BOUNDED} instead of {@link 
Boundedness#CONTINUOUS_UNBOUNDED}.
+     *
+     * @param stopCursorSupplier the {@link StopCursor} to specify the 
stopping offsets.
+     * @return this PulsarSourceBuilder.
+     * @see #setUnbounded(SerializableSupplier)
+     */
+    public PulsarSourceBuilder<IN, OUT> setBounded(
+            SerializableSupplier<StopCursor> stopCursorSupplier) {
+        this.boundedness = Boundedness.BOUNDED;
+        this.stopCursorSupplier = stopCursorSupplier;
+        return this;
+    }
+
+    /**
+     * DeserializationSchema is required for getting the {@link Schema} for 
deserialize message from
+     * pulsar and getting the {@link TypeInformation} for message 
serialization in flink.
+     *
+     * <p>We have defined a set of implementations, using {@code
+     * PulsarDeserializationSchema#pulsarSchema} or {@code 
PulsarDeserializationSchema#flinkSchema}
+     * for creating the desired schema.
+     */
+    public PulsarSourceBuilder<IN, OUT> setDeserializationSchema(

Review comment:
       I don't succeed in your suggestion. Could your provide more detailed 
example.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer;
+import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer;
+import 
org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import 
org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber;
+import 
org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator;
+import 
org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
+import 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaInitializationContext;
+import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
+import 
org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.function.SerializableSupplier;
+
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createAdmin;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createClient;
+import static 
org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky;
+
+/**
+ * The Source implementation of Pulsar. Please use a {@link 
PulsarSourceBuilder} to construct a
+ * {@link PulsarSource}. The following example shows how to create a 
PulsarSource emitting records
+ * of <code>String</code> type.
+ *
+ * <pre>{@code
+ * PulsarSource<byte[], String> source = PulsarSource.<byte[], String>builder()
+ *     .setTopics(TOPIC1, TOPIC2)
+ *     .setServiceUrl(getServiceUrl())
+ *     .setAdminUrl(getAdminUrl())
+ *     .setSubscriptionName("test")
+ *     .setDeserializationSchema(flinkSchema(new SimpleStringSchema()))
+ *     .setBounded(StopCursor::defaultStopCursor)
+ *     .build();
+ * }</pre>
+ *
+ * <p>See {@link PulsarSourceBuilder} for more details.
+ *
+ * @param <IN> The input type of the pulsar {@link Message}.

Review comment:
       Yep, IN is used because of supporting pulsar's built in `Schema`. The 
extra tests would be added for this.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.reader.deserializer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.ReflectionUtil;
+
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+import org.apache.pulsar.common.schema.SchemaType;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+/**
+ * The deserialization schema wrapper for pulsar original {@link Schema}. 
Pulsar would deserialize
+ * the message and pass it to flink with a auto generate or given {@link 
TypeInformation}.
+ *
+ * @param <T> The output type of the message.
+ */
+@Internal
+class PulsarSchemaWrapper<T> extends PulsarDeserializationSchemaBase<T, T> {
+    private static final long serialVersionUID = -4864701207257059158L;
+
+    public PulsarSchemaWrapper(PulsarSchemaFactory<T> schemaFactory) {
+        this(schemaFactory, createTypeInformation(schemaFactory));
+    }
+
+    public PulsarSchemaWrapper(Class<? extends PulsarSchemaFactory<T>> 
factoryClass) {
+        this(
+                ReflectionUtil.newInstance(factoryClass),
+                
createTypeInformation(ReflectionUtil.newInstance(factoryClass)));
+    }
+
+    public PulsarSchemaWrapper(
+            PulsarSchemaFactory<T> schemaFactory, TypeInformation<T> 
typeInformation) {
+        super(
+                schemaFactory,
+                message -> Collections.singletonList(message.getValue()),
+                typeInformation);
+    }
+
+    /**
+     * Convert the {@link Schema} into a flink readable {@link 
TypeInformation}. We only support all
+     * the primitive types in pulsar built-in schema.

Review comment:
       Since this merge request is urgent. We just provide the fundamental 
support and enhance the missing schemas in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to