MartijnVisser commented on a change in pull request #19056:
URL: https://github.com/apache/flink/pull/19056#discussion_r827071130
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -44,17 +43,19 @@ See how to link with them for cluster execution [here]({{<
ref "docs/dev/configu
This part describes the Pulsar source based on the new
[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
-If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower
releases, just use the StreamNative's
[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower
releases,
+just use the StreamNative's [Pulsar Flink
connector](https://github.com/streamnative/pulsar-flink).
{{< /hint >}}
### Usage
-Pulsar source provides a builder class for constructing an instance of
PulsarSource. The code snippet below shows
-how to build a PulsarSource to consume messages from the earliest cursor of
topic "persistent://public/default/my-topic",
-with **Exclusive** subscription `my-subscription` and deserialize the raw
payload of the messages as strings.
+Pulsar source provides a builder class for constructing a PulsarSource
instance. The code snippet below shows
+how to build a PulsarSource instance to consume messages from the earliest
cursor of the topic
+"persistent://public/default/my-topic" in **Exclusive** subscription type
(`my-subscription`)
+and deserialize the raw payload of the messages as strings.
Review comment:
```suggestion
The Pulsar source provides a builder class for constructing a PulsarSource
instance. The code snippet below builds a PulsarSource instance. It consumes
messages from the earliest cursor of the topic
"persistent://public/default/my-topic" in **Exclusive** subscription type
(`my-subscription`)
and deserializes the raw payload of the messages as strings.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -24,14 +24,13 @@ under the License.
# Apache Pulsar Connector
-Flink provides an [Apache Pulsar](https://pulsar.apache.org) source connector
for reading data from Pulsar topics with exactly-once guarantees.
+Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for
reading data from Pulsar topics with exactly-once guarantees.
## Dependency
-You can use the connector with Pulsar 2.7.0 or higher. However, the Pulsar
source connector supports
-Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/),
-it is recommended to use Pulsar 2.8.0 or higher releases.
-For details on Pulsar compatibility, please refer to the
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
+You can use the connector with the Pulsar 2.8.1 or higher version. However,
the Pulsar connector supports
+Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/), it is
recommended to use the Pulsar 2.9.2 or higher version.
+For details on Pulsar compatibility, refer to the
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
Review comment:
```suggestion
You can use the connector with the Pulsar 2.8.1 or higher. Because the
Pulsar connector supports
Pulsar [transactions](https://pulsar.apache.org/docs/en/txn-what/), it is
recommended to use the Pulsar 2.9.2 or higher.
Details on Pulsar compatibility can be found in
[PIP-72](https://github.com/apache/pulsar/wiki/PIP-72%3A-Introduce-Pulsar-Interface-Taxonomy%3A-Audience-and-Stability-Classification).
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition
subscription:
#### Flexible Topic Naming
-Since Pulsar 2.0, all topic names internally have the form
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in a form of
`{persistent|non-persistent}://tenant/namespace/topic`.
Now, for partitioned topics, you can use short names in many cases (for the
sake of simplicity).
The flexible naming system stems from the fact that there is now a default
topic type, tenant, and namespace in a Pulsar cluster.
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default |
+|:---------------|:-------------|
+| topic type | `persistent` |
+| tenant | `public` |
+| namespace | `default` |
-This table lists a mapping relationship between your input topic name and
translated topic name:
+This table lists a mapping relationship between your input topic name and the
translated topic name:
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name | Translated topic name
|
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic` | `persistent://public/default/my-topic`
|
+| `my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic` |
{{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic
name,
+For non-persistent topics, you need to specify the entire topic name,
as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would
use `non-persistent://public/default/my-topic` instead.
Review comment:
```suggestion
Thus, you cannot use a short name like `non-persistent://my-topic` and need
to use `non-persistent://public/default/my-topic` instead.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition
subscription:
#### Flexible Topic Naming
-Since Pulsar 2.0, all topic names internally have the form
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in a form of
`{persistent|non-persistent}://tenant/namespace/topic`.
Now, for partitioned topics, you can use short names in many cases (for the
sake of simplicity).
The flexible naming system stems from the fact that there is now a default
topic type, tenant, and namespace in a Pulsar cluster.
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default |
+|:---------------|:-------------|
+| topic type | `persistent` |
+| tenant | `public` |
+| namespace | `default` |
-This table lists a mapping relationship between your input topic name and
translated topic name:
+This table lists a mapping relationship between your input topic name and the
translated topic name:
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name | Translated topic name
|
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic` | `persistent://public/default/my-topic`
|
+| `my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic` |
{{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic
name,
+For non-persistent topics, you need to specify the entire topic name,
as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would
use `non-persistent://public/default/my-topic` instead.
{{< /hint >}}
#### Subscribing Pulsar Topic Partition
Internally, Pulsar divides a partitioned topic as a set of non-partitioned
topics according to the partition size.
-For example, if a `simple-string` topic with 3 partitions is created under the
`sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the
`sample` tenant with the `flink` namespace.
The topics on Pulsar would be:
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string` | Y |
+| `persistent://sample/flink/simple-string-partition-0` | N |
+| `persistent://sample/flink/simple-string-partition-1` | N |
+| `persistent://sample/flink/simple-string-partition-2` | N |
You can directly consume messages from the topic partitions by using the
non-partitioned topic names above.
-For example, use
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1",
"sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2
of the `sample/flink/simple-string` topic.
+For example, use
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1",
"sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
Review comment:
```suggestion
#### Setting Topic Patterns
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -44,17 +43,19 @@ See how to link with them for cluster execution [here]({{<
ref "docs/dev/configu
This part describes the Pulsar source based on the new
[data source]({{< ref "docs/dev/datastream/sources.md" >}}) API.
-If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower
releases, just use the StreamNative's
[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+If you want to use the legacy `SourceFunction` or on Flink 1.13 or lower
releases,
+just use the StreamNative's [Pulsar Flink
connector](https://github.com/streamnative/pulsar-flink).
{{< /hint >}}
Review comment:
I would completely remove this part from the documentation, since it
will end up in documentation for Flink 1.15 and future versions.
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -69,13 +70,17 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"Pulsar Source");
The following properties are **required** for building a PulsarSource:
-- Pulsar service url, configured by `setServiceUrl(String)`
-- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Pulsar service URL, configured by `setServiceUrl(String)`
+- Pulsar service HTTP URL (also known as admin URL), configured by
`setAdminUrl(String)`
- Pulsar subscription name, configured by `setSubscriptionName(String)`
- Topics / partitions to subscribe, see the following
- [Topic-partition subscription](#topic-partition-subscription) for more
details.
+ [topic-partition subscription](#topic-partition-subscription) for more
details.
- Deserializer to parse Pulsar messages, see the following
- [Deserializer](#deserializer) for more details.
+ [deserializer](#deserializer) for more details.
+
+It is **recommended** to set the consumer name in Pulsar Source by
`setConsumerName(String)`.
+This would give a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
Review comment:
```suggestion
It is recommended to set the consumer name in Pulsar Source by
`setConsumerName(String)`.
This sets a unique name for the Flink connector in the Pulsar statistic
dashboard.
You can use it to monitor the performance of your Flink connector and
applications.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition
subscription:
#### Flexible Topic Naming
-Since Pulsar 2.0, all topic names internally have the form
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in a form of
`{persistent|non-persistent}://tenant/namespace/topic`.
Now, for partitioned topics, you can use short names in many cases (for the
sake of simplicity).
The flexible naming system stems from the fact that there is now a default
topic type, tenant, and namespace in a Pulsar cluster.
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default |
+|:---------------|:-------------|
+| topic type | `persistent` |
+| tenant | `public` |
+| namespace | `default` |
-This table lists a mapping relationship between your input topic name and
translated topic name:
+This table lists a mapping relationship between your input topic name and the
translated topic name:
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name | Translated topic name
|
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic` | `persistent://public/default/my-topic`
|
+| `my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic` |
{{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic
name,
+For non-persistent topics, you need to specify the entire topic name,
as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would
use `non-persistent://public/default/my-topic` instead.
{{< /hint >}}
#### Subscribing Pulsar Topic Partition
Internally, Pulsar divides a partitioned topic as a set of non-partitioned
topics according to the partition size.
-For example, if a `simple-string` topic with 3 partitions is created under the
`sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the
`sample` tenant with the `flink` namespace.
The topics on Pulsar would be:
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string` | Y |
+| `persistent://sample/flink/simple-string-partition-0` | N |
+| `persistent://sample/flink/simple-string-partition-1` | N |
+| `persistent://sample/flink/simple-string-partition-2` | N |
You can directly consume messages from the topic partitions by using the
non-partitioned topic names above.
-For example, use
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1",
"sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2
of the `sample/flink/simple-string` topic.
+For example, use
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1",
"sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`)
from the given topic pattern.
-For example,
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be
`non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in
the regular expression.
+Pulsar source extracts the topic type (`persistent` or `non-persistent`) from
the given topic pattern.
+For example, you can use the
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to
specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic
type in the regular expression.
Review comment:
```suggestion
The Pulsar source extracts the topic type (`persistent` or `non-persistent`)
from the provided topic pattern.
For example, you can use the
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to
specify a `non-persistent` topic.
By default, a `persistent` topic is created if you do not specify the topic
type in the regular expression.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition
subscription:
#### Flexible Topic Naming
-Since Pulsar 2.0, all topic names internally have the form
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in a form of
`{persistent|non-persistent}://tenant/namespace/topic`.
Now, for partitioned topics, you can use short names in many cases (for the
sake of simplicity).
The flexible naming system stems from the fact that there is now a default
topic type, tenant, and namespace in a Pulsar cluster.
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default |
+|:---------------|:-------------|
+| topic type | `persistent` |
+| tenant | `public` |
+| namespace | `default` |
-This table lists a mapping relationship between your input topic name and
translated topic name:
+This table lists a mapping relationship between your input topic name and the
translated topic name:
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name | Translated topic name
|
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic` | `persistent://public/default/my-topic`
|
+| `my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic` |
{{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic
name,
+For non-persistent topics, you need to specify the entire topic name,
as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would
use `non-persistent://public/default/my-topic` instead.
{{< /hint >}}
#### Subscribing Pulsar Topic Partition
Internally, Pulsar divides a partitioned topic as a set of non-partitioned
topics according to the partition size.
-For example, if a `simple-string` topic with 3 partitions is created under the
`sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the
`sample` tenant with the `flink` namespace.
The topics on Pulsar would be:
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string` | Y |
+| `persistent://sample/flink/simple-string-partition-0` | N |
+| `persistent://sample/flink/simple-string-partition-1` | N |
+| `persistent://sample/flink/simple-string-partition-2` | N |
You can directly consume messages from the topic partitions by using the
non-partitioned topic names above.
-For example, use
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1",
"sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2
of the `sample/flink/simple-string` topic.
+For example, use
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1",
"sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`)
from the given topic pattern.
-For example,
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be
`non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in
the regular expression.
+Pulsar source extracts the topic type (`persistent` or `non-persistent`) from
the given topic pattern.
+For example, you can use the
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to
specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic
type in the regular expression.
-To consume both `persistent` and `non-persistent` topics based on the topic
pattern,
-you can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`.
-Pulsar connector would filter the available topics by the
`RegexSubscriptionMode`.
+You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to
consume
+both `persistent` and `non-persistent` topics based on the topic pattern.
+Pulsar source would filter the available topics by the `RegexSubscriptionMode`.
### Deserializer
-A deserializer (Deserialization schema) is required for parsing Pulsar
messages. The deserializer is
-configured by `setDeserializationSchema(PulsarDeserializationSchema)`.
+A deserializer (`PulsarDeserializationSchema`) is for parsing Pulsar messages
from bytes.
+You can configure the deserializer using
`setDeserializationSchema(PulsarDeserializationSchema)`.
Review comment:
```suggestion
A deserializer (`PulsarDeserializationSchema`) is for decoding Pulsar
messages from bytes.
You can configure the deserializer using
`setDeserializationSchema(PulsarDeserializationSchema)`.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -206,16 +212,17 @@ PulsarSource.builder().setSubscriptionName("my-shared")
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
```
-If you want to use `Key_Shared` subscription type on the Pulsar connector.
Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the
message key is contained in the specified range.
+If you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you provide a `RangeGenerator` implementation. The
`RangeGenerator` generates a set of
+key hash ranges so that a respective reader subtask only dispatches messages
where the hash of the
+message key is contained in the specified range.
-Pulsar connector would use a `UniformRangeGenerator` which would divides the
range by the Flink source parallelism
-if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
+Pulsar connector uses a `UniformRangeGenerator` which divides the range by the
Flink source
+parallelism if no `RangeGenerator` is provided in the `Key_Shared`
subscription type.
### Starting Position
-Pulsar source is able to consume messages starting from different positions by
`setStartCursor(StartCursor)`.
+Pulsar source is able to consume messages starting from different positions by
setting the `setStartCursor(StartCursor)` option.
Review comment:
```suggestion
The Pulsar source is able to consume messages starting from different
positions by setting the `setStartCursor(StartCursor)` option.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -96,64 +101,65 @@ Pulsar source provide two ways of topic-partition
subscription:
#### Flexible Topic Naming
-Since Pulsar 2.0, all topic names internally have the form
`{persistent|non-persistent}://tenant/namespace/topic`.
+Since Pulsar 2.0, all topic names internally are in a form of
`{persistent|non-persistent}://tenant/namespace/topic`.
Now, for partitioned topics, you can use short names in many cases (for the
sake of simplicity).
The flexible naming system stems from the fact that there is now a default
topic type, tenant, and namespace in a Pulsar cluster.
-Topic property | Default
-:------------|:-------
-topic type | `persistent`
-tenant | `public`
-namespace | `default`
+| Topic property | Default |
+|:---------------|:-------------|
+| topic type | `persistent` |
+| tenant | `public` |
+| namespace | `default` |
-This table lists a mapping relationship between your input topic name and
translated topic name:
+This table lists a mapping relationship between your input topic name and the
translated topic name:
-Input topic name | Translated topic name
-:----------------|:---------------------
-`my-topic` | `persistent://public/default/my-topic`
-`my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic`
+| Input topic name | Translated topic name
|
+|:----------------------------------|:-----------------------------------------------|
+| `my-topic` | `persistent://public/default/my-topic`
|
+| `my-tenant/my-namespace/my-topic` |
`persistent://my-tenant/my-namespace/my-topic` |
{{< hint warning >}}
-For non-persistent topics, you need to continue to specify the entire topic
name,
+For non-persistent topics, you need to specify the entire topic name,
as the default-based rules do not apply for non-partitioned topics.
-Thus, you cannot use a short name like `non-persistent://my-topic` and would
need to use `non-persistent://public/default/my-topic` instead.
+Thus, you cannot use a short name like `non-persistent://my-topic` and would
use `non-persistent://public/default/my-topic` instead.
{{< /hint >}}
#### Subscribing Pulsar Topic Partition
Internally, Pulsar divides a partitioned topic as a set of non-partitioned
topics according to the partition size.
-For example, if a `simple-string` topic with 3 partitions is created under the
`sample` tenant with `flink` namespace.
+For example, if a `simple-string` topic with 3 partitions is created under the
`sample` tenant with the `flink` namespace.
The topics on Pulsar would be:
-Topic name | Partitioned
-:--------- | :----------
-`persistent://sample/flink/simple-string` | Y
-`persistent://sample/flink/simple-string-partition-0` | N
-`persistent://sample/flink/simple-string-partition-1` | N
-`persistent://sample/flink/simple-string-partition-2` | N
+| Topic name | Partitioned |
+|:------------------------------------------------------|:------------|
+| `persistent://sample/flink/simple-string` | Y |
+| `persistent://sample/flink/simple-string-partition-0` | N |
+| `persistent://sample/flink/simple-string-partition-1` | N |
+| `persistent://sample/flink/simple-string-partition-2` | N |
You can directly consume messages from the topic partitions by using the
non-partitioned topic names above.
-For example, use
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1",
"sample/flink/simple-string-partition-2")` would consume the partitions 1 and 2
of the `sample/flink/simple-string` topic.
+For example, use
`PulsarSource.builder().setTopics("sample/flink/simple-string-partition-1",
"sample/flink/simple-string-partition-2")`
+would consume the partitions 1 and 2 of the `sample/flink/simple-string` topic.
-#### RegexSubscriptionMode for Topic Pattern
+#### Setting Topic Pattern
-Pulsar connector extracts the topic type (`persistent` or `non-persistent`)
from the given topic pattern.
-For example,
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` would be
`non-persistent`.
-The topic type would be `persistent` if you do not provide the topic type in
the regular expression.
+Pulsar source extracts the topic type (`persistent` or `non-persistent`) from
the given topic pattern.
+For example, you can use the
`PulsarSource.builder().setTopicPattern("non-persistent://my-topic*")` to
specify a `non-persistent` topic.
+By default, a `persistent` topic is created if you do not specify the topic
type in the regular expression.
-To consume both `persistent` and `non-persistent` topics based on the topic
pattern,
-you can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)`.
-Pulsar connector would filter the available topics by the
`RegexSubscriptionMode`.
+You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to
consume
+both `persistent` and `non-persistent` topics based on the topic pattern.
+Pulsar source would filter the available topics by the `RegexSubscriptionMode`.
Review comment:
```suggestion
You can use `setTopicPattern("topic-*", RegexSubscriptionMode.AllTopics)` to
consume
both `persistent` and `non-persistent` topics based on the topic pattern.
The Pulsar source would filter the available topics by the
`RegexSubscriptionMode`.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -176,12 +182,12 @@ you can use the predefined `PulsarDeserializationSchema`.
Pulsar connector provi
```
Pulsar `Message<byte[]>` contains some [extra
properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
-such as message key, message publish time, message time, application defined
key/value pairs that will be attached to the message, etc.
+such as message key, message publish time, message time, and
application-defined key/value pairs etc.
These properties could be acquired by the `Message<byte[]>` interface.
If you want to deserialize the Pulsar message by these properties, you need to
implement `PulsarDeserializationSchema`.
-And ensure that the `TypeInformation` from the
`PulsarDeserializationSchema.getProducedType()` must be correct.
-Flink would use this `TypeInformation` for passing the messages to downstream
operators.
+And ensure that the `TypeInformation` from the
`PulsarDeserializationSchema.getProducedType()` is correct.
+Flink uses this `TypeInformation` to pass the messages to downstream operators.
Review comment:
```suggestion
Ensure that the `TypeInformation` from the
`PulsarDeserializationSchema.getProducedType()` is correct.
Flink uses this `TypeInformation` to pass the messages to downstream
operators.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -227,14 +234,14 @@ Built-in start cursors include:
StartCursor.latest()
```
- Start from a specified message between the earliest and the latest.
- Pulsar connector would consume from the latest available message if the
message id doesn't exist.
+ Pulsar connector consumes from the latest available message if the message
ID does not exist.
Review comment:
```suggestion
The Pulsar connector consumes from the latest available message if the
message ID does not exist.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -206,16 +212,17 @@ PulsarSource.builder().setSubscriptionName("my-shared")
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
```
-If you want to use `Key_Shared` subscription type on the Pulsar connector.
Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the
message key is contained in the specified range.
+If you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you provide a `RangeGenerator` implementation. The
`RangeGenerator` generates a set of
+key hash ranges so that a respective reader subtask only dispatches messages
where the hash of the
+message key is contained in the specified range.
-Pulsar connector would use a `UniformRangeGenerator` which would divides the
range by the Flink source parallelism
-if no `RangeGenerator` is provided in the `Key_Shared` subscription type.
+Pulsar connector uses a `UniformRangeGenerator` which divides the range by the
Flink source
+parallelism if no `RangeGenerator` is provided in the `Key_Shared`
subscription type.
Review comment:
```suggestion
The Pulsar connector uses `UniformRangeGenerator` that divides the range by
the Flink source
parallelism if no `RangeGenerator` is provided in the `Key_Shared`
subscription type.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -227,14 +234,14 @@ Built-in start cursors include:
StartCursor.latest()
```
- Start from a specified message between the earliest and the latest.
- Pulsar connector would consume from the latest available message if the
message id doesn't exist.
+ Pulsar connector consumes from the latest available message if the message
ID does not exist.
The start message is included in consuming result.
```java
StartCursor.fromMessageId(MessageId)
```
- Start from a specified message between the earliest and the latest.
- Pulsar connector would consume from the latest available message if the
message id doesn't exist.
+ Pulsar connector consumes from the latest available message if the message
id doesn't exist.
Review comment:
```suggestion
The Pulsar connector consumes from the latest available message if the
message ID doesn't exist.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -176,12 +182,12 @@ you can use the predefined `PulsarDeserializationSchema`.
Pulsar connector provi
```
Pulsar `Message<byte[]>` contains some [extra
properties](https://pulsar.apache.org/docs/en/concepts-messaging/#messages),
-such as message key, message publish time, message time, application defined
key/value pairs that will be attached to the message, etc.
+such as message key, message publish time, message time, and
application-defined key/value pairs etc.
These properties could be acquired by the `Message<byte[]>` interface.
Review comment:
```suggestion
These properties could be defined in the `Message<byte[]>` interface.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
Review comment:
```suggestion
The default timeout for Pulsar transactions is 3 hours.
Make sure that that timeout is greater than checkpoint interval + maximum
recovery time.
A shorter checkpoint interval indicates a better consuming performance.
You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS`
option to change the transaction timeout.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
{{< hint info >}}
Each Pulsar message belongs to an ordered sequence on its topic.
The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on
how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition)
about how the message is stored,
you can create a `MessageId` by using `DefaultImplementation.newMessageId(long
ledgerId, long entryId, int partitionIndex)`.
{{< /hint >}}
### Boundedness
Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+By default, the `PulsarSource` runs in the streaming mode.
-In streaming mode, Pulsar source never stops until a Flink job fails or is
cancelled. However,
-you can set Pulsar source stopping at a stop position by using
```setUnboundedStopCursor(StopCursor)```.
-The Pulsar source will finish when all partitions reach their specified stop
positions.
+In the streaming mode, Pulsar source never stops until a Flink job fails or is
cancelled.
+However, you can use the `setUnboundedStopCursor(StopCursor)` to set the
Pulsar source to stop at a specific stop position.
-You can use ```setBoundedStopCursor(StopCursor)``` to specify a stop position
so that the Pulsar source can run in the batch mode.
-When all partitions have reached their stop positions, the source will finish.
+You can use `setBoundedStopCursor(StopCursor)` to specify a stop position to
run in batch mode.
Review comment:
```suggestion
You can use `setBoundedStopCursor(StopCursor)` to specify a stop position
for bounded data.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
Review comment:
```suggestion
### Producing to topics
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
StopCursor.atEventTime(long)
```
-### Configurable Options
+### Source Configurable Options
In addition to configuration options described above, you can set arbitrary
options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using
`setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using
`setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
#### PulsarClient Options
-Pulsar connector use the [client
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of
Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of
Pulsar's `ClientConfigurationData`,
which is required for creating a `PulsarClient`, as Flink configuration
options in `PulsarOptions`.
{{< generated/pulsar_client_configuration >}}
#### PulsarAdmin Options
The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used
for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic
pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed
here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses
topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
They are also defined in `PulsarOptions`.
{{< generated/pulsar_admin_configuration >}}
#### Pulsar Consumer Options
In general, Pulsar provides the Reader API and Consumer API for consuming
messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of
Pulsar's `ConsumerConfigurationData` as Flink configuration options in
`PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's
`ConsumerConfigurationData` as Flink configuration options in
`PulsarSourceOptions`.
{{< generated/pulsar_consumer_configuration >}}
#### PulsarSource Options
The configuration options below are mainly used for customizing the
performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
Review comment:
```suggestion
You can ignore them if you do not have any performance issues.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
{{< hint info >}}
Each Pulsar message belongs to an ordered sequence on its topic.
The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on
how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition)
about how the message is stored,
you can create a `MessageId` by using `DefaultImplementation.newMessageId(long
ledgerId, long entryId, int partitionIndex)`.
{{< /hint >}}
### Boundedness
Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+By default, the `PulsarSource` runs in the streaming mode.
-In streaming mode, Pulsar source never stops until a Flink job fails or is
cancelled. However,
-you can set Pulsar source stopping at a stop position by using
```setUnboundedStopCursor(StopCursor)```.
-The Pulsar source will finish when all partitions reach their specified stop
positions.
+In the streaming mode, Pulsar source never stops until a Flink job fails or is
cancelled.
+However, you can use the `setUnboundedStopCursor(StopCursor)` to set the
Pulsar source to stop at a specific stop position.
Review comment:
```suggestion
For unbounded data the Pulsar source never stops until a Flink job is
stopped or failed.
You can use the `setUnboundedStopCursor(StopCursor)` to set the Pulsar
source to stop at a specific stop position.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
StopCursor.atEventTime(long)
```
-### Configurable Options
+### Source Configurable Options
In addition to configuration options described above, you can set arbitrary
options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using
`setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using
`setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
#### PulsarClient Options
-Pulsar connector use the [client
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of
Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of
Pulsar's `ClientConfigurationData`,
which is required for creating a `PulsarClient`, as Flink configuration
options in `PulsarOptions`.
{{< generated/pulsar_client_configuration >}}
#### PulsarAdmin Options
The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used
for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic
pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed
here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses
topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
They are also defined in `PulsarOptions`.
{{< generated/pulsar_admin_configuration >}}
#### Pulsar Consumer Options
In general, Pulsar provides the Reader API and Consumer API for consuming
messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of
Pulsar's `ConsumerConfigurationData` as Flink configuration options in
`PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's
`ConsumerConfigurationData` as Flink configuration options in
`PulsarSourceOptions`.
{{< generated/pulsar_consumer_configuration >}}
#### PulsarSource Options
The configuration options below are mainly used for customizing the
performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
{{< generated/pulsar_source_configuration >}}
### Dynamic Partition Discovery
To handle scenarios like topic scaling-out or topic creation without
restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions
under provided
-topic-partition subscribing pattern. To enable partition discovery, set a
non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can
set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
```java
// discover new partitions per 10 seconds
PulsarSource.builder()
- .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
10000);
+ .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
10000);
```
{{< hint warning >}}
-- Partition discovery is **enabled** by default. Pulsar connector would query
the topic metadata every 30 seconds.
-- You need to set the partition discovery interval to a negative value to
disable this feature.
-- The partition discovery would be disabled in batch mode even if you set this
option with a non-negative value.
+- Partition discovery is **enabled** by default. The Pulsar connector queries
the topic metadata every 30 seconds.
+- To disable partition discovery, you need to set a negative partition
discovery interval.
+- The partition discovery is disabled in the batch mode even if you set this
option with a non-negative value.
{{< /hint >}}
### Event Time and Watermarks
By default, the message uses the timestamp embedded in Pulsar
`Message<byte[]>` as the event time.
-You can define your own `WatermarkStrategy` to extract the event time from the
message,
+You can define a `WatermarkStrategy` to extract the event time from the
message,
Review comment:
The previous sentence is actually better.
```suggestion
You can define your own `WatermarkStrategy` to extract the event time from
the message,
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -206,16 +212,17 @@ PulsarSource.builder().setSubscriptionName("my-shared")
PulsarSource.builder().setSubscriptionName("my-exclusive").setSubscriptionType(SubscriptionType.Exclusive)
```
-If you want to use `Key_Shared` subscription type on the Pulsar connector.
Ensure that you provide a `RangeGenerator` implementation.
-The `RangeGenerator` generates a set of key hash ranges so that
-a respective reader subtask will only dispatch messages where the hash of the
message key is contained in the specified range.
+If you want to use the `Key_Shared` subscription type on the Pulsar connector.
+Ensure that you provide a `RangeGenerator` implementation. The
`RangeGenerator` generates a set of
+key hash ranges so that a respective reader subtask only dispatches messages
where the hash of the
+message key is contained in the specified range.
Review comment:
```suggestion
Ensure that you provide a `RangeGenerator` implementation if you want to use
the `Key_Shared` subscription type on the Pulsar connector.
The `RangeGenerator` generates a set of key hash ranges so that a respective
reader subtask only dispatches messages where the hash of the message key is
contained in the specified range.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
StopCursor.atEventTime(long)
```
-### Configurable Options
+### Source Configurable Options
In addition to configuration options described above, you can set arbitrary
options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using
`setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using
`setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
#### PulsarClient Options
-Pulsar connector use the [client
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of
Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of
Pulsar's `ClientConfigurationData`,
which is required for creating a `PulsarClient`, as Flink configuration
options in `PulsarOptions`.
{{< generated/pulsar_client_configuration >}}
#### PulsarAdmin Options
The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used
for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic
pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed
here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses
topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
They are also defined in `PulsarOptions`.
{{< generated/pulsar_admin_configuration >}}
#### Pulsar Consumer Options
In general, Pulsar provides the Reader API and Consumer API for consuming
messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of
Pulsar's `ConsumerConfigurationData` as Flink configuration options in
`PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's
`ConsumerConfigurationData` as Flink configuration options in
`PulsarSourceOptions`.
{{< generated/pulsar_consumer_configuration >}}
#### PulsarSource Options
The configuration options below are mainly used for customizing the
performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
{{< generated/pulsar_source_configuration >}}
### Dynamic Partition Discovery
To handle scenarios like topic scaling-out or topic creation without
restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions
under provided
-topic-partition subscribing pattern. To enable partition discovery, set a
non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can
set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
Review comment:
```suggestion
job, the Pulsar source periodically discover new partitions under a provided
topic-partition subscription pattern. To enable partition discovery, you can
set a non-negative value for
the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
{{< hint info >}}
Each Pulsar message belongs to an ordered sequence on its topic.
The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on
how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition)
about how the message is stored,
you can create a `MessageId` by using `DefaultImplementation.newMessageId(long
ledgerId, long entryId, int partitionIndex)`.
{{< /hint >}}
### Boundedness
Pulsar source supports streaming and batch running modes.
-By default, the `PulsarSource` is set to run in the streaming mode.
+By default, the `PulsarSource` runs in the streaming mode.
Review comment:
```suggestion
By default, the `PulsarSource` is configured for unbounded data.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -287,65 +292,67 @@ Built-in stop cursors include:
StopCursor.atEventTime(long)
```
-### Configurable Options
+### Source Configurable Options
In addition to configuration options described above, you can set arbitrary
options for `PulsarClient`,
-`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using
`setConfig(ConfigOption<T>, T)` and `setConfig(Configuration)`.
+`PulsarAdmin`, Pulsar `Consumer` and `PulsarSource` by using
`setConfig(ConfigOption<T>, T)`,
+`setConfig(Configuration)` and `setConfig(Properties)`.
#### PulsarClient Options
-Pulsar connector use the [client
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
-to create the `Consumer` instance. Pulsar connector extracts most parts of
Pulsar's `ClientConfigurationData`,
+The Pulsar connector uses the [client
API](https://pulsar.apache.org/docs/en/client-libraries-java/)
+to create the `Consumer` instance. The Pulsar connector extracts most parts of
Pulsar's `ClientConfigurationData`,
which is required for creating a `PulsarClient`, as Flink configuration
options in `PulsarOptions`.
{{< generated/pulsar_client_configuration >}}
#### PulsarAdmin Options
The [admin API](https://pulsar.apache.org/docs/en/admin-api-overview/) is used
for querying topic metadata
-and for discovering the desired topics when Pulsar connector uses topic
pattern subscription. It would share most part of the
-configuration options with the client API. The configuration options listed
here are only used in the admin API.
+and for discovering the desired topics when the Pulsar connector uses
topic-pattern subscription.
+It shares most part of the configuration options with the client API.
+The configuration options listed here are only used in the admin API.
They are also defined in `PulsarOptions`.
{{< generated/pulsar_admin_configuration >}}
#### Pulsar Consumer Options
In general, Pulsar provides the Reader API and Consumer API for consuming
messages in different scenarios.
-Flink's Pulsar connector uses the Consumer API. It extracts most parts of
Pulsar's `ConsumerConfigurationData` as Flink configuration options in
`PulsarSourceOptions`.
+The Pulsar connector uses the Consumer API. It extracts most parts of Pulsar's
`ConsumerConfigurationData` as Flink configuration options in
`PulsarSourceOptions`.
{{< generated/pulsar_consumer_configuration >}}
#### PulsarSource Options
The configuration options below are mainly used for customizing the
performance and message acknowledgement behavior.
-You can just leave them alone if you do not meet any performance issues.
+You can just leave them alone if you do not have any performance issues.
{{< generated/pulsar_source_configuration >}}
### Dynamic Partition Discovery
To handle scenarios like topic scaling-out or topic creation without
restarting the Flink
-job, Pulsar source can be configured to periodically discover new partitions
under provided
-topic-partition subscribing pattern. To enable partition discovery, set a
non-negative value for
-the option `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS`:
+job, Pulsar source periodically discover new partitions under a provided
+topic-partition subscription pattern. To enable partition discovery, you can
set a non-negative value for
+the `PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS` option:
```java
// discover new partitions per 10 seconds
PulsarSource.builder()
- .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
10000);
+ .setConfig(PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS,
10000);
```
{{< hint warning >}}
-- Partition discovery is **enabled** by default. Pulsar connector would query
the topic metadata every 30 seconds.
-- You need to set the partition discovery interval to a negative value to
disable this feature.
-- The partition discovery would be disabled in batch mode even if you set this
option with a non-negative value.
+- Partition discovery is **enabled** by default. The Pulsar connector queries
the topic metadata every 30 seconds.
+- To disable partition discovery, you need to set a negative partition
discovery interval.
+- The partition discovery is disabled in the batch mode even if you set this
option with a non-negative value.
Review comment:
```suggestion
- Partition discovery is disabled for bounded data even if you set this
option with a non-negative value.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
Review comment:
```suggestion
Defining the topics for producing is similar to the [topic-partition
subscription](#topic-partition-subscription)
in the Pulsar source. We support a mix-in style of topic setting. You can
provide a list of topics,
partitions, or both of them.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2",
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding
partitions, Pulsar sink merges them and only use the topic.
Review comment:
```suggestion
If you build the Pulsar sink based on both the topic and its corresponding
partitions, Pulsar sink merges them and only uses the topic.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
Review comment:
```suggestion
No consistency guarantees can be made in this scenario.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2",
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1",
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported
in Pulsar Sink.
Review comment:
```suggestion
A serializer (`PulsarSerializationSchema`) is required for serializing the
record instance into bytes.
Similar to `PulsarSource`, Pulsar sink supports both Flink's
`SerializationSchema` and
Pulsar's `Schema`. Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported
in the Pulsar sink.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2",
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1",
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
interface,
+you can use the predefined `PulsarSerializationSchema`. Pulsar sink provides
two implementation methods.
+
+- Encode the message by using Pulsar's
[Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+ ```java
+ // Primitive types
+ PulsarSerializationSchema.pulsarSchema(Schema)
+
+ // Struct types (JSON, Protobuf, Avro, etc.)
+ PulsarSerializationSchema.pulsarSchema(Schema, Class)
+
+ // KeyValue type
+ PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
+ ```
+- Encode the message by using Flink's `SerializationSchema`
+ ```java
+ PulsarSerializationSchema.flinkSchema(SerializationSchema)
+ ```
+
+[Schema
evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
+can be enabled for users by using `PulsarSerializationSchema.pulsarSchema()`
and
+`PulsarSinkBuilder.enableSchemaEvolution()`, meaning that any broker schema
validation will be in place.
+Here is a code sample below.
+
+```java
+Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
+PulsarSerializationSchema<SomePojo> pulsarSchema =
PulsarSerializationSchema.pulsarSchema(schema, SomePojo.class);
+
+PulsarSink<String> sink = PulsarSink.builder()
+ ...
+ .setSerializationSchema(pulsarSchema)
+ .enableSchemaEvolution()
+ .build();
+```
+
+{{< hint warning >}}
+If you use Pulsar schema without enabling schema evolution, the target topic
will have a `Schema.BYTES` schema.
+And consumers need to handle the deserialization (if needed) themselves.
Review comment:
```suggestion
Consumers will need to handle the deserialization (if needed) themselves.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2",
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1",
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
interface,
Review comment:
Should this really point to a snapshot version?
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2",
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1",
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
interface,
+you can use the predefined `PulsarSerializationSchema`. Pulsar sink provides
two implementation methods.
Review comment:
```suggestion
you can use the predefined `PulsarSerializationSchema`. The Pulsar sink
provides two implementation methods.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2",
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1",
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
Review comment:
```suggestion
For example, when using the `PulsarSink.builder().setTopics("some-topic1",
"some-topic1-partition-0")` option to build the Pulsar sink,
this is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2",
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1",
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
interface,
+you can use the predefined `PulsarSerializationSchema`. Pulsar sink provides
two implementation methods.
+
+- Encode the message by using Pulsar's
[Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+ ```java
+ // Primitive types
+ PulsarSerializationSchema.pulsarSchema(Schema)
+
+ // Struct types (JSON, Protobuf, Avro, etc.)
+ PulsarSerializationSchema.pulsarSchema(Schema, Class)
+
+ // KeyValue type
+ PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
+ ```
+- Encode the message by using Flink's `SerializationSchema`
+ ```java
+ PulsarSerializationSchema.flinkSchema(SerializationSchema)
+ ```
+
+[Schema
evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
+can be enabled for users by using `PulsarSerializationSchema.pulsarSchema()`
and
+`PulsarSinkBuilder.enableSchemaEvolution()`, meaning that any broker schema
validation will be in place.
+Here is a code sample below.
Review comment:
```suggestion
[Schema
evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
can be enabled by users using `PulsarSerializationSchema.pulsarSchema()` and
`PulsarSinkBuilder.enableSchemaEvolution()`. This means that any broker
schema validation is in place.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2",
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
Review comment:
```suggestion
Configuring writing targets can be replaced by using a custom [`TopicRouter`]
[message routing](#message-routing). Configuring partitions on the Pulsar
connector is explained in the [flexible topic naming](#flexible-topic-naming)
section.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -24,14 +24,13 @@ under the License.
# Apache Pulsar Connector
-Flink provides an [Apache Pulsar](https://pulsar.apache.org) source connector
for reading data from Pulsar topics with exactly-once guarantees.
+Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for
reading data from Pulsar topics with exactly-once guarantees.
Review comment:
```suggestion
Flink provides an [Apache Pulsar](https://pulsar.apache.org) connector for
reading and writing data from and to Pulsar topics with exactly-once guarantees.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
Review comment:
```suggestion
The Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
Review comment:
Like for the Source, just remove parts that are referring to older
versions
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
Review comment:
```suggestion
It is recommended to set the producer name in Pulsar Source by
`setProducerName(String)`.
This sets a unique name for the Flink connector in the Pulsar statistic
dashboard.
You can use it to monitor the performance of your Flink connector and
applications.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -248,37 +255,35 @@ Built-in start cursors include:
{{< hint info >}}
Each Pulsar message belongs to an ordered sequence on its topic.
The sequence ID (`MessageId`) of the message is ordered in that sequence.
-`MessageId` contains some extra information (the ledger, entry, partition) on
how the message is stored,
+The `MessageId` contains some extra information (the ledger, entry, partition)
about how the message is stored,
you can create a `MessageId` by using `DefaultImplementation.newMessageId(long
ledgerId, long entryId, int partitionIndex)`.
{{< /hint >}}
### Boundedness
Pulsar source supports streaming and batch running modes.
Review comment:
```suggestion
The Pulsar source supports streaming and batch execution mode.
```
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
Review comment:
```suggestion
The Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]