MartijnVisser commented on a change in pull request #19056:
URL: https://github.com/apache/flink/pull/19056#discussion_r827071130



##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -44,17 +43,19 @@ See how to link with them for cluster execution [here]({{< 
ref "docs/dev/configu
 This part describes the Pulsar source based on the new
 [data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
 
-If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower 
releases, just use the StreamNative's 
[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower 
releases,
+just use the StreamNative's [Pulsar Flink 
connector](https://github.com/streamnative/pulsar-flink).
 {{< /hint >}}
 
 ### Usage
 
-Pulsar source provides a builder class for constructing an instance of 
PulsarSource. The code snippet below shows
-how to build a PulsarSource to consume messages from the earliest cursor of 
topic "persistent://public/default/my-topic",
-with **Exclusive** subscription `my-subscription` and deserialize the raw 
payload of the messages as strings.
+Pulsar source provides a builder class for constructing a PulsarSource 
instance. The code snippet below shows
+how to build a PulsarSource instance to consume messages from the earliest 
cursor of the topic
+"persistent://public/default/my-topic" in **Exclusive** subscription type 
(`my-subscription`)
+and deserialize the raw payload of the messages as strings.

Review comment:
       ```suggestion
   The Pulsar source provides a builder class for constructing a PulsarSource 
instance. The code snippet below builds a PulsarSource instance. It consumes 
messages from the earliest cursor of the topic
   "persistent://public/default/my-topic" in **Exclusive** subscription type 
(`my-subscription`)
   and deserializes the raw payload of the messages as strings.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -24,14 +24,13 @@ under the License.
 
 # Apache Pulsar Connector
 
-Flink provides an [Apache Pulsar](https://pulsar.apache.org) source connector 
for reading data from Pulsar topics with exactly-once guarantees.
+Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for 
reading data from Pulsar topics with exactly-once guarantees.
 
 ## Dependency
 
-You can use the connector with Pulsar 2.7.0 or higher. However, the Pulsar 
source connector supports
-Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/),
-it is recommended to use Pulsar 2.8.0 or higher releases.
-For details on Pulsar compatibility, please refer to the 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
+You can use the connector with the Pulsar 2.8.1 or higher version. However, 
the Pulsar connector supports
+Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/), it is 
recommended to use the Pulsar 2.9.2 or higher version.
+For details on Pulsar compatibility, refer to the 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).

Review comment:
       ```suggestion
   You can use the connector with the Pulsar 2.8.1 or higher. Because the 
Pulsar connector supports
   Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/), it is 
recommended to use the Pulsar 2.9.2 or higher.
   Details on Pulsar compatibility can be found in 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition 
subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form 
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  
`{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the 
sake of simplicity).
 The flexible naming system stems from the fact that there is now a default 
topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and 
translated topic name:
+This table lists a mapping relationship between your input topic name and the 
translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                    
      |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`   
      |
+| `my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic 
name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would 
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would 
use `non-persistent://public/default/my-topic` instead.

Review comment:
       ```suggestion
   Thus, you cannot use a short name like `non-persistent://my-topic` and need 
to use `non-persistent://public/default/my-topic` instead.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition 
subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form 
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  
`{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the 
sake of simplicity).
 The flexible naming system stems from the fact that there is now a default 
topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and 
translated topic name:
+This table lists a mapping relationship between your input topic name and the 
translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                    
      |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`   
      |
+| `my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic 
name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would 
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would 
use `non-persistent://public/default/my-topic` instead.
 {{< /hint >}}
 
 #### Subscribing Pulsar Topic Partition
 
 Internally, Pulsar divides a partitioned topic as a set of non-partitioned 
topics according to the partition size.
 
-For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with the `flink` namespace.
 The topics on Pulsar would be:
 
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name                                            | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string`             | Y           |
+| `persistent://sample/flink/simple-string-partition-0` | N           |
+| `persistent://sample/flink/simple-string-partition-1` | N           |
+| `persistent://sample/flink/simple-string-partition-2` | N           |
 
 You can directly consume messages from the topic partitions by using the 
non-partitioned topic names above.
-For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 
of the `sample/flink/simple-string` topic.
+For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
 
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern

Review comment:
       ```suggestion
   #### Setting Topic Patterns
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -44,17 +43,19 @@ See how to link with them for cluster execution [here]({{< 
ref "docs/dev/configu
 This part describes the Pulsar source based on the new
 [data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
 
-If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower 
releases, just use the StreamNative's 
[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower 
releases,
+just use the StreamNative's [Pulsar Flink 
connector](https://github.com/streamnative/pulsar-flink).
 {{< /hint >}}

Review comment:
       I would completely remove this part from the documentation, since it 
will end up in documentation for Flink 1.15 and future versions. 

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -69,13 +70,17 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"Pulsar Source");
 
 The following properties are **required** for building a PulsarSource:
 
-- Pulsar service url, configured by `setServiceUrl(String)`
-- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Pulsar service URL, configured by `setServiceUrl(String)`
+- Pulsar service HTTP URL (also known as admin URL), configured by 
`setAdminUrl(String)`
 - Pulsar subscription name, configured by `setSubscriptionName(String)`
 - Topics / partitions to subscribe, see the following
-  [Topic-partition subscription](#topic-partition-subscription) for more 
details.
+  [topic-partition subscription](#topic-partition-subscription) for more 
details.
 - Deserializer to parse Pulsar messages, see the following
-  [Deserializer](#deserializer) for more details.
+  [deserializer](#deserializer) for more details.
+
+It is **recommended** to set the consumer name in Pulsar Source by 
`setConsumerName(String)`.
+This would give a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.

Review comment:
       ```suggestion
   It is recommended to set the consumer name in Pulsar Source by 
`setConsumerName(String)`.
   This sets a unique name for the Flink connector in the Pulsar statistic 
dashboard.
   You can use it to monitor the performance of your Flink connector and 
applications.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition 
subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form 
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  
`{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the 
sake of simplicity).
 The flexible naming system stems from the fact that there is now a default 
topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and 
translated topic name:
+This table lists a mapping relationship between your input topic name and the 
translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                    
      |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`   
      |
+| `my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic 
name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would 
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would 
use `non-persistent://public/default/my-topic` instead.
 {{< /hint >}}
 
 #### Subscribing Pulsar Topic Partition
 
 Internally, Pulsar divides a partitioned topic as a set of non-partitioned 
topics according to the partition size.
 
-For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with the `flink` namespace.
 The topics on Pulsar would be:
 
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name                                            | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string`             | Y           |
+| `persistent://sample/flink/simple-string-partition-0` | N           |
+| `persistent://sample/flink/simple-string-partition-1` | N           |
+| `persistent://sample/flink/simple-string-partition-2` | N           |
 
 You can directly consume messages from the topic partitions by using the 
non-partitioned topic names above.
-For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 
of the `sample/flink/simple-string` topic.
+For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
 
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
 
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`) 
from the given topic pattern.
-For example, 
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be 
`non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in 
the regular expression.
+Pulsar source extracts the topic type (`persistent` or `non-persistent`) from 
the given topic pattern.
+For example, you can use the 
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to 
specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic 
type in the regular expression.

Review comment:
       ```suggestion
   The Pulsar source extracts the topic type (`persistent` or `non-persistent`) 
from the provided topic pattern.
   For example, you can use the 
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to 
specify a `non-persistent` topic.
   By default, a `persistent` topic is created if you do not specify the topic 
type in the regular expression.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition 
subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form 
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  
`{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the 
sake of simplicity).
 The flexible naming system stems from the fact that there is now a default 
topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and 
translated topic name:
+This table lists a mapping relationship between your input topic name and the 
translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                    
      |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`   
      |
+| `my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic 
name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would 
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would 
use `non-persistent://public/default/my-topic` instead.
 {{< /hint >}}
 
 #### Subscribing Pulsar Topic Partition
 
 Internally, Pulsar divides a partitioned topic as a set of non-partitioned 
topics according to the partition size.
 
-For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with the `flink` namespace.
 The topics on Pulsar would be:
 
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name                                            | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string`             | Y           |
+| `persistent://sample/flink/simple-string-partition-0` | N           |
+| `persistent://sample/flink/simple-string-partition-1` | N           |
+| `persistent://sample/flink/simple-string-partition-2` | N           |
 
 You can directly consume messages from the topic partitions by using the 
non-partitioned topic names above.
-For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 
of the `sample/flink/simple-string` topic.
+For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
 
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
 
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`) 
from the given topic pattern.
-For example, 
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be 
`non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in 
the regular expression.
+Pulsar source extracts the topic type (`persistent` or `non-persistent`) from 
the given topic pattern.
+For example, you can use the 
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to 
specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic 
type in the regular expression.
 
-To consume both `persistent` and `non-persistent` topics based on the topic 
pattern,
-you can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`.
-Pulsar connector would filter the available topics by the 
`RegexSubscriptionMode`.
+You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to 
consume
+both `persistent` and `non-persistent` topics based on the topic pattern.
+Pulsar source would filter the available topics by the `RegexSubscriptionMode`.
 
 ### Deserializer
 
-A deserializer (Deserialization schema) is required for parsing Pulsar 
messages. The deserializer is
-configured by `setDeserializationSchema(PulsarDeserializationSchema)`.
+A deserializer (`PulsarDeserializationSchema`) is for parsing Pulsar messages 
from bytes.
+You can configure the deserializer using 
`setDeserializationSchema(PulsarDeserializationSchema)`.

Review comment:
       ```suggestion
   A deserializer (`PulsarDeserializationSchema`) is for decoding Pulsar 
messages from bytes.
   You can configure the deserializer using 
`setDeserializationSchema(PulsarDeserializationSchema)`.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -206,16 +212,17 @@ PulsarSource.builder().setSubscriptionName("my-shared")
 
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
 ```
 
-If you want to use `Key_Shared` subscription type on the Pulsar connector. 
Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the 
message key is contained in the specified range.
+If you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you provide a `RangeGenerator` implementation. The 
`RangeGenerator` generates a set of
+key hash ranges so that a respective reader subtask only dispatches messages 
where the hash of the
+message key is contained in the specified range.
 
-Pulsar connector would use a `UniformRangeGenerator` which would divides the 
range by the Flink source parallelism
-if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
+Pulsar connector uses a `UniformRangeGenerator` which divides the range by the 
Flink source
+parallelism if no `RangeGenerator` is provided in the `Key_Shared` 
subscription type.
 
 ### Starting Position
 
-Pulsar source is able to consume messages starting from different positions by 
`setStartCursor(StartCursor)`.
+Pulsar source is able to consume messages starting from different positions by 
setting the `setStartCursor(StartCursor)` option.

Review comment:
       ```suggestion
   The Pulsar source is able to consume messages starting from different 
positions by setting the `setStartCursor(StartCursor)` option.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition 
subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form 
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  
`{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the 
sake of simplicity).
 The flexible naming system stems from the fact that there is now a default 
topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and 
translated topic name:
+This table lists a mapping relationship between your input topic name and the 
translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                    
      |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`   
      |
+| `my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic 
name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would 
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would 
use `non-persistent://public/default/my-topic` instead.
 {{< /hint >}}
 
 #### Subscribing Pulsar Topic Partition
 
 Internally, Pulsar divides a partitioned topic as a set of non-partitioned 
topics according to the partition size.
 
-For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with the `flink` namespace.
 The topics on Pulsar would be:
 
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name                                            | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string`             | Y           |
+| `persistent://sample/flink/simple-string-partition-0` | N           |
+| `persistent://sample/flink/simple-string-partition-1` | N           |
+| `persistent://sample/flink/simple-string-partition-2` | N           |
 
 You can directly consume messages from the topic partitions by using the 
non-partitioned topic names above.
-For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 
of the `sample/flink/simple-string` topic.
+For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
 
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
 
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`) 
from the given topic pattern.
-For example, 
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be 
`non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in 
the regular expression.
+Pulsar source extracts the topic type (`persistent` or `non-persistent`) from 
the given topic pattern.
+For example, you can use the 
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to 
specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic 
type in the regular expression.
 
-To consume both `persistent` and `non-persistent` topics based on the topic 
pattern,
-you can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`.
-Pulsar connector would filter the available topics by the 
`RegexSubscriptionMode`.
+You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to 
consume
+both `persistent` and `non-persistent` topics based on the topic pattern.
+Pulsar source would filter the available topics by the `RegexSubscriptionMode`.

Review comment:
       ```suggestion
   You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to 
consume
   both `persistent` and `non-persistent` topics based on the topic pattern.
   The Pulsar source would filter the available topics by the 
`RegexSubscriptionMode`.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -176,12 +182,12 @@ you can use the predefined `PulsarDeserializationSchema`. 
Pulsar connector provi
   ```
 
 Pulsar `Message<byte[]>` contains some [extra 
properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
-such as message key, message publish time, message time, application defined 
key/value pairs that will be attached to the message, etc.
+such as message key, message publish time, message time, and 
application-defined key/value pairs etc.
 These properties could be acquired by the `Message<byte[]>` interface.
 
 If you want to deserialize the Pulsar message by these properties, you need to 
implement `PulsarDeserializationSchema`.
-And ensure that the `TypeInformation` from the 
`PulsarDeserializationSchema.getProducedType()` must be correct.
-Flink would use this `TypeInformation` for passing the messages to downstream 
operators.
+And ensure that the `TypeInformation` from the 
`PulsarDeserializationSchema.getProducedType()` is correct.
+Flink uses this `TypeInformation` to pass the messages to downstream operators.

Review comment:
       ```suggestion
   Ensure that the `TypeInformation` from the 
`PulsarDeserializationSchema.getProducedType()` is correct.
   Flink uses this `TypeInformation` to pass the messages to downstream 
operators.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -227,14 +234,14 @@ Built-in start cursors include:
   StartCursor.latest()
   ```
 - Start from a specified message between the earliest and the latest.
-  Pulsar connector would consume from the latest available message if the 
message id doesn't exist.
+  Pulsar connector consumes from the latest available message if the message 
ID does not exist.

Review comment:
       ```suggestion
   The Pulsar connector consumes from the latest available message if the 
message ID does not exist.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -206,16 +212,17 @@ PulsarSource.builder().setSubscriptionName("my-shared")
 
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
 ```
 
-If you want to use `Key_Shared` subscription type on the Pulsar connector. 
Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the 
message key is contained in the specified range.
+If you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you provide a `RangeGenerator` implementation. The 
`RangeGenerator` generates a set of
+key hash ranges so that a respective reader subtask only dispatches messages 
where the hash of the
+message key is contained in the specified range.
 
-Pulsar connector would use a `UniformRangeGenerator` which would divides the 
range by the Flink source parallelism
-if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
+Pulsar connector uses a `UniformRangeGenerator` which divides the range by the 
Flink source
+parallelism if no `RangeGenerator` is provided in the `Key_Shared` 
subscription type.

Review comment:
       ```suggestion
   The Pulsar connector uses `UniformRangeGenerator` that divides the range by 
the Flink source
   parallelism if no `RangeGenerator` is provided in the `Key_Shared` 
subscription type.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -227,14 +234,14 @@ Built-in start cursors include:
   StartCursor.latest()
   ```
 - Start from a specified message between the earliest and the latest.
-  Pulsar connector would consume from the latest available message if the 
message id doesn't exist.
+  Pulsar connector consumes from the latest available message if the message 
ID does not exist.
 
   The start message is included in consuming result.
   ```java
   StartCursor.fromMessageId(MessageId)
   ```
 - Start from a specified message between the earliest and the latest.
-  Pulsar connector would consume from the latest available message if the 
message id doesn't exist.
+  Pulsar connector consumes from the latest available message if the message 
id doesn't exist.

Review comment:
       ```suggestion
   The Pulsar connector consumes from the latest available message if the 
message ID doesn't exist.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -176,12 +182,12 @@ you can use the predefined `PulsarDeserializationSchema`. 
Pulsar connector provi
   ```
 
 Pulsar `Message<byte[]>` contains some [extra 
properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
-such as message key, message publish time, message time, application defined 
key/value pairs that will be attached to the message, etc.
+such as message key, message publish time, message time, and 
application-defined key/value pairs etc.
 These properties could be acquired by the `Message<byte[]>` interface.

Review comment:
       ```suggestion
   These properties could be defined in the `Message<byte[]>` interface.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.

Review comment:
       ```suggestion
   The default timeout for Pulsar transactions is 3 hours.
   Make sure that that timeout is greater than checkpoint interval + maximum 
recovery time.
   A shorter checkpoint interval indicates a better consuming performance.
   You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` 
option to change the transaction timeout.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on 
how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition) 
about how the message is stored,
 you can create a `MessageId` by using `DefaultImplementation.newMessageId(long 
ledgerId, long entryId, int partitionIndex)`.
 {{< /hint >}}
 
 ### Boundedness
 
 Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+By default, the `PulsarSource` runs in the streaming mode.
 
-In streaming mode, Pulsar source never stops until a Flink job fails or is 
cancelled. However,
-you can set Pulsar source stopping at a stop position by using 
```setUnboundedStopCursor(StopCursor)```.
-The Pulsar source will finish when all partitions reach their specified stop 
positions.
+In the streaming mode, Pulsar source never stops until a Flink job fails or is 
cancelled.
+However, you can use the `setUnboundedStopCursor(StopCursor)` to set the 
Pulsar source to stop at a specific stop position.
 
-You can use ```setBoundedStopCursor(StopCursor)``` to specify a stop position 
so that the Pulsar source can run in the batch mode.
-When all partitions have reached their stop positions, the source will finish.
+You can use `setBoundedStopCursor(StopCursor)` to specify a stop position to 
run in batch mode.

Review comment:
       ```suggestion
   You can use `setBoundedStopCursor(StopCursor)` to specify a stop position 
for bounded data. 
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics

Review comment:
       ```suggestion
   ### Producing to topics
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
   StopCursor.atEventTime(long)
   ```
 
-### Configurable Options
+### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary 
options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
 
 #### PulsarClient Options
 
-Pulsar connector use the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration 
options in `PulsarOptions`.
 
 {{< generated/pulsar_client_configuration >}}
 
 #### PulsarAdmin Options
 
 The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used 
for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic 
pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed 
here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses 
topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
 They are also defined in `PulsarOptions`.
 
 {{< generated/pulsar_admin_configuration >}}
 
 #### Pulsar Consumer Options
 
 In general, Pulsar provides the Reader API and Consumer API for consuming 
messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of 
Pulsar's `ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's 
`ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
 
 {{< generated/pulsar_consumer_configuration >}}
 
 #### PulsarSource Options
 
 The configuration options below are mainly used for customizing the 
performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.

Review comment:
       ```suggestion
   You can ignore them if you do not have any performance issues.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on 
how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition) 
about how the message is stored,
 you can create a `MessageId` by using `DefaultImplementation.newMessageId(long 
ledgerId, long entryId, int partitionIndex)`.
 {{< /hint >}}
 
 ### Boundedness
 
 Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+By default, the `PulsarSource` runs in the streaming mode.
 
-In streaming mode, Pulsar source never stops until a Flink job fails or is 
cancelled. However,
-you can set Pulsar source stopping at a stop position by using 
```setUnboundedStopCursor(StopCursor)```.
-The Pulsar source will finish when all partitions reach their specified stop 
positions.
+In the streaming mode, Pulsar source never stops until a Flink job fails or is 
cancelled.
+However, you can use the `setUnboundedStopCursor(StopCursor)` to set the 
Pulsar source to stop at a specific stop position.

Review comment:
       ```suggestion
   For unbounded data the Pulsar source never stops until a Flink job is 
stopped or failed. 
   You can use the `setUnboundedStopCursor(StopCursor)` to set the Pulsar 
source to stop at a specific stop position.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
   StopCursor.atEventTime(long)
   ```
 
-### Configurable Options
+### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary 
options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
 
 #### PulsarClient Options
 
-Pulsar connector use the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration 
options in `PulsarOptions`.
 
 {{< generated/pulsar_client_configuration >}}
 
 #### PulsarAdmin Options
 
 The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used 
for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic 
pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed 
here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses 
topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
 They are also defined in `PulsarOptions`.
 
 {{< generated/pulsar_admin_configuration >}}
 
 #### Pulsar Consumer Options
 
 In general, Pulsar provides the Reader API and Consumer API for consuming 
messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of 
Pulsar's `ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's 
`ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
 
 {{< generated/pulsar_consumer_configuration >}}
 
 #### PulsarSource Options
 
 The configuration options below are mainly used for customizing the 
performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
 
 {{< generated/pulsar_source_configuration >}}
 
 ### Dynamic Partition Discovery
 
 To handle scenarios like topic scaling-out or topic creation without 
restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions 
under provided
-topic-partition subscribing pattern. To enable partition discovery, set a 
non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can 
set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
 
 ```java
 // discover new partitions per 10 seconds
 PulsarSource.builder()
-        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
10000);
+    .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
10000);
 ```
 
 {{< hint warning >}}
-- Partition discovery is **enabled** by default. Pulsar connector would query 
the topic metadata every 30 seconds.
-- You need to set the partition discovery interval to a negative value to 
disable this feature.
-- The partition discovery would be disabled in batch mode even if you set this 
option with a non-negative value.
+- Partition discovery is **enabled** by default. The Pulsar connector queries 
the topic metadata every 30 seconds.
+- To disable partition discovery, you need to set a negative partition 
discovery interval.
+- The partition discovery is disabled in the batch mode even if you set this 
option with a non-negative value.
 {{< /hint >}}
 
 ### Event Time and Watermarks
 
 By default, the message uses the timestamp embedded in Pulsar 
`Message<byte[]>` as the event time.
-You can define your own `WatermarkStrategy` to extract the event time from the 
message,
+You can define a `WatermarkStrategy` to extract the event time from the 
message,

Review comment:
       The previous sentence is actually better. 
   ```suggestion
   You can define your own `WatermarkStrategy` to extract the event time from 
the message,
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -206,16 +212,17 @@ PulsarSource.builder().setSubscriptionName("my-shared")
 
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
 ```
 
-If you want to use `Key_Shared` subscription type on the Pulsar connector. 
Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the 
message key is contained in the specified range.
+If you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you provide a `RangeGenerator` implementation. The 
`RangeGenerator` generates a set of
+key hash ranges so that a respective reader subtask only dispatches messages 
where the hash of the
+message key is contained in the specified range.

Review comment:
       ```suggestion
   Ensure that you provide a `RangeGenerator` implementation if you want to use 
the `Key_Shared` subscription type on the Pulsar connector.
   The `RangeGenerator` generates a set of key hash ranges so that a respective 
reader subtask only dispatches messages where the hash of the message key is 
contained in the specified range.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
   StopCursor.atEventTime(long)
   ```
 
-### Configurable Options
+### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary 
options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
 
 #### PulsarClient Options
 
-Pulsar connector use the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration 
options in `PulsarOptions`.
 
 {{< generated/pulsar_client_configuration >}}
 
 #### PulsarAdmin Options
 
 The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used 
for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic 
pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed 
here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses 
topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
 They are also defined in `PulsarOptions`.
 
 {{< generated/pulsar_admin_configuration >}}
 
 #### Pulsar Consumer Options
 
 In general, Pulsar provides the Reader API and Consumer API for consuming 
messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of 
Pulsar's `ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's 
`ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
 
 {{< generated/pulsar_consumer_configuration >}}
 
 #### PulsarSource Options
 
 The configuration options below are mainly used for customizing the 
performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
 
 {{< generated/pulsar_source_configuration >}}
 
 ### Dynamic Partition Discovery
 
 To handle scenarios like topic scaling-out or topic creation without 
restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions 
under provided
-topic-partition subscribing pattern. To enable partition discovery, set a 
non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can 
set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:

Review comment:
       ```suggestion
   job, the Pulsar source periodically discover new partitions under a provided
   topic-partition subscription pattern. To enable partition discovery, you can 
set a non-negative value for
   the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on 
how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition) 
about how the message is stored,
 you can create a `MessageId` by using `DefaultImplementation.newMessageId(long 
ledgerId, long entryId, int partitionIndex)`.
 {{< /hint >}}
 
 ### Boundedness
 
 Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+By default, the `PulsarSource` runs in the streaming mode.

Review comment:
       ```suggestion
   By default, the `PulsarSource` is configured for unbounded data.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
   StopCursor.atEventTime(long)
   ```
 
-### Configurable Options
+### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary 
options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
 
 #### PulsarClient Options
 
-Pulsar connector use the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration 
options in `PulsarOptions`.
 
 {{< generated/pulsar_client_configuration >}}
 
 #### PulsarAdmin Options
 
 The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used 
for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic 
pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed 
here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses 
topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
 They are also defined in `PulsarOptions`.
 
 {{< generated/pulsar_admin_configuration >}}
 
 #### Pulsar Consumer Options
 
 In general, Pulsar provides the Reader API and Consumer API for consuming 
messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of 
Pulsar's `ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's 
`ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
 
 {{< generated/pulsar_consumer_configuration >}}
 
 #### PulsarSource Options
 
 The configuration options below are mainly used for customizing the 
performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
 
 {{< generated/pulsar_source_configuration >}}
 
 ### Dynamic Partition Discovery
 
 To handle scenarios like topic scaling-out or topic creation without 
restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions 
under provided
-topic-partition subscribing pattern. To enable partition discovery, set a 
non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can 
set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
 
 ```java
 // discover new partitions per 10 seconds
 PulsarSource.builder()
-        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
10000);
+    .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
10000);
 ```
 
 {{< hint warning >}}
-- Partition discovery is **enabled** by default. Pulsar connector would query 
the topic metadata every 30 seconds.
-- You need to set the partition discovery interval to a negative value to 
disable this feature.
-- The partition discovery would be disabled in batch mode even if you set this 
option with a non-negative value.
+- Partition discovery is **enabled** by default. The Pulsar connector queries 
the topic metadata every 30 seconds.
+- To disable partition discovery, you need to set a negative partition 
discovery interval.
+- The partition discovery is disabled in the batch mode even if you set this 
option with a non-negative value.

Review comment:
       ```suggestion
   - Partition discovery is disabled for bounded data even if you set this 
option with a non-negative value.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.

Review comment:
       ```suggestion
   Defining the topics for producing is similar to the [topic-partition 
subscription](#topic-partition-subscription)
   in the Pulsar source. We support a mix-in style of topic setting. You can 
provide a list of topics,
   partitions, or both of them.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic 
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only use the topic.

Review comment:
       ```suggestion
   If you build the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only uses the topic.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.

Review comment:
       ```suggestion
   No consistency guarantees can be made in this scenario.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic 
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the 
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's 
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported 
in Pulsar Sink.

Review comment:
       ```suggestion
   A serializer (`PulsarSerializationSchema`) is required for serializing the 
record instance into bytes.
   Similar to `PulsarSource`, Pulsar sink supports both Flink's 
`SerializationSchema` and
   Pulsar's `Schema`. Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported 
in the Pulsar sink.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic 
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the 
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's 
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported 
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
 interface,
+you can use the predefined `PulsarSerializationSchema`. Pulsar sink provides 
two implementation methods.
+
+- Encode the message by using Pulsar's 
[Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+  ```java
+  // Primitive types
+  PulsarSerializationSchema.pulsarSchema(Schema)
+
+  // Struct types (JSON, Protobuf, Avro, etc.)
+  PulsarSerializationSchema.pulsarSchema(Schema, Class)
+
+  // KeyValue type
+  PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
+  ```
+- Encode the message by using Flink's `SerializationSchema`
+  ```java
+  PulsarSerializationSchema.flinkSchema(SerializationSchema)
+  ```
+
+[Schema 
evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
+can be enabled for users by using `PulsarSerializationSchema.pulsarSchema()` 
and
+`PulsarSinkBuilder.enableSchemaEvolution()`, meaning that any broker schema 
validation will be in place.
+Here is a code sample below.
+
+```java
+Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
+PulsarSerializationSchema<SomePojo> pulsarSchema = 
PulsarSerializationSchema.pulsarSchema(schema, SomePojo.class);
+
+PulsarSink<String> sink = PulsarSink.builder()
+    ...
+    .setSerializationSchema(pulsarSchema)
+    .enableSchemaEvolution()
+    .build();
+```
+
+{{< hint warning >}}
+If you use Pulsar schema without enabling schema evolution, the target topic 
will have a `Schema.BYTES` schema.
+And consumers need to handle the deserialization (if needed) themselves.

Review comment:
       ```suggestion
   Consumers will need to handle the deserialization (if needed) themselves.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic 
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the 
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's 
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported 
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
 interface,

Review comment:
       Should this really point to a snapshot version? 

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic 
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the 
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's 
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported 
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
 interface,
+you can use the predefined `PulsarSerializationSchema`. Pulsar sink provides 
two implementation methods.

Review comment:
       ```suggestion
   you can use the predefined `PulsarSerializationSchema`. The Pulsar sink 
provides two implementation methods.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic 
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.

Review comment:
       ```suggestion
   For example, when using the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
   this is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic 
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build  the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the 
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's 
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported 
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
 interface,
+you can use the predefined `PulsarSerializationSchema`. Pulsar sink provides 
two implementation methods.
+
+- Encode the message by using Pulsar's 
[Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+  ```java
+  // Primitive types
+  PulsarSerializationSchema.pulsarSchema(Schema)
+
+  // Struct types (JSON, Protobuf, Avro, etc.)
+  PulsarSerializationSchema.pulsarSchema(Schema, Class)
+
+  // KeyValue type
+  PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
+  ```
+- Encode the message by using Flink's `SerializationSchema`
+  ```java
+  PulsarSerializationSchema.flinkSchema(SerializationSchema)
+  ```
+
+[Schema 
evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
+can be enabled for users by using `PulsarSerializationSchema.pulsarSchema()` 
and
+`PulsarSinkBuilder.enableSchemaEvolution()`, meaning that any broker schema 
validation will be in place.
+Here is a code sample below.

Review comment:
       ```suggestion
   [Schema 
evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
   can be enabled by users using `PulsarSerializationSchema.pulsarSchema()` and
   `PulsarSinkBuilder.enableSchemaEvolution()`. This means that any broker 
schema validation is in place.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition 
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you 
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic 
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.

Review comment:
       ```suggestion
   Configuring writing targets can be replaced by using a custom [`TopicRouter`]
   [message routing](#message-routing). Configuring partitions on the Pulsar 
connector is explained in the [flexible topic naming](#flexible-topic-naming) 
section.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -24,14 +24,13 @@ under the License.
 
 # Apache Pulsar Connector
 
-Flink provides an [Apache Pulsar](https://pulsar.apache.org) source connector 
for reading data from Pulsar topics with exactly-once guarantees.
+Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for 
reading data from Pulsar topics with exactly-once guarantees.

Review comment:
       ```suggestion
   Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for 
reading and writing data from and to Pulsar topics with exactly-once guarantees.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.

Review comment:
       ```suggestion
   The Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}

Review comment:
       Like for the Source, just remove parts that are referring to older 
versions

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by 
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic 
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.

Review comment:
       ```suggestion
   It is recommended to set the producer name in Pulsar Source by 
`setProducerName(String)`.
   This sets a unique name for the Flink connector in the Pulsar statistic 
dashboard.
   You can use it to monitor the performance of your Flink connector and 
applications.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on 
how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition) 
about how the message is stored,
 you can create a `MessageId` by using `DefaultImplementation.newMessageId(long 
ledgerId, long entryId, int partitionIndex)`.
 {{< /hint >}}
 
 ### Boundedness
 
 Pulsar source supports streaming and batch running modes.

Review comment:
       ```suggestion
   The Pulsar source supports streaming and batch execution mode.
   ```

##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is  created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better  consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.

Review comment:
       ```suggestion
   The Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to