This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 77012db20dc4728732acff22b3d0da7a4b57c7f4 Author: Yufan Sheng <[email protected]> AuthorDate: Fri Sep 10 12:09:54 2021 +0800 [FLINK-23864][docs] Add flink-connector-pulsar module to flink-docs, auto generate the config document. --- .../generated/pulsar_admin_configuration.html | 42 ++++ .../generated/pulsar_client_configuration.html | 222 ++++++++++++++++++++ .../generated/pulsar_consumer_configuration.html | 174 +++++++++++++++ .../generated/pulsar_source_configuration.html | 54 +++++ .../pulsar/common/config/PulsarOptions.java | 153 ++++++++------ .../pulsar/source/PulsarSourceOptions.java | 233 ++++++++++----------- flink-docs/pom.xml | 6 + .../configuration/ConfigOptionsDocGenerator.java | 8 +- 8 files changed, 698 insertions(+), 194 deletions(-) diff --git a/docs/layouts/shortcodes/generated/pulsar_admin_configuration.html b/docs/layouts/shortcodes/generated/pulsar_admin_configuration.html new file mode 100644 index 0000000..31c7273 --- /dev/null +++ b/docs/layouts/shortcodes/generated/pulsar_admin_configuration.html @@ -0,0 +1,42 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>pulsar.admin.adminUrl</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The Pulsar service HTTP URL for the admin endpoint. For example, <code class="highlighter-rouge">http://my-broker.example.com:8080</code>, or <code class="highlighter-rouge">https://my-broker.example.com:8443</code> for TLS.</td> + </tr> + <tr> + <td><h5>pulsar.admin.autoCertRefreshTime</h5></td> + <td style="word-wrap: break-word;">300000</td> + <td>Integer</td> + <td>The auto cert refresh time (in ms) if Pulsar admin supports TLS authentication.</td> + </tr> + <tr> + <td><h5>pulsar.admin.connectTimeout</h5></td> + <td style="word-wrap: break-word;">60000</td> + <td>Integer</td> + <td>The connection time out (in ms) for the PulsarAdmin client.</td> + </tr> + <tr> + <td><h5>pulsar.admin.readTimeout</h5></td> + <td style="word-wrap: break-word;">60000</td> + <td>Integer</td> + <td>The server response read timeout (in ms) for the PulsarAdmin client for any request.</td> + </tr> + <tr> + <td><h5>pulsar.admin.requestTimeout</h5></td> + <td style="word-wrap: break-word;">300000</td> + <td>Integer</td> + <td>The server request timeout (in ms) for the PulsarAdmin client for any request.</td> + </tr> + </tbody> +</table> diff --git a/docs/layouts/shortcodes/generated/pulsar_client_configuration.html b/docs/layouts/shortcodes/generated/pulsar_client_configuration.html new file mode 100644 index 0000000..d44a93c --- /dev/null +++ b/docs/layouts/shortcodes/generated/pulsar_client_configuration.html @@ -0,0 +1,222 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>pulsar.client.authParamMap</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>Map</td> + <td>Parameters for the authentication plugin.</td> + </tr> + <tr> + <td><h5>pulsar.client.authParams</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Parameters for the authentication plugin.<br /><br />Example:<br /><code class="highlighter-rouge">key1:val1,key2:val2</code></td> + </tr> + <tr> + <td><h5>pulsar.client.authPluginClassName</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the authentication plugin.</td> + </tr> + <tr> + <td><h5>pulsar.client.concurrentLookupRequest</h5></td> + <td style="word-wrap: break-word;">5000</td> + <td>Integer</td> + <td>The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on the broker. It should be configured with a higher value only in case of it requires to produce or subscribe on thousands of topic using a created <code class="highlighter-rouge">PulsarClient</code></td> + </tr> + <tr> + <td><h5>pulsar.client.connectionTimeoutMs</h5></td> + <td style="word-wrap: break-word;">10000</td> + <td>Integer</td> + <td>Duration (in ms) of waiting for a connection to a broker to be established.<br />If the duration passes without a response from a broker, the connection attempt is dropped.</td> + </tr> + <tr> + <td><h5>pulsar.client.connectionsPerBroker</h5></td> + <td style="word-wrap: break-word;">1</td> + <td>Integer</td> + <td>The maximum number of connections that the client library will open to a single broker.<br /> By default, the connection pool will use a single connection for all the producers and consumers. Increasing this parameter may improve throughput when using many producers over a high latency connection.</td> + </tr> + <tr> + <td><h5>pulsar.client.enableBusyWait</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Option to enable busy-wait settings.<br />This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches. The spinning will consume 100% CPU even when the broker is not doing any work. It is recommended to reduce the number of IO threads and BookKeeper client threads to only have fewer CPU cores busy.</td> + </tr> + <tr> + <td><h5>pulsar.client.enableTransaction</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If transaction is enabled, start the <code class="highlighter-rouge">transactionCoordinatorClient</code> with <code class="highlighter-rouge">PulsarClient</code>.</td> + </tr> + <tr> + <td><h5>pulsar.client.initialBackoffIntervalNanos</h5></td> + <td style="word-wrap: break-word;">100000000</td> + <td>Long</td> + <td>Default duration (in nanoseconds) for a backoff interval.</td> + </tr> + <tr> + <td><h5>pulsar.client.keepAliveIntervalSeconds</h5></td> + <td style="word-wrap: break-word;">30</td> + <td>Integer</td> + <td>Interval (in seconds) for keeping connection between the Pulsar client and broker alive.</td> + </tr> + <tr> + <td><h5>pulsar.client.listenerName</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Configure the <code class="highlighter-rouge">listenerName</code> that the broker will return the corresponding <code class="highlighter-rouge">advertisedListener</code>.</td> + </tr> + <tr> + <td><h5>pulsar.client.maxBackoffIntervalNanos</h5></td> + <td style="word-wrap: break-word;">60000000000</td> + <td>Long</td> + <td>The maximum duration (in nanoseconds) for a backoff interval.</td> + </tr> + <tr> + <td><h5>pulsar.client.maxLookupRedirects</h5></td> + <td style="word-wrap: break-word;">20</td> + <td>Integer</td> + <td>The maximum number of times a lookup-request redirections to a broker.</td> + </tr> + <tr> + <td><h5>pulsar.client.maxLookupRequest</h5></td> + <td style="word-wrap: break-word;">50000</td> + <td>Integer</td> + <td>The maximum number of lookup requests allowed on each broker connection to prevent overload on the broker. It should be greater than <code class="highlighter-rouge">maxConcurrentLookupRequests</code>. Requests that inside <code class="highlighter-rouge">maxConcurrentLookupRequests</code> are already sent to broker, and requests beyond <code class="highlighter-rouge">maxConcurrentLookupRequests</code> and under <code class="highlighter-rouge">maxLookupRequests</code> will [...] + </tr> + <tr> + <td><h5>pulsar.client.maxNumberOfRejectedRequestPerConnection</h5></td> + <td style="word-wrap: break-word;">50</td> + <td>Integer</td> + <td>The maximum number of rejected requests of a broker in a certain period (30s) after the current connection is closed and the client creates a new connection to connect to a different broker.</td> + </tr> + <tr> + <td><h5>pulsar.client.memoryLimitBytes</h5></td> + <td style="word-wrap: break-word;">0</td> + <td>Long</td> + <td>The limit (in bytes) on the amount of direct memory that will be allocated by this client instance.<br />Note: at this moment this is only limiting the memory for producers. Setting this to <code class="highlighter-rouge">0</code> will disable the limit.</td> + </tr> + <tr> + <td><h5>pulsar.client.numIoThreads</h5></td> + <td style="word-wrap: break-word;">1</td> + <td>Integer</td> + <td>The number of threads used for handling connections to brokers.</td> + </tr> + <tr> + <td><h5>pulsar.client.numListenerThreads</h5></td> + <td style="word-wrap: break-word;">1</td> + <td>Integer</td> + <td>The number of threads used for handling message listeners. The listener thread pool is shared across all the consumers and readers that are using a <code class="highlighter-rouge">listener</code> model to get messages. For a given consumer, the listener is always invoked from the same thread to ensure ordering.</td> + </tr> + <tr> + <td><h5>pulsar.client.operationTimeoutMs</h5></td> + <td style="word-wrap: break-word;">30000</td> + <td>Integer</td> + <td>Operation timeout (in ms). Operations such as creating producers, subscribing or unsubscribing topics are retried during this interval. If the operation is not completed during this interval, the operation will be marked as failed.</td> + </tr> + <tr> + <td><h5>pulsar.client.proxyProtocol</h5></td> + <td style="word-wrap: break-word;">SNI</td> + <td><p>Enum</p></td> + <td>Protocol type to determine the type of proxy routing when a client connects to the proxy using <code class="highlighter-rouge">pulsar.client.proxyServiceUrl</code>.<br /><br />Possible values:<ul><li>"SNI"</li></ul></td> + </tr> + <tr> + <td><h5>pulsar.client.proxyServiceUrl</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Proxy-service URL when a client connects to the broker via the proxy. The client can choose the type of proxy-routing.</td> + </tr> + <tr> + <td><h5>pulsar.client.requestTimeoutMs</h5></td> + <td style="word-wrap: break-word;">60000</td> + <td>Integer</td> + <td>Maximum duration (in ms) for completing a request.</td> + </tr> + <tr> + <td><h5>pulsar.client.serviceUrl</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Service URL provider for Pulsar service.<br />To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.<br />You can assign Pulsar protocol URLs to specific clusters and use the <code class="highlighter-rouge">pulsar</code> scheme.<br /><ul><li>This is an example of <code class="highlighter-rouge">localhost</code>: <code class="highlighter-rouge">pulsar://localhost:6650</code>.</li><li>If you have multiple brokers, the URL is as: <code class= [...] + </tr> + <tr> + <td><h5>pulsar.client.sslProvider</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The name of the security provider used for SSL connections. The default value is the default security provider of the JVM.</td> + </tr> + <tr> + <td><h5>pulsar.client.statsIntervalSeconds</h5></td> + <td style="word-wrap: break-word;">60</td> + <td>Long</td> + <td>Interval between each stats info.<br /><ul><li>Stats is activated with positive <code class="highlighter-rouge">statsInterval</code></li><li>Set <code class="highlighter-rouge">statsIntervalSeconds</code> to 1 second at least.</li></ul></td> + </tr> + <tr> + <td><h5>pulsar.client.tlsAllowInsecureConnection</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether the Pulsar client accepts untrusted TLS certificate from the broker.</td> + </tr> + <tr> + <td><h5>pulsar.client.tlsCiphers</h5></td> + <td style="word-wrap: break-word;"></td> + <td>List<String></td> + <td>A list of cipher suites. This is a named combination of authentication, encryption, MAC and the key exchange algorithm used to negotiate the security settings for a network connection using the TLS or SSL network protocol. By default all the available cipher suites are supported.</td> + </tr> + <tr> + <td><h5>pulsar.client.tlsHostnameVerificationEnable</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Whether to enable TLS hostname verification. It allows to validate hostname verification when a client connects to the broker over TLS. It validates incoming x509 certificate and matches provided hostname (CN/SAN) with the expected broker's host name. It follows RFC 2818, 3.1. Server Identity hostname verification.</td> + </tr> + <tr> + <td><h5>pulsar.client.tlsProtocols</h5></td> + <td style="word-wrap: break-word;"></td> + <td>List<String></td> + <td>The SSL protocol used to generate the SSLContext. By default, it is set TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.</td> + </tr> + <tr> + <td><h5>pulsar.client.tlsTrustCertsFilePath</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Path to the trusted TLS certificate file.</td> + </tr> + <tr> + <td><h5>pulsar.client.tlsTrustStorePassword</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The store password for the key store file.</td> + </tr> + <tr> + <td><h5>pulsar.client.tlsTrustStorePath</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The location of the trust store file.</td> + </tr> + <tr> + <td><h5>pulsar.client.tlsTrustStoreType</h5></td> + <td style="word-wrap: break-word;">"JKS"</td> + <td>String</td> + <td>The file format of the trust store file.</td> + </tr> + <tr> + <td><h5>pulsar.client.useKeyStoreTls</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If TLS is enabled, whether use the KeyStore type as the TLS configuration parameter. If it is set to <code class="highlighter-rouge">false</code>, it means to use the default pem type configuration.</td> + </tr> + <tr> + <td><h5>pulsar.client.useTcpNoDelay</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Whether to use the TCP no-delay flag on the connection to disable Nagle algorithm.<br />No-delay features ensures that packets are sent out on the network as soon as possible, and it is critical to achieve low latency publishes. On the other hand, sending out a huge number of small packets might limit the overall throughput. Therefore, if latency is not a concern, it is recommended to set the <code class="highlighter-rouge">useTcpNoDelay</code> flag to <code class="highli [...] + </tr> + </tbody> +</table> diff --git a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html new file mode 100644 index 0000000..bc8b6df --- /dev/null +++ b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html @@ -0,0 +1,174 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>pulsar.consumer.ackReceiptEnabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Acknowledgement will return a receipt but this does not mean that the message will not be resent after getting the receipt.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.ackTimeoutMillis</h5></td> + <td style="word-wrap: break-word;">0</td> + <td>Long</td> + <td>The timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.<br />By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.<br />When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer (possibly to a different consum [...] + </tr> + <tr> + <td><h5>pulsar.consumer.acknowledgementsGroupTimeMicros</h5></td> + <td style="word-wrap: break-word;">100000</td> + <td>Long</td> + <td>Group a consumer acknowledgment for a specified time (in μs). By default, a consumer uses <code class="highlighter-rouge">100μs</code> grouping time to send out acknowledgments to a broker. If the group time is set to <code class="highlighter-rouge">0</code>, acknowledgments are sent out immediately. A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this <code class="highlighter-rouge">pulsar.consumer.maxPendingChunkedMessage</code> threshold. Once a consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if <code class="highlighter-rouge">pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull</code> is true. Otherwise, it marks them for redel [...] + </tr> + <tr> + <td><h5>pulsar.consumer.autoUpdatePartitionsIntervalSeconds</h5></td> + <td style="word-wrap: break-word;">60</td> + <td>Integer</td> + <td>The interval (in seconds) of updating partitions. This only works if autoUpdatePartitions is enabled.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.consumerName</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The consumer name is informative and it can be used to identify a particular consumer instance from the topic stats.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.cryptoFailureAction</h5></td> + <td style="word-wrap: break-word;">FAIL</td> + <td><p>Enum</p></td> + <td>The consumer should take action when it receives a message that can not be decrypted.<ul><li><code class="highlighter-rouge">FAIL</code>: this is the default option to fail messages until crypto succeeds.</li><li><code class="highlighter-rouge">DISCARD</code>: silently acknowledge but do not deliver messages to an application.</li><li><code class="highlighter-rouge">CONSUME</code>: deliver encrypted messages to applications. It is the application's responsibility to decry [...] + </tr> + <tr> + <td><h5>pulsar.consumer.deadLetterPolicy.deadLetterTopic</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the dead topic where the failed messages are sent.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.deadLetterPolicy.maxRedeliverCount</h5></td> + <td style="word-wrap: break-word;">0</td> + <td>Integer</td> + <td>The maximum number of times that a message are redelivered before being sent to the dead letter queue.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.deadLetterPolicy.retryLetterTopic</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Name of the retry topic where the failed messages are sent.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.expireTimeOfIncompleteChunkedMessageMillis</h5></td> + <td style="word-wrap: break-word;">60000</td> + <td>Long</td> + <td>If a producer fails to publish all the chunks of a message, the consumer can expire incomplete chunks if the consumer cannot receive all chunks in expire times (default 1 hour, in ms).</td> + </tr> + <tr> + <td><h5>pulsar.consumer.maxPendingChunkedMessage</h5></td> + <td style="word-wrap: break-word;">10</td> + <td>Integer</td> + <td>The consumer buffers chunk messages into memory until it receives all the chunks of the original message. While consuming chunk-messages, chunks from the same message might not be contiguous in the stream and they might be mixed with other messages' chunks. So, consumer has to maintain multiple buffers to manage chunks coming from different messages. This mainly happens when multiple publishers are publishing messages on the topic concurrently or publishers failed to publ [...] + </tr> + <tr> + <td><h5>pulsar.consumer.maxTotalReceiverQueueSizeAcrossPartitions</h5></td> + <td style="word-wrap: break-word;">50000</td> + <td>Integer</td> + <td>The maximum total receiver queue size across partitions.<br />This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.negativeAckRedeliveryDelayMicros</h5></td> + <td style="word-wrap: break-word;">60000000</td> + <td>Long</td> + <td>Delay (in μs) to wait before redelivering messages that failed to be processed.<br />When an application uses <code class="highlighter-rouge">Consumer.negativeAcknowledge(Message)</code>, failed messages are redelivered after a fixed timeout.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.poolMessages</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Enable pooling of messages and the underlying data buffers.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.priorityLevel</h5></td> + <td style="word-wrap: break-word;">0</td> + <td>Integer</td> + <td>Priority level for a consumer to which a broker gives more priorities while dispatching messages in the shared subscription type.<br />The broker follows descending priorities. For example, 0=max-priority, 1, 2,...<br />In shared subscription mode, the broker first dispatches messages to the consumers on the highest priority level if they have permits. Otherwise, the broker considers consumers on the next priority level.<br /><br />Example 1<br />If a subscription has con [...] +C1, 0, 2 +C2, 0, 1 +C3, 0, 1 +C4, 1, 2 +C5, 1, 1 +<br />The order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.properties</h5></td> + <td style="word-wrap: break-word;"></td> + <td>Map</td> + <td>A name or value property of this consumer. <code class="highlighter-rouge">properties</code> is application defined metadata attached to a consumer. When getting a topic stats, associate this metadata with the consumer stats for easier identification.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.readCompacted</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If enabling <code class="highlighter-rouge">readCompacted</code>, a consumer reads messages from a compacted topic rather than reading a full message backlog of a topic.<br />A consumer only sees the latest value for each key in the compacted topic, up until reaching the point in the topic message when compacting backlog. Beyond that point, send messages as normal.<br />Only enabling <code class="highlighter-rouge">readCompacted</code> on subscriptions to persistent topic [...] + </tr> + <tr> + <td><h5>pulsar.consumer.receiverQueueSize</h5></td> + <td style="word-wrap: break-word;">1000</td> + <td>Integer</td> + <td>Size of a consumer's receiver queue.<br />For example, the number of messages accumulated by a consumer before an application calls <code class="highlighter-rouge">Receive</code>.<br />A value higher than the default value increases consumer throughput, though at the expense of more memory utilization.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.replicateSubscriptionState</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If <code class="highlighter-rouge">replicateSubscriptionState</code> is enabled, a subscription state is replicated to geo-replicated clusters.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.retryEnable</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>If enabled, the consumer will automatically retry messages.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td> + <td style="word-wrap: break-word;">Latest</td> + <td><p>Enum</p></td> + <td>Initial position at which to set cursor when subscribing to a topic at first time.<br /><br />Possible values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td> + </tr> + <tr> + <td><h5>pulsar.consumer.subscriptionMode</h5></td> + <td style="word-wrap: break-word;">Durable</td> + <td><p>Enum</p></td> + <td>Select the subscription mode to be used when subscribing to the topic.<ul><li><code class="highlighter-rouge">Durable</code>: Make the subscription to be backed by a durable cursor that will retain messages and persist the current position.</li><li><code class="highlighter-rouge">NonDurable</code>: Lightweight subscription mode that doesn't have a durable cursor associated</li></ul><br /><br />Possible values:<ul><li>"Durable"</li><li>"NonDurable"</li></ul></td> + </tr> + <tr> + <td><h5>pulsar.consumer.subscriptionName</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>Specify the subscription name for this consumer. This argument is required when constructing the consumer.</td> + </tr> + <tr> + <td><h5>pulsar.consumer.subscriptionType</h5></td> + <td style="word-wrap: break-word;">Shared</td> + <td><p>Enum</p></td> + <td>Subscription type.<br /><br />Four subscription types are available:<ul><li>Exclusive</li><li>Failover</li><li>Shared</li><li>Key_Shared</li></ul><br /><br />Possible values:<ul><li>"Exclusive"</li><li>"Shared"</li><li>"Failover"</li><li>"Key_Shared"</li></ul></td> + </tr> + <tr> + <td><h5>pulsar.consumer.tickDurationMillis</h5></td> + <td style="word-wrap: break-word;">1000</td> + <td>Long</td> + <td>Granularity (in ms) of the ack-timeout redelivery.<br />A greater (for example, 1 hour) <code class="highlighter-rouge">tickDurationMillis</code> reduces the memory overhead to track messages.</td> + </tr> + </tbody> +</table> diff --git a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html new file mode 100644 index 0000000..02e512b --- /dev/null +++ b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html @@ -0,0 +1,54 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>pulsar.source.autoCommitCursorInterval</h5></td> + <td style="word-wrap: break-word;">5000</td> + <td>Long</td> + <td>This option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription. We would automatically commit the cursor using the given period (in ms).</td> + </tr> + <tr> + <td><h5>pulsar.source.enableAutoAcknowledgeMessage</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Flink commits the consuming position with pulsar transactions on checkpoint. However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster, ensure that you have set this option to <code class="highlighter-rouge">true</code>.<br />The source would use pulsar client's internal mechanism and commit cursor in two ways.<ul><li>For <code class="highlighter-rouge">Key_Shared</code> and <code class="highlighter-rouge">Shared</code> subscriptio [...] + </tr> + <tr> + <td><h5>pulsar.source.maxFetchRecords</h5></td> + <td style="word-wrap: break-word;">100</td> + <td>Integer</td> + <td>The maximum number of records to fetch to wait when polling. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of <code class="highlighter-rouge">pulsar.source.maxFetchTime</code>.</td> + </tr> + <tr> + <td><h5>pulsar.source.maxFetchTime</h5></td> + <td style="word-wrap: break-word;">10000</td> + <td>Long</td> + <td>The maximum time (in ms) to wait when fetching records. A longer time increases throughput but also latency. A fetch batch might be finished earlier because of <code class="highlighter-rouge">pulsar.source.maxFetchRecords</code>.</td> + </tr> + <tr> + <td><h5>pulsar.source.partitionDiscoveryIntervalMs</h5></td> + <td style="word-wrap: break-word;">30000</td> + <td>Long</td> + <td>The interval (in ms) for the Pulsar source to discover the new partitions. A non-positive value disables the partition discovery.</td> + </tr> + <tr> + <td><h5>pulsar.source.transactionTimeoutMillis</h5></td> + <td style="word-wrap: break-word;">10800000</td> + <td>Long</td> + <td>This option is used in <code class="highlighter-rouge">Shared</code> or <code class="highlighter-rouge">Key_Shared</code> subscription. You should configure this option when you do not enable the <code class="highlighter-rouge">pulsar.source.enableAutoAcknowledgeMessage</code> option.<br />The value (in ms) should be greater than the checkpoint interval.</td> + </tr> + <tr> + <td><h5>pulsar.source.verifyInitialOffsets</h5></td> + <td style="word-wrap: break-word;">WARN_ON_MISMATCH</td> + <td><p>Enum</p></td> + <td>Upon (re)starting the source, check whether the expected message can be read. If failure is enabled, the application fails. Otherwise, it logs a warning. A possible solution is to adjust the retention settings in Pulsar or ignoring the check result.<br /><br />Possible values:<ul><li>"FAIL_ON_MISMATCH"</li><li>"WARN_ON_MISMATCH"</li></ul></td> + </tr> + </tbody> +</table> diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java index 4c33a70..db90bf1 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java @@ -31,7 +31,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import static java.util.Collections.emptyMap; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.flink.configuration.description.TextElement.code; import static org.apache.flink.configuration.description.TextElement.text; @@ -48,6 +47,7 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIE @ConfigGroup(name = "PulsarClient", keyPrefix = CLIENT_CONFIG_PREFIX), @ConfigGroup(name = "PulsarAdmin", keyPrefix = ADMIN_CONFIG_PREFIX) }) +@SuppressWarnings("java:S1192") public final class PulsarOptions { // Pulsar client API config prefix. @@ -112,8 +112,7 @@ public final class PulsarOptions { .noDefaultValue() .withDescription( Description.builder() - .text( - "String represents parameters for the authentication plugin.") + .text("Parameters for the authentication plugin.") .linebreak() .linebreak() .text("Example:") @@ -124,9 +123,8 @@ public final class PulsarOptions { public static final ConfigOption<Map<String, String>> PULSAR_AUTH_PARAM_MAP = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authParamMap") .mapType() - .defaultValue(emptyMap()) - .withDescription( - "Map which represents parameters for the authentication plugin."); + .noDefaultValue() + .withDescription("Parameters for the authentication plugin."); public static final ConfigOption<Integer> PULSAR_OPERATION_TIMEOUT_MS = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "operationTimeoutMs") @@ -134,10 +132,11 @@ public final class PulsarOptions { .defaultValue(30000) .withDescription( Description.builder() - .text("Operation timeout (in millis).") + .text("Operation timeout (in ms).") .text( - "Producer-create, subscribe and unsubscribe operations will be retried until this interval," - + " after which the operation will be marked as failed.") + " Operations such as creating producers, subscribing or unsubscribing topics are retried during this interval.") + .text( + " If the operation is not completed during this interval, the operation will be marked as failed.") .build()); public static final ConfigOption<Long> PULSAR_STATS_INTERVAL_SECONDS = @@ -153,7 +152,7 @@ public final class PulsarOptions { "Stats is activated with positive %s", code("statsInterval")), text( - "Set %s to 1 second at least", + "Set %s to 1 second at least.", code("statsIntervalSeconds"))) .build()); @@ -173,10 +172,10 @@ public final class PulsarOptions { .text( "The number of threads used for handling message listeners.") .text( - "The listener thread pool is shared across all the consumers and readers that are using a %s model to get messages.", + " The listener thread pool is shared across all the consumers and readers that are using a %s model to get messages.", code("listener")) .text( - "For a given consumer, the listener will be always invoked from the same thread, to ensure ordering.") + " For a given consumer, the listener is always invoked from the same thread to ensure ordering.") .build()); public static final ConfigOption<Integer> PULSAR_CONNECTIONS_PER_BROKER = @@ -186,12 +185,12 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "Sets the max number of connection that the client library will open to a single broker.") + "The maximum number of connections that the client library will open to a single broker.") .linebreak() .text( - "By default, the connection pool will use a single connection for all the producers and consumers.") + " By default, the connection pool will use a single connection for all the producers and consumers.") .text( - "Increasing this parameter may improve throughput when using many producers over a high latency connection.") + " Increasing this parameter may improve throughput when using many producers over a high latency connection.") .build()); public static final ConfigOption<Boolean> PULSAR_USE_TCP_NO_DELAY = @@ -201,18 +200,18 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "Whether to use TCP no-delay flag on the connection to disable Nagle algorithm.") + "Whether to use the TCP no-delay flag on the connection to disable Nagle algorithm.") .linebreak() .text( - "No-delay features make sure packets are sent out on the network as soon as possible,") - .text("and it's critical to achieve low latency publishes.") + "No-delay features ensures that packets are sent out on the network as soon as possible,") + .text(" and it is critical to achieve low latency publishes.") .text( - "On the other hand, sending out a huge number of small packets might limit the overall throughput,") + " On the other hand, sending out a huge number of small packets might limit the overall throughput.") .text( - "so if latency is not a concern, it's advisable to set the %s flag to false.", - code("useTcpNoDelay")) + " Therefore, if latency is not a concern, it is recommended to set the %s flag to %s.", + code("useTcpNoDelay"), code("false")) .linebreak() - .text("Default value is true.") + .text("By default, it is set to %s.", code("true")) .build()); public static final ConfigOption<String> PULSAR_TLS_TRUST_CERTS_FILE_PATH = @@ -226,7 +225,7 @@ public final class PulsarOptions { .booleanType() .defaultValue(false) .withDescription( - "Whether the Pulsar client accepts untrusted TLS certificate from broker."); + "Whether the Pulsar client accepts untrusted TLS certificate from the broker."); public static final ConfigOption<Boolean> PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "tlsHostnameVerificationEnable") @@ -236,11 +235,11 @@ public final class PulsarOptions { Description.builder() .text("Whether to enable TLS hostname verification.") .text( - "It allows to validate hostname verification when client connects to broker over tls.") + " It allows to validate hostname verification when a client connects to the broker over TLS.") .text( - "It validates incoming x509 certificate and matches provided hostname(CN/SAN) with expected broker's host name.") + " It validates incoming x509 certificate and matches provided hostname (CN/SAN) with the expected broker's host name.") .text( - "It follows RFC 2818, 3.1. Server Identity hostname verification.") + " It follows RFC 2818, 3.1. Server Identity hostname verification.") .build()); public static final ConfigOption<Integer> PULSAR_CONCURRENT_LOOKUP_REQUEST = @@ -250,10 +249,9 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on broker.") + "The number of concurrent lookup requests allowed to send on each broker connection to prevent overload on the broker.") .text( - "It should be configured with higher value only in case of it requires" - + " to produce/subscribe on thousands of topic using created %s", + " It should be configured with a higher value only in case of it requires to produce or subscribe on thousands of topic using a created %s", code("PulsarClient")) .build()); @@ -264,12 +262,17 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "The maximum number of lookup requests allowed on each broker connection to prevent overload on broker.") - .text("It should be bigger than maxConcurrentLookupRequests.") + "The maximum number of lookup requests allowed on each broker connection to prevent overload on the broker.") + .text( + " It should be greater than %s.", + code("maxConcurrentLookupRequests")) .text( - "Requests that inside maxConcurrentLookupRequests already send to broker,") + " Requests that inside %s are already sent to broker,", + code("maxConcurrentLookupRequests")) .text( - "and requests beyond maxConcurrentLookupRequests and under maxLookupRequests will wait in each client cnx.") + " and requests beyond %s and under %s will wait in each client cnx.", + code("maxConcurrentLookupRequests"), + code("maxLookupRequests")) .build()); public static final ConfigOption<Integer> PULSAR_MAX_LOOKUP_REDIRECTS = @@ -277,23 +280,26 @@ public final class PulsarOptions { .intType() .defaultValue(20) .withDescription( - "Set the maximum number of times a lookup-request to a broker will be redirected."); + "The maximum number of times a lookup-request redirections to a broker."); public static final ConfigOption<Integer> PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "maxNumberOfRejectedRequestPerConnection") .intType() .defaultValue(50) .withDescription( - "The maximum number of rejected requests of a broker in a certain time" - + " frame (30 seconds) after the current connection is closed and" - + " the client creates a new connection to connect to a different broker."); + Description.builder() + .text( + "The maximum number of rejected requests of a broker in a certain period (30s) after the current connection is closed") + .text( + " and the client creates a new connection to connect to a different broker.") + .build()); public static final ConfigOption<Integer> PULSAR_KEEP_ALIVE_INTERVAL_SECONDS = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "keepAliveIntervalSeconds") .intType() .defaultValue(30) .withDescription( - "Seconds of keeping alive interval for each client broker connection."); + "Interval (in seconds) for keeping connection between the Pulsar client and broker alive."); public static final ConfigOption<Integer> PULSAR_CONNECTION_TIMEOUT_MS = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "connectionTimeoutMs") @@ -302,18 +308,18 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "Duration (in millis) of waiting for a connection to a broker to be established.") + "Duration (in ms) of waiting for a connection to a broker to be established.") .linebreak() .text( "If the duration passes without a response from a broker, the connection attempt is dropped.") .build()); - // TODO This option would be exposed by Pulsar's ClientBuilder in next release. + // TODO This option would be exposed by Pulsar's ClientBuilder in the next Pulsar release. public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT_MS = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "requestTimeoutMs") .intType() .defaultValue(60000) - .withDescription("Maximum duration (in millis) for completing a request."); + .withDescription("Maximum duration (in ms) for completing a request."); public static final ConfigOption<Long> PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "initialBackoffIntervalNanos") @@ -325,7 +331,8 @@ public final class PulsarOptions { ConfigOptions.key(CLIENT_CONFIG_PREFIX + "maxBackoffIntervalNanos") .longType() .defaultValue(SECONDS.toNanos(60)) - .withDescription("Maximum duration (in nanoseconds) for a backoff interval."); + .withDescription( + "The maximum duration (in nanoseconds) for a backoff interval."); public static final ConfigOption<Boolean> PULSAR_ENABLE_BUSY_WAIT = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "enableBusyWait") @@ -338,9 +345,9 @@ public final class PulsarOptions { .text( "This option will enable spin-waiting on executors and IO threads in order to reduce latency during context switches.") .text( - "The spinning will consume 100% CPU even when the broker is not doing any work.") + " The spinning will consume 100% CPU even when the broker is not doing any work.") .text( - "It is recommended to reduce the number of IO threads and BK client threads to only have few CPU cores busy.") + " It is recommended to reduce the number of IO threads and BookKeeper client threads to only have fewer CPU cores busy.") .build()); public static final ConfigOption<String> PULSAR_LISTENER_NAME = @@ -350,8 +357,8 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "Configure the listenerName that the broker will return the corresponding %s.", - code("advertisedListener")) + "Configure the %s that the broker will return the corresponding %s.", + code("listenerName"), code("advertisedListener")) .build()); public static final ConfigOption<Boolean> PULSAR_USE_KEY_STORE_TLS = @@ -361,8 +368,10 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "If Tls is enabled, whether use KeyStore type as tls configuration parameter.") - .text("False means use default pem type configuration.") + "If TLS is enabled, whether use the KeyStore type as the TLS configuration parameter.") + .text( + " If it is set to %s, it means to use the default pem type configuration.", + code("false")) .build()); public static final ConfigOption<String> PULSAR_SSL_PROVIDER = @@ -374,7 +383,7 @@ public final class PulsarOptions { .text( "The name of the security provider used for SSL connections.") .text( - "Default value is the default security provider of the JVM.") + " The default value is the default security provider of the JVM.") .build()); public static final ConfigOption<String> PULSAR_TLS_TRUST_STORE_TYPE = @@ -405,11 +414,11 @@ public final class PulsarOptions { Description.builder() .text("A list of cipher suites.") .text( - "This is a named combination of authentication, encryption," - + " MAC and key exchange algorithm used to negotiate the security" - + " settings for a network connection using TLS or SSL network protocol.") + " This is a named combination of authentication, encryption,") .text( - "By default all the available cipher suites are supported.") + " MAC and the key exchange algorithm used to negotiate the security settings for a network connection using the TLS or SSL network protocol.") + .text( + " By default all the available cipher suites are supported.") .build()); public static final ConfigOption<List<String>> PULSAR_TLS_PROTOCOLS = @@ -420,9 +429,10 @@ public final class PulsarOptions { .withDescription( Description.builder() .text("The SSL protocol used to generate the SSLContext.") - .text("Default setting is TLS, which is fine for most cases.") .text( - "Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.") + " By default, it is set TLS, which is fine for most cases.") + .text( + " Allowed values in recent JVMs are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.") .build()); public static final ConfigOption<Long> PULSAR_MEMORY_LIMIT_BYTES = @@ -432,11 +442,11 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "Configure a limit on the amount of direct memory that will be allocated by this client instance. Its unit is byte.") + "The limit (in bytes) on the amount of direct memory that will be allocated by this client instance.") .linebreak() .text( "Note: at this moment this is only limiting the memory for producers.") - .text("Setting this to 0 will disable the limit.") + .text(" Setting this to %s will disable the limit.", code("0")) .build()); public static final ConfigOption<String> PULSAR_PROXY_SERVICE_URL = @@ -444,8 +454,11 @@ public final class PulsarOptions { .stringType() .noDefaultValue() .withDescription( - "Proxy-service url when client would like to connect to broker via proxy." - + " Client can choose type of proxy-routing."); + Description.builder() + .text( + "Proxy-service URL when a client connects to the broker via the proxy.") + .text(" The client can choose the type of proxy-routing.") + .build()); public static final ConfigOption<ProxyProtocol> PULSAR_PROXY_PROTOCOL = ConfigOptions.key(CLIENT_CONFIG_PREFIX + "proxyProtocol") @@ -454,8 +467,8 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "Protocol type to determine type of proxy routing when client connects to proxy using %s.", - code(CLIENT_CONFIG_PREFIX + "proxyServiceUrl")) + "Protocol type to determine the type of proxy routing when a client connects to the proxy using %s.", + code("pulsar.client.proxyServiceUrl")) .build()); public static final ConfigOption<Boolean> PULSAR_ENABLE_TRANSACTION = @@ -463,7 +476,12 @@ public final class PulsarOptions { .booleanType() .defaultValue(false) .withDescription( - "If enable transaction, start the transactionCoordinatorClient with pulsar client."); + Description.builder() + .text( + "If transaction is enabled, start the %s with %s.", + code("transactionCoordinatorClient"), + code("PulsarClient")) + .build()); /////////////////////////////////////////////////////////////////////////////// // @@ -479,7 +497,7 @@ public final class PulsarOptions { .withDescription( Description.builder() .text( - "Set the Pulsar service HTTP URL for the admin endpoint. eg. %s, or %s for TLS.", + "The Pulsar service HTTP URL for the admin endpoint. For example, %s, or %s for TLS.", code("http://my-broker.example.com:8080"), code("https://my-broker.example.com:8443")) .build()); @@ -488,27 +506,26 @@ public final class PulsarOptions { ConfigOptions.key(ADMIN_CONFIG_PREFIX + "connectTimeout") .intType() .defaultValue(60000) - .withDescription( - "This sets the connection time out (in millis) for the pulsar admin client."); + .withDescription("The connection time out (in ms) for the PulsarAdmin client."); public static final ConfigOption<Integer> PULSAR_READ_TIMEOUT = ConfigOptions.key(ADMIN_CONFIG_PREFIX + "readTimeout") .intType() .defaultValue(60000) .withDescription( - "This sets the server response read time out (in millis) for the pulsar admin client for any request."); + "The server response read timeout (in ms) for the PulsarAdmin client for any request."); public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT = ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestTimeout") .intType() .defaultValue(300000) .withDescription( - "This sets the server request time out (in millis) for the pulsar admin client for any request."); + "The server request timeout (in ms) for the PulsarAdmin client for any request."); public static final ConfigOption<Integer> PULSAR_AUTO_CERT_REFRESH_TIME = ConfigOptions.key(ADMIN_CONFIG_PREFIX + "autoCertRefreshTime") .intType() .defaultValue(300000) .withDescription( - "This sets auto cert refresh time (in millis) if Pulsar admin uses tls authentication."); + "The auto cert refresh time (in ms) if Pulsar admin supports TLS authentication."); } diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java index 2884f5e..c319915 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java @@ -55,6 +55,7 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.SOURC @ConfigGroup(name = "PulsarSource", keyPrefix = SOURCE_CONFIG_PREFIX), @ConfigGroup(name = "PulsarConsumer", keyPrefix = CONSUMER_CONFIG_PREFIX) }) +@SuppressWarnings("java:S1192") public final class PulsarSourceOptions { // Pulsar source connector config prefix. @@ -78,8 +79,11 @@ public final class PulsarSourceOptions { .longType() .defaultValue(Duration.ofSeconds(30).toMillis()) .withDescription( - "The interval in milliseconds for the Pulsar source to discover " - + "the new partitions. A non-positive value disables the partition discovery."); + Description.builder() + .text( + "The interval (in ms) for the Pulsar source to discover the new partitions.") + .text(" A non-positive value disables the partition discovery.") + .build()); public static final ConfigOption<Boolean> PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE = ConfigOptions.key(SOURCE_CONFIG_PREFIX + "enableAutoAcknowledgeMessage") @@ -89,18 +93,21 @@ public final class PulsarSourceOptions { Description.builder() .text( "Flink commits the consuming position with pulsar transactions on checkpoint.") - .linebreak() .text( - "However, if you have disabled the flink checkpoint or your pulsar cluster disabled the transaction," - + " make sure you have set this option to %s.", + " However, if you have disabled the Flink checkpoint or disabled transaction for your Pulsar cluster,") + .text( + " ensure that you have set this option to %s.", code("true")) + .linebreak() .text( "The source would use pulsar client's internal mechanism and commit cursor in two ways.") .list( text( - "For Key_Shared and Shared subscription: the cursor would be committed once the message is consumed."), + "For %s and %s subscription, the cursor would be committed once the message is consumed.", + code("Key_Shared"), code("Shared")), text( - "For Exclusive and Failover subscription: the cursor would be committed in a fixed interval.")) + "For %s and %s subscription, the cursor would be committed in a given interval.", + code("Exclusive"), code("Failover"))) .build()); public static final ConfigOption<Long> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL = @@ -110,9 +117,9 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "This option is used only when user disabled checkpoint and using Exclusive or Failover subscription.") + "This option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription.") .text( - "We would automatically commit the cursor using the given period (in millis).") + " We would automatically commit the cursor using the given period (in ms).") .build()); public static final ConfigOption<Long> PULSAR_TRANSACTION_TIMEOUT_MILLIS = @@ -122,13 +129,14 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "This option is used for when using Shared or Key_Shared subscription." - + " You should set this option when you didn't enable the %s option.", + "This option is used in %s or %s subscription.", + code("Shared"), code("Key_Shared")) + .text( + " You should configure this option when you do not enable the %s option.", code("pulsar.source.enableAutoAcknowledgeMessage")) .linebreak() .text( - "This value should be greater than the checkpoint interval.") - .text("It uses milliseconds as the unit of time.") + "The value (in ms) should be greater than the checkpoint interval.") .build()); public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME = @@ -137,10 +145,10 @@ public final class PulsarSourceOptions { .defaultValue(Duration.ofSeconds(10).toMillis()) .withDescription( Description.builder() + .text("The maximum time (in ms) to wait when fetching records.") + .text(" A longer time increases throughput but also latency.") .text( - "The max time (in millis) to wait when fetching records. " - + "A longer time increases throughput but also latency. " - + "A fetch batch might be finished earlier because of %s.", + " A fetch batch might be finished earlier because of %s.", code("pulsar.source.maxFetchRecords")) .build()); @@ -151,9 +159,10 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "The max number of records to fetch to wait when polling. " - + "A longer time increases throughput but also latency." - + "A fetch batch might be finished earlier because of %s.", + "The maximum number of records to fetch to wait when polling.") + .text(" A longer time increases throughput but also latency.") + .text( + " A fetch batch might be finished earlier because of %s.", code("pulsar.source.maxFetchTime")) .build()); @@ -162,9 +171,14 @@ public final class PulsarSourceOptions { .enumType(CursorVerification.class) .defaultValue(CursorVerification.WARN_ON_MISMATCH) .withDescription( - "Upon (re)starting the source checks whether the expected message can be read. " - + "If failure is enabled the application fails, else it logs a warning. " - + "A possible solution is to adjust the retention settings in pulsar or ignoring the check result."); + Description.builder() + .text( + "Upon (re)starting the source, check whether the expected message can be read.") + .text( + " If failure is enabled, the application fails. Otherwise, it logs a warning.") + .text( + " A possible solution is to adjust the retention settings in Pulsar or ignoring the check result.") + .build()); /////////////////////////////////////////////////////////////////////////////// // @@ -178,8 +192,11 @@ public final class PulsarSourceOptions { .stringType() .noDefaultValue() .withDescription( - "Specify the subscription name for this consumer." - + " This argument is required when constructing the consumer."); + Description.builder() + .text("Specify the subscription name for this consumer.") + .text( + " This argument is required when constructing the consumer.") + .build()); public static final ConfigOption<SubscriptionType> PULSAR_SUBSCRIPTION_TYPE = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionType") @@ -238,16 +255,15 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "Group a consumer acknowledgment for a specified time (in microseconds).") - .linebreak() + "Group a consumer acknowledgment for a specified time (in μs).") .text( - "By default, a consumer uses 100ms grouping time to send out acknowledgments to a broker.") - .linebreak() + " By default, a consumer uses %s grouping time to send out acknowledgments to a broker.", + code("100μs")) .text( - "Setting a group time of 0 sends out acknowledgments immediately.") - .linebreak() + " If the group time is set to %s, acknowledgments are sent out immediately.", + code("0")) .text( - "A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.") + " A longer ack group time is more efficient at the expense of a slight increase in message re-deliveries after a failure.") .build()); public static final ConfigOption<Long> PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS = @@ -257,11 +273,11 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "Delay (in microseconds) to wait before redelivering messages that failed to be processed.") + "Delay (in μs) to wait before redelivering messages that failed to be processed.") .linebreak() .text( "When an application uses %s, failed messages are redelivered after a fixed timeout.", - code("Consumer#negativeAcknowledge(Message)")) + code("Consumer.negativeAcknowledge(Message)")) .build()); public static final ConfigOption<Integer> @@ -274,7 +290,7 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "The max total receiver queue size across partitions.") + "The maximum total receiver queue size across partitions.") .linebreak() .text( "This setting reduces the receiver queue size for individual partitions if the total receiver queue size exceeds this value.") @@ -285,7 +301,7 @@ public final class PulsarSourceOptions { .stringType() .noDefaultValue() .withDescription( - "Consumer name is informative and it can be used to identify a particular consumer instance from the topic stats."); + "The consumer name is informative and it can be used to identify a particular consumer instance from the topic stats."); public static final ConfigOption<Long> PULSAR_ACK_TIMEOUT_MILLIS = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "ackTimeoutMillis") @@ -294,17 +310,13 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "Set the timeout (in millis) for unacknowledged messages, truncated to the nearest millisecond." - + " The timeout needs to be greater than 1 second.") + "The timeout (in ms) for unacknowledged messages, truncated to the nearest millisecond. The timeout needs to be greater than 1 second.") .linebreak() .text( - "By default, the acknowledge timeout is disabled and that means that messages delivered to a" - + " consumer will not be re-delivered unless the consumer crashes.") + "By default, the acknowledge timeout is disabled and that means that messages delivered to a consumer will not be re-delivered unless the consumer crashes.") .linebreak() .text( - "When enabling ack timeout, if a message is not acknowledged within the specified timeout" - + " it will be re-delivered to the consumer" - + " (possibly to a different consumer in case of a shared subscription).") + "When acknowledgement timeout being enabled, if a message is not acknowledged within the specified timeout it will be re-delivered to the consumer (possibly to a different consumer in case of a shared subscription).") .build()); public static final ConfigOption<Long> PULSAR_TICK_DURATION_MILLIS = @@ -313,10 +325,10 @@ public final class PulsarSourceOptions { .defaultValue(1000L) .withDescription( Description.builder() - .text("Granularity (in millis) of the ack-timeout redelivery.") + .text("Granularity (in ms) of the ack-timeout redelivery.") .linebreak() .text( - "Using an higher %s reduces the memory overhead to track messages when setting ack-timeout to a bigger value (for example, 1 hour).", + "A greater (for example, 1 hour) %s reduces the memory overhead to track messages.", code("tickDurationMillis")) .build()); @@ -327,19 +339,21 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "Priority level for a consumer to which a broker gives more priority while dispatching messages in the shared subscription mode.") + "Priority level for a consumer to which a broker gives more priorities while dispatching messages in the shared subscription type.") .linebreak() .text( "The broker follows descending priorities. For example, 0=max-priority, 1, 2,...") .linebreak() .text( - "In shared subscription mode, the broker first dispatches messages to the max priority level consumers if they have permits. Otherwise, the broker considers next priority level consumers.") + "In shared subscription mode, the broker first dispatches messages to the consumers on the highest priority level if they have permits.") + .text( + " Otherwise, the broker considers consumers on the next priority level.") .linebreak() .linebreak() .text("Example 1") .linebreak() .text( - "If a subscription has consumerA with %s 0 and consumerB with %s 1, then the broker only dispatches messages to consumerA until it runs out permits and then starts dispatching messages to consumerB.", + "If a subscription has consumer A with %s 0 and consumer B with %s 1, then the broker only dispatches messages to consumer A until it runs out permits and then starts dispatching messages to consumer B.", code("priorityLevel"), code("priorityLevel")) .linebreak() .text("Example 2") @@ -353,7 +367,7 @@ public final class PulsarSourceOptions { + "C5, 1, 1\n") .linebreak() .text( - "Order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.") + "The order in which a broker dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.") .build()); public static final ConfigOption<Integer> PULSAR_MAX_PENDING_CHUNKED_MESSAGE = @@ -363,32 +377,28 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "Consumer buffers chunk messages into memory until it receives all the chunks of the original message.") + "The consumer buffers chunk messages into memory until it receives all the chunks of the original message.") .text( - "While consuming chunk-messages, chunks from same message might not be contiguous" - + " in the stream and they might be mixed with other messages' chunks.") + " While consuming chunk-messages, chunks from the same message might not be contiguous in the stream and they might be mixed with other messages' chunks.") .text( - "So, consumer has to maintain multiple buffers to manage chunks coming from different messages.") + " So, consumer has to maintain multiple buffers to manage chunks coming from different messages.") .text( - "This mainly happens when multiple publishers are publishing messages on the topic" - + " concurrently or publisher failed to publish all chunks of the messages.") + " This mainly happens when multiple publishers are publishing messages on the topic concurrently or publishers failed to publish all chunks of the messages.") .linebreak() - .text("eg: M1-C1, M2-C1, M1-C2, M2-C2") .text( - "Messages M1-C1 and M1-C2 belong to original message M1, M2-C1 and M2-C2 messages belong to M2 message.") + "For example, there are M1-C1, M2-C1, M1-C2, M2-C2 messages.") + .text( + "Messages M1-C1 and M1-C2 belong to the M1 original message while M2-C1 and M2-C2 belong to the M2 message.") .linebreak() .text( - "Buffering large number of outstanding uncompleted chunked messages can create memory" - + " pressure and it can be guarded by providing this %s threshold." - + " Once, consumer reaches this threshold, it drops the outstanding unchunked-messages" - + " by silently acking or asking broker to redeliver later by marking it unacked." - + " This behavior can be controlled by configuration %s", - code( - CONSUMER_CONFIG_PREFIX - + "maxPendingChunkedMessage"), + "Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this %s threshold.", + code("pulsar.consumer.maxPendingChunkedMessage")) + .text( + " Once, a consumer reaches this threshold, it drops the outstanding unchunked messages by silently acknowledging or asking the broker to redeliver messages later by marking it unacknowledged.") + .text( + " This behavior can be controlled by the %s option.", code( - CONSUMER_CONFIG_PREFIX - + "autoAckOldestChunkedMessageOnQueueFull")) + "pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull")) .build()); public static final ConfigOption<Boolean> PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL = @@ -398,16 +408,12 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "Buffering large number of outstanding uncompleted chunked messages can create memory pressure" - + " and it can be guarded by providing this %s threshold." - + " Once, consumer reaches this threshold, it drops the outstanding unchunked-messages" - + " by silently acking if %s is true else it marks them for redelivery.", - code( - CONSUMER_CONFIG_PREFIX - + "maxPendingChunkedMessage"), + "Buffering a large number of outstanding uncompleted chunked messages can bring memory pressure and it can be guarded by providing this %s threshold.", + code("pulsar.consumer.maxPendingChunkedMessage")) + .text( + " Once a consumer reaches this threshold, it drops the outstanding unchunked-messages by silently acknowledging if %s is true. Otherwise, it marks them for redelivery.", code( - CONSUMER_CONFIG_PREFIX - + "autoAckOldestChunkedMessageOnQueueFull")) + "pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull")) .build()); public static final ConfigOption<Long> PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS = @@ -415,10 +421,12 @@ public final class PulsarSourceOptions { .longType() .defaultValue(60 * 1000L) .withDescription( - "If producer fails to publish all the chunks of a message then consumer" - + " can expire incomplete chunks if consumer won't be able to" - + " receive all chunks in expire times (default 1 hour)." - + " It uses milliseconds as the unit of time."); + Description.builder() + .text( + "If a producer fails to publish all the chunks of a message,") + .text( + " the consumer can expire incomplete chunks if the consumer cannot receive all chunks in expire times (default 1 hour, in ms).") + .build()); public static final ConfigOption<ConsumerCryptoFailureAction> PULSAR_CRYPTO_FAILURE_ACTION = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "cryptoFailureAction") @@ -427,23 +435,28 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text( - "Consumer should take action when it receives a message that can not be decrypted.") + "The consumer should take action when it receives a message that can not be decrypted.") .list( text( - "FAIL: this is the default option to fail messages until crypto succeeds."), + "%s: this is the default option to fail messages until crypto succeeds.", + code("FAIL")), text( - "DISCARD: silently acknowledge and not deliver message to an application."), + "%s: silently acknowledge but do not deliver messages to an application.", + code("DISCARD")), text( - "CONSUME: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.")) + "%s: deliver encrypted messages to applications. It is the application's responsibility to decrypt the message.", + code("CONSUME"))) .linebreak() - .text("The decompression of message fails.") + .text("Fail to decompress the messages.") .linebreak() .text( "If messages contain batch messages, a client is not be able to retrieve individual messages in batch.") .linebreak() .text( - "Delivered encrypted message contains %s which contains encryption and compression information in it using which application can decrypt consumed message payload.", + "The delivered encrypted message contains %s which contains encryption and compression information in.", code("EncryptionContext")) + .text( + " You can use an application to decrypt the consumed message payload.") .build()); public static final ConfigOption<Map<String, String>> PULSAR_CONSUMER_PROPERTIES = @@ -453,17 +466,15 @@ public final class PulsarSourceOptions { .withDescription( Description.builder() .text("A name or value property of this consumer.") - .linebreak() .text( - "%s is application defined metadata attached to a consumer.", + " %s is application defined metadata attached to a consumer.", code("properties")) - .linebreak() .text( - "When getting a topic stats, associate this metadata with the consumer stats for easier identification.") + " When getting a topic stats, associate this metadata with the consumer stats for easier identification.") .build()); public static final ConfigOption<Boolean> PULSAR_READ_COMPACTED = - ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "readCompacted") // NOSONAR + ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "readCompacted") .booleanType() .defaultValue(false) .withDescription( @@ -524,47 +535,31 @@ public final class PulsarSourceOptions { .intType() .defaultValue(0) .withDescription( - "Maximum number of times that a message will be redelivered before being sent to the dead letter queue."); + "The maximum number of times that a message are redelivered before being sent to the dead letter queue."); public static final ConfigOption<String> PULSAR_RETRY_LETTER_TOPIC = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "deadLetterPolicy.retryLetterTopic") .stringType() .noDefaultValue() - .withDescription( - "Name of the retry topic where the failing messages will be sent."); + .withDescription("Name of the retry topic where the failed messages are sent."); public static final ConfigOption<String> PULSAR_DEAD_LETTER_TOPIC = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "deadLetterPolicy.deadLetterTopic") .stringType() .noDefaultValue() - .withDescription( - "Name of the dead topic where the failing messages will be sent."); + .withDescription("Name of the dead topic where the failed messages are sent."); public static final ConfigOption<Boolean> PULSAR_RETRY_ENABLE = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "retryEnable") .booleanType() .defaultValue(false) - .withDescription("If enabled, the consumer will auto retry messages."); - - public static final ConfigOption<Boolean> PULSAR_AUTO_UPDATE_PARTITIONS = - ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "autoUpdatePartitions") - .booleanType() - .defaultValue(true) - .withDescription( - Description.builder() - .text( - "If %s is enabled, a consumer subscribes to partition increase automatically.", - code("autoUpdatePartitions")) - .linebreak() - .text("Note: this is only for partitioned consumers.\t") - .build()); + .withDescription("If enabled, the consumer will automatically retry messages."); public static final ConfigOption<Integer> PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "autoUpdatePartitionsIntervalSeconds") .intType() .defaultValue(60) .withDescription( - "Set the interval (in seconds) of updating partitions." - + " This only works if autoUpdatePartitions is enabled."); + "The interval (in seconds) of updating partitions. This only works if autoUpdatePartitions is enabled."); public static final ConfigOption<Boolean> PULSAR_REPLICATE_SUBSCRIPTION_STATE = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "replicateSubscriptionState") @@ -582,23 +577,11 @@ public final class PulsarSourceOptions { .booleanType() .defaultValue(false) .withDescription( - "Ack will return receipt but does not mean that the message will not be resent after get receipt."); + "Acknowledgement will return a receipt but this does not mean that the message will not be resent after getting the receipt."); public static final ConfigOption<Boolean> PULSAR_POOL_MESSAGES = ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "poolMessages") .booleanType() .defaultValue(false) - .withDescription( - Description.builder() - .text( - "Enable pooling of messages and the underlying data buffers.") - .linebreak() - .text( - "When pooling is enabled, the application is responsible for calling" - + " %s after the handling of every received message. If %s" - + " is not called on a received message, there will be a memory leak." - + " If an application attempts to use and already \"released\" message," - + " it might experience undefined behavior (eg: memory corruption, deserialization error, etc.).", - code("Message.release()"), code("release()")) - .build()); + .withDescription("Enable pooling of messages and the underlying data buffers."); } diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml index 68364e0..3791708 100644 --- a/flink-docs/pom.xml +++ b/flink-docs/pom.xml @@ -121,6 +121,12 @@ under the License. </dependency> <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <scope>compile</scope> diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java index 8f04aea..9272789 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java @@ -95,7 +95,13 @@ public class ConfigOptionsDocGenerator { "flink-kubernetes", "org.apache.flink.kubernetes.configuration"), new OptionsClassLocation("flink-clients", "org.apache.flink.client.cli"), new OptionsClassLocation( - "flink-table/flink-sql-client", "org.apache.flink.table.client.config") + "flink-table/flink-sql-client", "org.apache.flink.table.client.config"), + new OptionsClassLocation( + "flink-connectors/flink-connector-pulsar", + "org.apache.flink.connector.pulsar.common.config"), + new OptionsClassLocation( + "flink-connectors/flink-connector-pulsar", + "org.apache.flink.connector.pulsar.source") }; static final Set<String> EXCLUSIONS =
