syhily commented on a change in pull request #19056:
URL: https://github.com/apache/flink/pull/19056#discussion_r827861301
##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +394,404 @@ You should enable transaction in the Pulsar
`borker.conf` file when using these
transactionCoordinatorEnabled=true
```
-Pulsar transaction would be created with 3 hours as the timeout by default.
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+Pulsar transaction is created within 3 hours as the timeout by default.
+Ensure that that timeout is greater than checkpoint interval + maximum
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option
to change the transaction timeout.
If checkpointing is disabled or you can not enable the transaction on Pulsar
broker, you should set
`PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+However, we can not promise the consistency in this scenario.
{{< hint info >}}
All acknowledgements in a transaction are recorded in the Pulsar broker side.
{{< /hint >}}
+## Pulsar Sink
+
+Pulsar Sink supports writing records into one or more Pulsar topics or a
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+ .setServiceUrl(serviceUrl)
+ .setAdminUrl(adminUrl)
+ .setTopics("topic1")
+ .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new
SimpleStringSchema()))
+ .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+ .build();
+
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for
more details.
+
+It is **recommended** to set the producer name in Pulsar Sink by
`setProducerName(String)`.
+This gives a unique name to the flink connector in the Pulsar statistic
dashboard.
+Give you a mechanism to monitor the performance of your flink applications.
+
+### Writing Topics
+
+Setting the topics to write is a bit like the [topic-partition
subscription](#topic-partition-subscription)
+in Pulsar source. We support a mixin style of topic setting. Therefore, you
can provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2",
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL`
option to change the discovery interval option.
+
+Configure writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). And read [flexible topic
naming](#flexible-topic-naming)
+for understanding how to configure partitions on the Pulsar connector.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding
partitions, Pulsar sink merges them and only use the topic.
+
+For example, it uses the `PulsarSink.builder().setTopics("some-topic1",
"some-topic1-partition-0")` option to build the Pulsar sink,
+it is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required to serializing the
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's
`SerializationSchema` and
+Pulsar's `Schema`. But Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported
in Pulsar Sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
interface,
Review comment:
Pulsar use `SNAPSHOT` javadocs. There is no stable version. I'm afraid
we have to use the snapshot here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]