This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8a2a3969701 [FLINK-27690][python][connector/pulsar][docs] Add pulsar
example and documentation
8a2a3969701 is described below
commit 8a2a396970156f1ddecf88a6498162506eb25069
Author: Ada Wang <[email protected]>
AuthorDate: Thu May 19 19:51:52 2022 +0800
[FLINK-27690][python][connector/pulsar][docs] Add pulsar example and
documentation
This closes #19775.
---
.../docs/connectors/datastream/pulsar.md | 187 ++++++++++++++++++++-
docs/content/docs/connectors/datastream/pulsar.md | 183 +++++++++++++++++++-
.../datastream/{formats => connectors}/__init__.py | 0
.../kafka_avro_format.py} | 0
.../kafka_csv_format.py} | 0
.../kafka_json_format.py} | 0
.../examples/datastream/connectors/pulsar.py | 72 ++++++++
7 files changed, 435 insertions(+), 7 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/pulsar.md
b/docs/content.zh/docs/connectors/datastream/pulsar.md
index d716b025ab8..0f84f6b492e 100644
--- a/docs/content.zh/docs/connectors/datastream/pulsar.md
+++ b/docs/content.zh/docs/connectors/datastream/pulsar.md
@@ -46,8 +46,11 @@ Pulsar Source 基于 Flink 最新的[批流一体 API]({{< ref
"docs/dev/datastr
Pulsar Source 提供了 builder 类来构造 `PulsarSource` 实例。下面的代码实例使用 builder 类创建的实例会从
“persistent://public/default/my-topic” 的数据开始端进行消费。对应的 Pulsar Source 使用了
**Exclusive**(独占)的订阅方式消费消息,订阅名称为 `my-subscription`,并把消息体的二进制字节流以 UTF-8
的方式编码为字符串。
+{{< tabs "pulsar-source-usage" >}}
+{{< tab "Java" >}}
+
```java
-PulsarSource<String> pulsarSource = PulsarSource.builder()
+PulsarSource<String> source = PulsarSource.builder()
.setServiceUrl(serviceUrl)
.setAdminUrl(adminUrl)
.setStartCursor(StartCursor.earliest())
@@ -60,6 +63,29 @@ PulsarSource<String> pulsarSource = PulsarSource.builder()
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
```
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+pulsar_source = PulsarSource.builder() \
+ .set_service_url('pulsar://localhost:6650') \
+ .set_admin_url('http://localhost:8080') \
+ .set_start_cursor(StartCursor.earliest()) \
+ .set_topics("my-topic") \
+ .set_deserialization_schema(
+ PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_subscription_name('my-subscription') \
+ .set_subscription_type(SubscriptionType.Exclusive) \
+ .build()
+
+env.from_source(source=pulsar_source,
+
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+ source_name="pulsar source")
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
如果使用构造类构造 `PulsarSource`,一定要提供下面几个属性:
- Pulsar 数据消费的地址,使用 `setServiceUrl(String)` 方法提供。
@@ -73,18 +99,47 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"Pulsar Source");
Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
- Topic 列表,从这个 Topic 的所有分区上消费消息,例如:
+ {{< tabs "pulsar-source-topics" >}}
+ {{< tab "Java" >}}
+
```java
PulsarSource.builder().setTopics("some-topic1", "some-topic2");
- // 从 topic "topic-a" 的 0 和 1 分区上消费
+ // 从 topic "topic-a" 的 0 和 2 分区上消费
PulsarSource.builder().setTopics("topic-a-partition-0",
"topic-a-partition-2");
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+
+ ```python
+ PulsarSource.builder().set_topics(["some-topic1", "some-topic2"])
+
+ # 从 topic "topic-a" 的 0 和 2 分区上消费
+ PulsarSource.builder().set_topics(["topic-a-partition-0",
"topic-a-partition-2"])
+ ```
+
+ {{< /tab >}}
+ {{< /tabs >}}
+
- Topic 正则,Pulsar Source 使用给定的正则表达式匹配出所有合规的 Topic,例如:
+ {{< tabs "pulsar-source-topic-pattern" >}}
+ {{< tab "Java" >}}
+
```java
PulsarSource.builder().setTopicPattern("topic-*");
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+
+ ```python
+ PulsarSource.builder().set_topic_pattern("topic-*")
+ ```
+
+ {{< /tab >}}
+ {{< /tabs >}}
+
#### Topic 名称简写
从 Pulsar 2.0 之后,完整的 Topic 名称格式为
`{persistent|non-persistent}://租户/命名空间/topic`。但是 Pulsar Source 不需要提供 Topic
名称的完整定义,因为 Topic 类型、租户、命名空间都设置了默认值。
@@ -147,13 +202,32 @@ Pulsar Source 提供了两种订阅 Topic 或 Topic 分区的方式。
PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class);
```
- 使用 Flink 的 `DeserializationSchema` 解析消息。
+ {{< tabs "pulsar-deserializer-deserialization-schema" >}}
+ {{< tab "Java" >}}
```java
PulsarDeserializationSchema.flinkSchema(DeserializationSchema);
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ PulsarDeserializationSchema.flink_schema(DeserializationSchema)
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- 使用 Flink 的 `TypeInformation` 解析消息。
+ {{< tabs "pulsar-deserializer-type-information" >}}
+ {{< tab "Java" >}}
```java
PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ PulsarDeserializationSchema.flink_type_info(TypeInformation)
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
Pulsar 的 `Message<byte[]>` 包含了很多
[额外的属性](https://pulsar.apache.org/docs/zh-CN/concepts-messaging/#%E6%B6%88%E6%81%AF)。例如,消息的
key、消息发送时间、消息生产时间、用户在消息上自定义的键值对属性等。可以使用 `Message<byte[]>` 接口来获取这些属性。
@@ -172,6 +246,9 @@ Pulsar 的 `Message<byte[]>` 包含了很多 [额外的属性](https://pulsar.ap
默认情况下,如果没有指定订阅类型,Pulsar Source 使用共享订阅类型(`SubscriptionType.Shared`)。
+{{< tabs "pulsar-subscriptions" >}}
+{{< tab "Java" >}}
+
```java
// 名为 "my-shared" 的共享订阅
PulsarSource.builder().setSubscriptionName("my-shared");
@@ -180,6 +257,20 @@ PulsarSource.builder().setSubscriptionName("my-shared");
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
```
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# 名为 "my-shared" 的共享订阅
+PulsarSource.builder().set_subscription_name("my-shared")
+
+# 名为 "my-exclusive" 的独占订阅
+PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_type(SubscriptionType.Exclusive)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
如果想在 Pulsar Source 里面使用 `key 共享` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator`
会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。
Pulsar Source 也提供了一个名为 `UniformRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash
范围均分。
@@ -189,13 +280,33 @@ Pulsar Source 也提供了一个名为 `UniformRangeGenerator` 的默认实现
Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的位置。内置的开始消费位置有:
- 从 Topic 里面最早的一条消息开始消费。
+ {{< tabs "pulsar-starting-position-earliest" >}}
+ {{< tab "Java" >}}
```java
StartCursor.earliest();
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StartCursor.earliest()
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- 从 Topic 里面最新的一条消息开始消费。
+ {{< tabs "pulsar-starting-position-latest" >}}
+ {{< tab "Java" >}}
```java
StartCursor.latest();
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StartCursor.latest()
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- 从给定的消息开始消费。
```java
StartCursor.fromMessageId(MessageId);
@@ -205,9 +316,18 @@ Pulsar Source 使用 `setStartCursor(StartCursor)` 方法给定开始消费的
StartCursor.fromMessageId(MessageId, boolean);
```
- 从给定的消息时间开始消费。
+ {{< tabs "pulsar-starting-position-message-time" >}}
+ {{< tab "Java" >}}
```java
StartCursor.fromMessageTime(long);
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StartCursor.from_message_time(int)
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
{{< hint info >}}
每条消息都有一个固定的序列号,这个序列号在 Pulsar 上有序排列,其包含了 ledger、entry、partition 等原始信息,用于在
Pulsar 底层存储上查找到具体的消息。
@@ -224,13 +344,33 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
内置的停止消费位置如下:
- 永不停止。
+ {{< tabs "pulsar-boundedness-never" >}}
+ {{< tab "Java" >}}
```java
StopCursor.never();
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StopCursor.never()
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- 停止于 Pulsar 启动时 Topic 里面最新的那条数据。
+ {{< tabs "pulsar-boundedness-latest" >}}
+ {{< tab "Java" >}}
```java
StopCursor.latest();
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StopCursor.latest()
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- 停止于某条消息,结果里不包含此消息。
```java
StopCursor.atMessageId(MessageId);
@@ -240,13 +380,22 @@ Pulsar Source 默认情况下使用流的方式消费数据。除非任务失败
StopCursor.afterMessageId(MessageId);
```
- 停止于某个给定的消息发布时间戳,比如 `Message<byte[]>.getPublishTime()`。
+ {{< tabs "pulsar-boundedness-publish-time" >}}
+ {{< tab "Java" >}}
```java
StopCursor.atPublishTime(long);
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StopCursor.at_publish_time(int)
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
-{{< hint warning >}}
-StopCursor.atEventTime(long) 目前已经处于弃用状态。
-{{< /hint >}}
+ {{< hint warning >}}
+ StopCursor.atEventTime(long) 目前已经处于弃用状态。
+ {{< /hint >}}
### Source 配置项
@@ -280,12 +429,27 @@ Pulsar 提供了消费者 API 和读者 API 两套 API 来进行数据消费,
为了能在启动 Flink 任务之后还能发现在 Pulsar 上扩容的分区或者是新创建的 Topic,Pulsar Source
提供了动态分区发现机制。该机制不需要重启 Flink 任务。对选项
`PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` 设置一个正整数即可启用。
+{{< tabs "pulsar-dynamic-partition-discovery" >}}
+{{< tab "Java" >}}
+
```java
// 10 秒查询一次分区信息
PulsarSource.builder()
.setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
10000);
```
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# 10 秒查询一次分区信息
+PulsarSource.builder()
+ .set_config("pulsar.source.partitionDiscoveryIntervalMs", 10000)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
{{< hint warning >}}
默认情况下,Pulsar 启用动态分区发现,查询间隔为 30 秒。用户可以给定一个负数,将该功能禁用。如果使用批的方式消费数据,将无法启用该功能。
{{< /hint >}}
@@ -294,10 +458,23 @@ PulsarSource.builder()
默认情况下,Pulsar Source 使用 Pulsar 的 `Message<byte[]>` 里面的时间作为解析结果的时间戳。用户可以使用
`WatermarkStrategy` 来自行解析出想要的消息时间,并向下游传递对应的水位线。
+{{< tabs "pulsar-watermarks" >}}
+{{< tab "Java" >}}
+
```java
env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source
With Custom Watermark Strategy");
```
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+env.from_source(pulsar_source, CustomWatermarkStrategy(), "Pulsar Source With
Custom Watermark Strategy")
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
[这篇文档]({{< ref "docs/dev/datastream/event-time/generating_watermarks.md"
>}})详细讲解了如何定义 `WatermarkStrategy`。
### 消息确认
diff --git a/docs/content/docs/connectors/datastream/pulsar.md
b/docs/content/docs/connectors/datastream/pulsar.md
index c1916b24608..66d050e540c 100644
--- a/docs/content/docs/connectors/datastream/pulsar.md
+++ b/docs/content/docs/connectors/datastream/pulsar.md
@@ -50,6 +50,9 @@ The Pulsar source provides a builder class for constructing a
PulsarSource insta
"persistent://public/default/my-topic" in **Exclusive** subscription type
(`my-subscription`)
and deserializes the raw payload of the messages as strings.
+{{< tabs "pulsar-source-usage" >}}
+{{< tab "Java" >}}
+
```java
PulsarSource<String> source = PulsarSource.builder()
.setServiceUrl(serviceUrl)
@@ -64,6 +67,29 @@ PulsarSource<String> source = PulsarSource.builder()
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");
```
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+pulsar_source = PulsarSource.builder() \
+ .set_service_url('pulsar://localhost:6650') \
+ .set_admin_url('http://localhost:8080') \
+ .set_start_cursor(StartCursor.earliest()) \
+ .set_topics("my-topic") \
+ .set_deserialization_schema(
+ PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_subscription_name('my-subscription') \
+ .set_subscription_type(SubscriptionType.Exclusive) \
+ .build()
+
+env.from_source(source=pulsar_source,
+
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+ source_name="pulsar source")
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
The following properties are **required** for building a PulsarSource:
- Pulsar service URL, configured by `setServiceUrl(String)`
@@ -83,6 +109,9 @@ You can use it to monitor the performance of your Flink
connector and applicatio
Pulsar source provide two ways of topic-partition subscription:
- Topic list, subscribing messages from all partitions in a list of topics.
For example:
+ {{< tabs "pulsar-source-topics" >}}
+ {{< tab "Java" >}}
+
```java
PulsarSource.builder().setTopics("some-topic1", "some-topic2");
@@ -90,11 +119,37 @@ Pulsar source provide two ways of topic-partition
subscription:
PulsarSource.builder().setTopics("topic-a-partition-0",
"topic-a-partition-2");
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+
+ ```python
+ PulsarSource.builder().set_topics(["some-topic1", "some-topic2"])
+
+ # Partition 0 and 2 of topic "topic-a"
+ PulsarSource.builder().set_topics(["topic-a-partition-0",
"topic-a-partition-2"])
+ ```
+
+ {{< /tab >}}
+ {{< /tabs >}}
+
- Topic pattern, subscribing messages from all topics whose name matches the
provided regular expression. For example:
+ {{< tabs "pulsar-source-topic-pattern" >}}
+ {{< tab "Java" >}}
+
```java
PulsarSource.builder().setTopicPattern("topic-*");
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+
+ ```python
+ PulsarSource.builder().set_topic_pattern("topic-*")
+ ```
+
+ {{< /tab >}}
+ {{< /tabs >}}
+
#### Flexible Topic Naming
Since Pulsar 2.0, all topic names internally are in a form of
`{persistent|non-persistent}://tenant/namespace/topic`.
@@ -169,13 +224,32 @@ you can use the predefined `PulsarDeserializationSchema`.
Pulsar connector provi
PulsarDeserializationSchema.pulsarSchema(Schema, Class, Class);
```
- Decode the message by using Flink's `DeserializationSchema`
+ {{< tabs "pulsar-deserializer-deserialization-schema" >}}
+ {{< tab "Java" >}}
```java
PulsarDeserializationSchema.flinkSchema(DeserializationSchema);
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ PulsarDeserializationSchema.flink_schema(DeserializationSchema)
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- Decode the message by using Flink's `TypeInformation`
+ {{< tabs "pulsar-deserializer-type-information" >}}
+ {{< tab "Java" >}}
```java
PulsarDeserializationSchema.flinkTypeInfo(TypeInformation, ExecutionConfig);
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ PulsarDeserializationSchema.flink_type_info(TypeInformation)
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
Pulsar `Message<byte[]>` contains some [extra
properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
such as message key, message publish time, message time, and
application-defined key/value pairs etc.
@@ -200,6 +274,9 @@ When a Flink reader crashes, all (non-acknowledged and
subsequent) messages are
By default, if no subscription type is defined, Pulsar source uses the
`Shared` subscription type.
+{{< tabs "pulsar-subscriptions" >}}
+{{< tab "Java" >}}
+
```java
// Shared subscription with name "my-shared"
PulsarSource.builder().setSubscriptionName("my-shared");
@@ -208,6 +285,20 @@ PulsarSource.builder().setSubscriptionName("my-shared");
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive);
```
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# Shared subscription with name "my-shared"
+PulsarSource.builder().set_subscription_name("my-shared")
+
+# Exclusive subscription with name "my-exclusive"
+PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_type(SubscriptionType.Exclusive)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
Ensure that you provide a `RangeGenerator` implementation if you want to use
the `Key_Shared` subscription type on the Pulsar connector.
The `RangeGenerator` generates a set of key hash ranges so that a respective
reader subtask only dispatches messages where the hash of the message key is
contained in the specified range.
@@ -220,13 +311,33 @@ The Pulsar source is able to consume messages starting
from different positions
Built-in start cursors include:
- Start from the earliest available message in the topic.
+ {{< tabs "pulsar-starting-position-earliest" >}}
+ {{< tab "Java" >}}
```java
StartCursor.earliest();
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StartCursor.earliest()
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- Start from the latest available message in the topic.
+ {{< tabs "pulsar-starting-position-latest" >}}
+ {{< tab "Java" >}}
```java
StartCursor.latest();
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StartCursor.latest()
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- Start from a specified message between the earliest and the latest.
The Pulsar connector consumes from the latest available message if the message
ID does not exist.
@@ -241,10 +352,20 @@ The Pulsar connector consumes from the latest available
message if the message I
```java
StartCursor.fromMessageId(MessageId, boolean);
```
+
- Start from the specified message time by `Message<byte[]>.getPublishTime()`.
+ {{< tabs "pulsar-starting-position-message-time" >}}
+ {{< tab "Java" >}}
```java
StartCursor.fromMessageTime(long);
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StartCursor.from_message_time(int)
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
{{< hint info >}}
Each Pulsar message belongs to an ordered sequence on its topic.
@@ -266,13 +387,33 @@ You can use `setBoundedStopCursor(StopCursor)` to specify
a stop position for bo
Built-in stop cursors include:
- The Pulsar source never stops consuming messages.
+ {{< tabs "pulsar-boundedness-never" >}}
+ {{< tab "Java" >}}
```java
StopCursor.never();
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StopCursor.never()
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- Stop at the latest available message when the Pulsar source starts
consuming messages.
+ {{< tabs "pulsar-boundedness-latest" >}}
+ {{< tab "Java" >}}
```java
StopCursor.latest();
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StopCursor.latest()
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
+
- Stop when the connector meets a given message, or stop at a message which is
produced after this given message.
```java
StopCursor.atMessageId(MessageId);
@@ -281,13 +422,23 @@ Built-in stop cursors include:
```java
StopCursor.afterMessageId(MessageId);
```
+
- Stop at the specified message time by `Message<byte[]>.getPublishTime()`.
+ {{< tabs "pulsar-boundedness-publish-time" >}}
+ {{< tab "Java" >}}
```java
StopCursor.atPublishTime(long);
```
+ {{< /tab >}}
+ {{< tab "Python" >}}
+ ```python
+ StopCursor.at_publish_time(int)
+ ```
+ {{< /tab >}}
+ {{< /tabs >}}
-{{< hint warning >}}
-StopCursor.atEventTime(long) is now deprecated.
+ {{< hint warning >}}
+ StopCursor.atEventTime(long) is now deprecated.
{{< /hint >}}
### Source Configurable Options
@@ -335,12 +486,27 @@ job, the Pulsar source periodically discover new
partitions under a provided
topic-partition subscription pattern. To enable partition discovery, you can
set a non-negative value for
the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
+{{< tabs "pulsar-dynamic-partition-discovery" >}}
+{{< tab "Java" >}}
+
```java
// discover new partitions per 10 seconds
PulsarSource.builder()
.setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
10000);
```
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+# discover new partitions per 10 seconds
+PulsarSource.builder()
+ .set_config("pulsar.source.partitionDiscoveryIntervalMs", 10000)
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
{{< hint warning >}}
- Partition discovery is **enabled** by default. The Pulsar connector queries
the topic metadata every 30 seconds.
- To disable partition discovery, you need to set a negative partition
discovery interval.
@@ -353,10 +519,23 @@ By default, the message uses the timestamp embedded in
Pulsar `Message<byte[]>`
You can define your own `WatermarkStrategy` to extract the event time from the
message,
and emit the watermark downstream:
+{{< tabs "pulsar-watermarks" >}}
+{{< tab "Java" >}}
+
```java
env.fromSource(pulsarSource, new CustomWatermarkStrategy(), "Pulsar Source
With Custom Watermark Strategy");
```
+{{< /tab >}}
+{{< tab "Python" >}}
+
+```python
+env.from_source(pulsar_source, CustomWatermarkStrategy(), "Pulsar Source With
Custom Watermark Strategy")
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
[This documentation]({{< ref
"docs/dev/datastream/event-time/generating_watermarks.md" >}}) describes
details about how to define a `WatermarkStrategy`.
diff --git a/flink-python/pyflink/examples/datastream/formats/__init__.py
b/flink-python/pyflink/examples/datastream/connectors/__init__.py
similarity index 100%
rename from flink-python/pyflink/examples/datastream/formats/__init__.py
rename to flink-python/pyflink/examples/datastream/connectors/__init__.py
diff --git a/flink-python/pyflink/examples/datastream/formats/avro_format.py
b/flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
similarity index 100%
rename from flink-python/pyflink/examples/datastream/formats/avro_format.py
rename to
flink-python/pyflink/examples/datastream/connectors/kafka_avro_format.py
diff --git a/flink-python/pyflink/examples/datastream/formats/csv_format.py
b/flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
similarity index 100%
rename from flink-python/pyflink/examples/datastream/formats/csv_format.py
rename to
flink-python/pyflink/examples/datastream/connectors/kafka_csv_format.py
diff --git a/flink-python/pyflink/examples/datastream/formats/json_format.py
b/flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
similarity index 100%
rename from flink-python/pyflink/examples/datastream/formats/json_format.py
rename to
flink-python/pyflink/examples/datastream/connectors/kafka_json_format.py
diff --git a/flink-python/pyflink/examples/datastream/connectors/pulsar.py
b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
new file mode 100644
index 00000000000..2278d522dbf
--- /dev/null
+++ b/flink-python/pyflink/examples/datastream/connectors/pulsar.py
@@ -0,0 +1,72 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import logging
+import sys
+
+from pyflink.common import SimpleStringSchema, WatermarkStrategy
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.datastream.connectors import PulsarSource, PulsarSink,
PulsarSerializationSchema, \
+ StartCursor, StopCursor, SubscriptionType, PulsarDeserializationSchema,
DeliveryGuarantee, \
+ TopicRoutingMode
+
+if __name__ == '__main__':
+ logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
+
+ PULSAR_SQL_CONNECTOR_PATH =
'file:///path/to/flink-sql-connector-pulsar-1.16.0.jar'
+ SERVICE_URL = 'pulsar://localhost:6650'
+ ADMIN_URL = 'http://localhost:8080'
+
+ env = StreamExecutionEnvironment.get_execution_environment()
+ env.set_parallelism(1)
+ env.add_jars(PULSAR_SQL_CONNECTOR_PATH)
+
+ pulsar_source = PulsarSource.builder() \
+ .set_service_url(SERVICE_URL) \
+ .set_admin_url(ADMIN_URL) \
+ .set_topics('ada') \
+ .set_start_cursor(StartCursor.latest()) \
+ .set_unbounded_stop_cursor(StopCursor.never()) \
+ .set_subscription_name('pyflink_subscription') \
+ .set_subscription_type(SubscriptionType.Exclusive) \
+ .set_deserialization_schema(
+ PulsarDeserializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_config('pulsar.source.enableAutoAcknowledgeMessage', True) \
+ .set_properties({'pulsar.source.autoCommitCursorInterval': '1000'}) \
+ .build()
+
+ ds = env.from_source(source=pulsar_source,
+
watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
+ source_name="pulsar source")
+
+ pulsar_sink = PulsarSink.builder() \
+ .set_service_url(SERVICE_URL) \
+ .set_admin_url(ADMIN_URL) \
+ .set_producer_name('pyflink_producer') \
+ .set_topics('beta') \
+ .set_serialization_schema(
+ PulsarSerializationSchema.flink_schema(SimpleStringSchema())) \
+ .set_delivery_guarantee(DeliveryGuarantee.AT_LEAST_ONCE) \
+ .set_topic_routing_mode(TopicRoutingMode.ROUND_ROBIN) \
+ .set_config('pulsar.producer.maxPendingMessages', 1000) \
+ .set_properties({'pulsar.producer.batchingMaxMessages': '100'}) \
+ .build()
+
+ ds.sink_to(pulsar_sink).name('pulsar sink')
+
+ env.execute()