syhily commented on a change in pull request #15304: URL: https://github.com/apache/flink/pull/15304#discussion_r684572500
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils; +import org.apache.flink.connector.pulsar.source.PulsarSourceOptions; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.function.Function; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTO_CERT_REFRESH_TIME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECT_TIMEOUT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_BUSY_WAIT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_KEEP_ALIVE_INTERVAL_SECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_LISTENER_NAME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_BACKOFF_INTERVAL_NANOS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REDIRECTS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REQUEST; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_IO_THREADS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_LISTENER_THREADS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_PROTOCOL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_SERVICE_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_READ_TIMEOUT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SSL_PROVIDER; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_ALLOW_INSECURE_CONNECTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_CIPHERS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_PROTOCOLS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PASSWORD; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PATH; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_KEY_STORE_TLS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TCP_NO_DELAY; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TLS; +import static org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky; +import static org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils.configMap; + +/** The util for creating pulsar configuration class from flink's {@link Configuration}. */ +public final class PulsarConfigUtils { + + private PulsarConfigUtils() { + // No need to create instance. + } + + /** + * Create a common {@link ClientConfigurationData} for both pulsar source and sink connector by + * using the given {@link Configuration}. Add the configuration was listed in {@link + * PulsarSourceOptions}, and we would parse it one by one. + * + * @param configuration The flink configuration + */ + public static ClientConfigurationData createClientConfig( Review comment: We have set the default value fallback in config option definition. The option that user don't set would use the default value instead. The required config option would be checked in `PulsarSourceConfigUtils.checkConfigurations` ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/utils/PulsarTransactionUtils.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.utils; + +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; + +import static org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Transaction was introduced into pulsar since 2.7.0, but the interface {@link Transaction} didn't + * provide a id method until 2.8.1. We have to add this util for acquiring the {@link TxnID} for + * compatible consideration. + */ +@SuppressWarnings("java:S3011") +public final class PulsarTransactionUtils { Review comment: Yes, this hack must be removed after pulsar 2.8.1 release. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaInitializationContext; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; + +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createAdmin; +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createClient; +import static org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky; + +/** + * The Source implementation of Pulsar. Please use a {@link PulsarSourceBuilder} to construct a + * {@link PulsarSource}. The following example shows how to create a PulsarSource emitting records + * of <code>String</code> type. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource.<byte[], String>builder() + * .setTopics(TOPIC1, TOPIC2) + * .setServiceUrl(getServiceUrl()) + * .setAdminUrl(getAdminUrl()) + * .setSubscriptionName("test") + * .setDeserializationSchema(flinkSchema(new SimpleStringSchema())) + * .setBounded(StopCursor::defaultStopCursor) + * .build(); + * }</pre> + * + * <p>See {@link PulsarSourceBuilder} for more details. + * + * @param <IN> The input type of the pulsar {@link Message}. + * @param <OUT> The output type of the source. + */ +@PublicEvolving +public final class PulsarSource<IN, OUT> + implements Source<OUT, PulsarPartitionSplit, PulsarSourceEnumState>, + ResultTypeQueryable<OUT> { + private static final long serialVersionUID = 7773108631275567433L; + + /** + * The common configuration for pulsar source, we don't support the pulsar's configuration class + * directly. + */ + private final Configuration configuration; + + private final SourceConfiguration sourceConfiguration; + + private final PulsarSubscriber subscriber; + + private final RangeGenerator rangeGenerator; + + private final SerializableSupplier<StartCursor> startCursorSupplier; + + private final SerializableSupplier<StopCursor> stopCursorSupplier; + + /** + * Boundedness for source, we only support {@link Boundedness#CONTINUOUS_UNBOUNDED} currently. + */ + private final Boundedness boundedness; + + /** The pulsar deserialization schema used for deserialize message. */ + private final PulsarDeserializationSchema<IN, OUT> deserializationSchema; + + /** Modify the flink generated {@link ClientConfigurationData}. */ + private final ConfigurationDataCustomizer<ClientConfigurationData> + clientConfigurationCustomizer; + + /** Modify the flink generated {@link ConsumerConfigurationData}. */ + private final ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer; + + /** A lazy evaluated pulsar client for split reader. */ + private transient volatile PulsarClient pulsarClient; + + /** A lazy evaluated pulsar admin for source enumerator. */ + private transient volatile PulsarAdmin pulsarAdmin; Review comment: OK, that means we could just remove the `volatile` and synchronization. Since pulsar client has an inner thread pool which could consume a lot of resources. I prefer cache them here. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils; +import org.apache.flink.connector.pulsar.source.PulsarSourceOptions; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.function.Function; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTO_CERT_REFRESH_TIME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECT_TIMEOUT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_BUSY_WAIT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_KEEP_ALIVE_INTERVAL_SECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_LISTENER_NAME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_BACKOFF_INTERVAL_NANOS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REDIRECTS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REQUEST; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_IO_THREADS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_LISTENER_THREADS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_PROTOCOL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_SERVICE_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_READ_TIMEOUT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SSL_PROVIDER; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_ALLOW_INSECURE_CONNECTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_CIPHERS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_PROTOCOLS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PASSWORD; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PATH; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_KEY_STORE_TLS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TCP_NO_DELAY; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TLS; +import static org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky; +import static org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils.configMap; + +/** The util for creating pulsar configuration class from flink's {@link Configuration}. */ +public final class PulsarConfigUtils { + + private PulsarConfigUtils() { + // No need to create instance. + } + + /** + * Create a common {@link ClientConfigurationData} for both pulsar source and sink connector by + * using the given {@link Configuration}. Add the configuration was listed in {@link + * PulsarSourceOptions}, and we would parse it one by one. + * + * @param configuration The flink configuration + */ + public static ClientConfigurationData createClientConfig( + Configuration configuration, + @Nullable ConfigurationDataCustomizer<ClientConfigurationData> customizer) { + ClientConfigurationData data = new ClientConfigurationData(); + + // Set the properties one by one. + data.setServiceUrl(configuration.get(PULSAR_SERVICE_URL)); + data.setAuthPluginClassName(configuration.get(PULSAR_AUTH_PLUGIN_CLASS_NAME)); + data.setAuthParams(configuration.get(PULSAR_AUTH_PARAMS)); + data.setAuthParamMap( + getOptionValue( + configuration, + PULSAR_AUTH_PARAM_MAP, + v -> PulsarJsonUtils.toMap(String.class, String.class, v))); + data.setOperationTimeoutMs(configuration.get(PULSAR_OPERATION_TIMEOUT_MS)); + data.setStatsIntervalSeconds(configuration.get(PULSAR_STATS_INTERVAL_SECONDS)); + data.setNumIoThreads(configuration.get(PULSAR_NUM_IO_THREADS)); + data.setNumListenerThreads(configuration.get(PULSAR_NUM_LISTENER_THREADS)); + data.setConnectionsPerBroker(configuration.get(PULSAR_CONNECTIONS_PER_BROKER)); + data.setUseTcpNoDelay(configuration.get(PULSAR_USE_TCP_NO_DELAY)); + data.setUseTls(configuration.get(PULSAR_USE_TLS)); + data.setTlsTrustCertsFilePath(configuration.get(PULSAR_TLS_TRUST_CERTS_FILE_PATH)); + data.setTlsAllowInsecureConnection(configuration.get(PULSAR_TLS_ALLOW_INSECURE_CONNECTION)); + data.setTlsHostnameVerificationEnable( + configuration.get(PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE)); + data.setConcurrentLookupRequest(configuration.get(PULSAR_CONCURRENT_LOOKUP_REQUEST)); + data.setMaxLookupRequest(configuration.get(PULSAR_MAX_LOOKUP_REQUEST)); + data.setMaxLookupRedirects(configuration.get(PULSAR_MAX_LOOKUP_REDIRECTS)); + data.setMaxNumberOfRejectedRequestPerConnection( + configuration.get(PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION)); + data.setKeepAliveIntervalSeconds(configuration.get(PULSAR_KEEP_ALIVE_INTERVAL_SECONDS)); + data.setConnectionTimeoutMs(configuration.get(PULSAR_CONNECTION_TIMEOUT_MS)); + data.setRequestTimeoutMs(configuration.get(PULSAR_REQUEST_TIMEOUT_MS)); + data.setInitialBackoffIntervalNanos( + configuration.get(PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS)); + data.setMaxBackoffIntervalNanos(configuration.get(PULSAR_MAX_BACKOFF_INTERVAL_NANOS)); + data.setEnableBusyWait(configuration.get(PULSAR_ENABLE_BUSY_WAIT)); + data.setListenerName(configuration.get(PULSAR_LISTENER_NAME)); + data.setUseKeyStoreTls(configuration.get(PULSAR_USE_KEY_STORE_TLS)); + data.setSslProvider(configuration.get(PULSAR_SSL_PROVIDER)); + data.setTlsTrustStoreType(configuration.get(PULSAR_TLS_TRUST_STORE_TYPE)); + data.setTlsTrustStorePath(configuration.get(PULSAR_TLS_TRUST_STORE_PATH)); + data.setTlsTrustStorePassword(configuration.get(PULSAR_TLS_TRUST_STORE_PASSWORD)); + data.setTlsCiphers( + getOptionValue( + configuration, + PULSAR_TLS_CIPHERS, + v -> PulsarJsonUtils.toSet(String.class, v))); + data.setTlsProtocols( + getOptionValue( + configuration, + PULSAR_TLS_PROTOCOLS, + v -> PulsarJsonUtils.toSet(String.class, v))); + data.setMemoryLimitBytes(configuration.get(PULSAR_MEMORY_LIMIT_BYTES)); + data.setProxyServiceUrl(configuration.get(PULSAR_PROXY_SERVICE_URL)); + data.setProxyProtocol(configuration.get(PULSAR_PROXY_PROTOCOL)); + data.setEnableTransaction(configuration.get(PULSAR_ENABLE_TRANSACTION)); + + // Create Authentication instance like pulsar command line tools. + // The token and TLS authentication should be created by the below factory method. Review comment: Yep, you are right. I have removed. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchema.java ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.Collector; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; + +import java.io.Serializable; + +/** + * A schema bridge for deserializing the pulsar's <code>Message<?></code> into a flink managed + * instance. We support both the pulsar's self managed schema and flink managed schema. + * + * @param <M> The decode message type from pulsar client, which would create a message {@link + * SchemaInfo} from this type. + * @param <T> The output message type for sinking to downstream flink operator. + */ +@PublicEvolving +public interface PulsarDeserializationSchema<M, T> extends Serializable, ResultTypeQueryable<T> { Review comment: I think you are right. After reading this review comment carefully. `Schema` is a quite important feature for pulsar internal. Decode the message in pulsar client don't give us any benefits but complexity. Wrap the `Schema` into a flink `TypeInformation` and consume the message in bytes make look better. `Schema` have some extra [evolution and compatibility](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/) features in pulsar. This is quite important for experienced pulsar user. We can implement this feature in the future and receive a `Message<byte[]>` now. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicNameUtils.java ########## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import org.apache.pulsar.common.naming.TopicName; + +/** util for topic name. */ +public final class TopicNameUtils { Review comment: Nope. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarConfigUtils.java ########## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.config; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils; +import org.apache.flink.connector.pulsar.source.PulsarSourceOptions; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; +import org.apache.pulsar.client.api.Authentication; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.function.Function; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAMS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PARAM_MAP; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTH_PLUGIN_CLASS_NAME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_AUTO_CERT_REFRESH_TIME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONCURRENT_LOOKUP_REQUEST; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTIONS_PER_BROKER; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECTION_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_CONNECT_TIMEOUT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_BUSY_WAIT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_KEEP_ALIVE_INTERVAL_SECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_LISTENER_NAME; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_BACKOFF_INTERVAL_NANOS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REDIRECTS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_LOOKUP_REQUEST; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_MEMORY_LIMIT_BYTES; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_IO_THREADS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_NUM_LISTENER_THREADS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_OPERATION_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_PROTOCOL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_PROXY_SERVICE_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_READ_TIMEOUT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_REQUEST_TIMEOUT_MS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SSL_PROVIDER; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_ALLOW_INSECURE_CONNECTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_CIPHERS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_PROTOCOLS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_CERTS_FILE_PATH; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PASSWORD; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_PATH; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_TLS_TRUST_STORE_TYPE; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_KEY_STORE_TLS; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TCP_NO_DELAY; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_USE_TLS; +import static org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky; +import static org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils.configMap; + +/** The util for creating pulsar configuration class from flink's {@link Configuration}. */ +public final class PulsarConfigUtils { + + private PulsarConfigUtils() { + // No need to create instance. + } + + /** + * Create a common {@link ClientConfigurationData} for both pulsar source and sink connector by + * using the given {@link Configuration}. Add the configuration was listed in {@link + * PulsarSourceOptions}, and we would parse it one by one. + * + * @param configuration The flink configuration + */ + public static ClientConfigurationData createClientConfig( + Configuration configuration, + @Nullable ConfigurationDataCustomizer<ClientConfigurationData> customizer) { + ClientConfigurationData data = new ClientConfigurationData(); + + // Set the properties one by one. + data.setServiceUrl(configuration.get(PULSAR_SERVICE_URL)); + data.setAuthPluginClassName(configuration.get(PULSAR_AUTH_PLUGIN_CLASS_NAME)); + data.setAuthParams(configuration.get(PULSAR_AUTH_PARAMS)); + data.setAuthParamMap( + getOptionValue( + configuration, + PULSAR_AUTH_PARAM_MAP, + v -> PulsarJsonUtils.toMap(String.class, String.class, v))); + data.setOperationTimeoutMs(configuration.get(PULSAR_OPERATION_TIMEOUT_MS)); + data.setStatsIntervalSeconds(configuration.get(PULSAR_STATS_INTERVAL_SECONDS)); + data.setNumIoThreads(configuration.get(PULSAR_NUM_IO_THREADS)); + data.setNumListenerThreads(configuration.get(PULSAR_NUM_LISTENER_THREADS)); + data.setConnectionsPerBroker(configuration.get(PULSAR_CONNECTIONS_PER_BROKER)); + data.setUseTcpNoDelay(configuration.get(PULSAR_USE_TCP_NO_DELAY)); + data.setUseTls(configuration.get(PULSAR_USE_TLS)); + data.setTlsTrustCertsFilePath(configuration.get(PULSAR_TLS_TRUST_CERTS_FILE_PATH)); + data.setTlsAllowInsecureConnection(configuration.get(PULSAR_TLS_ALLOW_INSECURE_CONNECTION)); + data.setTlsHostnameVerificationEnable( + configuration.get(PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE)); + data.setConcurrentLookupRequest(configuration.get(PULSAR_CONCURRENT_LOOKUP_REQUEST)); + data.setMaxLookupRequest(configuration.get(PULSAR_MAX_LOOKUP_REQUEST)); + data.setMaxLookupRedirects(configuration.get(PULSAR_MAX_LOOKUP_REDIRECTS)); + data.setMaxNumberOfRejectedRequestPerConnection( + configuration.get(PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION)); + data.setKeepAliveIntervalSeconds(configuration.get(PULSAR_KEEP_ALIVE_INTERVAL_SECONDS)); + data.setConnectionTimeoutMs(configuration.get(PULSAR_CONNECTION_TIMEOUT_MS)); + data.setRequestTimeoutMs(configuration.get(PULSAR_REQUEST_TIMEOUT_MS)); + data.setInitialBackoffIntervalNanos( + configuration.get(PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS)); + data.setMaxBackoffIntervalNanos(configuration.get(PULSAR_MAX_BACKOFF_INTERVAL_NANOS)); + data.setEnableBusyWait(configuration.get(PULSAR_ENABLE_BUSY_WAIT)); + data.setListenerName(configuration.get(PULSAR_LISTENER_NAME)); + data.setUseKeyStoreTls(configuration.get(PULSAR_USE_KEY_STORE_TLS)); + data.setSslProvider(configuration.get(PULSAR_SSL_PROVIDER)); + data.setTlsTrustStoreType(configuration.get(PULSAR_TLS_TRUST_STORE_TYPE)); + data.setTlsTrustStorePath(configuration.get(PULSAR_TLS_TRUST_STORE_PATH)); + data.setTlsTrustStorePassword(configuration.get(PULSAR_TLS_TRUST_STORE_PASSWORD)); + data.setTlsCiphers( + getOptionValue( + configuration, + PULSAR_TLS_CIPHERS, + v -> PulsarJsonUtils.toSet(String.class, v))); + data.setTlsProtocols( + getOptionValue( + configuration, + PULSAR_TLS_PROTOCOLS, + v -> PulsarJsonUtils.toSet(String.class, v))); + data.setMemoryLimitBytes(configuration.get(PULSAR_MEMORY_LIMIT_BYTES)); + data.setProxyServiceUrl(configuration.get(PULSAR_PROXY_SERVICE_URL)); + data.setProxyProtocol(configuration.get(PULSAR_PROXY_PROTOCOL)); + data.setEnableTransaction(configuration.get(PULSAR_ENABLE_TRANSACTION)); + + // Create Authentication instance like pulsar command line tools. + // The token and TLS authentication should be created by the below factory method. + data.setAuthentication(createAuthentication(configuration)); + + if (customizer != null) { + customizer.customize(data); + } + + return data; + } + + /** + * Create the {@link Authentication} instance for both {@code PulsarClient} and {@code + * PulsarAdmin}. If the user didn't provide configuration, a {@link AuthenticationDisabled} + * instance would be returned. + */ + public static Authentication createAuthentication(Configuration configuration) { + if (configuration.contains(PULSAR_AUTH_PLUGIN_CLASS_NAME)) { + String authPluginClassName = configuration.get(PULSAR_AUTH_PLUGIN_CLASS_NAME); + + if (configuration.contains(PULSAR_AUTH_PARAMS)) { + String authParamsString = configuration.get(PULSAR_AUTH_PARAMS); + return sneaky( Review comment: `UnsupportedAuthenticationException` is the exception that `AuthenticationFactory` used. It's no need to recovery if the authentication config is incorrect. So we could just sneaky throws this exception and let flink crash. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.common.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; + +import org.apache.pulsar.client.api.ProxyProtocol; + +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; + +/** + * Configuration for Pulsar Client, these config options would be used for both source, sink and + * table. + */ +@PublicEvolving +public final class PulsarOptions { Review comment: I have exposed the required options in source builder methods, these options are supposed to be used for experienced user. And a document should be provided for clarifying these. I'll submit the document next monday. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaInitializationContext; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; + +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createAdmin; +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createClient; +import static org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky; + +/** + * The Source implementation of Pulsar. Please use a {@link PulsarSourceBuilder} to construct a + * {@link PulsarSource}. The following example shows how to create a PulsarSource emitting records + * of <code>String</code> type. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource.<byte[], String>builder() + * .setTopics(TOPIC1, TOPIC2) + * .setServiceUrl(getServiceUrl()) + * .setAdminUrl(getAdminUrl()) + * .setSubscriptionName("test") + * .setDeserializationSchema(flinkSchema(new SimpleStringSchema())) + * .setBounded(StopCursor::defaultStopCursor) + * .build(); + * }</pre> + * + * <p>See {@link PulsarSourceBuilder} for more details. + * + * @param <IN> The input type of the pulsar {@link Message}. + * @param <OUT> The output type of the source. + */ +@PublicEvolving +public final class PulsarSource<IN, OUT> + implements Source<OUT, PulsarPartitionSplit, PulsarSourceEnumState>, + ResultTypeQueryable<OUT> { + private static final long serialVersionUID = 7773108631275567433L; + + /** + * The common configuration for pulsar source, we don't support the pulsar's configuration class + * directly. + */ + private final Configuration configuration; + + private final SourceConfiguration sourceConfiguration; + + private final PulsarSubscriber subscriber; + + private final RangeGenerator rangeGenerator; + + private final SerializableSupplier<StartCursor> startCursorSupplier; + + private final SerializableSupplier<StopCursor> stopCursorSupplier; + + /** + * Boundedness for source, we only support {@link Boundedness#CONTINUOUS_UNBOUNDED} currently. + */ + private final Boundedness boundedness; + + /** The pulsar deserialization schema used for deserialize message. */ + private final PulsarDeserializationSchema<IN, OUT> deserializationSchema; + + /** Modify the flink generated {@link ClientConfigurationData}. */ + private final ConfigurationDataCustomizer<ClientConfigurationData> + clientConfigurationCustomizer; + + /** Modify the flink generated {@link ConsumerConfigurationData}. */ + private final ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer; + + /** A lazy evaluated pulsar client for split reader. */ + private transient volatile PulsarClient pulsarClient; + + /** A lazy evaluated pulsar admin for source enumerator. */ + private transient volatile PulsarAdmin pulsarAdmin; + + /** + * The constructor for PulsarSource, it's package protected for forcing using {@link + * PulsarSourceBuilder}. + */ + PulsarSource( + Configuration configuration, + PulsarSubscriber subscriber, + RangeGenerator rangeGenerator, + SerializableSupplier<StartCursor> startCursorSupplier, + SerializableSupplier<StopCursor> stopCursorSupplier, + Boundedness boundedness, + PulsarDeserializationSchema<IN, OUT> deserializationSchema, + ConfigurationDataCustomizer<ClientConfigurationData> clientConfigurationCustomizer, + ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer) { + // Since these implementation could be a lambda, make sure they are serializable. + ClosureCleaner.clean( + startCursorSupplier, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean( + stopCursorSupplier, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean( + clientConfigurationCustomizer, ExecutionConfig.ClosureCleanerLevel.RECURSIVE, true); + ClosureCleaner.clean( + consumerConfigurationCustomizer, + ExecutionConfig.ClosureCleanerLevel.RECURSIVE, + true); Review comment: Yep, you are right. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.regex.Pattern; + +import static java.lang.Boolean.FALSE; +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES; +import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The builder class for {@link PulsarSource} to make it easier for the users to construct a {@link + * PulsarSource}. + * + * <p>The following example shows the minimum setup to create a PulsarSource that reads the String + * values from a Pulsar topic. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .build(); + * }</pre> + * + * <p>The service url, admin url, subscription name, topics to consume, and the record deserializer + * are required fields that must be set. + * + * <p>To specify the starting position of PulsarSource, one can call {@link + * #setStartCursorSupplier(SerializableSupplier)}. + * + * <p>By default the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never + * stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link + * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link + * #setUnbounded(SerializableSupplier)}. For example the following PulsarSource stops after it + * consumes up to a event time when the Flink started. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis())) + * .build(); + * }</pre> + * + * @param <IN> The input type of the pulsar {@link Message <?>} + * @param <OUT> The output type of the source. + */ +@PublicEvolving +public final class PulsarSourceBuilder<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class); + + private final Configuration configuration; + private PulsarSubscriber subscriber; + private RangeGenerator rangeGenerator; + private SerializableSupplier<StartCursor> startCursorSupplier; + private SerializableSupplier<StopCursor> stopCursorSupplier; + private Boundedness boundedness; + private PulsarDeserializationSchema<IN, OUT> deserializationSchema; + private ConfigurationDataCustomizer<ClientConfigurationData> clientConfigurationCustomizer; + private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer; + + // private builder constructor. + PulsarSourceBuilder() { + // The default configuration holder. + this.configuration = new Configuration(); + } + + /** + * Sets the admin endpoint for the PulsarAdmin of the PulsarSource. + * + * @param adminUrl the url for the PulsarAdmin. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) { + configuration.set(PULSAR_ADMIN_URL, adminUrl); + return this; + } + + /** + * Sets the server's link for the PulsarConsumer of the PulsarSource. + * + * @param serviceUrl the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) { + configuration.set(PULSAR_SERVICE_URL, serviceUrl); + return this; + } + + /** + * Sets the name for this pulsar subscription. + * + * @param subscriptionName the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String subscriptionName) { + configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName); + return this; + } + + /** + * {@link SubscriptionType} is the consuming behavior for pulsar, we would generator different + * split by the given subscription type, it's required and quite important for end-user. + * + * @param subscriptionType The type of subscription. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setSubscriptionType(SubscriptionType subscriptionType) { + configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType); Review comment: Yep, this is quit important. I added. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java ########## @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.connector.pulsar.source.config.CursorVerification; + +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionType; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; + +/** + * Configurations for PulsarSource. All these options would be listed in pulsar flink connector + * documentation. + */ +@PublicEvolving +public final class PulsarSourceOptions { Review comment: The `PulsarOptions` is used for both pulsar source & sink. The `PulsarSourceOptions` could only be used for pulsar source. I'll add some clarification in java doc. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.ReflectionUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +/** + * The deserialization schema wrapper for pulsar original {@link Schema}. Pulsar would deserialize + * the message and pass it to flink with a auto generate or given {@link TypeInformation}. + * + * @param <T> The output type of the message. + */ +@Internal +class PulsarSchemaWrapper<T> extends PulsarDeserializationSchemaBase<T, T> { + private static final long serialVersionUID = -4864701207257059158L; + + public PulsarSchemaWrapper(PulsarSchemaFactory<T> schemaFactory) { + this(schemaFactory, createTypeInformation(schemaFactory)); + } + + public PulsarSchemaWrapper(Class<? extends PulsarSchemaFactory<T>> factoryClass) { + this( + ReflectionUtil.newInstance(factoryClass), + createTypeInformation(ReflectionUtil.newInstance(factoryClass))); + } + + public PulsarSchemaWrapper( + PulsarSchemaFactory<T> schemaFactory, TypeInformation<T> typeInformation) { + super( + schemaFactory, + message -> Collections.singletonList(message.getValue()), + typeInformation); + } + + /** + * Convert the {@link Schema} into a flink readable {@link TypeInformation}. We only support all + * the primitive types in pulsar built-in schema. + * + * <p>Please visit <a + * href="ttp://pulsar.apache.org/docs/en/schema-understand/#schema-type">pulsar + * documentation</a> for detailed schema type clarify. + */ + @SuppressWarnings("unchecked") + private static <T> TypeInformation<T> createTypeInformation( + PulsarSchemaFactory<T> schemaFactory) { + Schema<T> schema = schemaFactory.create(); + // SchemaInfo contains all the required information for deserializing. + SchemaInfo schemaInfo = schema.getSchemaInfo(); + SchemaType schemaType = schemaInfo.getType(); + + TypeInformation<?> information = null; + switch (schemaType) { + case STRING: + information = Types.STRING; + break; + case BOOLEAN: + information = Types.BOOLEAN; + break; + case INT8: + information = Types.BYTE; + break; + case INT16: + information = Types.SHORT; + break; + case INT32: + information = Types.INT; + break; + case INT64: + information = Types.LONG; + break; + case FLOAT: + information = Types.FLOAT; + break; + case DOUBLE: + information = Types.DOUBLE; + break; + case DATE: + // Since pulsar use this type for both util.Date and sql.Date, + // we just choose util.Date here. + information = BasicTypeInfo.DATE_TYPE_INFO; + break; + case TIME: + information = Types.SQL_TIME; + break; + case TIMESTAMP: + information = Types.SQL_TIMESTAMP; + break; + case INSTANT: + information = Types.INSTANT; + break; + case LOCAL_DATE: + information = Types.LOCAL_DATE; + break; + case LOCAL_TIME: + information = Types.LOCAL_TIME; + break; + case LOCAL_DATE_TIME: + information = Types.LOCAL_DATE_TIME; + break; + case BYTES: + information = Types.PRIMITIVE_ARRAY(Types.BYTE); + break; + } + + if (information == null) { + // Try to extract the type info by using flink provided utils. + try { + // Support protobuf class after flink support it natively. Review comment: Yep, the tick should be created. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.regex.Pattern; + +import static java.lang.Boolean.FALSE; +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES; +import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The builder class for {@link PulsarSource} to make it easier for the users to construct a {@link + * PulsarSource}. + * + * <p>The following example shows the minimum setup to create a PulsarSource that reads the String + * values from a Pulsar topic. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .build(); + * }</pre> + * + * <p>The service url, admin url, subscription name, topics to consume, and the record deserializer + * are required fields that must be set. + * + * <p>To specify the starting position of PulsarSource, one can call {@link + * #setStartCursorSupplier(SerializableSupplier)}. + * + * <p>By default the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never + * stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link + * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link + * #setUnbounded(SerializableSupplier)}. For example the following PulsarSource stops after it + * consumes up to a event time when the Flink started. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis())) + * .build(); + * }</pre> + * + * @param <IN> The input type of the pulsar {@link Message <?>} + * @param <OUT> The output type of the source. + */ +@PublicEvolving +public final class PulsarSourceBuilder<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class); + + private final Configuration configuration; + private PulsarSubscriber subscriber; + private RangeGenerator rangeGenerator; + private SerializableSupplier<StartCursor> startCursorSupplier; + private SerializableSupplier<StopCursor> stopCursorSupplier; + private Boundedness boundedness; + private PulsarDeserializationSchema<IN, OUT> deserializationSchema; + private ConfigurationDataCustomizer<ClientConfigurationData> clientConfigurationCustomizer; + private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer; + + // private builder constructor. + PulsarSourceBuilder() { + // The default configuration holder. + this.configuration = new Configuration(); + } + + /** + * Sets the admin endpoint for the PulsarAdmin of the PulsarSource. + * + * @param adminUrl the url for the PulsarAdmin. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) { + configuration.set(PULSAR_ADMIN_URL, adminUrl); + return this; + } + + /** + * Sets the server's link for the PulsarConsumer of the PulsarSource. + * + * @param serviceUrl the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) { + configuration.set(PULSAR_SERVICE_URL, serviceUrl); + return this; + } + + /** + * Sets the name for this pulsar subscription. + * + * @param subscriptionName the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String subscriptionName) { + configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName); + return this; + } + + /** + * {@link SubscriptionType} is the consuming behavior for pulsar, we would generator different + * split by the given subscription type, it's required and quite important for end-user. Review comment: `SubscriptionType` could be a very important feature for pulsar, the consuming behaviors are also different among these subscription types. The default subscription type is `Shared`. User don't required to set subscription type in source builder. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicMetadata.java ########## @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import static org.apache.pulsar.common.partition.PartitionedTopicMetadata.NON_PARTITIONED; + +/** The pojo class for pulsar topic metadata information. */ +public final class TopicMetadata { + + /** + * The name of the topic, it would be a {@link TopicNameUtils#topicName(String)} which don't + * contains partition information. + */ + private final String name; + + /** If this topic is a partitioned topic. */ + private final boolean partitioned; + + /** The size for a partitioned topic. It would be zero for non-partitioned topic. */ + private final int partitionSize; + + public TopicMetadata(String name, int partitionSize) { + this.name = name; + this.partitioned = partitionSize != NON_PARTITIONED; Review comment: 👍 That sounds better. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/subscriber/impl/BasePulsarSubscriber.java ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.subscriber.impl; + +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.partition.PartitionedTopicMetadata; + +import java.util.List; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toList; + +/** PulsarSubscriber abstract class to simplify Pulsar admin related operations. */ +public abstract class BasePulsarSubscriber implements PulsarSubscriber { + private static final long serialVersionUID = 2053021503331058888L; + + protected TopicMetadata queryTopicMetadata(PulsarAdmin pulsarAdmin, String topicName) { + // Drop the complete topic name for a clean partitioned topic name. + String completeTopicName = TopicNameUtils.topicName(topicName); + try { + PartitionedTopicMetadata metadata = + pulsarAdmin.topics().getPartitionedTopicMetadata(completeTopicName); + return new TopicMetadata(topicName, metadata.partitions); + } catch (PulsarAdminException e) { + if (e.getStatusCode() == 404) { + // Return null for skipping the topic metadata query. + return null; + } else { + // This method would cause the failure for subscriber. + throw new PulsarRuntimeException(e); + } + } + } + + protected List<TopicPartition> createPartitions( Review comment: 👍 ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicPartition.java ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.enumerator.topic; + +import org.apache.pulsar.client.api.Range; +import org.apache.pulsar.client.api.SubscriptionType; + +import java.io.Serializable; +import java.util.Objects; + +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicName; +import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils.topicNameWithPartition; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Metadata of a partitioned topic. */ Review comment: It's an internal API. The javadoc could be confused, I would rewrite it. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.regex.Pattern; + +import static java.lang.Boolean.FALSE; +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES; +import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The builder class for {@link PulsarSource} to make it easier for the users to construct a {@link + * PulsarSource}. + * + * <p>The following example shows the minimum setup to create a PulsarSource that reads the String + * values from a Pulsar topic. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .build(); + * }</pre> + * + * <p>The service url, admin url, subscription name, topics to consume, and the record deserializer + * are required fields that must be set. + * + * <p>To specify the starting position of PulsarSource, one can call {@link + * #setStartCursorSupplier(SerializableSupplier)}. + * + * <p>By default the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never + * stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link + * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link + * #setUnbounded(SerializableSupplier)}. For example the following PulsarSource stops after it + * consumes up to a event time when the Flink started. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis())) + * .build(); + * }</pre> + * + * @param <IN> The input type of the pulsar {@link Message <?>} + * @param <OUT> The output type of the source. + */ +@PublicEvolving +public final class PulsarSourceBuilder<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class); + + private final Configuration configuration; + private PulsarSubscriber subscriber; + private RangeGenerator rangeGenerator; + private SerializableSupplier<StartCursor> startCursorSupplier; + private SerializableSupplier<StopCursor> stopCursorSupplier; + private Boundedness boundedness; + private PulsarDeserializationSchema<IN, OUT> deserializationSchema; + private ConfigurationDataCustomizer<ClientConfigurationData> clientConfigurationCustomizer; + private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer; + + // private builder constructor. + PulsarSourceBuilder() { + // The default configuration holder. + this.configuration = new Configuration(); + } + + /** + * Sets the admin endpoint for the PulsarAdmin of the PulsarSource. + * + * @param adminUrl the url for the PulsarAdmin. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) { + configuration.set(PULSAR_ADMIN_URL, adminUrl); + return this; + } + + /** + * Sets the server's link for the PulsarConsumer of the PulsarSource. + * + * @param serviceUrl the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) { + configuration.set(PULSAR_SERVICE_URL, serviceUrl); + return this; + } + + /** + * Sets the name for this pulsar subscription. + * + * @param subscriptionName the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String subscriptionName) { + configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName); + return this; + } + + /** + * {@link SubscriptionType} is the consuming behavior for pulsar, we would generator different + * split by the given subscription type, it's required and quite important for end-user. + * + * @param subscriptionType The type of subscription. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setSubscriptionType(SubscriptionType subscriptionType) { + configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType); + return this; + } + + /** + * Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this + * non-existed topic wouldn't throw any exception. But the best solution is just consuming by + * using a topic regex. This method is conflict with {@code setTopicPattern}. + * + * @param topics The topic list you would like to consume message. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopics(String... topics) { + return setTopics(Arrays.asList(topics)); + } + + /** + * Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this + * non-existed topic wouldn't throw any exception. But the best solution is just consuming by + * using a topic regex. This method is conflict with {@code setTopicPattern}. + * + * @param topics The topic list you would like to consume message. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopics(List<String> topics) { + ensureSubscriberIsNull("topics"); + configuration.set(PULSAR_TOPIC_NAMES, PulsarJsonUtils.toString(topics)); + + this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics); + + return this; + } + + /** + * Set a topic pattern to consume from use the java {@link Pattern}. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopicPattern(String topicsPattern) { + return setTopicPattern(Pattern.compile(topicsPattern)); + } + + /** + * Set a topic pattern to consume from use the java {@link Pattern}. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopicPattern(Pattern topicsPattern) { + return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics); + } + + /** + * Set a topic pattern to consume from use the java {@link Pattern}. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @param regexSubscriptionMode The topic filter for regex subscription. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopicPattern( + String topicsPattern, RegexSubscriptionMode regexSubscriptionMode) { + return setTopicPattern(Pattern.compile(topicsPattern), regexSubscriptionMode); + } + + /** + * Set a topic pattern to consume from use the java {@link Pattern}. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @param regexSubscriptionMode The topic filter for regex subscription. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopicPattern( + Pattern topicsPattern, RegexSubscriptionMode regexSubscriptionMode) { + ensureSubscriberIsNull("topic pattern"); + configuration.set(PULSAR_TOPICS_PATTERN, topicsPattern.toString()); + this.subscriber = + PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, regexSubscriptionMode); + + return setRegexSubscriptionMode(regexSubscriptionMode); + } + + /** + * When subscribing to a topic using a regular expression, you can pick a certain type of + * topics. + * + * @param regexSubscriptionMode The topic type. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setRegexSubscriptionMode( + RegexSubscriptionMode regexSubscriptionMode) { + configuration.set(PULSAR_REGEX_SUBSCRIPTION_MODE, regexSubscriptionMode); + return this; + } + + /** + * Set a topic range generator for Key_Shared subscription. + * + * @param rangeGenerator A generator which would generate a set of {@link TopicRange} for given + * topic. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setRangeGenerator(RangeGenerator rangeGenerator) { + this.rangeGenerator = rangeGenerator; + if (configuration.contains(PULSAR_SUBSCRIPTION_TYPE)) { + SubscriptionType subscriptionType = configuration.get(PULSAR_SUBSCRIPTION_TYPE); + if (subscriptionType != SubscriptionType.Key_Shared) { + LOG.warn( + "Key_Shared subscription should be used for custom rangeGenerator instead of {}", + subscriptionType); + } + } + + return this; + } + + /** + * Specify from which offsets the PulsarSource should start consume from by providing an {@link + * StartCursor}. + * + * @param startCursorSupplier the supplier providing a {@link StartCursor} which set the + * starting offsets for the Source. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setStartCursorSupplier( + SerializableSupplier<StartCursor> startCursorSupplier) { Review comment: `StartCursor` could be implemented by end user. It could have independent state in `PulsarPartitionSplitState`. So different Cursor couldn't share the same instance. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java ########## @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.common.exception.PulsarRuntimeException; +import org.apache.flink.connector.pulsar.common.utils.PulsarJsonUtils; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.DivideRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FullRangeGenerator; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.function.Consumer; +import java.util.regex.Pattern; + +import static java.lang.Boolean.FALSE; +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.getOptionValue; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ADMIN_URL; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_ENABLE_TRANSACTION; +import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_COMMIT_CURSOR; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REGEX_SUBSCRIPTION_MODE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPICS_PATTERN; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_TOPIC_NAMES; +import static org.apache.flink.connector.pulsar.source.config.PulsarSourceConfigUtils.checkConfigurations; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The builder class for {@link PulsarSource} to make it easier for the users to construct a {@link + * PulsarSource}. + * + * <p>The following example shows the minimum setup to create a PulsarSource that reads the String + * values from a Pulsar topic. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .build(); + * }</pre> + * + * <p>The service url, admin url, subscription name, topics to consume, and the record deserializer + * are required fields that must be set. + * + * <p>To specify the starting position of PulsarSource, one can call {@link + * #setStartCursorSupplier(SerializableSupplier)}. + * + * <p>By default the PulsarSource runs in an {@link Boundedness#CONTINUOUS_UNBOUNDED} mode and never + * stop until the Flink job is canceled or fails. To let the PulsarSource run in {@link + * Boundedness#CONTINUOUS_UNBOUNDED} but stops at some given offsets, one can call {@link + * #setUnbounded(SerializableSupplier)}. For example the following PulsarSource stops after it + * consumes up to a event time when the Flink started. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource + * .<byte[], String>builder() + * .setServiceUrl(PULSAR_BROKER_URL) + * .setAdminUrl(PULSAR_BROKER_HTTP_URL) + * .setSubscriptionName("flink-source-1") + * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) + * .setDeserializationSchema(PulsarDeserializationSchema.flinkSchema(Types.STRING)) + * .setUnbounded(StopCursor.atEventTime(System.currentTimeMillis())) + * .build(); + * }</pre> + * + * @param <IN> The input type of the pulsar {@link Message <?>} + * @param <OUT> The output type of the source. + */ +@PublicEvolving +public final class PulsarSourceBuilder<IN, OUT> { + private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceBuilder.class); + + private final Configuration configuration; + private PulsarSubscriber subscriber; + private RangeGenerator rangeGenerator; + private SerializableSupplier<StartCursor> startCursorSupplier; + private SerializableSupplier<StopCursor> stopCursorSupplier; + private Boundedness boundedness; + private PulsarDeserializationSchema<IN, OUT> deserializationSchema; + private ConfigurationDataCustomizer<ClientConfigurationData> clientConfigurationCustomizer; + private ConfigurationDataCustomizer<ConsumerConfigurationData<IN>> + consumerConfigurationCustomizer; + + // private builder constructor. + PulsarSourceBuilder() { + // The default configuration holder. + this.configuration = new Configuration(); + } + + /** + * Sets the admin endpoint for the PulsarAdmin of the PulsarSource. + * + * @param adminUrl the url for the PulsarAdmin. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setAdminUrl(String adminUrl) { + configuration.set(PULSAR_ADMIN_URL, adminUrl); + return this; + } + + /** + * Sets the server's link for the PulsarConsumer of the PulsarSource. + * + * @param serviceUrl the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setServiceUrl(String serviceUrl) { + configuration.set(PULSAR_SERVICE_URL, serviceUrl); + return this; + } + + /** + * Sets the name for this pulsar subscription. + * + * @param subscriptionName the server url of the Pulsar cluster. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setSubscriptionName(String subscriptionName) { + configuration.set(PULSAR_SUBSCRIPTION_NAME, subscriptionName); + return this; + } + + /** + * {@link SubscriptionType} is the consuming behavior for pulsar, we would generator different + * split by the given subscription type, it's required and quite important for end-user. + * + * @param subscriptionType The type of subscription. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setSubscriptionType(SubscriptionType subscriptionType) { + configuration.set(PULSAR_SUBSCRIPTION_TYPE, subscriptionType); + return this; + } + + /** + * Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this + * non-existed topic wouldn't throw any exception. But the best solution is just consuming by + * using a topic regex. This method is conflict with {@code setTopicPattern}. + * + * @param topics The topic list you would like to consume message. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopics(String... topics) { + return setTopics(Arrays.asList(topics)); + } + + /** + * Set a pulsar topic list for flink source. Some topic may not exist currently, consuming this + * non-existed topic wouldn't throw any exception. But the best solution is just consuming by + * using a topic regex. This method is conflict with {@code setTopicPattern}. + * + * @param topics The topic list you would like to consume message. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopics(List<String> topics) { + ensureSubscriberIsNull("topics"); + configuration.set(PULSAR_TOPIC_NAMES, PulsarJsonUtils.toString(topics)); + + this.subscriber = PulsarSubscriber.getTopicListSubscriber(topics); + + return this; + } + + /** + * Set a topic pattern to consume from use the java {@link Pattern}. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopicPattern(String topicsPattern) { + return setTopicPattern(Pattern.compile(topicsPattern)); + } + + /** + * Set a topic pattern to consume from use the java {@link Pattern}. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopicPattern(Pattern topicsPattern) { + return setTopicPattern(topicsPattern, RegexSubscriptionMode.AllTopics); + } + + /** + * Set a topic pattern to consume from use the java {@link Pattern}. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @param regexSubscriptionMode The topic filter for regex subscription. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopicPattern( + String topicsPattern, RegexSubscriptionMode regexSubscriptionMode) { + return setTopicPattern(Pattern.compile(topicsPattern), regexSubscriptionMode); + } + + /** + * Set a topic pattern to consume from use the java {@link Pattern}. + * + * @param topicsPattern the pattern of the topic name to consume from. + * @param regexSubscriptionMode The topic filter for regex subscription. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setTopicPattern( + Pattern topicsPattern, RegexSubscriptionMode regexSubscriptionMode) { + ensureSubscriberIsNull("topic pattern"); + configuration.set(PULSAR_TOPICS_PATTERN, topicsPattern.toString()); + this.subscriber = + PulsarSubscriber.getTopicPatternSubscriber(topicsPattern, regexSubscriptionMode); + + return setRegexSubscriptionMode(regexSubscriptionMode); + } + + /** + * When subscribing to a topic using a regular expression, you can pick a certain type of + * topics. + * + * @param regexSubscriptionMode The topic type. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setRegexSubscriptionMode( + RegexSubscriptionMode regexSubscriptionMode) { + configuration.set(PULSAR_REGEX_SUBSCRIPTION_MODE, regexSubscriptionMode); + return this; + } + + /** + * Set a topic range generator for Key_Shared subscription. + * + * @param rangeGenerator A generator which would generate a set of {@link TopicRange} for given + * topic. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setRangeGenerator(RangeGenerator rangeGenerator) { + this.rangeGenerator = rangeGenerator; + if (configuration.contains(PULSAR_SUBSCRIPTION_TYPE)) { + SubscriptionType subscriptionType = configuration.get(PULSAR_SUBSCRIPTION_TYPE); + if (subscriptionType != SubscriptionType.Key_Shared) { + LOG.warn( + "Key_Shared subscription should be used for custom rangeGenerator instead of {}", + subscriptionType); + } + } + + return this; + } + + /** + * Specify from which offsets the PulsarSource should start consume from by providing an {@link + * StartCursor}. + * + * @param startCursorSupplier the supplier providing a {@link StartCursor} which set the + * starting offsets for the Source. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder<IN, OUT> setStartCursorSupplier( + SerializableSupplier<StartCursor> startCursorSupplier) { + this.startCursorSupplier = startCursorSupplier; + return this; + } + + /** + * By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner + * and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run as + * a streaming source but still stops at some point, one can set an {@link StartCursor} to + * specify the stopping offsets for each partition. When all the partitions have reached their + * stopping offsets, the PulsarSource will then exit. + * + * <p>This method is different from {@link #setBounded(SerializableSupplier)} that after setting + * the stopping offsets with this method, {@link PulsarSource#getBoundedness()} will still + * return {@link Boundedness#CONTINUOUS_UNBOUNDED} even though it will stop at the stopping + * offsets specified by the stopping offsets {@link StartCursor}. + * + * @param stopCursorSupplier The {@link StopCursor} to specify the stopping offset. + * @return this PulsarSourceBuilder. + * @see #setBounded(SerializableSupplier) + */ + public PulsarSourceBuilder<IN, OUT> setUnbounded( + SerializableSupplier<StopCursor> stopCursorSupplier) { + this.boundedness = Boundedness.CONTINUOUS_UNBOUNDED; + this.stopCursorSupplier = stopCursorSupplier; + return this; + } + + /** + * By default the PulsarSource is set to run in {@link Boundedness#CONTINUOUS_UNBOUNDED} manner + * and thus never stops until the Flink job fails or is canceled. To let the PulsarSource run in + * {@link Boundedness#BOUNDED} manner and stops at some point, one can set an {@link + * StartCursor} to specify the stopping offsets for each partition. When all the partitions have + * reached their stopping offsets, the PulsarSource will then exit. + * + * <p>This method is different from {@link #setUnbounded(SerializableSupplier)} that after + * setting the stopping offsets with this method, {@link PulsarSource#getBoundedness()} will + * return {@link Boundedness#BOUNDED} instead of {@link Boundedness#CONTINUOUS_UNBOUNDED}. + * + * @param stopCursorSupplier the {@link StopCursor} to specify the stopping offsets. + * @return this PulsarSourceBuilder. + * @see #setUnbounded(SerializableSupplier) + */ + public PulsarSourceBuilder<IN, OUT> setBounded( + SerializableSupplier<StopCursor> stopCursorSupplier) { + this.boundedness = Boundedness.BOUNDED; + this.stopCursorSupplier = stopCursorSupplier; + return this; + } + + /** + * DeserializationSchema is required for getting the {@link Schema} for deserialize message from + * pulsar and getting the {@link TypeInformation} for message serialization in flink. + * + * <p>We have defined a set of implementations, using {@code + * PulsarDeserializationSchema#pulsarSchema} or {@code PulsarDeserializationSchema#flinkSchema} + * for creating the desired schema. + */ + public PulsarSourceBuilder<IN, OUT> setDeserializationSchema( Review comment: I don't succeed in your suggestion. Could your provide more detailed example. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java ########## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.pulsar.common.config.ConfigurationDataCustomizer; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer; +import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumerator; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; +import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; +import org.apache.flink.connector.pulsar.source.enumerator.subscriber.PulsarSubscriber; +import org.apache.flink.connector.pulsar.source.enumerator.topic.range.RangeGenerator; +import org.apache.flink.connector.pulsar.source.reader.PulsarSourceReaderFactory; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaInitializationContext; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit; +import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.function.SerializableSupplier; + +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; + +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createAdmin; +import static org.apache.flink.connector.pulsar.common.config.PulsarConfigUtils.createClient; +import static org.apache.flink.connector.pulsar.common.exception.PulsarExceptionUtils.sneaky; + +/** + * The Source implementation of Pulsar. Please use a {@link PulsarSourceBuilder} to construct a + * {@link PulsarSource}. The following example shows how to create a PulsarSource emitting records + * of <code>String</code> type. + * + * <pre>{@code + * PulsarSource<byte[], String> source = PulsarSource.<byte[], String>builder() + * .setTopics(TOPIC1, TOPIC2) + * .setServiceUrl(getServiceUrl()) + * .setAdminUrl(getAdminUrl()) + * .setSubscriptionName("test") + * .setDeserializationSchema(flinkSchema(new SimpleStringSchema())) + * .setBounded(StopCursor::defaultStopCursor) + * .build(); + * }</pre> + * + * <p>See {@link PulsarSourceBuilder} for more details. + * + * @param <IN> The input type of the pulsar {@link Message}. Review comment: Yep, IN is used because of supporting pulsar's built in `Schema`. The extra tests would be added for this. ########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarSchemaWrapper.java ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.reader.deserializer; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.ReflectionUtil; + +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; + +import java.nio.charset.StandardCharsets; +import java.util.Collections; + +/** + * The deserialization schema wrapper for pulsar original {@link Schema}. Pulsar would deserialize + * the message and pass it to flink with a auto generate or given {@link TypeInformation}. + * + * @param <T> The output type of the message. + */ +@Internal +class PulsarSchemaWrapper<T> extends PulsarDeserializationSchemaBase<T, T> { + private static final long serialVersionUID = -4864701207257059158L; + + public PulsarSchemaWrapper(PulsarSchemaFactory<T> schemaFactory) { + this(schemaFactory, createTypeInformation(schemaFactory)); + } + + public PulsarSchemaWrapper(Class<? extends PulsarSchemaFactory<T>> factoryClass) { + this( + ReflectionUtil.newInstance(factoryClass), + createTypeInformation(ReflectionUtil.newInstance(factoryClass))); + } + + public PulsarSchemaWrapper( + PulsarSchemaFactory<T> schemaFactory, TypeInformation<T> typeInformation) { + super( + schemaFactory, + message -> Collections.singletonList(message.getValue()), + typeInformation); + } + + /** + * Convert the {@link Schema} into a flink readable {@link TypeInformation}. We only support all + * the primitive types in pulsar built-in schema. Review comment: Since this merge request is urgent. We just provide the fundamental support and enhance the missing schemas in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
