This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a2a3969701 [FLINK-27690][python][connector/pulsar][docs] Add pulsar 
example and documentation
8a2a3969701 is described below

commit 8a2a396970156f1ddecf88a6498162506eb25069
Author: Ada Wang <[email protected]>
AuthorDate: Thu May 19 19:51:52 2022 +0800

    [FLINK-27690][python][connector/pulsar][docs] Add pulsar example and 
documentation
    
    This closes #19775.
---
 .../docs/connectors/datastream/pulsar.md           | 187 ++++++++++++++++++++-
 docs/content/docs/connectors/datastream/pulsar.md  | 183 +++++++++++++++++++-
 .../datastream/{formats => connectors}/__init__.py |   0
 .../kafka_avro_format.py}                          |   0
 .../kafka_csv_format.py}                           |   0
 .../kafka_json_format.py}                          |   0
 .../examples/datastream/connectors/pulsar.py       |  72 ++++++++
 7 files changed, 435 insertions(+), 7 deletions(-)

diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md 
b/docs/content.zh/docs/connectors/datastream/pulsar.md
index d716b025ab8..0f84f6b492e 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -46,8 +46,11 @@ Pulsar Source 基于 Flink 最新的[批流一体 API]({{< ref 
"docs/dev/datastr
 
 Pulsar Source 提供了 builder 类来构造 `PulsarSource` 实例。下面的代码实例使用 builder 类创建的实例会从 
“persistent://public/default/my-topic” 的数据开始端进行消费。对应的 Pulsar Source 使用了 
**Exclusive**(独占)的订阅方式消费消息,订阅名称为 `my-subscription`,并把消息体的二进制字节流以 UTF-8 
的方式编码为字符串。
 
+{{< tabs "pulsar-source-usage" >}}
+{{< tab "Java" >}}
+
 ```java
-PulsarSource<String> pulsarSource = PulsarSource.builder()
+PulsarSource<String> source = PulsarSource.builder()
     .setServiceUrl(serviceUrl)
     .setAdminUrl(adminUrl)
     .setStartCursor(StartCursor.earliest())
@@ -60,6 +63,29 @@ PulsarSource<String> pulsarSource = PulsarSource.builder()
 env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
 ```
 
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+pulsar_source = PulsarSource.builder() \
+    .set_service_url('pulsar://localhost:6650') \
+    .set_admin_url('http://localhost:8080') \
+    .set_start_cursor(StartCursor.earliest()) \
+    .set_topics("my-topic") \
+    .set_deserialization_schema(
+        PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+    .set_subscription_name('my-subscription') \
+    .set_subscription_type(SubscriptionType.Exclusive) \
+    .build()
+
+env.from_source(source=pulsar_source,
+                
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+                source_name="pulsar source")
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
 如果使用构造类构造 `PulsarSource`,一定要提供下面几个属性:
 
 - Pulsar 数据消费的地址,使用 `setServiceUrl(String)` 方法提供。
@@ -73,18 +99,47 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"Pulsar Source");
 Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
 
 - Topic 列表,从这个 Topic 的所有分区上消费消息,例如:
+  {{< tabs "pulsar-source-topics" >}}
+  {{< tab "Java" >}}
+
   ```java
   PulsarSource.builder().setTopics("some-topic1", "some-topic2");
 
-  // 从 topic "topic-a" 的 0 和 1 分区上消费
+  // 从 topic "topic-a" 的 0 和 2 分区上消费
   PulsarSource.builder().setTopics("topic-a-partition-0", 
"topic-a-partition-2");
   ```
 
+  {{< /tab >}}
+  {{< tab "Python" >}}
+
+  ```python
+  PulsarSource.builder().set_topics(["some-topic1", "some-topic2"])
+
+  # 从 topic "topic-a" 的 0 和 2 分区上消费
+  PulsarSource.builder().set_topics(["topic-a-partition-0", 
"topic-a-partition-2"])
+  ```
+
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - Topic 正则,Pulsar Source 使用给定的正则表达式匹配出所有合规的 Topic,例如:
+  {{< tabs "pulsar-source-topic-pattern" >}}
+  {{< tab "Java" >}}
+
   ```java
   PulsarSource.builder().setTopicPattern("topic-*");
   ```
 
+  {{< /tab >}}
+  {{< tab "Python" >}}
+
+  ```python
+  PulsarSource.builder().set_topic_pattern("topic-*")
+  ```
+
+  {{< /tab >}}
+  {{< /tabs >}}
+
 #### Topic 名称简写
 
 从 Pulsar 2.0 之后,完整的 Topic 名称格式为 
`{persistent|non-persistent}://租户/命名空间/topic`。但是 Pulsar Source 不需要提供 Topic 
名称的完整定义,因为 Topic 类型、租户、命名空间都设置了默认值。
@@ -147,13 +202,32 @@ Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
   PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class);
   ```
 - 使用 Flink 的 `DeserializationSchema` 解析消息。
+  {{< tabs "pulsar-deserializer-deserialization-schema" >}}
+  {{< tab "Java" >}}
   ```java
   PulsarDeserializationSchema.flinkSchema(DeserializationSchema);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  PulsarDeserializationSchema.flink_schema(DeserializationSchema)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - 使用 Flink 的 `TypeInformation` 解析消息。
+  {{< tabs "pulsar-deserializer-type-information" >}}
+  {{< tab "Java" >}}
   ```java
   PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  PulsarDeserializationSchema.flink_type_info(TypeInformation)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 
 Pulsar 的 `Message<byte[]>` 包含了很多 
[额外的属性](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#%E6%B6%88%E6%81%AF)。例如,消息的
 key、消息发送时间、消息生产时间、用户在消息上自定义的键值对属性等。可以使用 `Message<byte[]>` 接口来获取这些属性。
 
@@ -172,6 +246,9 @@ Pulsar 的 `Message<byte[]>` 包含了很多 [额外的属性](https://pulsar.ap
 
 默认情况下,如果没有指定订阅类型,Pulsar Source 使用共享订阅类型(`SubscriptionType.Shared`)。
 
+{{< tabs "pulsar-subscriptions" >}}
+{{< tab "Java" >}}
+
 ```java
 // 名为 "my-shared" 的共享订阅
 PulsarSource.builder().setSubscriptionName("my-shared");
@@ -180,6 +257,20 @@ PulsarSource.builder().setSubscriptionName("my-shared");
 
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
 ```
 
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# 名为 "my-shared" 的共享订阅
+PulsarSource.builder().set_subscription_name("my-shared")
+
+# 名为 "my-exclusive" 的独占订阅
+PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_type(SubscriptionType.Exclusive)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
 如果想在 Pulsar Source 里面使用 `key 共享` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 
会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。
 
 Pulsar Source 也提供了一个名为 `UniformRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash 
范围均分。
@@ -189,13 +280,33 @@ Pulsar Source 也提供了一个名为 `UniformRangeGenerator` 的默认实现
 Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的位置。内置的开始消费位置有:
 
 - 从 Topic 里面最早的一条消息开始消费。
+  {{< tabs "pulsar-starting-position-earliest" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.earliest();
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.earliest()
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - 从 Topic 里面最新的一条消息开始消费。
+  {{< tabs "pulsar-starting-position-latest" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.latest();
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.latest()
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - 从给定的消息开始消费。
   ```java
   StartCursor.fromMessageId(MessageId);
@@ -205,9 +316,18 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
   StartCursor.fromMessageId(MessageId, boolean);
   ```
 - 从给定的消息时间开始消费。
+  {{< tabs "pulsar-starting-position-message-time" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.fromMessageTime(long);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.from_message_time(int)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 
 {{< hint info >}}
 每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在 
Pulsar 底层存储上查找到具体的消息。
@@ -224,13 +344,33 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
 内置的停止消费位置如下:
 
 - 永不停止。
+  {{< tabs "pulsar-boundedness-never" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.never();
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.never()
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - 停止于 Pulsar 启动时 Topic 里面最新的那条数据。
+  {{< tabs "pulsar-boundedness-latest" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.latest();
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.latest()
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - 停止于某条消息,结果里不包含此消息。
   ```java
   StopCursor.atMessageId(MessageId);
@@ -240,13 +380,22 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
   StopCursor.afterMessageId(MessageId);
   ```
 - 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`。
+  {{< tabs "pulsar-boundedness-publish-time" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.atPublishTime(long);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.at_publish_time(int)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 
-{{< hint warning >}}
-StopCursor.atEventTime(long) 目前已经处于弃用状态。
-{{< /hint >}}
+  {{< hint warning >}}
+  StopCursor.atEventTime(long) 目前已经处于弃用状态。
+  {{< /hint >}}
 
 ### Source 配置项
 
@@ -280,12 +429,27 @@ Pulsar 提供了消费者 API 和读者 API 两套 API 来进行数据消费,
 
 为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 Topic,Pulsar Source 
提供了动态分区发现机制。该机制不需要重启 Flink 任务。对选项 
`PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` 设置一个正整数即可启用。
 
+{{< tabs "pulsar-dynamic-partition-discovery" >}}
+{{< tab "Java" >}}
+
 ```java
 // 10 秒查询一次分区信息
 PulsarSource.builder()
         .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
10000);
 ```
 
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# 10 秒查询一次分区信息
+PulsarSource.builder()
+    .set_config("pulsar.source.partitionDiscoveryIntervalMs", 10000)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
 {{< hint warning >}}
 默认情况下,Pulsar 启用动态分区发现,查询间隔为 30 秒。用户可以给定一个负数,将该功能禁用。如果使用批的方式消费数据,将无法启用该功能。
 {{< /hint >}}
@@ -294,10 +458,23 @@ PulsarSource.builder()
 
 默认情况下,Pulsar Source 使用 Pulsar 的 `Message<byte[]>` 里面的时间作为解析结果的时间戳。用户可以使用 
`WatermarkStrategy` 来自行解析出想要的消息时间,并向下游传递对应的水位线。
 
+{{< tabs "pulsar-watermarks" >}}
+{{< tab "Java" >}}
+
 ```java
 env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source 
With Custom Watermark Strategy");
 ```
 
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+env.from_source(pulsar_source, CustomWatermarkStrategy(), "Pulsar Source With 
Custom Watermark Strategy")
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
 [这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md" 
>}})详细讲解了如何定义 `WatermarkStrategy`。
 
 ### 消息确认
diff --git a/docs/content/docs/connectors/datastream/pulsar.md 
b/docs/content/docs/connectors/datastream/pulsar.md
index c1916b24608..66d050e540c 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -50,6 +50,9 @@ The Pulsar source provides a builder class for constructing a 
PulsarSource insta
 "persistent://public/default/my-topic" in **Exclusive** subscription type 
(`my-subscription`)
 and deserializes the raw payload of the messages as strings.
 
+{{< tabs "pulsar-source-usage" >}}
+{{< tab "Java" >}}
+
 ```java
 PulsarSource<String> source = PulsarSource.builder()
     .setServiceUrl(serviceUrl)
@@ -64,6 +67,29 @@ PulsarSource<String> source = PulsarSource.builder()
 env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
 ```
 
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+pulsar_source = PulsarSource.builder() \
+    .set_service_url('pulsar://localhost:6650') \
+    .set_admin_url('http://localhost:8080') \
+    .set_start_cursor(StartCursor.earliest()) \
+    .set_topics("my-topic") \
+    .set_deserialization_schema(
+        PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+    .set_subscription_name('my-subscription') \
+    .set_subscription_type(SubscriptionType.Exclusive) \
+    .build()
+
+env.from_source(source=pulsar_source,
+                
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+                source_name="pulsar source")
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
 The following properties are **required** for building a PulsarSource:
 
 - Pulsar service URL, configured by `setServiceUrl(String)`
@@ -83,6 +109,9 @@ You can use it to monitor the performance of your Flink 
connector and applicatio
 Pulsar source provide two ways of topic-partition subscription:
 
 - Topic list, subscribing messages from all partitions in a list of topics. 
For example:
+  {{< tabs "pulsar-source-topics" >}}
+  {{< tab "Java" >}}
+
   ```java
   PulsarSource.builder().setTopics("some-topic1", "some-topic2");
 
@@ -90,11 +119,37 @@ Pulsar source provide two ways of topic-partition 
subscription:
   PulsarSource.builder().setTopics("topic-a-partition-0", 
"topic-a-partition-2");
   ```
 
+  {{< /tab >}}
+  {{< tab "Python" >}}
+
+  ```python
+  PulsarSource.builder().set_topics(["some-topic1", "some-topic2"])
+
+  # Partition 0 and 2 of topic "topic-a"
+  PulsarSource.builder().set_topics(["topic-a-partition-0", 
"topic-a-partition-2"])
+  ```
+
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - Topic pattern, subscribing messages from all topics whose name matches the 
provided regular expression. For example:
+  {{< tabs "pulsar-source-topic-pattern" >}}
+  {{< tab "Java" >}}
+
   ```java
   PulsarSource.builder().setTopicPattern("topic-*");
   ```
 
+  {{< /tab >}}
+  {{< tab "Python" >}}
+
+  ```python
+  PulsarSource.builder().set_topic_pattern("topic-*")
+  ```
+
+  {{< /tab >}}
+  {{< /tabs >}}
+
 #### Flexible Topic Naming
 
 Since Pulsar 2.0, all topic names internally are in  a form of  
`{persistent|non-persistent}://tenant/namespace/topic`.
@@ -169,13 +224,32 @@ you can use the predefined `PulsarDeserializationSchema`. 
Pulsar connector provi
   PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class);
   ```
 - Decode the message by using Flink's `DeserializationSchema`
+  {{< tabs "pulsar-deserializer-deserialization-schema" >}}
+  {{< tab "Java" >}}
   ```java
   PulsarDeserializationSchema.flinkSchema(DeserializationSchema);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  PulsarDeserializationSchema.flink_schema(DeserializationSchema)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - Decode the message by using Flink's `TypeInformation`
+  {{< tabs "pulsar-deserializer-type-information" >}}
+  {{< tab "Java" >}}
   ```java
   PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  PulsarDeserializationSchema.flink_type_info(TypeInformation)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 
 Pulsar `Message<byte[]>` contains some [extra 
properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
 such as message key, message publish time, message time, and 
application-defined key/value pairs etc.
@@ -200,6 +274,9 @@ When a Flink reader crashes, all (non-acknowledged and 
subsequent) messages are
 
 By default, if no subscription type is defined, Pulsar source uses the 
`Shared` subscription type.
 
+{{< tabs "pulsar-subscriptions" >}}
+{{< tab "Java" >}}
+
 ```java
 // Shared subscription with name "my-shared"
 PulsarSource.builder().setSubscriptionName("my-shared");
@@ -208,6 +285,20 @@ PulsarSource.builder().setSubscriptionName("my-shared");
 
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
 ```
 
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# Shared subscription with name "my-shared"
+PulsarSource.builder().set_subscription_name("my-shared")
+
+# Exclusive subscription with name "my-exclusive"
+PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_type(SubscriptionType.Exclusive)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
 Ensure that you provide a `RangeGenerator` implementation if you want to use 
the `Key_Shared` subscription type on the Pulsar connector.
 The `RangeGenerator` generates a set of key hash ranges so that a respective 
reader subtask only dispatches messages where the hash of the message key is 
contained in the specified range.
 
@@ -220,13 +311,33 @@ The Pulsar source is able to consume messages starting 
from different positions
 Built-in start cursors include:
 
 - Start from the earliest available message in the topic.
+  {{< tabs "pulsar-starting-position-earliest" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.earliest();
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.earliest()
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - Start from the latest available message in the topic.
+  {{< tabs "pulsar-starting-position-latest" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.latest();
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.latest()
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - Start from a specified message between the earliest and the latest.
 The Pulsar connector consumes from the latest available message if the message 
ID does not exist.
 
@@ -241,10 +352,20 @@ The Pulsar connector consumes from the latest available 
message if the message I
   ```java
   StartCursor.fromMessageId(MessageId, boolean);
   ```
+
 - Start from the specified message time by `Message<byte[]>.getPublishTime()`.
+  {{< tabs "pulsar-starting-position-message-time" >}}
+  {{< tab "Java" >}}
   ```java
   StartCursor.fromMessageTime(long);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StartCursor.from_message_time(int)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 
 {{< hint info >}}
 Each Pulsar message belongs to an ordered sequence on its topic.
@@ -266,13 +387,33 @@ You can use `setBoundedStopCursor(StopCursor)` to specify 
a stop position for bo
 Built-in stop cursors include:
 
 - The Pulsar source never stops consuming messages.
+  {{< tabs "pulsar-boundedness-never" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.never();
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.never()
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - Stop at the latest available message when the  Pulsar source starts 
consuming messages.
+  {{< tabs "pulsar-boundedness-latest" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.latest();
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.latest()
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
+
 - Stop when the connector meets a given message, or stop at a message which is 
produced after this given message.
   ```java
   StopCursor.atMessageId(MessageId);
@@ -281,13 +422,23 @@ Built-in stop cursors include:
   ```java
   StopCursor.afterMessageId(MessageId);
   ```
+
 - Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
+  {{< tabs "pulsar-boundedness-publish-time" >}}
+  {{< tab "Java" >}}
   ```java
   StopCursor.atPublishTime(long);
   ```
+  {{< /tab >}}
+  {{< tab "Python" >}}
+  ```python
+  StopCursor.at_publish_time(int)
+  ```
+  {{< /tab >}}
+  {{< /tabs >}}
 
-{{< hint warning >}}
-StopCursor.atEventTime(long) is now deprecated.
+  {{< hint warning >}}
+  StopCursor.atEventTime(long) is now deprecated.
   {{< /hint >}}
 
 ### Source Configurable Options
@@ -335,12 +486,27 @@ job, the Pulsar source periodically discover new 
partitions under a provided
 topic-partition subscription pattern. To enable partition discovery, you can 
set a non-negative value for
 the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
 
+{{< tabs "pulsar-dynamic-partition-discovery" >}}
+{{< tab "Java" >}}
+
 ```java
 // discover new partitions per 10 seconds
 PulsarSource.builder()
     .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, 
10000);
 ```
 
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# discover new partitions per 10 seconds
+PulsarSource.builder()
+    .set_config("pulsar.source.partitionDiscoveryIntervalMs", 10000)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
 {{< hint warning >}}
 - Partition discovery is **enabled** by default. The Pulsar connector queries 
the topic metadata every 30 seconds.
 - To disable partition discovery, you need to set a negative partition 
discovery interval.
@@ -353,10 +519,23 @@ By default, the message uses the timestamp embedded in 
Pulsar `Message<byte[]>`
 You can define your own `WatermarkStrategy` to extract the event time from the 
message,
 and emit the watermark downstream:
 
+{{< tabs "pulsar-watermarks" >}}
+{{< tab "Java" >}}
+
 ```java
 env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source 
With Custom Watermark Strategy");
 ```
 
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+env.from_source(pulsar_source, CustomWatermarkStrategy(), "Pulsar Source With 
Custom Watermark Strategy")
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
 [This documentation]({{< ref 
"docs/dev/datastream/event-time/generating_watermarks.md" >}}) describes
 details about how to define a `WatermarkStrategy`.
 
diff --git a/flink-python/pyflink/examples/datastream/formats/__init__.py 
b/flink-python/pyflink/examples/datastream/connectors/__init__.py
similarity index 100%
rename from flink-python/pyflink/examples/datastream/formats/__init__.py
rename to flink-python/pyflink/examples/datastream/connectors/__init__.py
diff --git a/flink-python/pyflink/examples/datastream/formats/avro_format.py 
b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
similarity index 100%
rename from flink-python/pyflink/examples/datastream/formats/avro_format.py
rename to 
flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
diff --git a/flink-python/pyflink/examples/datastream/formats/csv_format.py 
b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
similarity index 100%
rename from flink-python/pyflink/examples/datastream/formats/csv_format.py
rename to 
flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
diff --git a/flink-python/pyflink/examples/datastream/formats/json_format.py 
b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
similarity index 100%
rename from flink-python/pyflink/examples/datastream/formats/json_format.py
rename to 
flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
diff --git a/flink-python/pyflink/examples/datastream/connectors/pulsar.py 
b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
new file mode 100644
index 00000000000..2278d522dbf
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
@@ -0,0 +1,72 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import logging
+import sys
+
+from pyflink.common import SimpleStringSchema, WatermarkStrategy
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import PulsarSource, PulsarSink, 
PulsarSerializationSchema, \
+    StartCursor, StopCursor, SubscriptionType, PulsarDeserializationSchema, 
DeliveryGuarantee, \
+    TopicRoutingMode
+
+if __name__ == '__main__':
+    logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
format="%(message)s")
+
+    PULSAR_SQL_CONNECTOR_PATH = 
'file:///path/to/flink-sql-connector-pulsar-1.16.0.jar'
+    SERVICE_URL = 'pulsar://localhost:6650'
+    ADMIN_URL = 'http://localhost:8080'
+
+    env = StreamExecutionEnvironment.get_execution_environment()
+    env.set_parallelism(1)
+    env.add_jars(PULSAR_SQL_CONNECTOR_PATH)
+
+    pulsar_source = PulsarSource.builder() \
+        .set_service_url(SERVICE_URL) \
+        .set_admin_url(ADMIN_URL) \
+        .set_topics('ada') \
+        .set_start_cursor(StartCursor.latest()) \
+        .set_unbounded_stop_cursor(StopCursor.never()) \
+        .set_subscription_name('pyflink_subscription') \
+        .set_subscription_type(SubscriptionType.Exclusive) \
+        .set_deserialization_schema(
+            PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+        .set_config('pulsar.source.enableAutoAcknowledgeMessage', True) \
+        .set_properties({'pulsar.source.autoCommitCursorInterval': '1000'}) \
+        .build()
+
+    ds = env.from_source(source=pulsar_source,
+                         
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+                         source_name="pulsar source")
+
+    pulsar_sink = PulsarSink.builder() \
+        .set_service_url(SERVICE_URL) \
+        .set_admin_url(ADMIN_URL) \
+        .set_producer_name('pyflink_producer') \
+        .set_topics('beta') \
+        .set_serialization_schema(
+            PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
+        .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+        .set_topic_routing_mode(TopicRoutingMode.ROUND_ROBIN) \
+        .set_config('pulsar.producer.maxPendingMessages', 1000) \
+        .set_properties({'pulsar.producer.batchingMaxMessages': '100'}) \
+        .build()
+
+    ds.sink_to(pulsar_sink).name('pulsar sink')
+
+    env.execute()

Reply via email to