This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 77012db20dc4728732acff22b3d0da7a4b57c7f4
Author: Yufan Sheng <[email protected]>
AuthorDate: Fri Sep 10 12:09:54 2021 +0800

    [FLINK-23864][docs] Add flink-connector-pulsar module to flink-docs, auto 
generate the config document.
---
 .../generated/pulsar_admin_configuration.html      |  42 ++++
 .../generated/pulsar_client_configuration.html     | 222 ++++++++++++++++++++
 .../generated/pulsar_consumer_configuration.html   | 174 +++++++++++++++
 .../generated/pulsar_source_configuration.html     |  54 +++++
 .../pulsar/common/config/PulsarOptions.java        | 153 ++++++++------
 .../pulsar/source/PulsarSourceOptions.java         | 233 ++++++++++-----------
 flink-docs/pom.xml                                 |   6 +
 .../configuration/ConfigOptionsDocGenerator.java   |   8 +-
 8 files changed, 698 insertions(+), 194 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/pulsar_admin_configuration.html 
b/docs/layouts/shortcodes/generated/pulsar_admin_configuration.html
new file mode 100644
index 0000000..31c7273
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/pulsar_admin_configuration.html
@@ -0,0 +1,42 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>pulsar.admin.adminUrl</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The Pulsar service HTTP URL for the admin endpoint. For 
example, <code 
class="highlighter-rouge">http://my-broker.example.com:8080</code>, or <code 
class="highlighter-rouge">https://my-broker.example.com:8443</code> for 
TLS.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.admin.autoCertRefreshTime</h5></td>
+            <td style="word-wrap: break-word;">300000</td>
+            <td>Integer</td>
+            <td>The auto cert refresh time (in ms) if Pulsar admin supports 
TLS authentication.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.admin.connectTimeout</h5></td>
+            <td style="word-wrap: break-word;">60000</td>
+            <td>Integer</td>
+            <td>The connection time out (in ms) for the PulsarAdmin 
client.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.admin.readTimeout</h5></td>
+            <td style="word-wrap: break-word;">60000</td>
+            <td>Integer</td>
+            <td>The server response read timeout (in ms) for the PulsarAdmin 
client for any request.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.admin.requestTimeout</h5></td>
+            <td style="word-wrap: break-word;">300000</td>
+            <td>Integer</td>
+            <td>The server request timeout (in ms) for the PulsarAdmin client 
for any request.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/layouts/shortcodes/generated/pulsar_client_configuration.html 
b/docs/layouts/shortcodes/generated/pulsar_client_configuration.html
new file mode 100644
index 0000000..d44a93c
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/pulsar_client_configuration.html
@@ -0,0 +1,222 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>pulsar.client.authParamMap</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Map</td>
+            <td>Parameters for the authentication plugin.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.authParams</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Parameters for the authentication plugin.<br /><br 
/>Example:<br /><code class="highlighter-rouge">key1:val1,key2:val2</code></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.authPluginClassName</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Name of the authentication plugin.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.concurrentLookupRequest</h5></td>
+            <td style="word-wrap: break-word;">5000</td>
+            <td>Integer</td>
+            <td>The number of concurrent lookup requests allowed to send on 
each broker connection to prevent overload on the broker. It should be 
configured with a higher value only in case of it requires to produce or 
subscribe on thousands of topic using a created <code 
class="highlighter-rouge">PulsarClient</code></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.connectionTimeoutMs</h5></td>
+            <td style="word-wrap: break-word;">10000</td>
+            <td>Integer</td>
+            <td>Duration (in ms) of waiting for a connection to a broker to be 
established.<br />If the duration passes without a response from a broker, the 
connection attempt is dropped.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.connectionsPerBroker</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The maximum number of connections that the client library will 
open to a single broker.<br /> By default, the connection pool will use a 
single connection for all the producers and consumers. Increasing this 
parameter may improve throughput when using many producers over a high latency 
connection.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.enableBusyWait</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Option to enable busy-wait settings.<br />This option will 
enable spin-waiting on executors and IO threads in order to reduce latency 
during context switches. The spinning will consume 100% CPU even when the 
broker is not doing any work. It is recommended to reduce the number of IO 
threads and BookKeeper client threads to only have fewer CPU cores busy.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.enableTransaction</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If transaction is enabled, start the <code 
class="highlighter-rouge">transactionCoordinatorClient</code> with <code 
class="highlighter-rouge">PulsarClient</code>.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.initialBackoffIntervalNanos</h5></td>
+            <td style="word-wrap: break-word;">100000000</td>
+            <td>Long</td>
+            <td>Default duration (in nanoseconds) for a backoff interval.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.keepAliveIntervalSeconds</h5></td>
+            <td style="word-wrap: break-word;">30</td>
+            <td>Integer</td>
+            <td>Interval (in seconds) for keeping connection between the 
Pulsar client and broker alive.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.listenerName</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Configure the <code 
class="highlighter-rouge">listenerName</code> that the broker will return the 
corresponding <code class="highlighter-rouge">advertisedListener</code>.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.maxBackoffIntervalNanos</h5></td>
+            <td style="word-wrap: break-word;">60000000000</td>
+            <td>Long</td>
+            <td>The maximum duration (in nanoseconds) for a backoff 
interval.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.maxLookupRedirects</h5></td>
+            <td style="word-wrap: break-word;">20</td>
+            <td>Integer</td>
+            <td>The maximum number of times a lookup-request redirections to a 
broker.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.maxLookupRequest</h5></td>
+            <td style="word-wrap: break-word;">50000</td>
+            <td>Integer</td>
+            <td>The maximum number of lookup requests allowed on each broker 
connection to prevent overload on the broker. It should be greater than <code 
class="highlighter-rouge">maxConcurrentLookupRequests</code>. Requests that 
inside <code class="highlighter-rouge">maxConcurrentLookupRequests</code> are 
already sent to broker, and requests beyond <code 
class="highlighter-rouge">maxConcurrentLookupRequests</code> and under <code 
class="highlighter-rouge">maxLookupRequests</code> will  [...]
+        </tr>
+        <tr>
+            
<td><h5>pulsar.client.maxNumberOfRejectedRequestPerConnection</h5></td>
+            <td style="word-wrap: break-word;">50</td>
+            <td>Integer</td>
+            <td>The maximum number of rejected requests of a broker in a 
certain period (30s) after the current connection is closed and the client 
creates a new connection to connect to a different broker.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.memoryLimitBytes</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Long</td>
+            <td>The limit (in bytes) on the amount of direct memory that will 
be allocated by this client instance.<br />Note: at this moment this is only 
limiting the memory for producers. Setting this to <code 
class="highlighter-rouge">0</code> will disable the limit.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.numIoThreads</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The number of threads used for handling connections to 
brokers.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.numListenerThreads</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The number of threads used for handling message listeners. The 
listener thread pool is shared across all the consumers and readers that are 
using a <code class="highlighter-rouge">listener</code> model to get messages. 
For a given consumer, the listener is always invoked from the same thread to 
ensure ordering.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.operationTimeoutMs</h5></td>
+            <td style="word-wrap: break-word;">30000</td>
+            <td>Integer</td>
+            <td>Operation timeout (in ms). Operations such as creating 
producers, subscribing or unsubscribing topics are retried during this 
interval. If the operation is not completed during this interval, the operation 
will be marked as failed.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.proxyProtocol</h5></td>
+            <td style="word-wrap: break-word;">SNI</td>
+            <td><p>Enum</p></td>
+            <td>Protocol type to determine the type of proxy routing when a 
client connects to the proxy using <code 
class="highlighter-rouge">pulsar.client.proxyServiceUrl</code>.<br /><br 
/>Possible values:<ul><li>"SNI"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.proxyServiceUrl</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Proxy-service URL when a client connects to the broker via the 
proxy. The client can choose the type of proxy-routing.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.requestTimeoutMs</h5></td>
+            <td style="word-wrap: break-word;">60000</td>
+            <td>Integer</td>
+            <td>Maximum duration (in ms) for completing a request.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.serviceUrl</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Service URL provider for Pulsar service.<br />To connect to 
Pulsar using client libraries, you need to specify a Pulsar protocol URL.<br 
/>You can assign Pulsar protocol URLs to specific clusters and use the <code 
class="highlighter-rouge">pulsar</code> scheme.<br /><ul><li>This is an example 
of <code class="highlighter-rouge">localhost</code>: <code 
class="highlighter-rouge">pulsar://localhost:6650</code>.</li><li>If you have 
multiple brokers, the URL is as: <code class= [...]
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.sslProvider</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The name of the security provider used for SSL connections. 
The default value is the default security provider of the JVM.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.statsIntervalSeconds</h5></td>
+            <td style="word-wrap: break-word;">60</td>
+            <td>Long</td>
+            <td>Interval between each stats info.<br /><ul><li>Stats is 
activated with positive <code 
class="highlighter-rouge">statsInterval</code></li><li>Set <code 
class="highlighter-rouge">statsIntervalSeconds</code> to 1 second at 
least.</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.tlsAllowInsecureConnection</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether the Pulsar client accepts untrusted TLS certificate 
from the broker.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.tlsCiphers</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>List&lt;String&gt;</td>
+            <td>A list of cipher suites. This is a named combination of 
authentication, encryption, MAC and the key exchange algorithm used to 
negotiate the security settings for a network connection using the TLS or SSL 
network protocol. By default all the available cipher suites are supported.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.tlsHostnameVerificationEnable</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to enable TLS hostname verification. It allows to 
validate hostname verification when a client connects to the broker over TLS. 
It validates incoming x509 certificate and matches provided hostname (CN/SAN) 
with the expected broker's host name. It follows RFC 2818, 3.1. Server Identity 
hostname verification.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.tlsProtocols</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>List&lt;String&gt;</td>
+            <td>The SSL protocol used to generate the SSLContext. By default, 
it is set TLS, which is fine for most cases. Allowed values in recent JVMs are 
TLS, TLSv1.3, TLSv1.2 and TLSv1.1.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.tlsTrustCertsFilePath</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Path to the trusted TLS certificate file.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.tlsTrustStorePassword</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The store password for the key store file.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.tlsTrustStorePath</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The location of the trust store file.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.tlsTrustStoreType</h5></td>
+            <td style="word-wrap: break-word;">"JKS"</td>
+            <td>String</td>
+            <td>The file format of the trust store file.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.useKeyStoreTls</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If TLS is enabled, whether use the KeyStore type as the TLS 
configuration parameter. If it is set to <code 
class="highlighter-rouge">false</code>, it means to use the default pem type 
configuration.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.client.useTcpNoDelay</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to use the TCP no-delay flag on the connection to 
disable Nagle algorithm.<br />No-delay features ensures that packets are sent 
out on the network as soon as possible, and it is critical to achieve low 
latency publishes. On the other hand, sending out a huge number of small 
packets might limit the overall throughput. Therefore, if latency is not a 
concern, it is recommended to set the <code 
class="highlighter-rouge">useTcpNoDelay</code> flag to <code class="highli [...]
+        </tr>
+    </tbody>
+</table>
diff --git 
a/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html 
b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
new file mode 100644
index 0000000..bc8b6df
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/pulsar_consumer_configuration.html
@@ -0,0 +1,174 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>pulsar.consumer.ackReceiptEnabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Acknowledgement will return a receipt but this does not mean 
that the message will not be resent after getting the receipt.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.ackTimeoutMillis</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Long</td>
+            <td>The timeout (in ms) for unacknowledged messages, truncated to 
the nearest millisecond. The timeout needs to be greater than 1 second.<br />By 
default, the acknowledge timeout is disabled and that means that messages 
delivered to a consumer will not be re-delivered unless the consumer 
crashes.<br />When acknowledgement timeout being enabled, if a message is not 
acknowledged within the specified timeout it will be re-delivered to the 
consumer (possibly to a different consum [...]
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.acknowledgementsGroupTimeMicros</h5></td>
+            <td style="word-wrap: break-word;">100000</td>
+            <td>Long</td>
+            <td>Group a consumer acknowledgment for a specified time (in μs). 
By default, a consumer uses <code class="highlighter-rouge">100μs</code> 
grouping time to send out acknowledgments to a broker. If the group time is set 
to <code class="highlighter-rouge">0</code>, acknowledgments are sent out 
immediately. A longer ack group time is more efficient at the expense of a 
slight increase in message re-deliveries after a failure.</td>
+        </tr>
+        <tr>
+            
<td><h5>pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Buffering a large number of outstanding uncompleted chunked 
messages can bring memory pressure and it can be guarded by providing this 
<code class="highlighter-rouge">pulsar.consumer.maxPendingChunkedMessage</code> 
threshold. Once a consumer reaches this threshold, it drops the outstanding 
unchunked-messages by silently acknowledging if <code 
class="highlighter-rouge">pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull</code>
 is true. Otherwise, it marks them for redel [...]
+        </tr>
+        <tr>
+            
<td><h5>pulsar.consumer.autoUpdatePartitionsIntervalSeconds</h5></td>
+            <td style="word-wrap: break-word;">60</td>
+            <td>Integer</td>
+            <td>The interval (in seconds) of updating partitions. This only 
works if autoUpdatePartitions is enabled.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.consumerName</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The consumer name is informative and it can be used to 
identify a particular consumer instance from the topic stats.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.cryptoFailureAction</h5></td>
+            <td style="word-wrap: break-word;">FAIL</td>
+            <td><p>Enum</p></td>
+            <td>The consumer should take action when it receives a message 
that can not be decrypted.<ul><li><code class="highlighter-rouge">FAIL</code>: 
this is the default option to fail messages until crypto 
succeeds.</li><li><code class="highlighter-rouge">DISCARD</code>: silently 
acknowledge but do not deliver messages to an application.</li><li><code 
class="highlighter-rouge">CONSUME</code>: deliver encrypted messages to 
applications. It is the application's responsibility to decry [...]
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.deadLetterPolicy.deadLetterTopic</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Name of the dead topic where the failed messages are sent.</td>
+        </tr>
+        <tr>
+            
<td><h5>pulsar.consumer.deadLetterPolicy.maxRedeliverCount</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Integer</td>
+            <td>The maximum number of times that a message are redelivered 
before being sent to the dead letter queue.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.deadLetterPolicy.retryLetterTopic</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Name of the retry topic where the failed messages are 
sent.</td>
+        </tr>
+        <tr>
+            
<td><h5>pulsar.consumer.expireTimeOfIncompleteChunkedMessageMillis</h5></td>
+            <td style="word-wrap: break-word;">60000</td>
+            <td>Long</td>
+            <td>If a producer fails to publish all the chunks of a message, 
the consumer can expire incomplete chunks if the consumer cannot receive all 
chunks in expire times (default 1 hour, in ms).</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.maxPendingChunkedMessage</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>The consumer buffers chunk messages into memory until it 
receives all the chunks of the original message. While consuming 
chunk-messages, chunks from the same message might not be contiguous in the 
stream and they might be mixed with other messages' chunks. So, consumer has to 
maintain multiple buffers to manage chunks coming from different messages. This 
mainly happens when multiple publishers are publishing messages on the topic 
concurrently or publishers failed to publ [...]
+        </tr>
+        <tr>
+            
<td><h5>pulsar.consumer.maxTotalReceiverQueueSizeAcrossPartitions</h5></td>
+            <td style="word-wrap: break-word;">50000</td>
+            <td>Integer</td>
+            <td>The maximum total receiver queue size across partitions.<br 
/>This setting reduces the receiver queue size for individual partitions if the 
total receiver queue size exceeds this value.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.negativeAckRedeliveryDelayMicros</h5></td>
+            <td style="word-wrap: break-word;">60000000</td>
+            <td>Long</td>
+            <td>Delay (in μs) to wait before redelivering messages that failed 
to be processed.<br />When an application uses <code 
class="highlighter-rouge">Consumer.negativeAcknowledge(Message)</code>, failed 
messages are redelivered after a fixed timeout.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.poolMessages</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Enable pooling of messages and the underlying data 
buffers.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.priorityLevel</h5></td>
+            <td style="word-wrap: break-word;">0</td>
+            <td>Integer</td>
+            <td>Priority level for a consumer to which a broker gives more 
priorities while dispatching messages in the shared subscription type.<br />The 
broker follows descending priorities. For example, 0=max-priority, 1, 2,...<br 
/>In shared subscription mode, the broker first dispatches messages to the 
consumers on the highest priority level if they have permits. Otherwise, the 
broker considers consumers on the next priority level.<br /><br />Example 1<br 
/>If a subscription has con [...]
+C1, 0, 2
+C2, 0, 1
+C3, 0, 1
+C4, 1, 2
+C5, 1, 1
+<br />The order in which a broker dispatches messages to consumers is: C1, C2, 
C3, C1, C4, C5, C4.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.properties</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>Map</td>
+            <td>A name or value property of this consumer. <code 
class="highlighter-rouge">properties</code> is application defined metadata 
attached to a consumer. When getting a topic stats, associate this metadata 
with the consumer stats for easier identification.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.readCompacted</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If enabling <code 
class="highlighter-rouge">readCompacted</code>, a consumer reads messages from 
a compacted topic rather than reading a full message backlog of a topic.<br />A 
consumer only sees the latest value for each key in the compacted topic, up 
until reaching the point in the topic message when compacting backlog. Beyond 
that point, send messages as normal.<br />Only enabling <code 
class="highlighter-rouge">readCompacted</code> on subscriptions to persistent 
topic [...]
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.receiverQueueSize</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Integer</td>
+            <td>Size of a consumer's receiver queue.<br />For example, the 
number of messages accumulated by a consumer before an application calls <code 
class="highlighter-rouge">Receive</code>.<br />A value higher than the default 
value increases consumer throughput, though at the expense of more memory 
utilization.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.replicateSubscriptionState</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If <code 
class="highlighter-rouge">replicateSubscriptionState</code> is enabled, a 
subscription state is replicated to geo-replicated clusters.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.retryEnable</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If enabled, the consumer will automatically retry 
messages.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.subscriptionInitialPosition</h5></td>
+            <td style="word-wrap: break-word;">Latest</td>
+            <td><p>Enum</p></td>
+            <td>Initial position at which to set cursor when subscribing to a 
topic at first time.<br /><br />Possible 
values:<ul><li>"Latest"</li><li>"Earliest"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.subscriptionMode</h5></td>
+            <td style="word-wrap: break-word;">Durable</td>
+            <td><p>Enum</p></td>
+            <td>Select the subscription mode to be used when subscribing to 
the topic.<ul><li><code class="highlighter-rouge">Durable</code>: Make the 
subscription to be backed by a durable cursor that will retain messages and 
persist the current position.</li><li><code 
class="highlighter-rouge">NonDurable</code>: Lightweight subscription mode that 
doesn't have a durable cursor associated</li></ul><br /><br />Possible 
values:<ul><li>"Durable"</li><li>"NonDurable"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.subscriptionName</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Specify the subscription name for this consumer. This argument 
is required when constructing the consumer.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.subscriptionType</h5></td>
+            <td style="word-wrap: break-word;">Shared</td>
+            <td><p>Enum</p></td>
+            <td>Subscription type.<br /><br />Four subscription types are 
available:<ul><li>Exclusive</li><li>Failover</li><li>Shared</li><li>Key_Shared</li></ul><br
 /><br />Possible 
values:<ul><li>"Exclusive"</li><li>"Shared"</li><li>"Failover"</li><li>"Key_Shared"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.consumer.tickDurationMillis</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Long</td>
+            <td>Granularity (in ms) of the ack-timeout redelivery.<br />A 
greater (for example, 1 hour) <code 
class="highlighter-rouge">tickDurationMillis</code> reduces the memory overhead 
to track messages.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/layouts/shortcodes/generated/pulsar_source_configuration.html 
b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
new file mode 100644
index 0000000..02e512b
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/pulsar_source_configuration.html
@@ -0,0 +1,54 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>pulsar.source.autoCommitCursorInterval</h5></td>
+            <td style="word-wrap: break-word;">5000</td>
+            <td>Long</td>
+            <td>This option is used only when the user disables the checkpoint 
and uses Exclusive or Failover subscription. We would automatically commit the 
cursor using the given period (in ms).</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.source.enableAutoAcknowledgeMessage</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Flink commits the consuming position with pulsar transactions 
on checkpoint. However, if you have disabled the Flink checkpoint or disabled 
transaction for your Pulsar cluster, ensure that you have set this option to 
<code class="highlighter-rouge">true</code>.<br />The source would use pulsar 
client's internal mechanism and commit cursor in two ways.<ul><li>For <code 
class="highlighter-rouge">Key_Shared</code> and <code 
class="highlighter-rouge">Shared</code> subscriptio [...]
+        </tr>
+        <tr>
+            <td><h5>pulsar.source.maxFetchRecords</h5></td>
+            <td style="word-wrap: break-word;">100</td>
+            <td>Integer</td>
+            <td>The maximum number of records to fetch to wait when polling. A 
longer time increases throughput but also latency. A fetch batch might be 
finished earlier because of <code 
class="highlighter-rouge">pulsar.source.maxFetchTime</code>.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.source.maxFetchTime</h5></td>
+            <td style="word-wrap: break-word;">10000</td>
+            <td>Long</td>
+            <td>The maximum time (in ms) to wait when fetching records. A 
longer time increases throughput but also latency. A fetch batch might be 
finished earlier because of <code 
class="highlighter-rouge">pulsar.source.maxFetchRecords</code>.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.source.partitionDiscoveryIntervalMs</h5></td>
+            <td style="word-wrap: break-word;">30000</td>
+            <td>Long</td>
+            <td>The interval (in ms) for the Pulsar source to discover the new 
partitions. A non-positive value disables the partition discovery.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.source.transactionTimeoutMillis</h5></td>
+            <td style="word-wrap: break-word;">10800000</td>
+            <td>Long</td>
+            <td>This option is used in <code 
class="highlighter-rouge">Shared</code> or <code 
class="highlighter-rouge">Key_Shared</code> subscription. You should configure 
this option when you do not enable the <code 
class="highlighter-rouge">pulsar.source.enableAutoAcknowledgeMessage</code> 
option.<br />The value (in ms) should be greater than the checkpoint 
interval.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.source.verifyInitialOffsets</h5></td>
+            <td style="word-wrap: break-word;">WARN_ON_MISMATCH</td>
+            <td><p>Enum</p></td>
+            <td>Upon (re)starting the source, check whether the expected 
message can be read. If failure is enabled, the application fails. Otherwise, 
it logs a warning. A possible solution is to adjust the retention settings in 
Pulsar or ignoring the check result.<br /><br />Possible 
values:<ul><li>"FAIL_ON_MISMATCH"</li><li>"WARN_ON_MISMATCH"</li></ul></td>
+        </tr>
+    </tbody>
+</table>
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
index 4c33a70..db90bf1 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/common/config/PulsarOptions.java
@@ -31,7 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import static java.util.Collections.emptyMap;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.flink.configuration.description.TextElement.code;
 import static org.apache.flink.configuration.description.TextElement.text;
@@ -48,6 +47,7 @@ import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.CLIE
             @ConfigGroup(name = "PulsarClient", keyPrefix = 
CLIENT_CONFIG_PREFIX),
             @ConfigGroup(name = "PulsarAdmin", keyPrefix = ADMIN_CONFIG_PREFIX)
         })
+@SuppressWarnings("java:S1192")
 public final class PulsarOptions {
 
     // Pulsar client API config prefix.
@@ -112,8 +112,7 @@ public final class PulsarOptions {
                     .noDefaultValue()
                     .withDescription(
                             Description.builder()
-                                    .text(
-                                            "String represents parameters for 
the authentication plugin.")
+                                    .text("Parameters for the authentication 
plugin.")
                                     .linebreak()
                                     .linebreak()
                                     .text("Example:")
@@ -124,9 +123,8 @@ public final class PulsarOptions {
     public static final ConfigOption<Map<String, String>> 
PULSAR_AUTH_PARAM_MAP =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "authParamMap")
                     .mapType()
-                    .defaultValue(emptyMap())
-                    .withDescription(
-                            "Map which represents parameters for the 
authentication plugin.");
+                    .noDefaultValue()
+                    .withDescription("Parameters for the authentication 
plugin.");
 
     public static final ConfigOption<Integer> PULSAR_OPERATION_TIMEOUT_MS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "operationTimeoutMs")
@@ -134,10 +132,11 @@ public final class PulsarOptions {
                     .defaultValue(30000)
                     .withDescription(
                             Description.builder()
-                                    .text("Operation timeout (in millis).")
+                                    .text("Operation timeout (in ms).")
                                     .text(
-                                            "Producer-create, subscribe and 
unsubscribe operations will be retried until this interval,"
-                                                    + " after which the 
operation will be marked as failed.")
+                                            " Operations such as creating 
producers, subscribing or unsubscribing topics are retried during this 
interval.")
+                                    .text(
+                                            " If the operation is not 
completed during this interval, the operation will be marked as failed.")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_STATS_INTERVAL_SECONDS =
@@ -153,7 +152,7 @@ public final class PulsarOptions {
                                                     "Stats is activated with 
positive %s",
                                                     code("statsInterval")),
                                             text(
-                                                    "Set %s to 1 second at 
least",
+                                                    "Set %s to 1 second at 
least.",
                                                     
code("statsIntervalSeconds")))
                                     .build());
 
@@ -173,10 +172,10 @@ public final class PulsarOptions {
                                     .text(
                                             "The number of threads used for 
handling message listeners.")
                                     .text(
-                                            "The listener thread pool is 
shared across all the consumers and readers that are using a %s model to get 
messages.",
+                                            " The listener thread pool is 
shared across all the consumers and readers that are using a %s model to get 
messages.",
                                             code("listener"))
                                     .text(
-                                            "For a given consumer, the 
listener will be always invoked from the same thread, to ensure ordering.")
+                                            " For a given consumer, the 
listener is always invoked from the same thread to ensure ordering.")
                                     .build());
 
     public static final ConfigOption<Integer> PULSAR_CONNECTIONS_PER_BROKER =
@@ -186,12 +185,12 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Sets the max number of connection 
that the client library will open to a single broker.")
+                                            "The maximum number of connections 
that the client library will open to a single broker.")
                                     .linebreak()
                                     .text(
-                                            "By default, the connection pool 
will use a single connection for all the producers and consumers.")
+                                            " By default, the connection pool 
will use a single connection for all the producers and consumers.")
                                     .text(
-                                            "Increasing this parameter may 
improve throughput when using many producers over a high latency connection.")
+                                            " Increasing this parameter may 
improve throughput when using many producers over a high latency connection.")
                                     .build());
 
     public static final ConfigOption<Boolean> PULSAR_USE_TCP_NO_DELAY =
@@ -201,18 +200,18 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Whether to use TCP no-delay flag 
on the connection to disable Nagle algorithm.")
+                                            "Whether to use the TCP no-delay 
flag on the connection to disable Nagle algorithm.")
                                     .linebreak()
                                     .text(
-                                            "No-delay features make sure 
packets are sent out on the network as soon as possible,")
-                                    .text("and it's critical to achieve low 
latency publishes.")
+                                            "No-delay features ensures that 
packets are sent out on the network as soon as possible,")
+                                    .text(" and it is critical to achieve low 
latency publishes.")
                                     .text(
-                                            "On the other hand, sending out a 
huge number of small packets might limit the overall throughput,")
+                                            " On the other hand, sending out a 
huge number of small packets might limit the overall throughput.")
                                     .text(
-                                            "so if latency is not a concern, 
it's advisable to set the %s flag to false.",
-                                            code("useTcpNoDelay"))
+                                            " Therefore, if latency is not a 
concern, it is recommended to set the %s flag to %s.",
+                                            code("useTcpNoDelay"), 
code("false"))
                                     .linebreak()
-                                    .text("Default value is true.")
+                                    .text("By default, it is set to %s.", 
code("true"))
                                     .build());
 
     public static final ConfigOption<String> PULSAR_TLS_TRUST_CERTS_FILE_PATH =
@@ -226,7 +225,7 @@ public final class PulsarOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "Whether the Pulsar client accepts untrusted TLS 
certificate from broker.");
+                            "Whether the Pulsar client accepts untrusted TLS 
certificate from the broker.");
 
     public static final ConfigOption<Boolean> 
PULSAR_TLS_HOSTNAME_VERIFICATION_ENABLE =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + 
"tlsHostnameVerificationEnable")
@@ -236,11 +235,11 @@ public final class PulsarOptions {
                             Description.builder()
                                     .text("Whether to enable TLS hostname 
verification.")
                                     .text(
-                                            "It allows to validate hostname 
verification when client connects to broker over tls.")
+                                            " It allows to validate hostname 
verification when a client connects to the broker over TLS.")
                                     .text(
-                                            "It validates incoming x509 
certificate and matches provided hostname(CN/SAN) with expected broker's host 
name.")
+                                            " It validates incoming x509 
certificate and matches provided hostname (CN/SAN) with the expected broker's 
host name.")
                                     .text(
-                                            "It follows RFC 2818, 3.1. Server 
Identity hostname verification.")
+                                            " It follows RFC 2818, 3.1. Server 
Identity hostname verification.")
                                     .build());
 
     public static final ConfigOption<Integer> PULSAR_CONCURRENT_LOOKUP_REQUEST 
=
@@ -250,10 +249,9 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The number of concurrent lookup 
requests allowed to send on each broker connection to prevent overload on 
broker.")
+                                            "The number of concurrent lookup 
requests allowed to send on each broker connection to prevent overload on the 
broker.")
                                     .text(
-                                            "It should be configured with 
higher value only in case of it requires"
-                                                    + " to produce/subscribe 
on thousands of topic using created %s",
+                                            " It should be configured with a 
higher value only in case of it requires to produce or subscribe on thousands 
of topic using a created %s",
                                             code("PulsarClient"))
                                     .build());
 
@@ -264,12 +262,17 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The maximum number of lookup 
requests allowed on each broker connection to prevent overload on broker.")
-                                    .text("It should be bigger than 
maxConcurrentLookupRequests.")
+                                            "The maximum number of lookup 
requests allowed on each broker connection to prevent overload on the broker.")
+                                    .text(
+                                            " It should be greater than %s.",
+                                            
code("maxConcurrentLookupRequests"))
                                     .text(
-                                            "Requests that inside 
maxConcurrentLookupRequests already send to broker,")
+                                            " Requests that inside %s are 
already sent to broker,",
+                                            
code("maxConcurrentLookupRequests"))
                                     .text(
-                                            "and requests beyond 
maxConcurrentLookupRequests and under maxLookupRequests will wait in each 
client cnx.")
+                                            " and requests beyond %s and under 
%s will wait in each client cnx.",
+                                            
code("maxConcurrentLookupRequests"),
+                                            code("maxLookupRequests"))
                                     .build());
 
     public static final ConfigOption<Integer> PULSAR_MAX_LOOKUP_REDIRECTS =
@@ -277,23 +280,26 @@ public final class PulsarOptions {
                     .intType()
                     .defaultValue(20)
                     .withDescription(
-                            "Set the maximum number of times a lookup-request 
to a broker will be redirected.");
+                            "The maximum number of times a lookup-request 
redirections to a broker.");
 
     public static final ConfigOption<Integer> 
PULSAR_MAX_NUMBER_OF_REJECTED_REQUEST_PER_CONNECTION =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + 
"maxNumberOfRejectedRequestPerConnection")
                     .intType()
                     .defaultValue(50)
                     .withDescription(
-                            "The maximum number of rejected requests of a 
broker in a certain time"
-                                    + " frame (30 seconds) after the current 
connection is closed and"
-                                    + " the client creates a new connection to 
connect to a different broker.");
+                            Description.builder()
+                                    .text(
+                                            "The maximum number of rejected 
requests of a broker in a certain period (30s) after the current connection is 
closed")
+                                    .text(
+                                            " and the client creates a new 
connection to connect to a different broker.")
+                                    .build());
 
     public static final ConfigOption<Integer> 
PULSAR_KEEP_ALIVE_INTERVAL_SECONDS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + 
"keepAliveIntervalSeconds")
                     .intType()
                     .defaultValue(30)
                     .withDescription(
-                            "Seconds of keeping alive interval for each client 
broker connection.");
+                            "Interval (in seconds) for keeping connection 
between the Pulsar client and broker alive.");
 
     public static final ConfigOption<Integer> PULSAR_CONNECTION_TIMEOUT_MS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "connectionTimeoutMs")
@@ -302,18 +308,18 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Duration (in millis) of waiting 
for a connection to a broker to be established.")
+                                            "Duration (in ms) of waiting for a 
connection to a broker to be established.")
                                     .linebreak()
                                     .text(
                                             "If the duration passes without a 
response from a broker, the connection attempt is dropped.")
                                     .build());
 
-    // TODO This option would be exposed by Pulsar's ClientBuilder in next 
release.
+    // TODO This option would be exposed by Pulsar's ClientBuilder in the next 
Pulsar release.
     public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT_MS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "requestTimeoutMs")
                     .intType()
                     .defaultValue(60000)
-                    .withDescription("Maximum duration (in millis) for 
completing a request.");
+                    .withDescription("Maximum duration (in ms) for completing 
a request.");
 
     public static final ConfigOption<Long> 
PULSAR_INITIAL_BACKOFF_INTERVAL_NANOS =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + 
"initialBackoffIntervalNanos")
@@ -325,7 +331,8 @@ public final class PulsarOptions {
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "maxBackoffIntervalNanos")
                     .longType()
                     .defaultValue(SECONDS.toNanos(60))
-                    .withDescription("Maximum duration (in nanoseconds) for a 
backoff interval.");
+                    .withDescription(
+                            "The maximum duration (in nanoseconds) for a 
backoff interval.");
 
     public static final ConfigOption<Boolean> PULSAR_ENABLE_BUSY_WAIT =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "enableBusyWait")
@@ -338,9 +345,9 @@ public final class PulsarOptions {
                                     .text(
                                             "This option will enable 
spin-waiting on executors and IO threads in order to reduce latency during 
context switches.")
                                     .text(
-                                            "The spinning will consume 100% 
CPU even when the broker is not doing any work.")
+                                            " The spinning will consume 100% 
CPU even when the broker is not doing any work.")
                                     .text(
-                                            "It is recommended to reduce the 
number of IO threads and BK client threads to only have few CPU cores busy.")
+                                            " It is recommended to reduce the 
number of IO threads and BookKeeper client threads to only have fewer CPU cores 
busy.")
                                     .build());
 
     public static final ConfigOption<String> PULSAR_LISTENER_NAME =
@@ -350,8 +357,8 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Configure the listenerName that 
the broker will return the corresponding %s.",
-                                            code("advertisedListener"))
+                                            "Configure the %s that the broker 
will return the corresponding %s.",
+                                            code("listenerName"), 
code("advertisedListener"))
                                     .build());
 
     public static final ConfigOption<Boolean> PULSAR_USE_KEY_STORE_TLS =
@@ -361,8 +368,10 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "If Tls is enabled, whether use 
KeyStore type as tls configuration parameter.")
-                                    .text("False means use default pem type 
configuration.")
+                                            "If TLS is enabled, whether use 
the KeyStore type as the TLS configuration parameter.")
+                                    .text(
+                                            " If it is set to %s, it means to 
use the default pem type configuration.",
+                                            code("false"))
                                     .build());
 
     public static final ConfigOption<String> PULSAR_SSL_PROVIDER =
@@ -374,7 +383,7 @@ public final class PulsarOptions {
                                     .text(
                                             "The name of the security provider 
used for SSL connections.")
                                     .text(
-                                            "Default value is the default 
security provider of the JVM.")
+                                            " The default value is the default 
security provider of the JVM.")
                                     .build());
 
     public static final ConfigOption<String> PULSAR_TLS_TRUST_STORE_TYPE =
@@ -405,11 +414,11 @@ public final class PulsarOptions {
                             Description.builder()
                                     .text("A list of cipher suites.")
                                     .text(
-                                            "This is a named combination of 
authentication, encryption,"
-                                                    + " MAC and key exchange 
algorithm used to negotiate the security"
-                                                    + " settings for a network 
connection using TLS or SSL network protocol.")
+                                            " This is a named combination of 
authentication, encryption,")
                                     .text(
-                                            "By default all the available 
cipher suites are supported.")
+                                            " MAC and the key exchange 
algorithm used to negotiate the security settings for a network connection 
using the TLS or SSL network protocol.")
+                                    .text(
+                                            " By default all the available 
cipher suites are supported.")
                                     .build());
 
     public static final ConfigOption<List<String>> PULSAR_TLS_PROTOCOLS =
@@ -420,9 +429,10 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text("The SSL protocol used to generate 
the SSLContext.")
-                                    .text("Default setting is TLS, which is 
fine for most cases.")
                                     .text(
-                                            "Allowed values in recent JVMs are 
TLS, TLSv1.3, TLSv1.2 and TLSv1.1.")
+                                            " By default, it is set TLS, which 
is fine for most cases.")
+                                    .text(
+                                            " Allowed values in recent JVMs 
are TLS, TLSv1.3, TLSv1.2 and TLSv1.1.")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_MEMORY_LIMIT_BYTES =
@@ -432,11 +442,11 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Configure a limit on the amount 
of direct memory that will be allocated by this client instance. Its unit is 
byte.")
+                                            "The limit (in bytes) on the 
amount of direct memory that will be allocated by this client instance.")
                                     .linebreak()
                                     .text(
                                             "Note: at this moment this is only 
limiting the memory for producers.")
-                                    .text("Setting this to 0 will disable the 
limit.")
+                                    .text(" Setting this to %s will disable 
the limit.", code("0"))
                                     .build());
 
     public static final ConfigOption<String> PULSAR_PROXY_SERVICE_URL =
@@ -444,8 +454,11 @@ public final class PulsarOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Proxy-service url when client would like to 
connect to broker via proxy."
-                                    + " Client can choose type of 
proxy-routing.");
+                            Description.builder()
+                                    .text(
+                                            "Proxy-service URL when a client 
connects to the broker via the proxy.")
+                                    .text(" The client can choose the type of 
proxy-routing.")
+                                    .build());
 
     public static final ConfigOption<ProxyProtocol> PULSAR_PROXY_PROTOCOL =
             ConfigOptions.key(CLIENT_CONFIG_PREFIX + "proxyProtocol")
@@ -454,8 +467,8 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Protocol type to determine type 
of proxy routing when client connects to proxy using %s.",
-                                            code(CLIENT_CONFIG_PREFIX + 
"proxyServiceUrl"))
+                                            "Protocol type to determine the 
type of proxy routing when a client connects to the proxy using %s.",
+                                            
code("pulsar.client.proxyServiceUrl"))
                                     .build());
 
     public static final ConfigOption<Boolean> PULSAR_ENABLE_TRANSACTION =
@@ -463,7 +476,12 @@ public final class PulsarOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "If enable transaction, start the 
transactionCoordinatorClient with pulsar client.");
+                            Description.builder()
+                                    .text(
+                                            "If transaction is enabled, start 
the %s with %s.",
+                                            
code("transactionCoordinatorClient"),
+                                            code("PulsarClient"))
+                                    .build());
 
     
///////////////////////////////////////////////////////////////////////////////
     //
@@ -479,7 +497,7 @@ public final class PulsarOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Set the Pulsar service HTTP URL 
for the admin endpoint. eg. %s, or %s for TLS.",
+                                            "The Pulsar service HTTP URL for 
the admin endpoint. For example, %s, or %s for TLS.",
                                             
code("http://my-broker.example.com:8080";),
                                             
code("https://my-broker.example.com:8443";))
                                     .build());
@@ -488,27 +506,26 @@ public final class PulsarOptions {
             ConfigOptions.key(ADMIN_CONFIG_PREFIX + "connectTimeout")
                     .intType()
                     .defaultValue(60000)
-                    .withDescription(
-                            "This sets the connection time out (in millis) for 
the pulsar admin client.");
+                    .withDescription("The connection time out (in ms) for the 
PulsarAdmin client.");
 
     public static final ConfigOption<Integer> PULSAR_READ_TIMEOUT =
             ConfigOptions.key(ADMIN_CONFIG_PREFIX + "readTimeout")
                     .intType()
                     .defaultValue(60000)
                     .withDescription(
-                            "This sets the server response read time out (in 
millis) for the pulsar admin client for any request.");
+                            "The server response read timeout (in ms) for the 
PulsarAdmin client for any request.");
 
     public static final ConfigOption<Integer> PULSAR_REQUEST_TIMEOUT =
             ConfigOptions.key(ADMIN_CONFIG_PREFIX + "requestTimeout")
                     .intType()
                     .defaultValue(300000)
                     .withDescription(
-                            "This sets the server request time out (in millis) 
for the pulsar admin client for any request.");
+                            "The server request timeout (in ms) for the 
PulsarAdmin client for any request.");
 
     public static final ConfigOption<Integer> PULSAR_AUTO_CERT_REFRESH_TIME =
             ConfigOptions.key(ADMIN_CONFIG_PREFIX + "autoCertRefreshTime")
                     .intType()
                     .defaultValue(300000)
                     .withDescription(
-                            "This sets auto cert refresh time (in millis) if 
Pulsar admin uses tls authentication.");
+                            "The auto cert refresh time (in ms) if Pulsar 
admin supports TLS authentication.");
 }
diff --git 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
index 2884f5e..c319915 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java
@@ -55,6 +55,7 @@ import static 
org.apache.flink.connector.pulsar.source.PulsarSourceOptions.SOURC
             @ConfigGroup(name = "PulsarSource", keyPrefix = 
SOURCE_CONFIG_PREFIX),
             @ConfigGroup(name = "PulsarConsumer", keyPrefix = 
CONSUMER_CONFIG_PREFIX)
         })
+@SuppressWarnings("java:S1192")
 public final class PulsarSourceOptions {
 
     // Pulsar source connector config prefix.
@@ -78,8 +79,11 @@ public final class PulsarSourceOptions {
                     .longType()
                     .defaultValue(Duration.ofSeconds(30).toMillis())
                     .withDescription(
-                            "The interval in milliseconds for the Pulsar 
source to discover "
-                                    + "the new partitions. A non-positive 
value disables the partition discovery.");
+                            Description.builder()
+                                    .text(
+                                            "The interval (in ms) for the 
Pulsar source to discover the new partitions.")
+                                    .text(" A non-positive value disables the 
partition discovery.")
+                                    .build());
 
     public static final ConfigOption<Boolean> 
PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE =
             ConfigOptions.key(SOURCE_CONFIG_PREFIX + 
"enableAutoAcknowledgeMessage")
@@ -89,18 +93,21 @@ public final class PulsarSourceOptions {
                             Description.builder()
                                     .text(
                                             "Flink commits the consuming 
position with pulsar transactions on checkpoint.")
-                                    .linebreak()
                                     .text(
-                                            "However, if you have disabled the 
flink checkpoint or your pulsar cluster disabled the transaction,"
-                                                    + " make sure you have set 
this option to %s.",
+                                            " However, if you have disabled 
the Flink checkpoint or disabled transaction for your Pulsar cluster,")
+                                    .text(
+                                            " ensure that you have set this 
option to %s.",
                                             code("true"))
+                                    .linebreak()
                                     .text(
                                             "The source would use pulsar 
client's internal mechanism and commit cursor in two ways.")
                                     .list(
                                             text(
-                                                    "For Key_Shared and Shared 
subscription: the cursor would be committed once the message is consumed."),
+                                                    "For %s and %s 
subscription, the cursor would be committed once the message is consumed.",
+                                                    code("Key_Shared"), 
code("Shared")),
                                             text(
-                                                    "For Exclusive and 
Failover subscription: the cursor would be committed in a fixed interval."))
+                                                    "For %s and %s 
subscription, the cursor would be committed in a given interval.",
+                                                    code("Exclusive"), 
code("Failover")))
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
@@ -110,9 +117,9 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "This option is used only when 
user disabled checkpoint and using Exclusive or Failover subscription.")
+                                            "This option is used only when the 
user disables the checkpoint and uses Exclusive or Failover subscription.")
                                     .text(
-                                            "We would automatically commit the 
cursor using the given period (in millis).")
+                                            " We would automatically commit 
the cursor using the given period (in ms).")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_TRANSACTION_TIMEOUT_MILLIS =
@@ -122,13 +129,14 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "This option is used for when 
using Shared or Key_Shared subscription."
-                                                    + " You should set this 
option when you didn't enable the %s option.",
+                                            "This option is used in %s or %s 
subscription.",
+                                            code("Shared"), code("Key_Shared"))
+                                    .text(
+                                            " You should configure this option 
when you do not enable the %s option.",
                                             
code("pulsar.source.enableAutoAcknowledgeMessage"))
                                     .linebreak()
                                     .text(
-                                            "This value should be greater than 
the checkpoint interval.")
-                                    .text("It uses milliseconds as the unit of 
time.")
+                                            "The value (in ms) should be 
greater than the checkpoint interval.")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
@@ -137,10 +145,10 @@ public final class PulsarSourceOptions {
                     .defaultValue(Duration.ofSeconds(10).toMillis())
                     .withDescription(
                             Description.builder()
+                                    .text("The maximum time (in ms) to wait 
when fetching records.")
+                                    .text(" A longer time increases throughput 
but also latency.")
                                     .text(
-                                            "The max time (in millis) to wait 
when fetching records. "
-                                                    + "A longer time increases 
throughput but also latency. "
-                                                    + "A fetch batch might be 
finished earlier because of %s.",
+                                            " A fetch batch might be finished 
earlier because of %s.",
                                             
code("pulsar.source.maxFetchRecords"))
                                     .build());
 
@@ -151,9 +159,10 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "The max number of records to 
fetch to wait when polling. "
-                                                    + "A longer time increases 
throughput but also latency."
-                                                    + "A fetch batch might be 
finished earlier because of %s.",
+                                            "The maximum number of records to 
fetch to wait when polling.")
+                                    .text(" A longer time increases throughput 
but also latency.")
+                                    .text(
+                                            " A fetch batch might be finished 
earlier because of %s.",
                                             code("pulsar.source.maxFetchTime"))
                                     .build());
 
@@ -162,9 +171,14 @@ public final class PulsarSourceOptions {
                     .enumType(CursorVerification.class)
                     .defaultValue(CursorVerification.WARN_ON_MISMATCH)
                     .withDescription(
-                            "Upon (re)starting the source checks whether the 
expected message can be read. "
-                                    + "If failure is enabled the application 
fails, else it logs a warning. "
-                                    + "A possible solution is to adjust the 
retention settings in pulsar or ignoring the check result.");
+                            Description.builder()
+                                    .text(
+                                            "Upon (re)starting the source, 
check whether the expected message can be read.")
+                                    .text(
+                                            " If failure is enabled, the 
application fails. Otherwise, it logs a warning.")
+                                    .text(
+                                            " A possible solution is to adjust 
the retention settings in Pulsar or ignoring the check result.")
+                                    .build());
 
     
///////////////////////////////////////////////////////////////////////////////
     //
@@ -178,8 +192,11 @@ public final class PulsarSourceOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Specify the subscription name for this consumer."
-                                    + " This argument is required when 
constructing the consumer.");
+                            Description.builder()
+                                    .text("Specify the subscription name for 
this consumer.")
+                                    .text(
+                                            " This argument is required when 
constructing the consumer.")
+                                    .build());
 
     public static final ConfigOption<SubscriptionType> 
PULSAR_SUBSCRIPTION_TYPE =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "subscriptionType")
@@ -238,16 +255,15 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Group a consumer acknowledgment 
for a specified time (in microseconds).")
-                                    .linebreak()
+                                            "Group a consumer acknowledgment 
for a specified time (in μs).")
                                     .text(
-                                            "By default, a consumer uses 100ms 
grouping time to send out acknowledgments to a broker.")
-                                    .linebreak()
+                                            " By default, a consumer uses %s 
grouping time to send out acknowledgments to a broker.",
+                                            code("100μs"))
                                     .text(
-                                            "Setting a group time of 0 sends 
out acknowledgments immediately.")
-                                    .linebreak()
+                                            " If the group time is set to %s, 
acknowledgments are sent out immediately.",
+                                            code("0"))
                                     .text(
-                                            "A longer ack group time is more 
efficient at the expense of a slight increase in message re-deliveries after a 
failure.")
+                                            " A longer ack group time is more 
efficient at the expense of a slight increase in message re-deliveries after a 
failure.")
                                     .build());
 
     public static final ConfigOption<Long> 
PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS =
@@ -257,11 +273,11 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Delay (in microseconds) to wait 
before redelivering messages that failed to be processed.")
+                                            "Delay (in μs) to wait before 
redelivering messages that failed to be processed.")
                                     .linebreak()
                                     .text(
                                             "When an application uses %s, 
failed messages are redelivered after a fixed timeout.",
-                                            
code("Consumer#negativeAcknowledge(Message)"))
+                                            
code("Consumer.negativeAcknowledge(Message)"))
                                     .build());
 
     public static final ConfigOption<Integer>
@@ -274,7 +290,7 @@ public final class PulsarSourceOptions {
                             .withDescription(
                                     Description.builder()
                                             .text(
-                                                    "The max total receiver 
queue size across partitions.")
+                                                    "The maximum total 
receiver queue size across partitions.")
                                             .linebreak()
                                             .text(
                                                     "This setting reduces the 
receiver queue size for individual partitions if the total receiver queue size 
exceeds this value.")
@@ -285,7 +301,7 @@ public final class PulsarSourceOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "Consumer name is informative and it can be used 
to identify a particular consumer instance from the topic stats.");
+                            "The consumer name is informative and it can be 
used to identify a particular consumer instance from the topic stats.");
 
     public static final ConfigOption<Long> PULSAR_ACK_TIMEOUT_MILLIS =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "ackTimeoutMillis")
@@ -294,17 +310,13 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Set the timeout (in millis) for 
unacknowledged messages, truncated to the nearest millisecond."
-                                                    + " The timeout needs to 
be greater than 1 second.")
+                                            "The timeout (in ms) for 
unacknowledged messages, truncated to the nearest millisecond. The timeout 
needs to be greater than 1 second.")
                                     .linebreak()
                                     .text(
-                                            "By default, the acknowledge 
timeout is disabled and that means that messages delivered to a"
-                                                    + " consumer will not be 
re-delivered unless the consumer crashes.")
+                                            "By default, the acknowledge 
timeout is disabled and that means that messages delivered to a consumer will 
not be re-delivered unless the consumer crashes.")
                                     .linebreak()
                                     .text(
-                                            "When enabling ack timeout, if a 
message is not acknowledged within the specified timeout"
-                                                    + " it will be 
re-delivered to the consumer"
-                                                    + " (possibly to a 
different consumer in case of a shared subscription).")
+                                            "When acknowledgement timeout 
being enabled, if a message is not acknowledged within the specified timeout it 
will be re-delivered to the consumer (possibly to a different consumer in case 
of a shared subscription).")
                                     .build());
 
     public static final ConfigOption<Long> PULSAR_TICK_DURATION_MILLIS =
@@ -313,10 +325,10 @@ public final class PulsarSourceOptions {
                     .defaultValue(1000L)
                     .withDescription(
                             Description.builder()
-                                    .text("Granularity (in millis) of the 
ack-timeout redelivery.")
+                                    .text("Granularity (in ms) of the 
ack-timeout redelivery.")
                                     .linebreak()
                                     .text(
-                                            "Using an higher %s reduces the 
memory overhead to track messages when setting ack-timeout to a bigger value 
(for example, 1 hour).",
+                                            "A greater (for example, 1 hour) 
%s reduces the memory overhead to track messages.",
                                             code("tickDurationMillis"))
                                     .build());
 
@@ -327,19 +339,21 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Priority level for a consumer to 
which a broker gives more priority while dispatching messages in the shared 
subscription mode.")
+                                            "Priority level for a consumer to 
which a broker gives more priorities while dispatching messages in the shared 
subscription type.")
                                     .linebreak()
                                     .text(
                                             "The broker follows descending 
priorities. For example, 0=max-priority, 1, 2,...")
                                     .linebreak()
                                     .text(
-                                            "In shared subscription mode, the 
broker first dispatches messages to the max priority level consumers if they 
have permits. Otherwise, the broker considers next priority level consumers.")
+                                            "In shared subscription mode, the 
broker first dispatches messages to the consumers on the highest priority level 
if they have permits.")
+                                    .text(
+                                            " Otherwise, the broker considers 
consumers on the next priority level.")
                                     .linebreak()
                                     .linebreak()
                                     .text("Example 1")
                                     .linebreak()
                                     .text(
-                                            "If a subscription has consumerA 
with %s 0 and consumerB with %s 1, then the broker only dispatches messages to 
consumerA until it runs out permits and then starts dispatching messages to 
consumerB.",
+                                            "If a subscription has consumer A 
with %s 0 and consumer B with %s 1, then the broker only dispatches messages to 
consumer A until it runs out permits and then starts dispatching messages to 
consumer B.",
                                             code("priorityLevel"), 
code("priorityLevel"))
                                     .linebreak()
                                     .text("Example 2")
@@ -353,7 +367,7 @@ public final class PulsarSourceOptions {
                                                     + "C5, 1, 1\n")
                                     .linebreak()
                                     .text(
-                                            "Order in which a broker 
dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.")
+                                            "The order in which a broker 
dispatches messages to consumers is: C1, C2, C3, C1, C4, C5, C4.")
                                     .build());
 
     public static final ConfigOption<Integer> 
PULSAR_MAX_PENDING_CHUNKED_MESSAGE =
@@ -363,32 +377,28 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Consumer buffers chunk messages 
into memory until it receives all the chunks of the original message.")
+                                            "The consumer buffers chunk 
messages into memory until it receives all the chunks of the original message.")
                                     .text(
-                                            "While consuming chunk-messages, 
chunks from same message might not be contiguous"
-                                                    + " in the stream and they 
might be mixed with other messages' chunks.")
+                                            " While consuming chunk-messages, 
chunks from the same message might not be contiguous in the stream and they 
might be mixed with other messages' chunks.")
                                     .text(
-                                            "So, consumer has to maintain 
multiple buffers to manage chunks coming from different messages.")
+                                            " So, consumer has to maintain 
multiple buffers to manage chunks coming from different messages.")
                                     .text(
-                                            "This mainly happens when multiple 
publishers are publishing messages on the topic"
-                                                    + " concurrently or 
publisher failed to publish all chunks of the messages.")
+                                            " This mainly happens when 
multiple publishers are publishing messages on the topic concurrently or 
publishers failed to publish all chunks of the messages.")
                                     .linebreak()
-                                    .text("eg: M1-C1, M2-C1, M1-C2, M2-C2")
                                     .text(
-                                            "Messages M1-C1 and M1-C2 belong 
to original message M1, M2-C1 and M2-C2 messages belong to M2 message.")
+                                            "For example, there are M1-C1, 
M2-C1, M1-C2, M2-C2 messages.")
+                                    .text(
+                                            "Messages M1-C1 and M1-C2 belong 
to the M1 original message while M2-C1 and M2-C2 belong to the M2 message.")
                                     .linebreak()
                                     .text(
-                                            "Buffering large number of 
outstanding uncompleted chunked messages can create memory"
-                                                    + " pressure and it can be 
guarded by providing this %s threshold."
-                                                    + " Once, consumer reaches 
this threshold, it drops the outstanding unchunked-messages"
-                                                    + " by silently acking or 
asking broker to redeliver later by marking it unacked."
-                                                    + " This behavior can be 
controlled by configuration %s",
-                                            code(
-                                                    CONSUMER_CONFIG_PREFIX
-                                                            + 
"maxPendingChunkedMessage"),
+                                            "Buffering a large number of 
outstanding uncompleted chunked messages can bring memory pressure and it can 
be guarded by providing this %s threshold.",
+                                            
code("pulsar.consumer.maxPendingChunkedMessage"))
+                                    .text(
+                                            " Once, a consumer reaches this 
threshold, it drops the outstanding unchunked messages by silently 
acknowledging or asking the broker to redeliver messages later by marking it 
unacknowledged.")
+                                    .text(
+                                            " This behavior can be controlled 
by the %s option.",
                                             code(
-                                                    CONSUMER_CONFIG_PREFIX
-                                                            + 
"autoAckOldestChunkedMessageOnQueueFull"))
+                                                    
"pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull"))
                                     .build());
 
     public static final ConfigOption<Boolean> 
PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL =
@@ -398,16 +408,12 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Buffering large number of 
outstanding uncompleted chunked messages can create memory pressure"
-                                                    + " and it can be guarded 
by providing this %s threshold."
-                                                    + " Once, consumer reaches 
this threshold, it drops the outstanding unchunked-messages"
-                                                    + " by silently acking if 
%s is true else it marks them for redelivery.",
-                                            code(
-                                                    CONSUMER_CONFIG_PREFIX
-                                                            + 
"maxPendingChunkedMessage"),
+                                            "Buffering a large number of 
outstanding uncompleted chunked messages can bring memory pressure and it can 
be guarded by providing this %s threshold.",
+                                            
code("pulsar.consumer.maxPendingChunkedMessage"))
+                                    .text(
+                                            " Once a consumer reaches this 
threshold, it drops the outstanding unchunked-messages by silently 
acknowledging if %s is true. Otherwise, it marks them for redelivery.",
                                             code(
-                                                    CONSUMER_CONFIG_PREFIX
-                                                            + 
"autoAckOldestChunkedMessageOnQueueFull"))
+                                                    
"pulsar.consumer.autoAckOldestChunkedMessageOnQueueFull"))
                                     .build());
 
     public static final ConfigOption<Long> 
PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS =
@@ -415,10 +421,12 @@ public final class PulsarSourceOptions {
                     .longType()
                     .defaultValue(60 * 1000L)
                     .withDescription(
-                            "If producer fails to publish all the chunks of a 
message then consumer"
-                                    + " can expire incomplete chunks if 
consumer won't be able to"
-                                    + " receive all chunks in expire times 
(default 1 hour)."
-                                    + " It uses milliseconds as the unit of 
time.");
+                            Description.builder()
+                                    .text(
+                                            "If a producer fails to publish 
all the chunks of a message,")
+                                    .text(
+                                            " the consumer can expire 
incomplete chunks if the consumer cannot receive all chunks in expire times 
(default 1 hour, in ms).")
+                                    .build());
 
     public static final ConfigOption<ConsumerCryptoFailureAction> 
PULSAR_CRYPTO_FAILURE_ACTION =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "cryptoFailureAction")
@@ -427,23 +435,28 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text(
-                                            "Consumer should take action when 
it receives a message that can not be decrypted.")
+                                            "The consumer should take action 
when it receives a message that can not be decrypted.")
                                     .list(
                                             text(
-                                                    "FAIL: this is the default 
option to fail messages until crypto succeeds."),
+                                                    "%s: this is the default 
option to fail messages until crypto succeeds.",
+                                                    code("FAIL")),
                                             text(
-                                                    "DISCARD: silently 
acknowledge and not deliver message to an application."),
+                                                    "%s: silently acknowledge 
but do not deliver messages to an application.",
+                                                    code("DISCARD")),
                                             text(
-                                                    "CONSUME: deliver 
encrypted messages to applications. It is the application's responsibility to 
decrypt the message."))
+                                                    "%s: deliver encrypted 
messages to applications. It is the application's responsibility to decrypt the 
message.",
+                                                    code("CONSUME")))
                                     .linebreak()
-                                    .text("The decompression of message 
fails.")
+                                    .text("Fail to decompress the messages.")
                                     .linebreak()
                                     .text(
                                             "If messages contain batch 
messages, a client is not be able to retrieve individual messages in batch.")
                                     .linebreak()
                                     .text(
-                                            "Delivered encrypted message 
contains %s which contains encryption and compression information in it using 
which application can decrypt consumed message payload.",
+                                            "The delivered encrypted message 
contains %s which contains encryption and compression information in.",
                                             code("EncryptionContext"))
+                                    .text(
+                                            " You can use an application to 
decrypt the consumed message payload.")
                                     .build());
 
     public static final ConfigOption<Map<String, String>> 
PULSAR_CONSUMER_PROPERTIES =
@@ -453,17 +466,15 @@ public final class PulsarSourceOptions {
                     .withDescription(
                             Description.builder()
                                     .text("A name or value property of this 
consumer.")
-                                    .linebreak()
                                     .text(
-                                            "%s is application defined 
metadata attached to a consumer.",
+                                            " %s is application defined 
metadata attached to a consumer.",
                                             code("properties"))
-                                    .linebreak()
                                     .text(
-                                            "When getting a topic stats, 
associate this metadata with the consumer stats for easier identification.")
+                                            " When getting a topic stats, 
associate this metadata with the consumer stats for easier identification.")
                                     .build());
 
     public static final ConfigOption<Boolean> PULSAR_READ_COMPACTED =
-            ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "readCompacted") // 
NOSONAR
+            ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "readCompacted")
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
@@ -524,47 +535,31 @@ public final class PulsarSourceOptions {
                     .intType()
                     .defaultValue(0)
                     .withDescription(
-                            "Maximum number of times that a message will be 
redelivered before being sent to the dead letter queue.");
+                            "The maximum number of times that a message are 
redelivered before being sent to the dead letter queue.");
 
     public static final ConfigOption<String> PULSAR_RETRY_LETTER_TOPIC =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + 
"deadLetterPolicy.retryLetterTopic")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription(
-                            "Name of the retry topic where the failing 
messages will be sent.");
+                    .withDescription("Name of the retry topic where the failed 
messages are sent.");
     public static final ConfigOption<String> PULSAR_DEAD_LETTER_TOPIC =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + 
"deadLetterPolicy.deadLetterTopic")
                     .stringType()
                     .noDefaultValue()
-                    .withDescription(
-                            "Name of the dead topic where the failing messages 
will be sent.");
+                    .withDescription("Name of the dead topic where the failed 
messages are sent.");
 
     public static final ConfigOption<Boolean> PULSAR_RETRY_ENABLE =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "retryEnable")
                     .booleanType()
                     .defaultValue(false)
-                    .withDescription("If enabled, the consumer will auto retry 
messages.");
-
-    public static final ConfigOption<Boolean> PULSAR_AUTO_UPDATE_PARTITIONS =
-            ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "autoUpdatePartitions")
-                    .booleanType()
-                    .defaultValue(true)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "If %s is enabled, a consumer 
subscribes to partition increase automatically.",
-                                            code("autoUpdatePartitions"))
-                                    .linebreak()
-                                    .text("Note: this is only for partitioned 
consumers.\t")
-                                    .build());
+                    .withDescription("If enabled, the consumer will 
automatically retry messages.");
 
     public static final ConfigOption<Integer> 
PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + 
"autoUpdatePartitionsIntervalSeconds")
                     .intType()
                     .defaultValue(60)
                     .withDescription(
-                            "Set the interval (in seconds) of updating 
partitions."
-                                    + " This only works if 
autoUpdatePartitions is enabled.");
+                            "The interval (in seconds) of updating partitions. 
This only works if autoUpdatePartitions is enabled.");
 
     public static final ConfigOption<Boolean> 
PULSAR_REPLICATE_SUBSCRIPTION_STATE =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + 
"replicateSubscriptionState")
@@ -582,23 +577,11 @@ public final class PulsarSourceOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "Ack will return receipt but does not mean that 
the message will not be resent after get receipt.");
+                            "Acknowledgement will return a receipt but this 
does not mean that the message will not be resent after getting the receipt.");
 
     public static final ConfigOption<Boolean> PULSAR_POOL_MESSAGES =
             ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "poolMessages")
                     .booleanType()
                     .defaultValue(false)
-                    .withDescription(
-                            Description.builder()
-                                    .text(
-                                            "Enable pooling of messages and 
the underlying data buffers.")
-                                    .linebreak()
-                                    .text(
-                                            "When pooling is enabled, the 
application is responsible for calling"
-                                                    + " %s after the handling 
of every received message. If %s"
-                                                    + " is not called on a 
received message, there will be a memory leak."
-                                                    + " If an application 
attempts to use and already \"released\" message,"
-                                                    + " it might experience 
undefined behavior (eg: memory corruption, deserialization error, etc.).",
-                                            code("Message.release()"), 
code("release()"))
-                                    .build());
+                    .withDescription("Enable pooling of messages and the 
underlying data buffers.");
 }
diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml
index 68364e0..3791708 100644
--- a/flink-docs/pom.xml
+++ b/flink-docs/pom.xml
@@ -121,6 +121,12 @@ under the License.
                </dependency>
 
                <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
                        <groupId>org.apache.logging.log4j</groupId>
                        <artifactId>log4j-slf4j-impl</artifactId>
                        <scope>compile</scope>
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
 
b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 8f04aea..9272789 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -95,7 +95,13 @@ public class ConfigOptionsDocGenerator {
                         "flink-kubernetes", 
"org.apache.flink.kubernetes.configuration"),
                 new OptionsClassLocation("flink-clients", 
"org.apache.flink.client.cli"),
                 new OptionsClassLocation(
-                        "flink-table/flink-sql-client", 
"org.apache.flink.table.client.config")
+                        "flink-table/flink-sql-client", 
"org.apache.flink.table.client.config"),
+                new OptionsClassLocation(
+                        "flink-connectors/flink-connector-pulsar",
+                        "org.apache.flink.connector.pulsar.common.config"),
+                new OptionsClassLocation(
+                        "flink-connectors/flink-connector-pulsar",
+                        "org.apache.flink.connector.pulsar.source")
             };
 
     static final Set<String> EXCLUSIONS =

Reply via email to