AHeise commented on a change in pull request #15304: URL: https://github.com/apache/flink/pull/15304#discussion_r686741468
########## File path: flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java ########## @@ -0,0 +1,538 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.connector.pulsar.common.config.PulsarOptions; +import org.apache.flink.connector.pulsar.source.config.CursorVerification; + +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionType; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.configuration.description.TextElement.code; +import static org.apache.flink.configuration.description.TextElement.text; + +/** + * Configurations for PulsarSource. All the options list here could be configured in {@link + * PulsarSourceBuilder#setProperty(ConfigOption, Object)}. The {@link PulsarOptions} is also + * required for pulsar source. + * + * @see PulsarOptions + */ +@PublicEvolving +public final class PulsarSourceOptions { + + // Pulsar source connector config prefix. + private static final String SOURCE_CONFIG_PREFIX = "pulsar.source."; + // Pulsar consumer API config prefix. + private static final String CONSUMER_CONFIG_PREFIX = "pulsar.consumer."; + + private PulsarSourceOptions() { + // This is a constant class + } + + /////////////////////////////////////////////////////////////////////////////// + // + // The configuration for pulsar source part. + // All the configuration listed below should have the pulsar.source prefix. + // + /////////////////////////////////////////////////////////////////////////////// + + public static final ConfigOption<Long> PULSAR_PARTITION_DISCOVERY_INTERVAL_MS = + ConfigOptions.key(SOURCE_CONFIG_PREFIX + "partitionDiscoveryIntervalMs") + .longType() + .defaultValue(Duration.ofSeconds(30).toMillis()) + .withDescription( + "The interval in milliseconds for the Pulsar source to discover " + + "the new partitions. A non-positive value disables the partition discovery."); + + public static final ConfigOption<Boolean> PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE = + ConfigOptions.key(SOURCE_CONFIG_PREFIX + "enableAutoAcknowledgeMessage") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "We would using pulsar transaction for committing " + + "the consuming position when checkpointing. It would be efficient " + + "for supporting different subscription type.") + .linebreak() + .text( + "But if you have disabled the flink checkpoint or your pulsar cluster disabled the transaction," + + " make sure you have set this option to %s.", + code("true")) + .text( + "We would use pulsar client's internal mechanism and commit cursor in two ways.") + .list( + text( + "For Key_Shared and Shared subscription: the cursor would be committed once the message is consumed."), + text( + "For Exclusive and Failover subscription: the cursor would be committed in a fixed interval.")) + .build()); + + public static final ConfigOption<Long> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL = + ConfigOptions.key(SOURCE_CONFIG_PREFIX + "autoCommitCursorInterval") + .longType() + .defaultValue(Duration.ofSeconds(5).toMillis()) + .withDescription( + "This option is used only when user disabled checkpoint" + + " and using Exclusive or Failover subscription"); + + public static final ConfigOption<Long> PULSAR_TRANSACTION_TIMEOUT_MILLIS = + ConfigOptions.key(SOURCE_CONFIG_PREFIX + "transactionTimeoutMillis") + .longType() + .defaultValue(Duration.ofHours(3).toMillis()) + .withDescription( + Description.builder() + .text( + "This option is used for when using Shared or Key_Shared subscription." + + " You should set this option when you didn't enable the %s option.", + code("pulsar.source.enableAutoAcknowledgeMessage")) + .linebreak() + .text( + "This value should be greater than the checkpoint interval.") + .build()); + + public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME = + ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchTime") + .longType() + .defaultValue(Duration.ofSeconds(10).toMillis()) + .withDescription( + Description.builder() + .text( + "The max time to wait when fetching records. " + + "A longer time increases throughput but also latency. " + + "A fetch batch might be finished earlier because of %s.", + code("pulsar.source.maxFetchRecords")) + .build()); + + public static final ConfigOption<Integer> PULSAR_MAX_FETCH_RECORDS = + ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchRecords") + .intType() + .defaultValue(100) + .withDescription( + Description.builder() + .text( + "The max number of records to fetch to wait when polling. " + + "A longer time increases throughput but also latency." + + "A fetch batch might be finished earlier because of %s.", + code("pulsar.source.maxFetchTime")) + .build()); + + public static final ConfigOption<CursorVerification> PULSAR_VERIFY_INITIAL_OFFSETS = + ConfigOptions.key(SOURCE_CONFIG_PREFIX + "verifyInitialOffsets") + .enumType(CursorVerification.class) + .defaultValue(CursorVerification.WARN_ON_MISMATCH) + .withDescription( + "Upon (re)starting the source checks whether the expected message can be read. " + + "If failure is enabled the application fails, else it logs a warning. " + + "A possible solution is to adjust the retention settings in pulsar or ignoring the check result."); + + /////////////////////////////////////////////////////////////////////////////// + // + // The configuration for ConsumerConfigurationData part. + // All the configuration listed below should have the pulsar.consumer prefix. + // + /////////////////////////////////////////////////////////////////////////////// + + public static final ConfigOption<String> PULSAR_TOPIC_NAMES = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "topicNames") + .stringType() + .defaultValue("[]") + .withDescription("Topic name."); + + public static final ConfigOption<String> PULSAR_TOPICS_PATTERN = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "topicsPattern") + .stringType() + .noDefaultValue() + .withDescription("Topic pattern."); + + public static final ConfigOption<String> PULSAR_SUBSCRIPTION_NAME = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionName") + .stringType() + .noDefaultValue() + .withDescription("Subscription name."); + + public static final ConfigOption<SubscriptionType> PULSAR_SUBSCRIPTION_TYPE = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionType") + .enumType(SubscriptionType.class) + .defaultValue(SubscriptionType.Shared) + .withDescription( + Description.builder() + .text("Subscription type.") + .linebreak() + .linebreak() + .text("Four subscription types are available:") + .list( + text("Exclusive"), + text("Failover"), + text("Shared"), + text("Key_Shared")) + .build()); + + public static final ConfigOption<SubscriptionMode> PULSAR_SUBSCRIPTION_MODE = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionMode") + .enumType(SubscriptionMode.class) + .defaultValue(SubscriptionMode.Durable); + + public static final ConfigOption<Integer> PULSAR_RECEIVER_QUEUE_SIZE = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "receiverQueueSize") + .intType() + .defaultValue(1000) + .withDescription( + Description.builder() + .text("Size of a consumer's receiver queue.") + .linebreak() + .text( + "For example, the number of messages accumulated by a consumer before an application calls %s.", + code("Receive")) + .linebreak() + .text( + "A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.") + .build()); + + public static final ConfigOption<Long> PULSAR_ACKNOWLEDGEMENTS_GROUP_TIME_MICROS = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "acknowledgementsGroupTimeMicros") + .longType() + .defaultValue(TimeUnit.MILLISECONDS.toMicros(100)) + .withDescription( + Description.builder() + .text("Group a consumer acknowledgment for a specified time.") + .linebreak() + .text( + "By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.") + .linebreak() + .text( + "Setting a group time of 0 sends out acknowledgments immediately.") + .linebreak() + .text( + "A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.") + .build()); + + public static final ConfigOption<Long> PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "negativeAckRedeliveryDelayMicros") + .longType() + .defaultValue(TimeUnit.MINUTES.toMicros(1)) + .withDescription( + Description.builder() + .text( + "Delay to wait before redelivering messages that failed to be processed.") + .linebreak() + .text( + "When an application uses %s, failed messages are redelivered after a fixed timeout.", + code("Consumer#negativeAcknowledge(Message)")) + .build()); + + public static final ConfigOption<Integer> + PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = + ConfigOptions.key( + CONSUMER_CONFIG_PREFIX + + "maxTotalReceiverQueueSizeAcrossPartitions") + .intType() + .defaultValue(50000) + .withDescription( + Description.builder() + .text( + "The max total receiver queue size across partitions.") + .linebreak() + .text( + "This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.") + .build()); + + public static final ConfigOption<String> PULSAR_CONSUMER_NAME = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "consumerName") + .stringType() + .noDefaultValue() + .withDescription("Consumer name."); + + public static final ConfigOption<Long> PULSAR_ACK_TIMEOUT_MILLIS = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "ackTimeoutMillis") + .longType() + .defaultValue(0L) + .withDescription("Timeout of unacknowledged messages."); + + public static final ConfigOption<Long> PULSAR_TICK_DURATION_MILLIS = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "tickDurationMillis") + .longType() + .defaultValue(1000L) + .withDescription( + Description.builder() + .text("Granularity of the ack-timeout redelivery.") + .linebreak() + .text( + "Using an higher %s reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).", + code("tickDurationMillis")) + .build()); + + public static final ConfigOption<Integer> PULSAR_PRIORITY_LEVEL = + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "priorityLevel") // NOSONAR + .intType() + .defaultValue(0) + .withDescription( + Description.builder() + .text( + "Priority level for a consumer to which a broker gives more priority while dispatching messages in the shared subscription mode.") + .linebreak() + .text( + "The broker follows descending priorities. For example, 0=max-priority, 1, 2,...") + .linebreak() + .text( + "In shared subscription mode, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.") + .linebreak() + .linebreak() + .text("Example 1") Review comment: I was assuming that `Example 1` is just a placeholder where you wanted to add anything. I didn't perceive it as a caption; so please leave as is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
