This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new d630eaa  [FLINK-26028][Connector/pulsar] add sink documentation; 
change some pulsar source documentation.
d630eaa is described below

commit d630eaae12e743a3666f42c2e7f6c6ce9caa0d66
Author: Yufei Zhang <[email protected]>
AuthorDate: Wed Feb 23 11:16:47 2022 +0800

    [FLINK-26028][Connector/pulsar] add sink documentation; change some pulsar 
source documentation.
---
 .../docs/connectors/datastream/pulsar.md           | 474 ++++++++++++++---
 docs/content/docs/connectors/datastream/pulsar.md  | 586 +++++++++++++++++----
 .../generated/pulsar_producer_configuration.html   |  90 ++++
 .../generated/pulsar_sink_configuration.html       |  48 ++
 .../configuration/ConfigOptionsDocGenerator.java   |   3 +
 5 files changed, 1007 insertions(+), 194 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md 
b/docs/content.zh/docs/connectors/datastream/pulsar.md
index c14958a..3ba5989 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -24,12 +24,11 @@ under the License.
 
 # Apache Pulsar 连接器
 
-Flink 当前只提供 [Apache Pulsar](https://pulsar.apache.org) 数据源,用户可以使用它从 Pulsar 
读取数据,并保证每条数据只被处理一次。
+Flink 当前提供 [Apache Pulsar](https://pulsar.apache.org) Source 和 Sink 
连接器,用户可以使用它从 Pulsar 读取数据,并保证每条数据只被处理一次。
 
 ## 添加依赖
 
-连接器当前支持 Pulsar 2.7.0 之后的版本,但是连接器使用到了 Pulsar 
的[事务机制](https://pulsar.apache.org/docs/en/txn-what/),建议在 Pulsar 2.8.0
-及其之后的版本上使用连接器进行数据读取。
+Pulsar Source 当前支持 Pulsar 2.8.1 之后的版本,但是 Pulsar Source 使用到了 Pulsar 
的[事务机制](https://pulsar.apache.org/docs/zh-CN/txn-what/),建议在 Pulsar 2.9.2 
及其之后的版本上使用 Pulsar Source 进行数据读取。
 
 如果想要了解更多关于 Pulsar API 兼容性设计,可以阅读文档 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification)。
 
@@ -37,18 +36,15 @@ Flink 当前只提供 [Apache Pulsar](https://pulsar.apache.org) 数据源,用
 
 Flink 的流连接器并不会放到发行文件里面一同发布,阅读[此文档]({{< ref "docs/dev/configuration/overview" 
>}}),了解如何将连接器添加到集群实例内。
 
-## Pulsar 数据源
+## Pulsar Source
 
 {{< hint info >}}
-Pulsar 数据源基于 Flink 最新的[批流一体 API]({{< ref "docs/dev/datastream/sources.md" >}}) 
进行开发。
-
-如果要想使用基于旧版的 `SourceFunction` 实现的 Pulsar 数据源,或者是项目的 Flink 版本低于 1.14,可以使用 
StreamNative 单独维护的 [pulsar-flink](https://github.com/streamnative/pulsar-flink)。
+Pulsar Source 基于 Flink 最新的[批流一体 API]({{< ref "docs/dev/datastream/sources.md" 
>}}) 进行开发。
 {{< /hint >}}
 
 ### 使用示例
 
-Pulsar 数据源提供了 builder 类来构造数据源实例。下面的代码实例使用 builder 类创建的数据源会从 topic 
"persistent://public/default/my-topic" 的数据开始端进行消费。
-连接器使用了 **Exclusive**(独占)的订阅方式消费消息,订阅名称为 `my-subscription`,并把消息体的二进制字节流以 UTF-8 
的方式编码为字符串。
+Pulsar Source 提供了 builder 类来构造 `PulsarSource` 实例。下面的代码实例使用 builder 类创建的实例会从 
“persistent://public/default/my-topic” 的数据开始端进行消费。对应的 Pulsar Source 使用了 
**Exclusive**(独占)的订阅方式消费消息,订阅名称为 `my-subscription`,并把消息体的二进制字节流以 UTF-8 
的方式编码为字符串。
 
 ```java
 PulsarSource<String> pulsarSource = PulsarSource.builder()
@@ -64,17 +60,17 @@ PulsarSource<String> pulsarSource = PulsarSource.builder()
 env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
 ```
 
-如果使用构造类构造 Pulsar 数据源,一定要提供下面几个属性:
+如果使用构造类构造 `PulsarSource`,一定要提供下面几个属性:
 
-- Pulsar 数据消费的地址,使用 `setServiceUrl(String)` 方法提供
-- Pulsar HTTP 管理地址,使用 `setAdminUrl(String)` 方法提供
-- Pulsar 订阅名称,使用 `setSubscriptionName(String)` 方法提供
-- 需要消费的 topic 或者是 topic 下面的分区,详见[指定消费的 Topic 或者 Topic 
分区](#指定消费的-topic-或者-topic-分区)
-- 解码 Pulsar 消息的反序列化器,详见[反序列化器](#反序列化器)
+- Pulsar 数据消费的地址,使用 `setServiceUrl(String)` 方法提供。
+- Pulsar HTTP 管理地址,使用 `setAdminUrl(String)` 方法提供。
+- Pulsar 订阅名称,使用 `setSubscriptionName(String)` 方法提供。
+- 需要消费的 Topic 或者是 Topic 下面的分区,详见[指定消费的 Topic 或者 Topic 
分区](#指定消费的-topic-或者-topic-分区)。
+- 解码 Pulsar 消息的反序列化器,详见[反序列化器](#反序列化器)。
 
 ### 指定消费的 Topic 或者 Topic 分区
 
-Pulsar 数据源提供了两种订阅 topic 或 topic 分区的方式。
+Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
 
 - Topic 列表,从这个 Topic 的所有分区上消费消息,例如:
   ```java
@@ -84,66 +80,62 @@ Pulsar 数据源提供了两种订阅 topic 或 topic 分区的方式。
   PulsarSource.builder().setTopics("topic-a-partition-0", 
"topic-a-partition-2");
   ```
 
-- Topic 正则,连接器使用给定的正则表达式匹配出所有合规的 topic,例如:
+- Topic 正则,Pulsar Source 使用给定的正则表达式匹配出所有合规的 Topic,例如:
   ```java
   PulsarSource.builder().setTopicPattern("topic-*");
   ```
 
 #### Topic 名称简写
 
-从 Pulsar 2.0 之后,完整的 topic 名称格式为 `{persistent|non-persistent}://租户/命名空间/topic`。
-但是连接器不需要提供 topic 名称的完整定义,因为 topic 类型、租户、命名空间都设置了默认值。
+从 Pulsar 2.0 之后,完整的 Topic 名称格式为 
`{persistent|non-persistent}://租户/命名空间/topic`。但是 Pulsar Source 不需要提供 Topic 
名称的完整定义,因为 Topic 类型、租户、命名空间都设置了默认值。
 
-Topic 属性 | 默认值
-:------------|:-------
-topic 类型 | `persistent`
-租户 | `public`
-命名空间 | `default`
+| Topic 属性 | 默认值          |
+|:---------|:-------------|
+| Topic 类型 | `persistent` |
+| 租户       | `public`     |
+| 命名空间     | `default`    |
 
-下面的表格提供了当前 Pulsar 支持的简写方式:
+下面的表格提供了当前 Pulsar Topic 支持的简写方式:
 
-topic 名称简写 | 翻译后的 topic 名称
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic`
+| Topic 名称简写                        | 翻译后的 Topic 名称                            
      |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`   
      |
+| `my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-对于 non-persistent(非持久化) topic,连接器不支持简写名称。所以无法将 
`non-persistent://public/default/my-topic` 简写成 `non-persistent://my-topic`。
+对于 Non-persistent(非持久化)Topic,Pulsar Source 不支持简写名称。所以无法将 
`non-persistent://public/default/my-topic` 简写成 `non-persistent://my-topic`。
 {{< /hint >}}
 
 #### Pulsar Topic 层次结构
 
 对于 Pulsar 而言,Topic 分区也是一种 Topic。Pulsar 会将一个有分区的 Topic 在内部按照分区的大小拆分成等量的无分区 
Topic。
 
-例如,在 Pulsar 的 `sample` 租户下面的 `flink` 命名空间里面创建了一个有 3 个分区的 topic,给它起名为 
`simple-string`。
-可以在 Pulsar 上看到如下的 topic 列表:
+由于 Pulsar 内部的分区实际实现为一个 Topic,我们将用“分区”来指代“仅有一个分区的 Topic(Non-partitioned 
Topic)”和“具有多个分区的 Topic 下属的分区”。
+
+例如,在 Pulsar 的 `sample` 租户下面的 `flink` 命名空间里面创建了一个有 3 个分区的 Topic,给它起名为 
`simple-string`。可以在 Pulsar 上看到如下的 Topic 列表:
 
-Topic 名称 | 是否分区
-:--------- | :----------
-`persistent://sample/flink/simple-string` | 是
-`persistent://sample/flink/simple-string-partition-0` | 否
-`persistent://sample/flink/simple-string-partition-1` | 否
-`persistent://sample/flink/simple-string-partition-2` | 否
+| Topic 名称                                              | 是否分区 |
+|:------------------------------------------------------|:-----|
+| `persistent://sample/flink/simple-string`             | 是    |
+| `persistent://sample/flink/simple-string-partition-0` | 否    |
+| `persistent://sample/flink/simple-string-partition-1` | 否    |
+| `persistent://sample/flink/simple-string-partition-2` | 否    |
 
-这意味着,用户可以用上面的子 topic 去直接消费分区里面的数据,不需要再去基于上层的父 topic 去消费全部分区的数据。
-例如:使用 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")` 将会只消费 topic 
`sample/flink/simple-string` 上面的分区 1 和 2 里面的消息。
+这意味着,用户可以用上面的子 Topic 去直接消费分区里面的数据,不需要再去基于上层的父 Topic 去消费全部分区的数据。例如:使用 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")` 将会只消费 Topic 
`sample/flink/simple-string` 分区 1 和 2 里面的消息。
 
 #### 配置 Topic 正则表达式
 
-前面提到了 Pulsar topic 有 `persistent`、`non-persistent` 
两种类型,使用正则表达式消费数据的时候,连接器会尝试从正则表达式里面解析出消息的类型。
-例如:`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` 会解析出 
`non-persistent` 这个 topic 类型。
-如果用户使用 topic 名称简写的方式,连接器会使用默认的消息类型 `persistent`。
+前面提到了 Pulsar Topic 有 `persistent`、`non-persistent` 两种类型,使用正则表达式消费数据的时候,Pulsar 
Source 
会尝试从正则表达式里面解析出消息的类型。例如:`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")`
 会解析出 `non-persistent` 这个 Topic 类型。如果用户使用 Topic 名称简写的方式,Pulsar Source 
会使用默认的消息类型 `persistent`。
 
-如果想用正则去消费 `persistent` 和 `non-persistent` 类型的 topic,需要使用 
`RegexSubscriptionMode` 定义 topic 类型,例如:`setTopicPattern("topic-*", 
RegexSubscriptionMode.AllTopics)`。
+如果想用正则去消费 `persistent` 和 `non-persistent` 类型的 Topic,需要使用 
`RegexSubscriptionMode` 定义 Topic 类型,例如:`setTopicPattern("topic-*", 
RegexSubscriptionMode.AllTopics)`。
 
 ### 反序列化器
 
-反序列化器用于解析 Pulsar 消息,连接器使用 `PulsarDeserializationSchema` 来定义反序列化器。
-用户可以在 builder 类中使用 `setDeserializationSchema(PulsarDeserializationSchema)` 
方法配置反序列化器,它会解析 Pulsar 的 `Message<byte[]>` 实例。
+反序列化器用于解析 Pulsar 消息,Pulsar Source 使用 `PulsarDeserializationSchema` 
来定义反序列化器。用户可以在 builder 类中使用 
`setDeserializationSchema(PulsarDeserializationSchema)` 方法配置反序列化器。
 
-如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 
`PulsarDeserializationSchema`。Pulsar 连接器里面提供了 3 种预定义好的反序列化器。
+如果用户只关心消息体的二进制字节流,并不需要其他属性来解析数据。可以直接使用预定义的 
`PulsarDeserializationSchema`。Pulsar Source里面提供了 3 种预定义的反序列化器。
 
-- 使用 Pulsar 的 [Schema](https://pulsar.apache.org/docs/en/schema-understand/) 
解析消息。
+- 使用 Pulsar 的 
[Schema](https://pulsar.apache.org/docs/zh-CN/schema-understand/) 解析消息。
   ```java
   // 基础数据类型
   PulsarDeserializationSchema.pulsarSchema(Schema);
@@ -163,25 +155,22 @@ Topic 名称 | 是否分区
   PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
   ```
 
-Pulsar 的 `Message<byte[]>` 包含了很多 
[额外的属性](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#%E6%B6%88%E6%81%AF)。
-例如,消息的 key,消息发送时间,消息生产时间,用户在消息上自定义的键值对属性等。可以使用 `Message<byte[]>` 接口来获取这些属性。
+Pulsar 的 `Message<byte[]>` 包含了很多 
[额外的属性](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#%E6%B6%88%E6%81%AF)。例如,消息的
 key、消息发送时间、消息生产时间、用户在消息上自定义的键值对属性等。可以使用 `Message<byte[]>` 接口来获取这些属性。
 
-如果用户需要基于这些额外的属性来解析一条消息,可以实现 `PulsarDeserializationSchema` 接口。
-并一定要确保 `PulsarDeserializationSchema.getProducedType()` 方法返回的 `TypeInformation` 
是正确的结果。
-Flink 使用 `TypeInformation` 将解析出来的结果序列化传递到下游算子。
+如果用户需要基于这些额外的属性来解析一条消息,可以实现 `PulsarDeserializationSchema` 接口。并一定要确保 
`PulsarDeserializationSchema.getProducedType()` 方法返回的 `TypeInformation` 
是正确的结果。Flink 使用 `TypeInformation` 将解析出来的结果序列化传递到下游算子。
 
 ### Pulsar 订阅
 
-订阅是命名好的配置规则,指导消息如何投递给消费者。连接器需要提供一个独立的订阅名称,支持 Pulsar 的四种订阅模式:
+订阅是命名好的配置规则,指导消息如何投递给消费者。Pulsar Source 需要提供一个独立的订阅名称,支持 Pulsar 的四种订阅模式:
 
 - 
[exclusive(独占)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#exclusive)
 - 
[shared(共享)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#shared%E5%85%B1%E4%BA%AB)
 - 
[failover(灾备)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#failover%E7%81%BE%E5%A4%87)
 - [key_shared(key 
共享)](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#key_shared)
 
-当前 Pulsar 连接器里面,`独占` 和 `灾备` 的实现没有区别,如果 Flink 的一个 reader 挂了,连接器会把所有未消费的数据交给其他的 
reader 来消费数据。
+当前 Pulsar Source 里,`独占` 和 `灾备` 的实现没有区别,如果 Flink 的一个 reader 挂了,Pulsar Source 
会把所有未消费的数据交给其他的 reader 来消费数据。
 
-默认情况下,如果没有指定订阅类型,连接器使用共享订阅类型(`SubscriptionType.Shared`)。
+默认情况下,如果没有指定订阅类型,Pulsar Source 使用共享订阅类型(`SubscriptionType.Shared`)。
 
 ```java
 // 名为 "my-shared" 的共享订阅
@@ -191,19 +180,19 @@ PulsarSource.builder().setSubscriptionName("my-shared");
 
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
 ```
 
-如果想在 Pulsar 连接器里面使用 `key 共享` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 
会生成一组消息 key 的 hash 范围,连接器会基于给定的范围来消费数据。
+如果想在 Pulsar Source 里面使用 `key 共享` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 
会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。
 
-Pulsar 连接器也提供了一个名为 `UniformRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。
+Pulsar Source 也提供了一个名为 `UniformRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash 
范围均分。
 
 ### 起始消费位置
 
-连接器使用 `setStartCursor(StartCursor)` 方法给定开始消费的位置。内置的消费位置有:
+Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的位置。内置的开始消费位置有:
 
-- 从 topic 里面最早的一条消息开始消费。
+- 从 Topic 里面最早的一条消息开始消费。
   ```java
   StartCursor.earliest();
   ```
-- 从 topic 里面最新的一条消息开始消费。
+- 从 Topic 里面最新的一条消息开始消费。
   ```java
   StartCursor.latest();
   ```
@@ -222,23 +211,23 @@ Pulsar 连接器也提供了一个名为 `UniformRangeGenerator` 的默认实现
 
 {{< hint info >}}
 每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 
Pulsar 底层存储上查找到具体的消息。
+
 Pulsar 称这个序列号为 `MessageId`,用户可以使用 `DefaultImplementation.newMessageId(long 
ledgerId, long entryId, int partitionIndex)` 创建它。
 {{< /hint >}}
 
 ### 边界
 
-Pulsar 连接器同时支持流式和批的消费方式,默认情况下,连接器使用流的方式消费数据。除非任务失败或者被取消,否则连接器将持续消费数据。
-用户可以使用 `setBoundedStopCursor(StopCursor)` 给定停止消费的位置,这种情况下连接器会使用批的方式进行消费。当所有 
topic 分区都消费到了停止位置,Flink 任务就会结束。
+Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败或者被取消,否则将持续消费数据。用户可以使用 
`setBoundedStopCursor(StopCursor)` 
给定停止消费的位置,这种情况下会使用批的方式进行消费。使用流的方式一样可以给定停止位置,使用 
`setUnboundedStopCursor(StopCursor)` 方法即可。
 
-使用流的方式一样可以给定停止位置,使用 `setUnboundedStopCursor(StopCursor)` 方法即可。
+在批模式下,使用 `setBoundedStopCursor(StopCursor)` 来指定一个消费停止位置。
 
-内置的停止位置如下:
+内置的停止消费位置如下:
 
 - 永不停止。
   ```java
   StopCursor.never();
   ```
-- 停止于 Pulsar 启动时 topic 里面最新的那条数据。
+- 停止于 Pulsar 启动时 Topic 里面最新的那条数据。
   ```java
   StopCursor.latest();
   ```
@@ -255,31 +244,29 @@ Pulsar 连接器同时支持流式和批的消费方式,默认情况下,连
   StopCursor.atEventTime(long);
   ```
 
-### 其他配置项
+### Source 配置项
 
-除了前面提到的配置选项,连接器还提供了丰富的选项供 Pulsar 专家使用,在 builder 类里通过 
`setConfig(ConfigOption<T>, T)` 和 `setConfig(Configuration)` 方法给定下述的全部配置。
+除了前面提到的配置选项,Pulsar Source 还提供了丰富的选项供 Pulsar 专家使用,在 builder 类里通过 
`setConfig(ConfigOption<T>, T)` 和 `setConfig(Configuration)` 方法给定下述的全部配置。
 
 #### Pulsar Java 客户端配置项
 
-Pulsar 连接器使用[Java 
客户端](https://pulsar.apache.org/docs/en/client-libraries-java/)来创建消费实例,相关的配置定义于 
Pulsar 的 `ClientConfigurationData` 内。连接器在 `PulsarOptions` 选项中,定义大部分的可供用户定义的配置。
+Pulsar Source 使用 [Java 
客户端](https://pulsar.apache.org/docs/zh-CN/client-libraries-java/)来创建消费实例,相关的配置定义于
 Pulsar 的 `ClientConfigurationData` 内。在 `PulsarOptions` 选项中,定义大部分的可供用户定义的配置。
 
 {{< generated/pulsar_client_configuration >}}
 
 #### Pulsar 管理 API 配置项
 
-[管理 API](https://pulsar.apache.org/docs/en/admin-api-overview/) 用于查询 topic 
的元数据和用正则订阅的时候的 topic 查找,它与
-Java 客户端共享大部分配置。下面列举的配置只供管理 API 使用,连接器也在 `PulsarOptions` 里予以定义。
+[管理 API](https://pulsar.apache.org/docs/zh-CN/admin-api-overview/) 用于查询 Topic 
的元数据和用正则订阅的时候的 Topic 查找,它与 Java 客户端共享大部分配置。下面列举的配置只供管理 API 使用,`PulsarOptions` 
包含了这些配置 。
 
 {{< generated/pulsar_admin_configuration >}}
 
 #### Pulsar 消费者 API 配置项
 
-Pulsar 提供了消费者 API 和读者 API 两套 API 来进行数据消费,它们可用于不同的业务场景。
-Flink 上的 Pulsar 连接器使用消费者 API 进行消费,它的配置定义于 Pulsar 的 `ConsumerConfigurationData` 
内。连接器将其中大部分的可供用户定义的配置定义于 `PulsarSourceOptions` 内。
+Pulsar 提供了消费者 API 和读者 API 两套 API 来进行数据消费,它们可用于不同的业务场景。Flink 上的 Pulsar Source 
使用消费者 API 进行消费,它的配置定义于 Pulsar 的 `ConsumerConfigurationData` 内。Pulsar Source 
将其中大部分的可供用户定义的配置定义于 `PulsarSourceOptions` 内。
 
 {{< generated/pulsar_consumer_configuration >}}
 
-#### Pulsar 数据源配置项
+#### Pulsar Source配置项
 
 下述配置主要用于性能调优或者是控制消息确认的行为。如非必要,可以不用强制配置。
 
@@ -287,8 +274,7 @@ Flink 上的 Pulsar 连接器使用消费者 API 进行消费,它的配置定
 
 ### 动态分区发现
 
-为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 topic,连接器提供了动态分区发现机制。该机制不需要重启 
Flink 任务。
-对选项 `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` 设置一个正整数即可启用。
+为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 Topic,Pulsar Source 
提供了动态分区发现机制。该机制不需要重启 Flink 任务。对选项 
`PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` 设置一个正整数即可启用。
 
 ```java
 // 10 秒查询一次分区信息
@@ -302,29 +288,29 @@ PulsarSource.builder()
 
 ### 事件时间和水位线
 
-默认情况下,连接器使用 Pulsar 的 `Message<byte[]>` 里面的时间作为解析结果的时间戳。用户可以使用 
`WatermarkStrategy` 来自行解析出想要的消息时间,并向下游传递对应的水位线。
+默认情况下,Pulsar Source 使用 Pulsar 的 `Message<byte[]>` 里面的时间作为解析结果的时间戳。用户可以使用 
`WatermarkStrategy` 来自行解析出想要的消息时间,并向下游传递对应的水位线。
 
 ```java
 env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source 
With Custom Watermark Strategy");
 ```
 
-[这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" >}}) 
详细讲解了如何定义 `WatermarkStrategy`。
+[这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" 
>}})详细讲解了如何定义 `WatermarkStrategy`。
 
 ### 消息确认
 
-一旦在 topic 
上创建了订阅,消息便会[存储](https://pulsar.apache.org/docs/zh-CN/concepts-architecture-overview/#%E6%8C%81%E4%B9%85%E5%8C%96%E5%AD%98%E5%82%A8)在
 Pulsar 里。即使没有消费者,消息也不会被丢弃。只有当连接器同 Pulsar 
确认此条消息已经被消费,该消息才以某种机制会被移除。连接器支持四种订阅方式,它们的消息确认方式也大不相同。
+一旦在 Topic 
上创建了订阅,消息便会[存储](https://pulsar.apache.org/docs/zh-CN/concepts-architecture-overview/#%E6%8C%81%E4%B9%85%E5%8C%96%E5%AD%98%E5%82%A8)在
 Pulsar 里。即使没有消费者,消息也不会被丢弃。只有当 Pulsar Source 同 Pulsar 
确认此条消息已经被消费,该消息才以某种机制会被移除。Pulsar Source 支持四种订阅方式,它们的消息确认方式也大不相同。
 
 #### 独占和灾备订阅下的消息确认
 
-`独占` 和 `灾备` 订阅下,连接器使用累进式确认方式。确认某条消息已经被处理时,其前面被消费的消息会自动被置为已读。Pulsar 连接器会在 Flink 
完成检查点时将对应时刻消费的消息置为已读,以此来保证 Pulsar 状态与 Flink 状态一致。
+`独占` 和 `灾备` 订阅下,Pulsar Source 使用累进式确认方式。确认某条消息已经被处理时,其前面消息会自动被置为已读。Pulsar 
Source 会在 Flink 完成检查点时将对应时刻消费的消息置为已读,以此来保证 Pulsar 状态与 Flink 状态一致。
 
-如果用户没有在 Flink 上启用检查点,连接器可以使用周期性提交来将消费状态提交给 Pulsar,使用配置 
`PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL` 来进行定义。
+如果用户没有在 Flink 上启用检查点,Pulsar Source 可以使用周期性提交来将消费状态提交给 Pulsar,使用配置 
`PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL` 来进行定义。
 
-需要注意的是,此种场景下,Pulsar 连接器并不依赖于提交到 Pulsar 的状态来做容错。消息确认只是为了能在 Pulsar 端看到对应的消费处理情况。
+需要注意的是,此种场景下,Pulsar Source 并不依赖于提交到 Pulsar 的状态来做容错。消息确认只是为了能在 Pulsar 
端看到对应的消费处理情况。
 
 #### 共享和 key 共享订阅下的消息确认
 
-`共享` 和 `key 共享` 需要依次确认每一条消息,所以连接器在 Pulsar 事务里面进行消息确认,然后将事务提交到 Pulsar。
+`共享` 和 `key 共享` 需要依次确认每一条消息,所以 Pulsar Source 在 Pulsar 事务里面进行消息确认,然后将事务提交到 
Pulsar。
 
 首先需要在 Pulsar 的 `borker.conf` 文件里面启用事务:
 
@@ -332,11 +318,320 @@ env.fromSource(pulsarSource, new 
CustomWatermarkStrategy(), "Pulsar Source With
 transactionCoordinatorEnabled=true
 ```
 
-连接器创建的事务的默认超时时间为 3 小时,请确保这个时间大于 Flink 检查点的间隔。用户可以使用 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` 来设置事务的超时时间。
+Pulsar Source 创建的事务的默认超时时间为 3 小时,请确保这个时间大于 Flink 检查点的间隔。用户可以使用 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` 来设置事务的超时时间。
+
+如果用户无法启用 Pulsar 的事务,或者是因为项目禁用了检查点,需要将 
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` 选项设置为 `true`,消息从 
Pulsar 消费后会被立刻置为已读。Pulsar Source 无法保证此种场景下的消息一致性。
+
+Pulsar Source 在 Pulsar 上使用日志的形式记录某个事务下的消息确认,为了更好的性能,请缩短 Flink 做检查点的间隔。
+
+## Pulsar Sink
+
+Pulsar Sink 连接器可以将经过 Flink 处理后的数据写入一个或多个 Pulsar Topic 或者 Topic 下的某些分区。
+
+{{< hint info >}}
+Pulsar Sink 基于 Flink 最新的 [Sink 
API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 实现。
+
+如果想要使用旧版的使用 `SinkFuntion` 接口实现的 Sink 连接器,可以使用 StreamNative 维护的 
[pulsar-flink](https://github.com/streamnative/pulsar-flink)。
+{{< /hint >}}
+
+### 使用示例
+
+Pulsar Sink 使用 builder 类来创建 `PulsarSink` 实例。
+
+下面示例展示了如何通过 Pulsar Sink 以“至少一次”的语义将字符串类型的数据发送给 topic1。
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+
+stream.sinkTo(sink);
+```
+
+下列为创建一个 `PulsarSink` 实例必需的属性:
+
+- Pulsar 数据消费的地址,使用 `setServiceUrl(String)` 方法提供。
+- Pulsar HTTP 管理地址,使用 `setAdminUrl(String)` 方法提供。
+- 需要发送到的 Topic 或者是 Topic 下面的分区,详见[指定写入的topic或者topic分区](#指定写入的topic或者topic分区)。
+- 编码 Pulsar 消息的序列化器,详见[序列化器](#序列化器)。
+
+在创建 `PulsarSink` 时,建议使用 `setProducerName(String)` 来指定 `PulsarSink` 内部使用的 
Pulsar 生产者名称。这样方便在数据监控页面找到对应的生产者监控指标。
+
+### 指定写入的 Topic 或者 Topic 分区
+
+`PulsarSink` 指定写入 Topic 的方式和 Pulsar Source [指定消费的 Topic 或者 Topic 
分区](#指定消费的-topic-或者-topic-分区)的方式类似。`PulsarSink` 支持以 mixin 风格指定写入的 Topic 
或分区。因此,可以指定一组 Topic 或者分区或者是两者都有。
+
+```java
+// Topic "some-topic1" 和 "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Topic "topic-a" 的分区 0 和 2 
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Topic "topic-a" 以及 Topic "some-topic2" 分区 0 和 2 
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+动态分区发现默认处于开启状态,这意味着 `PulsarSink` 将会周期性地从 Pulsar 集群中查询 Topic 
的元数据来获取可能有的分区数量变更信息。使用 
`PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 配置项来指定查询的间隔时间。
+
+可以选择实现 `TopicRouter` 接口来自定义[消息路由策略](#消息路由策略)。此外,阅读 [Topic 
名称简写](#topic-名称简写)将有助于理解 Pulsar 的分区在 Pulsar 连接器中的配置方式。
+
+{{< hint warning >}}
+如果在 `PulsarSink` 中同时指定了某个 Topic 和其下属的分区,那么 `PulsarSink` 将会自动将两者合并,仅使用外层的 Topic。
+
+举个例子,如果通过 `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` 来指定写入的 Topic,那么其结果等价于 
`PulsarSink.builder().setTopics("some-topic1")`。
+{{< /hint >}}
+
+### 序列化器
+
+序列化器(`PulsarSerializationSchema`)负责将 Flink 中的每条记录序列化成 byte 数组,并通过网络发送至指定的写入 
Topic。和 Pulsar Source 类似的是,序列化器同时支持使用基于 Flink 的 `SerializationSchema` 
接口实现序列化器和使用 Pulsar 原生的 `Schema` 类型实现的序列化器。不过序列化器并不支持 Pulsar 的 
`Schema.AUTO_PRODUCE_BYTES()`。
 
-如果用户无法启用 Pulsar 的事务,或者是因为项目禁用了检查点,需要将 
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` 选项设置为 `true`,消息从 
Pulsar 消费后会被立刻置为已读。连接器无法保证此种场景下的消息一致性。
+如果不需要指定 
[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
 接口中提供的 key 或者其他的消息属性,可以从上述 2 种预定义的 `PulsarSerializationSchema` 实现中选择适合需求的一种使用。
+
+- 使用 Pulsar 的 
[Schema](https://pulsar.apache.org/docs/zh-CN/schema-understand/) 来序列化 Flink 
中的数据。
+  ```java
+  // 原始数据类型
+  PulsarSerializationSchema.pulsarSchema(Schema)
 
-连接器在 Pulsar 上使用日志的形式记录某个事务下的消息确认,为了更好的性能,请缩短 Flink 做检查点的间隔。
+  // 有结构数据类型(JSON、Protobuf、Avro 等)
+  PulsarSerializationSchema.pulsarSchema(Schema, Class)
+
+  // 键值对类型
+  PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
+  ```
+- 使用 Flink 的 `SerializationSchema` 来序列化数据。
+  ```java
+  PulsarSerializationSchema.flinkSchema(SerializationSchema)
+  ```
+
+同时使用 `PulsarSerializationSchema.pulsarSchema()` 以及在 builder 中指定 
`PulsarSinkBuilder.enableSchemaEvolution()` 可以启用 [Schema 
evolution](https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/#schema-evolution)
 特性。该特性会使用 Pulsar Broker 端提供的 Schema 版本兼容性检测以及 Schema 版本演进。下列示例展示了如何启用 Schema 
Evolution。
+
+```java
+Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
+PulsarSerializationSchema<SomePojo> pulsarSchema = 
PulsarSerializationSchema.pulsarSchema(schema, SomePojo.class);
+
+PulsarSink<String> sink = PulsarSink.builder()
+    ...
+    .setSerializationSchema(pulsarSchema)
+    .enableSchemaEvolution()
+    .build();
+```
+
+{{< hint warning >}}
+如果想要使用 Pulsar 原生的 Schema 序列化消息而不需要 Schema Evolution 特性,那么写入的 Topic 会使用 
`Schema.BYTES` 作为消息的 Schema,对应 Topic 的消费者需要自己负责反序列化的工作。
+
+例如,如果使用 `PulsarSerializationSchema.pulsarSchema(Schema.STRING)` 而不使用 
`PulsarSinkBuilder.enableSchemaEvolution()`。那么在写入 Topic 中所记录的消息 Schema 将会是 
`Schema.BYTES`。
+{{< /hint >}}
+
+### 消息路由策略
+
+在 Pulsar Sink 中,消息路由发生在于分区之间,而非上层 Topic。对于给定 Topic 的情况,路由算法会首先会查询出 Topic 
之上所有的分区信息,并在这些分区上实现消息的路由。Pulsar Sink 默认提供 2 种路由策略的实现。
+
+- `KeyHashTopicRouter`:使用消息的 key 对应的哈希值来取模计算出消息对应的 Topic 分区。
+
+  使用此路由可以将具有相同 key 的消息发送至同一个 Topic 分区。消息的 key 可以在自定义 
`PulsarSerializationSchema` 时,在 `serialize()` 方法内使用 
`PulsarMessageBuilder.key(String key)` 来予以指定。
+
+  如果消息没有包含 key,此路由策略将从 Topic 分区中随机选择一个发送。
+
+  可以使用 `MessageKeyHash.JAVA_HASH` 或者 `MessageKeyHash.MURMUR3_32_HASH` 
两种不同的哈希算法来计算消息 key 的哈希值。使用 `PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH` 
配置项来指定想要的哈希算法。
+
+- `RoundRobinRouter`:轮换使用用户给定的 Topic 分区。
+  
+  消息将会轮替地选取 Topic 分区,当往某个 Topic 分区里写入指定数量的消息后,将会轮换至下一个 Topic 分区。使用 
`PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES` 指定向一个 Topic 分区中写入的消息数量。
+
+还可以通过实现 `TopicRouter` 接口来自定义消息路由策略,请注意 TopicRouter 的实现需要能被序列化。
+
+在 `TopicRouter` 内可以指定任意的 Topic 分区(即使这个 Topic 分区不在 `setTopics()` 
指定的列表中)。因此,当使用自定义的 `TopicRouter` 时,`PulsarSinkBuilder.setTopics` 选项是可选的。
+
+```java
+@PublicEvolving
+public interface TopicRouter<IN> extends Serializable {
+
+    String route(IN in, List<String> partitions, PulsarSinkContext context);
+
+    default void open(SinkConfiguration sinkConfiguration) {
+        // 默认无操作
+    }
+}
+```
+
+{{< hint info >}}
+如前文所述,Pulsar 分区的内部被实现为一个无分区的 Topic,一般情况下 Pulsar 
客户端会隐藏这个实现,并且提供内置的消息路由策略。Pulsar Sink 并没有使用 Pulsar 客户端提供的路由策略和封装,而是使用了 Pulsar 
客户端更底层的 API 自行实现了消息路由逻辑。这样做的主要目的是能够在属于不同 Topic 的分区之间定义更灵活的消息路由策略。
+
+详情请参考 Pulsar 的 [partitioned 
topics](https://pulsar.apache.org/docs/zh-CN/cookbooks-partitioned/)。
+{{< /hint >}}
+
+### 发送一致性
+
+`PulsarSink` 支持三种发送一致性。
+
+- `NONE`:Flink 应用运行时可能出现数据丢失的情况。在这种模式下,Pulsar Sink 
发送消息后并不会检查消息是否发送成功。此模式具有最高的吞吐量,可用于一致性没有要求的场景。
+- `AT_LEAST_ONCE`:每条消息**至少有**一条对应消息发送至 Pulsar,发送至 Pulsar 的消息可能会因为 Flink 
应用重启而出现重复。
+- `EXACTLY_ONCE`:每条消息**有且仅有**一条对应消息发送至 Pulsar。发送至 Pulsar 的消息不会有重复也不会丢失。Pulsar 
Sink 内部依赖 [Pulsar 
事务](https://pulsar.apache.org/docs/zh-CN/transactions/)和两阶段提交协议来保证每条记录都能正确发往 
Pulsar。
+
+### 消息延时发送
+
+[消息延时发送](https://pulsar.apache.org/docs/zh-CN/next/concepts-messaging/#%E6%B6%88%E6%81%AF%E5%BB%B6%E8%BF%9F%E4%BC%A0%E9%80%92)特性可以让指定发送的每一条消息需要延时一段时间后才能被下游的消费者所消费。当延时消息发送特性启用时,Pulsar
 Sink 会**立刻**将消息发送至 Pulsar Broker。但该消息在指定的延迟时间到达前将会保持对下游消费者不可见。
+
+消息延时发送仅在 `Shared` 订阅模式下有效,在 `Exclusive` 和 `Failover` 模式下该特性无效。
+
+可以使用 `MessageDelayer.fixed(Duration)` 创建一个 `MessageDelayer` 
来为所有消息指定恒定的接收时延,或者实现 `MessageDelayer` 接口来为不同的消息指定不同的接收时延。
+
+{{< hint warning >}}
+消息对下游消费者的可见时间应当基于 `PulsarSinkContext.processTime() `计算得到。
+{{< /hint >}}
+
+### Sink 配置项
+
+可以在 builder 类里通过 `setConfig(ConfigOption<T>, T)` 和 `setConfig(Configuration)` 
方法给定下述的全部配置。
+
+#### PulsarClient 和 PulsarAdmin 配置项
+
+Pulsar Sink 和 Pulsar Source 公用的配置选项可参考
+
+- [Pulsar Java 客户端配置项](#pulsar-java-客户端配置项)
+- [Pulsar 管理 API 配置项](#pulsar-管理-API-配置项)
+
+#### Pulsar 生产者 API 配置项
+
+Pulsar Sink 使用生产者 API 来发送消息。Pulsar 的 `ProducerConfigurationData` 中大部分的配置项被映射为 
`PulsarSinkOptions` 里的选项。
+
+{{< generated/pulsar_producer_configuration >}}
+
+#### Pulsar Sink 配置项
+
+下述配置主要用于性能调优或者是控制消息确认的行为。如非必要,可以不用考虑配置。
+
+{{< generated/pulsar_sink_configuration >}}
+
+### Sink 监控指标
+
+下列表格列出了当前 Sink 支持的监控指标,前 6 个指标是 [FLIP-33: Standardize Connector 
Metrics]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics))
 中规定的 Sink 连接器应当支持的标准指标。
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 15%">Scope</th>
+      <th class="text-left" style="width: 18%">Metrics</th>
+      <th class="text-left" style="width: 18%">User Variables</th>
+      <th class="text-left" style="width: 39%">Description</th>
+      <th class="text-left" style="width: 10%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <th rowspan="13">Operator</th>
+        <td>numBytesOut</td>
+        <td>n/a</td>
+        <td>Pulsar Sink 启动后总共发出的字节数</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>numBytesOutPerSecond</td>
+        <td>n/a</td>
+        <td>每秒发送的字节数</td>
+        <td>Meter</td>
+    </tr>
+    <tr>
+        <td>numRecordsOut</td>
+        <td>n/a</td>
+        <td>Pulsar Sink 启动后总共发出的消息数</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>numRecordsOutPerSecond</td>
+        <td>n/a</td>
+        <td>每秒发送的消息数</td>
+        <td>Meter</td>
+    </tr>
+    <tr>
+        <td>numRecordsOutErrors</td>
+        <td>n/a</td>
+        <td>总共发送消息失败的次数</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>currentSendTime</td>
+        <td>n/a</td>
+        <td>最近一条消息从被放入客户端缓冲队列到收到消息确认的时间</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.numAcksReceived</td>
+        <td>n/a</td>
+        <td>总共收到的确认数</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.sendLatencyMax</td>
+        <td>n/a</td>
+        <td>所有生产者的最大发送延迟</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency50Pct</td>
+        <td>ProducerName</td>
+        <td>某个生产者在过去的一个窗口内的发送延迟的中位数</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency75Pct</td>
+        <td>ProducerName</td>
+        <td>某个生产者在过去的一个窗口内的发送延迟的 75 百分位数</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency95Pct</td>
+        <td>ProducerName</td>
+        <td>某个生产者在过去的一个窗口内的发送延迟的 95 百分位数</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency99Pct</td>
+        <td>ProducerName</td>
+        <td>某个生产者在过去的一个窗口内的发送延迟的 99 百分位数</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency999Pct</td>
+        <td>ProducerName</td>
+        <td>某个生产者在过去的一个窗口内的发送延迟的 99.9 百分位数</td>
+        <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< hint info >}}
+指标 `numBytesOut`、`numRecordsOut` 和 `numRecordsOutErrors` 从 Pulsar Producer 
实例的监控指标中获得。
+
+`currentSendTime` 记录了最近一条消息从放入生产者的缓冲队列到消息被消费确认所耗费的时间。这项指标在 `NONE` 发送一致性下不可用。
+{{< /hint >}}
+
+默认情况下,Pulsar 生产者每隔 60 秒才会刷新一次监控数据,然而 Pulsar Sink 每 500 毫秒就会从 Pulsar 
生产者中获得最新的监控数据。因此 `numRecordsOut`、`numBytesOut`、`numAcksReceived` 以及 
`numRecordsOutErrors` 4 个指标实际上每 60 秒才会刷新一次。
+
+如果想要更高地刷新评率,可以通过 
`builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS. 1L)` 来将 Pulsar 
生产者的监控数据刷新频率调整至相应值(最低为1s)。
+
+`numBytesOutRate` 和 `numRecordsOutRate` 指标是 Flink 内部通过 `numBytesOut` 和 
`numRecordsOut` 计数器,在一个 60 秒的窗口内计算得到的。
+
+### 设计思想简述
+
+Pulsar Sink 遵循 
[FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 中定义的 Sink API 设计。
+
+#### 无状态的 SinkWriter
+
+在 `EXACTLY_ONCE` 一致性下,Pulsar Sink 不会将事务相关的信息存放于检查点快照中。这意味着当 Flink 应用重启时,Pulsar 
Sink 会创建新的事务实例。上一次运行过程中任何未提交事务中的消息会因为超时中止而无法被下游的消费者所消费。这样的设计保证了 SinkWriter 
是无状态的。
+
+#### Pulsar Schema Evolution
+
+[Pulsar Schema 
Evolution](https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/)
 允许用户在一个 Flink 应用程序中使用的数据模型发生特定改变后(比如向基于 ARVO 的 POJO 类中增加或删除一个字段),仍能使用同一个 Flink 
应用程序的代码。
+
+可以在 Pulsar 集群内指定哪些类型的数据模型的改变是被允许的,详情请参阅 [Pulsar Schema 
Evolution](https://pulsar.apache.org/docs/zh-CN/schema-evolution-compatibility/)。
 
 ## 升级至最新的连接器
 
@@ -347,6 +642,11 @@ transactionCoordinatorEnabled=true
 
 ## 问题诊断
 
-使用 Flink 和 Pulsar 交互时如果遇到问题,一定要牢记 Flink 只使用了 Pulsar 的[Java 
客户端](https://pulsar.apache.org/docs/zh-CN/client-libraries-java/) 和[管理 
API](https://pulsar.apache.org/docs/zh-CN/admin-api-overview/)。用户遇到的问题很有可能与 
Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本、或者修改 Pulsar 的配置,Pulsar 连接器的配置来尝试解决问题。
+使用 Flink 和 Pulsar 交互时如果遇到问题,由于 Flink 内部实现只是基于 Pulsar 的 [Java 
客户端](https://pulsar.apache.org/docs/zh-CN/client-libraries-java/)和[管理 
API](https://pulsar.apache.org/docs/zh-CN/admin-api-overview/) 而开发的。
+
+用户遇到的问题可能与 Flink 无关,请先升级 Pulsar 的版本、Pulsar 客户端的版本,或者修改 Pulsar 的配置、Pulsar 
连接器的配置来尝试解决问题。
+
+### 在 Java 11 上使用不稳定
 
+Pulsar connector 在 Java 11 中有一些尚未修复的问题。我们当前推荐在 Java 8 环境中运行Pulsar connector.
 {{< top >}}
diff --git a/docs/content/docs/connectors/datastream/pulsar.md 
b/docs/content/docs/connectors/datastream/pulsar.md
index d544241..11e62cc 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -24,14 +24,13 @@ under the License.
 
 # Apache Pulsar Connector
 
-Flink provides an [Apache Pulsar](https://pulsar.apache.org) source connector 
for reading data from Pulsar topics with exactly-once guarantees.
+Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for 
reading and writing data from and to Pulsar topics with exactly-once guarantees.
 
 ## Dependency
 
-You can use the connector with Pulsar 2.7.0 or higher. However, the Pulsar 
source connector supports
-Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/),
-it is recommended to use Pulsar 2.8.0 or higher releases.
-For details on Pulsar compatibility, please refer to the 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
+You can use the connector with the Pulsar 2.8.1 or higher. Because the Pulsar 
connector supports
+Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/), it is 
recommended to use the Pulsar 2.9.2 or higher.
+Details on Pulsar compatibility can be found in 
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
 
 {{< artifact flink-connector-pulsar >}}
 
@@ -43,18 +42,16 @@ See how to link with them for cluster execution [here]({{< 
ref "docs/dev/configu
 {{< hint info >}}
 This part describes the Pulsar source based on the new
 [data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
-
-If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower 
releases, just use the StreamNative's 
[pulsar-flink](https://github.com/streamnative/pulsar-flink).
 {{< /hint >}}
 
 ### Usage
 
-Pulsar source provides a builder class for constructing an instance of 
PulsarSource. The code snippet below shows
-how to build a PulsarSource to consume messages from the earliest cursor of 
topic "persistent://public/default/my-topic",
-with **Exclusive** subscription `my-subscription` and deserialize the raw 
payload of the messages as strings.
+The Pulsar source provides a builder class for constructing a PulsarSource 
instance. The code snippet below builds a PulsarSource instance. It consumes 
messages from the earliest cursor of the topic
+"persistent://public/default/my-topic" in **Exclusive** subscription type 
(`my-subscription`)
+and deserializes the raw payload of the messages as strings.
 
 ```java
-PulsarSource<String> pulsarSource = PulsarSource.builder()
+PulsarSource<String> source = PulsarSource.builder()
     .setServiceUrl(serviceUrl)
     .setAdminUrl(adminUrl)
     .setStartCursor(StartCursor.earliest())
@@ -69,13 +66,17 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"Pulsar Source");
 
 The following properties are **required** for building a PulsarSource:
 
-- Pulsar service url, configured by `setServiceUrl(String)`
-- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Pulsar service URL, configured by `setServiceUrl(String)`
+- Pulsar service HTTP URL (also known as admin URL), configured by 
`setAdminUrl(String)`
 - Pulsar subscription name, configured by `setSubscriptionName(String)`
 - Topics / partitions to subscribe, see the following
-  [Topic-partition subscription](#topic-partition-subscription) for more 
details.
+  [topic-partition subscription](#topic-partition-subscription) for more 
details.
 - Deserializer to parse Pulsar messages, see the following
-  [Deserializer](#deserializer) for more details.
+  [deserializer](#deserializer) for more details.
+
+It is recommended to set the consumer name in Pulsar Source by 
`setConsumerName(String)`.
+This sets a unique name for the Flink connector in the Pulsar statistic 
dashboard.
+You can use it to monitor the performance of your Flink connector and 
applications.
 
 ### Topic-partition Subscription
 
@@ -96,64 +97,65 @@ Pulsar source provide two ways of topic-partition 
subscription:
 
 #### Flexible Topic Naming
 
-Since Pulsar 2.0, all topic names internally have the form 
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in  a form of  
`{persistent|non-persistent}://tenant/namespace/topic`.
 Now, for partitioned topics, you can use short names in many cases (for the 
sake of simplicity).
 The flexible naming system stems from the fact that there is now a default 
topic type, tenant, and namespace in a Pulsar cluster.
 
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default      |
+|:---------------|:-------------|
+| topic type     | `persistent` |
+| tenant         | `public`     |
+| namespace      | `default`    |
 
-This table lists a mapping relationship between your input topic name and 
translated topic name:
+This table lists a mapping relationship between your input topic name and the 
translated topic name:
 
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name                  | Translated topic name                    
      |
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic`                        | `persistent://public/default/my-topic`   
      |
+| `my-tenant/my-namespace/my-topic` | 
`persistent://my-tenant/my-namespace/my-topic` |
 
 {{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic 
name,
+For non-persistent topics, you need to specify the entire topic name,
 as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would 
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and need to 
use `non-persistent://public/default/my-topic` instead.
 {{< /hint >}}
 
 #### Subscribing Pulsar Topic Partition
 
 Internally, Pulsar divides a partitioned topic as a set of non-partitioned 
topics according to the partition size.
 
-For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the 
`sample` tenant with the `flink` namespace.
 The topics on Pulsar would be:
 
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name                                            | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string`             | Y           |
+| `persistent://sample/flink/simple-string-partition-0` | N           |
+| `persistent://sample/flink/simple-string-partition-1` | N           |
+| `persistent://sample/flink/simple-string-partition-2` | N           |
 
 You can directly consume messages from the topic partitions by using the 
non-partitioned topic names above.
-For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2 
of the `sample/flink/simple-string` topic.
+For example, use 
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1", 
"sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
 
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Patterns
 
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`) 
from the given topic pattern.
-For example, 
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be 
`non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in 
the regular expression.
+The Pulsar source extracts the topic type (`persistent` or `non-persistent`) 
from the provided topic pattern.
+For example, you can use the 
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to 
specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic 
type in the regular expression.
 
-To consume both `persistent` and `non-persistent` topics based on the topic 
pattern,
-you can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`.
-Pulsar connector would filter the available topics by the 
`RegexSubscriptionMode`.
+You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to 
consume
+both `persistent` and `non-persistent` topics based on the topic pattern.
+The Pulsar source would filter the available topics by the 
`RegexSubscriptionMode`.
 
 ### Deserializer
 
-A deserializer (Deserialization schema) is required for parsing Pulsar 
messages. The deserializer is
-configured by `setDeserializationSchema(PulsarDeserializationSchema)`.
+A deserializer (`PulsarDeserializationSchema`) is for decoding Pulsar messages 
from bytes.
+You can configure the deserializer using 
`setDeserializationSchema(PulsarDeserializationSchema)`.
 The `PulsarDeserializationSchema` defines how to deserialize a Pulsar 
`Message<byte[]>`.
 
 If only the raw payload of a message (message data in bytes) is needed,
-you can use the predefined `PulsarDeserializationSchema`. Pulsar connector 
provides three types of implementation.
+you can use the predefined `PulsarDeserializationSchema`. Pulsar connector 
provides three implementation methods.
 
 - Decode the message by using Pulsar's 
[Schema](https://pulsar.apache.org/docs/en/schema-understand/).
   ```java
@@ -176,12 +178,12 @@ you can use the predefined `PulsarDeserializationSchema`. 
Pulsar connector provi
   ```
 
 Pulsar `Message<byte[]>` contains some [extra 
properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
-such as message key, message publish time, message time, application defined 
key/value pairs that will be attached to the message, etc.
-These properties could be acquired by the `Message<byte[]>` interface.
+such as message key, message publish time, message time, and 
application-defined key/value pairs etc.
+These properties could be defined in the `Message<byte[]>` interface.
 
 If you want to deserialize the Pulsar message by these properties, you need to 
implement `PulsarDeserializationSchema`.
-And ensure that the `TypeInformation` from the 
`PulsarDeserializationSchema.getProducedType()` must be correct.
-Flink would use this `TypeInformation` for passing the messages to downstream 
operators.
+Ensure that the `TypeInformation` from the 
`PulsarDeserializationSchema.getProducedType()` is correct.
+Flink uses this `TypeInformation` to pass the messages to downstream operators.
 
 ### Pulsar Subscriptions
 
@@ -196,7 +198,7 @@ The subscription name is required for consuming messages. 
Pulsar connector suppo
 There is no difference between `Exclusive` and `Failover` in the Pulsar 
connector.
 When a Flink reader crashes, all (non-acknowledged and subsequent) messages 
are redelivered to the available Flink readers.
 
-By default, if no subscription type is defined, Pulsar source uses `Shared` 
subscription.
+By default, if no subscription type is defined, Pulsar source uses the 
`Shared` subscription type.
 
 ```java
 // Shared subscription with name "my-shared"
@@ -206,16 +208,15 @@ PulsarSource.builder().setSubscriptionName("my-shared");
 
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
 ```
 
-If you want to use `Key_Shared` subscription type on the Pulsar connector. 
Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the 
message key is contained in the specified range.
+Ensure that you provide a `RangeGenerator` implementation if you want to use 
the `Key_Shared` subscription type on the Pulsar connector.
+The `RangeGenerator` generates a set of key hash ranges so that a respective 
reader subtask only dispatches messages where the hash of the message key is 
contained in the specified range.
 
-Pulsar connector would use a `UniformRangeGenerator` which would divides the 
range by the Flink source parallelism
-if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
+The Pulsar connector uses `UniformRangeGenerator` that divides the range by 
the Flink source
+parallelism if no `RangeGenerator` is provided in the `Key_Shared` 
subscription type.
 
 ### Starting Position
 
-Pulsar source is able to consume messages starting from different positions by 
`setStartCursor(StartCursor)`.
+The Pulsar source is able to consume messages starting from different 
positions by setting the `setStartCursor(StartCursor)` option.
 Built-in start cursors include:
 
 - Start from the earliest available message in the topic.
@@ -227,14 +228,14 @@ Built-in start cursors include:
   StartCursor.latest();
   ```
 - Start from a specified message between the earliest and the latest.
-  Pulsar connector would consume from the latest available message if the 
message id doesn't exist.
+The Pulsar connector consumes from the latest available message if the message 
ID does not exist.
 
   The start message is included in consuming result.
   ```java
   StartCursor.fromMessageId(MessageId);
   ```
 - Start from a specified message between the earliest and the latest.
-  Pulsar connector would consume from the latest available message if the 
message id doesn't exist.
+The Pulsar connector consumes from the latest available message if the message 
ID doesn't exist.
 
   Include or exclude the start message by using the second boolean parameter.
   ```java
@@ -248,37 +249,35 @@ Built-in start cursors include:
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
 The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on 
how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition) 
about how the message is stored,
 you can create a `MessageId` by using `DefaultImplementation.newMessageId(long 
ledgerId, long entryId, int partitionIndex)`.
 {{< /hint >}}
 
 ### Boundedness
 
-Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+The Pulsar source supports streaming and batch execution mode.
+By default, the `PulsarSource` is configured for unbounded data.
 
-In streaming mode, Pulsar source never stops until a Flink job fails or is 
cancelled. However,
-you can set Pulsar source stopping at a stop position by using 
```setUnboundedStopCursor(StopCursor)```.
-The Pulsar source will finish when all partitions reach their specified stop 
positions.
+For unbounded data the Pulsar source never stops until a Flink job is stopped 
or failed. 
+You can use the `setUnboundedStopCursor(StopCursor)` to set the Pulsar source 
to stop at a specific stop position.
 
-You can use ```setBoundedStopCursor(StopCursor)``` to specify a stop position 
so that the Pulsar source can run in the batch mode.
-When all partitions have reached their stop positions, the source will finish.
+You can use `setBoundedStopCursor(StopCursor)` to specify a stop position for 
bounded data. 
 
 Built-in stop cursors include:
 
-- Connector will never stop consuming.
+- The Pulsar source never stops consuming messages.
   ```java
   StopCursor.never();
   ```
-- Stop at the latest available message in Pulsar when the connector starts 
consuming.
+- Stop at the latest available message when the  Pulsar source starts 
consuming messages.
   ```java
   StopCursor.latest();
   ```
-- Stop when connector meet a given message, or stop at a message which is 
produced after this given message.
+- Stop when the connector meets a given message, or stop at a message which is 
produced after this given message.
   ```java
   StopCursor.atMessageId(MessageId);
   ```
-- Stop but include the given message in consuming result.
+- Stop but include the given message in the consuming result.
   ```java
   StopCursor.afterMessageId(MessageId);
   ```
@@ -287,15 +286,16 @@ Built-in stop cursors include:
   StopCursor.atEventTime(long);
   ```
 
-### Configurable Options
+### Source Configurable Options
 
 In addition to configuration options described above, you can set arbitrary 
options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using 
`setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
 
 #### PulsarClient Options
 
-Pulsar connector use the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client 
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of 
Pulsar's `ClientConfigurationData`,
 which is required for creating a `PulsarClient`, as Flink configuration 
options in `PulsarOptions`.
 
 {{< generated/pulsar_client_configuration >}}
@@ -303,8 +303,9 @@ which is required for creating a `PulsarClient`, as Flink 
configuration options
 #### PulsarAdmin Options
 
 The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used 
for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic 
pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed 
here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses 
topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
 They are also defined in `PulsarOptions`.
 
 {{< generated/pulsar_admin_configuration >}}
@@ -312,34 +313,34 @@ They are also defined in `PulsarOptions`.
 #### Pulsar Consumer Options
 
 In general, Pulsar provides the Reader API and Consumer API for consuming 
messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of 
Pulsar's `ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's 
`ConsumerConfigurationData` as Flink configuration options in 
`PulsarSourceOptions`.
 
 {{< generated/pulsar_consumer_configuration >}}
 
 #### PulsarSource Options
 
 The configuration options below are mainly used for customizing the 
performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can ignore them if you do not have any performance issues.
 
 {{< generated/pulsar_source_configuration >}}
 
 ### Dynamic Partition Discovery
 
 To handle scenarios like topic scaling-out or topic creation without 
restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions 
under provided
-topic-partition subscribing pattern. To enable partition discovery, set a 
non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, the Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can 
set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
 
 ```java
 // discover new partitions per 10 seconds
 PulsarSource.builder()
-        .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
10000);
+    .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
10000);
 ```
 
 {{< hint warning >}}
-- Partition discovery is **enabled** by default. Pulsar connector would query 
the topic metadata every 30 seconds.
-- You need to set the partition discovery interval to a negative value to 
disable this feature.
-- The partition discovery would be disabled in batch mode even if you set this 
option with a non-negative value.
+- Partition discovery is **enabled** by default. The Pulsar connector queries 
the topic metadata every 30 seconds.
+- To disable partition discovery, you need to set a negative partition 
discovery interval.
+- Partition discovery is disabled for bounded data even if you set this option 
with a non-negative value.
 {{< /hint >}}
 
 ### Event Time and Watermarks
@@ -359,7 +360,7 @@ details about how to define a `WatermarkStrategy`.
 
 When a subscription is created, Pulsar 
[retains](https://pulsar.apache.org/docs/en/concepts-architecture-overview/#persistent-storage)
 all messages, even if the consumer is disconnected.
 The retained messages are discarded only when the connector acknowledges that 
all these messages are processed successfully.
-Pulsar connector supports four subscription types, which makes the 
acknowledgement behaviors variety among different subscriptions.
+The Pulsar connector supports four subscription types, which makes the 
acknowledgement behaviors vary among different subscriptions.
 
 #### Acknowledgement on Exclusive and Failover Subscription Types
 
@@ -367,18 +368,18 @@ Pulsar connector supports four subscription types, which 
makes the acknowledgeme
 the latest successfully consumed message. All the message before the given 
message are marked
 with a consumed status.
 
-Pulsar source acknowledges the current consuming message when checkpoints are 
**completed**,
-to ensure the consistency between Flink's checkpoint state and committed 
position on Pulsar brokers.
+The Pulsar source acknowledges the current consuming message when checkpoints 
are **completed**,
+to ensure the consistency between Flink's checkpoint state and committed 
position on the Pulsar brokers.
 
 If checkpointing is disabled, Pulsar source periodically acknowledges messages.
-You can set the acknowledgement period by using the 
`PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL` option.
+You can use the `PulsarSourceOptions.PULSAR_AUTO_COMMIT_CURSOR_INTERVAL` 
option to set the acknowledgement period.
 
 Pulsar source does **NOT** rely on committed positions for fault tolerance.
-Acknowledging messages is only for exposing the progress of consumer and 
monitoring on these two subscription types.
+Acknowledging messages is only for exposing the progress of consumers and 
monitoring on these two subscription types.
 
 #### Acknowledgement on Shared and Key_Shared Subscription Types
 
-`Shared` and `Key_Shared` subscription types need to acknowledge messages one 
by one. You can acknowledge
+In `Shared` and `Key_Shared` subscription types, messages are acknowledged one 
by one. You can acknowledge
 a message in a transaction and commit it to Pulsar.
 
 You should enable transaction in the Pulsar `borker.conf` file when using 
these two subscription types in connector:
@@ -387,19 +388,399 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+The default timeout for Pulsar transactions is 3 hours.
+Make sure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+No consistency guarantees can be made in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+The Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+The Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is recommended to set the producer name in Pulsar Source by 
`setProducerName(String)`.
+This sets a unique name for the Flink connector in the Pulsar statistic 
dashboard.
+You can use it to monitor the performance of your Flink connector and 
applications.
+
+### Producing to topics
+
+Defining the topics for producing is similar to the [topic-partition 
subscription](#topic-partition-subscription)
+in the Pulsar source. We support a mix-in style of topic setting. You can 
provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configuring writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). Configuring partitions on the Pulsar 
connector is explained in the [flexible topic naming](#flexible-topic-naming) 
section.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only uses the topic.
+
+For example, when using the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
+this is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required for serializing the 
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's 
`SerializationSchema` and
+Pulsar's `Schema`. Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in 
the Pulsar sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
 interface,
+you can use the predefined `PulsarSerializationSchema`. The Pulsar sink 
provides two implementation methods.
+
+- Encode the message by using Pulsar's 
[Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+  ```java
+  // Primitive types
+  PulsarSerializationSchema.pulsarSchema(Schema)
+
+  // Struct types (JSON, Protobuf, Avro, etc.)
+  PulsarSerializationSchema.pulsarSchema(Schema, Class)
+
+  // KeyValue type
+  PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
+  ```
+- Encode the message by using Flink's `SerializationSchema`
+  ```java
+  PulsarSerializationSchema.flinkSchema(SerializationSchema)
+  ```
+
+[Schema 
evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
+can be enabled by users using `PulsarSerializationSchema.pulsarSchema()` and
+`PulsarSinkBuilder.enableSchemaEvolution()`. This means that any broker schema 
validation is in place.
+
+```java
+Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
+PulsarSerializationSchema<SomePojo> pulsarSchema = 
PulsarSerializationSchema.pulsarSchema(schema, SomePojo.class);
+
+PulsarSink<String> sink = PulsarSink.builder()
+    ...
+    .setSerializationSchema(pulsarSchema)
+    .enableSchemaEvolution()
+    .build();
+```
+
+{{< hint warning >}}
+If you use Pulsar schema without enabling schema evolution, the target topic 
will have a `Schema.BYTES` schema.
+Consumers will need to handle the deserialization (if needed) themselves.
+
+For example, if you set  
`PulsarSerializationSchema.pulsarSchema(Schema.STRING)` without enabling schema 
evolution,
+the schema stored in Pulsar topics is `Schema.BYTES`.
+{{< /hint >}}
+
+### Message Routing
+
+Routing in Pulsar Sink is operated on the partition level. For a list of 
partitioned topics,
+the routing algorithm first collects all partitions from different topics, and 
then calculates routing within all the partitions.
+By default Pulsar Sink supports two router implementation.
+
+- `KeyHashTopicRouter`: use the hashcode of the message's key to decide the 
topic partition that messages are sent to.
+
+  The message key is provided by `PulsarSerializationSchema.key(IN, 
PulsarSinkContext)`
+  You need to implement this interface and extract the message key when you 
want to send the message with the same key to the same topic partition.
+
+  If you do not provide the message key. A topic  partition is randomly chosen 
from the topic list.
+
+  The message key can be hashed in two ways: `MessageKeyHash.JAVA_HASH` and 
`MessageKeyHash.MURMUR3_32_HASH`.
+  You can use the `PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH` option to choose 
the hash method.
+
+- `RoundRobinRouter`: Round-robin among all the partitions.
+
+  All messages are sent to the first partition, and switch to the next 
partition after sending
+  a fixed number of messages. The batch size can be customized by the 
`PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES` option.
+
+Let’s assume there are ten messages and two topics. Topic A has two partitions 
while topic B has three partitions.
+The batch size is set to five messages. In this case, topic A has 5 messages 
per partition which topic B does not receive any messages.
+
+You can configure custom routers by using the `TopicRouter` interface.
+If you implement a `TopicRouter`, ensure that it is serializable.
+And you can return partitions which are not available in the pre-discovered 
partition list.
+
+Thus, you do not need to specify topics using the 
`PulsarSinkBuilder.setTopics` option when you implement the custom topic router.
+
+```java
+@PublicEvolving
+public interface TopicRouter<IN> extends Serializable {
+
+    String route(IN in, List<String> partitions, PulsarSinkContext context);
+
+    default void open(SinkConfiguration sinkConfiguration) {
+        // Nothing to do by default.
+    }
+}
+```
+
+{{< hint info >}}
+Internally, a Pulsar partition is implemented as a topic. The Pulsar client 
provides APIs to hide this
+implementation detail and handles routing under the hood automatically. Pulsar 
Sink uses a lower client
+API to implement its own routing layer to support multiple topics routing.
+
+For details, see  [partitioned 
topics](https://pulsar.apache.org/docs/en/cookbooks-partitioned/).
+{{< /hint >}}
+
+### Delivery Guarantee
+
+`PulsarSink` supports three delivery guarantee semantics.
+
+- `NONE`: Data loss can happen even when the pipeline is running.
+  Basically, we use a fire-and-forget strategy to send records to Pulsar 
topics in this mode.
+  It means that this mode has the highest throughput.
+- `AT_LEAST_ONCE`: No data loss happens, but data duplication can happen after 
a restart from checkpoint.
+- `EXACTLY_ONCE`: No data loss happens. Each record is sent to the Pulsar 
broker only once.
+  Pulsar Sink uses [Pulsar 
transaction](https://pulsar.apache.org/docs/en/transactions/)
+  and two-phase commit (2PC) to ensure records are sent only once even after 
pipeline restarts.
+
+### Delayed message delivery
+
+[Delayed message 
delivery](https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery)
+enables you to delay the possibility to consume a message. With delayed 
message enabled, the Pulsar sink sends a message to the Pulsar topic
+**immediately**, but the message is delivered to a consumer once the specified 
delay is over.
+
+Delayed message delivery only works in the `Shared` subscription type. In 
`Exclusive` and `Failover`
+subscription types, the delayed message is dispatched immediately.
+
+You can configure the `MessageDelayer` to define when to send the message to 
the consumer.
+The default option is to never delay the message dispatching. You can use the 
`MessageDelayer.fixed(Duration)` option to
+Configure delaying all messages in a fixed duration. You can also implement 
the `MessageDelayer`
+interface to dispatch messages at different time.
+
+{{< hint warning >}}
+The dispatch time should be calculated by the 
`PulsarSinkContext.processTime()`.
+{{< /hint >}}
+
+### Sink Configurable Options
+
+You can set options for `PulsarClient`, `PulsarAdmin`, Pulsar `Producer` and 
`PulsarSink`
+by using `setConfig(ConfigOption<T>, T)`, `setConfig(Configuration)` and 
`setConfig(Properties)`.
+
+#### PulsarClient and PulsarAdmin Options
+
+For details, refer to [PulsarAdmin options](#pulsaradmin-options).
+
+#### Pulsar Producer Options
+
+The Pulsar connector uses the Producer API to send messages. It extracts most 
parts of
+Pulsar's `ProducerConfigurationData` as Flink configuration options in 
`PulsarSinkOptions`.
+
+{{< generated/pulsar_producer_configuration >}}
+
+#### PulsarSink Options
+
+The configuration options below are mainly used for customizing the 
performance and message
+sending behavior. You can just leave them alone if you do not have any 
performance issues.
+
+{{< generated/pulsar_sink_configuration >}}
+
+### Sink Metrics
+
+This table lists supported metrics.
+The first 6 metrics are standard Pulsar Sink metrics as described in
+[FLIP-33: Standardize Connector 
Metrics]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics))
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 15%">Scope</th>
+      <th class="text-left" style="width: 18%">Metrics</th>
+      <th class="text-left" style="width: 18%">User Variables</th>
+      <th class="text-left" style="width: 39%">Description</th>
+      <th class="text-left" style="width: 10%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <th rowspan="13">Operator</th>
+        <td>numBytesOut</td>
+        <td>n/a</td>
+        <td>The total number of output bytes since the sink starts. Count 
towards the numBytesOut in TaskIOMetricsGroup.</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>numBytesOutPerSecond</td>
+        <td>n/a</td>
+        <td>The output bytes per second</td>
+        <td>Meter</td>
+    </tr>
+    <tr>
+        <td>numRecordsOut</td>
+        <td>n/a</td>
+        <td>The total number of output records since the sink starts.</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>numRecordsOutPerSecond</td>
+        <td>n/a</td>
+        <td>The output records per second</td>
+        <td>Meter</td>
+    </tr>
+    <tr>
+        <td>numRecordsOutErrors</td>
+        <td>n/a</td>
+        <td>The total number of records failed to send</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>currentSendTime</td>
+        <td>n/a</td>
+        <td>The time it takes to send the last record, from enqueue the 
message in client buffer to its ack.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.numAcksReceived</td>
+        <td>n/a</td>
+        <td>The number of acks received for sent messages.</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.sendLatencyMax</td>
+        <td>n/a</td>
+        <td>The maximum send latency in the last refresh interval across all 
producers.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency50Pct</td>
+        <td>ProducerName</td>
+        <td>The 50th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency75Pct</td>
+        <td>ProducerName</td>
+        <td>The 75th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency95Pct</td>
+        <td>ProducerName</td>
+        <td>The 95th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency99Pct</td>
+        <td>ProducerName</td>
+        <td>The 99th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency999Pct</td>
+        <td>ProducerName</td>
+        <td>The 99.9th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< hint info >}}
+- `numBytesOut`, `numRecordsOut`, `numRecordsOutErrors` are retrieved from 
Pulsar client metrics.
+
+- `currentSendTime` tracks the time from when the producer calls `sendAync()` 
to
+  the time when the message is acknowledged by the broker. This metric is not 
available in `NONE` delivery guarantee.
+{{< /hint >}}
+
+The Pulsar producer refreshes its stats every 60 seconds by default. The 
PulsarSink retrieves the Pulsar producer
+stats every 500ms. That means that `numRecordsOut`, `numBytesOut`, 
`numAcksReceived`, and `numRecordsOutErrors` 
+are updated every 60 seconds. To increase the metrics refresh frequency, you 
can change
+the Pulsar producer stats refresh interval to a smaller value (minimum 1 
second), as shown below.
+
+```java
+builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS. 1L)
+```
+
+`numBytesOutRate` and `numRecordsOutRate` are calculated based on the 
`numBytesOut` and `numRecordsOUt`
+counter respectively. Flink internally uses a fixed 60 seconds window to 
calculate the rates.
+
+### Brief Design Rationale
+
+Pulsar sink follow the Sink API defined in 
+[FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction).
+
+#### Stateless SinkWriter
+
+In `EXACTLY_ONCE` mode, the Pulsar sink does not store transaction information 
in a checkpoint.
+That means that new transactions will be created after a restart.
+Therefore, any message in previous pending transactions is either aborted or 
timed out
+(They are never visible to the downstream Pulsar consumer).
+The Pulsar team is working to optimize the needed resources by unfinished 
pending transactions.
+
+#### Pulsar Schema Evolution
+
+[Pulsar Schema 
Evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/) 
allows
+you to reuse the same Flink job after certain "allowed" data model changes, 
like adding or deleting
+a field in a AVRO-based Pojo class. Please note that you can specify Pulsar 
schema validation rules
+and define an auto schema update. For details, refer to [Pulsar Schema 
Evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/).
+
+## Known Issues
+
+This section describes some known issues about the Pulsar connectors.
+
+### Unstable on Java 11
+
+Pulsar connector has some known issues on Java 11. It is recommended to run 
Pulsar connector
+on Java 8.
+
+### No TransactionCoordinatorNotFound, but automatic reconnect
+
+Pulsar transactions are still in active development and are not stable. Pulsar 
2.9.2
+introduces [a break change](https://github.com/apache/pulsar/pull/13135) in 
transactions.
+If you use Pulsar 2.9.2 or higher with an older Pulsar client, you might get a 
`TransactionCoordinatorNotFound` exception.
+
+You can use the latest `pulsar-client-all` release to resolve this issue.
+
 ## Upgrading to the Latest Connector Version
 
 The generic upgrade steps are outlined in [upgrading jobs and Flink versions 
guide]({{< ref "docs/ops/upgrading" >}}).
@@ -407,7 +788,7 @@ The Pulsar connector does not store any state on the Flink 
side. The Pulsar conn
 For Pulsar, you additionally need to know these limitations:
 
 * Do not upgrade the Pulsar connector and Pulsar broker version at the same 
time.
-* Always use a newer Pulsar client with Pulsar connector for consuming message 
from Pulsar.
+* Always use a newer Pulsar client with Pulsar connector to consume messages 
from Pulsar.
 
 ## Troubleshooting
 
@@ -417,13 +798,4 @@ If you have a problem with Pulsar when using Flink, keep 
in mind that Flink only
 and your problem might be independent of Flink and sometimes can be solved by 
upgrading Pulsar brokers,
 reconfiguring Pulsar brokers or reconfiguring Pulsar connector in Flink.
 
-### Messages can be delayed on low volume topics
-
-When the Pulsar source connector reads from a low volume topic, users might 
observe a 10 seconds delay between messages. Pulsar buffers messages from 
topics by default. Before emitting to downstream
-operators, the number of buffered records must be equal or larger than 
`PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS`. If the data volume is low, it 
could be that filling up the number of buffered records takes longer than 
`PULSAR_MAX_FETCH_TIME` (default to 10 seconds). If that's the case, it means 
that only after this time has passed the messages will be emitted. 
-
-To avoid this behaviour, you need to change either the buffered records or the 
waiting time. 
-
-
-
 {{< top >}}
diff --git 
a/docs/layouts/shortcodes/generated/pulsar_producer_configuration.html 
b/docs/layouts/shortcodes/generated/pulsar_producer_configuration.html
new file mode 100644
index 0000000..044c4b2
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/pulsar_producer_configuration.html
@@ -0,0 +1,90 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>pulsar.producer.batchingEnabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enable batch send ability, it was enabled by default.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.batchingMaxBytes</h5></td>
+            <td style="word-wrap: break-word;">131072</td>
+            <td>Integer</td>
+            <td>The maximum size of messages permitted in a batch. Keep the 
maximum consistent as previous versions.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.batchingMaxMessages</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Integer</td>
+            <td>The maximum number of messages permitted in a batch.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.batchingMaxPublishDelayMicros</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Long</td>
+            <td>Batching time period of sending messages.</td>
+        </tr>
+        <tr>
+            
<td><h5>pulsar.producer.batchingPartitionSwitchFrequencyByPublishDelay</h5></td>
+            <td style="word-wrap: break-word;">10</td>
+            <td>Integer</td>
+            <td>The maximum wait time for switching topic partitions.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.chunkingEnabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.compressionType</h5></td>
+            <td style="word-wrap: break-word;">NONE</td>
+            <td><p>Enum</p></td>
+            <td>Message data compression type used by a producer.Available 
options:<ul><li><a href="LZ4">https://github.com/lz4/lz4</a></li><li><a 
href="ZLIB">https://zlib.net/</a></li><li><a 
href="ZSTD">https://facebook.github.io/zstd/</a></li><li><a 
href="SNAPPY">https://google.github.io/snappy/</a></li></ul><br /><br 
/>Possible 
values:<ul><li>"NONE"</li><li>"LZ4"</li><li>"ZLIB"</li><li>"ZSTD"</li><li>"SNAPPY"</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.initialSequenceId</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Long</td>
+            <td>The sequence id for avoiding the duplication, it's used when 
Pulsar doesn't have transaction.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.maxPendingMessages</h5></td>
+            <td style="word-wrap: break-word;">1000</td>
+            <td>Integer</td>
+            <td>The maximum size of a queue holding pending messages.<br />For 
example, a message waiting to receive an acknowledgment from a <a 
href="broker">https://pulsar.apache.org/docs/en/reference-terminology#broker</a>.<br
 />By default, when the queue is full, all calls to the <code 
class="highlighter-rouge">Send</code> and <code 
class="highlighter-rouge">SendAsync</code> methods fail unless you set <code 
class="highlighter-rouge">BlockIfQueueFull</code> to true.</td>
+        </tr>
+        <tr>
+            
<td><h5>pulsar.producer.maxPendingMessagesAcrossPartitions</h5></td>
+            <td style="word-wrap: break-word;">50000</td>
+            <td>Integer</td>
+            <td>The maximum number of pending messages across partitions.<br 
/>Use the setting to lower the max pending messages for each partition (<code 
class="highlighter-rouge">setMaxPendingMessages</code>) if the total number 
exceeds the configured value.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.producerName</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>A producer name which would be displayed in the Pulsar's 
dashboard. If no producer name was provided, we would use a Pulsar generated 
name instead.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.properties</h5></td>
+            <td style="word-wrap: break-word;"></td>
+            <td>Map</td>
+            <td>A name or value property of this consumer. <code 
class="highlighter-rouge">properties</code> is application defined metadata 
attached to a consumer. When getting a topic stats, associate this metadata 
with the consumer stats for easier identification.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.producer.sendTimeoutMs</h5></td>
+            <td style="word-wrap: break-word;">30000</td>
+            <td>Long</td>
+            <td>Message send timeout in ms.If a message is not acknowledged by 
a server before the <code class="highlighter-rouge">sendTimeout</code> expires, 
an error occurs.</td>
+        </tr>
+    </tbody>
+</table>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html 
b/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html
new file mode 100644
index 0000000..cd7f803
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html
@@ -0,0 +1,48 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            <td><h5>pulsar.sink.deliveryGuarantee</h5></td>
+            <td style="word-wrap: break-word;">none</td>
+            <td><p>Enum</p></td>
+            <td>Optional delivery guarantee when committing.<br /><br 
/>Possible values:<ul><li>"exactly-once": Records are only delivered 
exactly-once also under failover scenarios. To build a complete exactly-once 
pipeline is required that the source and sink support exactly-once and are 
properly configured.</li><li>"at-least-once": Records are ensured to be 
delivered but it may happen that the same record is delivered multiple times. 
Usually, this guarantee is faster than the exactly- [...]
+        </tr>
+        <tr>
+            <td><h5>pulsar.sink.enableSchemaEvolution</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If you enable this option and use 
PulsarSerializationSchema.pulsarSchema(), we would consume and deserialize the 
message by using Pulsar's <code class="highlighter-rouge">Schema</code>.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.sink.maxRecommitTimes</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>The allowed transaction recommit times if we meet some 
retryable exception. This is used in Pulsar Transaction.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.sink.messageKeyHash</h5></td>
+            <td style="word-wrap: break-word;">murmur-3-32-hash</td>
+            <td><p>Enum</p></td>
+            <td>The hash policy for routing message by calculating the hash 
code of message key.<br /><br />Possible values:<ul><li>"java-hash": This hash 
would use <code class="highlighter-rouge">String.hashCode()</code> to calculate 
the message key string's hash code.</li><li>"murmur-3-32-hash": This hash would 
calculate message key's hash code by using <a 
href="https://en.wikipedia.org/wiki/MurmurHash";>Murmur3</a> 
algorithm.</li></ul></td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.sink.topicMetadataRefreshInterval</h5></td>
+            <td style="word-wrap: break-word;">1800000</td>
+            <td>Long</td>
+            <td>Auto update the topic metadata in a fixed interval (in ms). 
The default value is 30 minutes.</td>
+        </tr>
+        <tr>
+            <td><h5>pulsar.sink.transactionTimeoutMillis</h5></td>
+            <td style="word-wrap: break-word;">10800000</td>
+            <td>Long</td>
+            <td>This option is used when the user require the <code 
class="highlighter-rouge">DeliveryGuarantee.EXACTLY_ONCE</code> semantic.We 
would use transaction for making sure the message could be write only once.</td>
+        </tr>
+    </tbody>
+</table>
diff --git 
a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
 
b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
index 4602cb6..38b5121 100644
--- 
a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java
@@ -105,6 +105,9 @@ public class ConfigOptionsDocGenerator {
                         "flink-connectors/flink-connector-pulsar",
                         "org.apache.flink.connector.pulsar.source"),
                 new OptionsClassLocation(
+                        "flink-connectors/flink-connector-pulsar",
+                        "org.apache.flink.connector.pulsar.sink"),
+                new OptionsClassLocation(
                         "flink-libraries/flink-cep", 
"org.apache.flink.cep.configuration"),
                 new OptionsClassLocation(
                         "flink-dstl/flink-dstl-dfs", 
"org.apache.flink.changelog.fs"),

Reply via email to