imaffe commented on a change in pull request #19056:
URL: https://github.com/apache/flink/pull/19056#discussion_r836437285



##########
File path: docs/content/docs/connectors/datastream/pulsar.md
##########
@@ -387,27 +391,402 @@ You should enable transaction in the Pulsar 
`borker.conf` file when using these
 transactionCoordinatorEnabled=true
 ```
 
-Pulsar transaction would be created with 3 hours as the timeout by default. 
Make sure that timeout > checkpoint interval + maximum recovery time.
-A shorter checkpoint interval would increase the consuming performance.
-You can change the transaction timeout by using the 
`PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option.
+The default timeout for Pulsar transactions is 3 hours.
+Make sure that that timeout is greater than checkpoint interval + maximum 
recovery time.
+A shorter checkpoint interval indicates a better consuming performance.
+You can use the `PulsarSourceOptions.PULSAR_TRANSACTION_TIMEOUT_MILLIS` option 
to change the transaction timeout.
 
 If checkpointing is disabled or you can not enable the transaction on Pulsar 
broker, you should set
 `PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE` to `true`.
-The message would be immediately acknowledged after consuming.
-We can not promise consistency in this scenario.
+The message is immediately acknowledged after consuming.
+No consistency guarantees can be made in this scenario.
 
 {{< hint info >}}
 All acknowledgements in a transaction are recorded in the Pulsar broker side.
 {{< /hint >}}
 
+## Pulsar Sink
+
+The Pulsar Sink supports writing records into one or more Pulsar topics or a 
specified list of Pulsar partitions.
+
+{{< hint info >}}
+This part describes the Pulsar sink based on the new
+[data 
sink](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
 API.
+
+If you still want to use the legacy `SinkFunction` or on Flink 1.14 or 
previous releases, just use the StreamNative's
+[pulsar-flink](https://github.com/streamnative/pulsar-flink).
+{{< /hint >}}
+
+### Usage
+
+The Pulsar Sink uses a builder class to construct the `PulsarSink` instance.
+This example writes a String record to a Pulsar topic with at-least-once 
delivery guarantee.
+
+```java
+DataStream<String> stream = ...
+
+PulsarSink<String> sink = PulsarSink.builder()
+    .setServiceUrl(serviceUrl)
+    .setAdminUrl(adminUrl)
+    .setTopics("topic1")
+    .setSerializationSchema(PulsarSerializationSchema.flinkSchema(new 
SimpleStringSchema()))
+    .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
+    .build();
+        
+stream.sinkTo(sink);
+```
+
+The following properties are **required** for building PulsarSink:
+
+- Pulsar service url, configured by `setServiceUrl(String)`
+- Pulsar service http url (aka. admin url), configured by `setAdminUrl(String)`
+- Topics / partitions to write, see [writing targets](#writing-targets) for 
more details.
+- Serializer to generate Pulsar messages, see [serializer](#serializer) for 
more details.
+
+It is recommended to set the producer name in Pulsar Source by 
`setProducerName(String)`.
+This sets a unique name for the Flink connector in the Pulsar statistic 
dashboard.
+You can use it to monitor the performance of your Flink connector and 
applications.
+
+### Producing to topics
+
+Defining the topics for producing is similar to the [topic-partition 
subscription](#topic-partition-subscription)
+in the Pulsar source. We support a mix-in style of topic setting. You can 
provide a list of topics,
+partitions, or both of them.
+
+```java
+// Topic "some-topic1" and "some-topic2"
+PulsarSink.builder().setTopics("some-topic1", "some-topic2")
+
+// Partition 0 and 2 of topic "topic-a"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2")
+
+// Partition 0 and 2 of topic "topic-a" and topic "some-topic2"
+PulsarSink.builder().setTopics("topic-a-partition-0", "topic-a-partition-2", 
"some-topic2")
+```
+
+The topics you provide support auto partition discovery. We query the topic 
metadata from the Pulsar in a fixed interval.
+You can use the `PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL` 
option to change the discovery interval option.
+
+Configuring writing targets can be replaced by using a custom [`TopicRouter`]
+[message routing](#message-routing). Configuring partitions on the Pulsar 
connector is explained in the [flexible topic naming](#flexible-topic-naming) 
section.
+
+{{< hint warning >}}
+If you build the Pulsar sink based on both the topic and its corresponding 
partitions, Pulsar sink merges them and only uses the topic.
+
+For example, when using the `PulsarSink.builder().setTopics("some-topic1", 
"some-topic1-partition-0")` option to build the Pulsar sink,
+this is simplified to `PulsarSink.builder().setTopics("some-topic1")`.
+{{< /hint >}}
+
+### Serializer
+
+A serializer (`PulsarSerializationSchema`) is required for serializing the 
record instance into bytes.
+Similar to `PulsarSource`, Pulsar sink supports both Flink's 
`SerializationSchema` and
+Pulsar's `Schema`. Pulsar's `Schema.AUTO_PRODUCE_BYTES()` is not supported in 
the Pulsar sink.
+
+If you do not need the message key and other message properties in Pulsar's
+[Message](https://pulsar.apache.org/api/client/2.9.0-SNAPSHOT/org/apache/pulsar/client/api/Message.html)
 interface,
+you can use the predefined `PulsarSerializationSchema`. The Pulsar sink 
provides two implementation methods.
+
+- Encode the message by using Pulsar's 
[Schema](https://pulsar.apache.org/docs/en/schema-understand/).
+  ```java
+  // Primitive types
+  PulsarSerializationSchema.pulsarSchema(Schema)
+
+  // Struct types (JSON, Protobuf, Avro, etc.)
+  PulsarSerializationSchema.pulsarSchema(Schema, Class)
+
+  // KeyValue type
+  PulsarSerializationSchema.pulsarSchema(Schema, Class, Class)
+  ```
+- Encode the message by using Flink's `SerializationSchema`
+  ```java
+  PulsarSerializationSchema.flinkSchema(SerializationSchema)
+  ```
+
+[Schema 
evolution](https://pulsar.apache.org/docs/en/schema-evolution-compatibility/#schema-evolution)
+can be enabled by users using `PulsarSerializationSchema.pulsarSchema()` and
+`PulsarSinkBuilder.enableSchemaEvolution()`. This means that any broker schema 
validation is in place.
+
+```java
+Schema<SomePojo> schema = Schema.AVRO(SomePojo.class);
+PulsarSerializationSchema<SomePojo> pulsarSchema = 
PulsarSerializationSchema.pulsarSchema(schema, SomePojo.class);
+
+PulsarSink<String> sink = PulsarSink.builder()
+    ...
+    .setSerializationSchema(pulsarSchema)
+    .enableSchemaEvolution()
+    .build();
+```
+
+{{< hint warning >}}
+If you use Pulsar schema without enabling schema evolution, the target topic 
will have a `Schema.BYTES` schema.
+Consumers will need to handle the deserialization (if needed) themselves.
+
+For example, if you set  
`PulsarSerializationSchema.pulsarSchema(Schema.STRING)` without enabling schema 
evolution,
+the schema stored in Pulsar topics is `Schema.BYTES`.
+{{< /hint >}}
+
+### Message Routing
+
+Routing in Pulsar Sink is operated on the partition level. For a list of 
partitioned topics,
+the routing algorithm first collects all partitions from different topics, and 
then calculates routing within all the partitions.
+By default Pulsar Sink supports two router implementation.
+
+- `KeyHashTopicRouter`: use the hashcode of the message's key to decide the 
topic partition that messages are sent to.
+
+  The message key is provided by `PulsarSerializationSchema.key(IN, 
PulsarSinkContext)`
+  You need to implement this interface and extract the message key when you 
want to send the message with the same key to the same topic partition.
+
+  If you do not provide the message key. A topic  partition is randomly chosen 
from the topic list.
+
+  The message key can be hashed in two ways: `MessageKeyHash.JAVA_HASH` and 
`MessageKeyHash.MURMUR3_32_HASH`.
+  You can use the `PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH` option to choose 
the hash method.
+
+- `RoundRobinRouter`: Round-robin among all the partitions.
+
+  All messages are sent to the first partition, and switch to the next 
partition after sending
+  a fixed number of messages. The batch size can be customized by the 
`PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES` option.
+
+Let’s assume there are ten messages and two topics. Topic A has two partitions 
while topic B has three partitions.
+The batch size is set to five messages. In this case, topic A has 5 messages 
per partition which topic B does not receive any messages.
+
+You can configure custom routers by using the `TopicRouter` interface.
+If you implement a `TopicRouter`, ensure that it is serializable.
+And you can return partitions which are not available in the pre-discovered 
partition list.
+
+Thus, you do not need to specify topics using the 
`PulsarSinkBuilder.setTopics` option when you implement the custom topic router.
+
+```java
+@PublicEvolving
+public interface TopicRouter<IN> extends Serializable {
+
+    String route(IN in, List<String> partitions, PulsarSinkContext context);
+
+    default void open(SinkConfiguration sinkConfiguration) {
+        // Nothing to do by default.
+    }
+}
+```
+
+{{< hint info >}}
+Internally, a Pulsar partition is implemented as a topic. The Pulsar client 
provides APIs to hide this
+implementation detail and handles routing under the hood automatically. Pulsar 
Sink uses a lower client
+API to implement its own routing layer to support multiple topics routing.
+
+For details, see  [partitioned 
topics](https://pulsar.apache.org/docs/en/cookbooks-partitioned/).
+{{< /hint >}}
+
+### Delivery Guarantee
+
+`PulsarSink` supports three delivery guarantee semantics.
+
+- `NONE`: Data loss can happen even when the pipeline is running.
+  Basically, we use a fire-and-forget strategy to send records to Pulsar 
topics in this mode.
+  It means that this mode has the highest throughput.
+- `AT_LEAST_ONCE`: No data loss happens, but data duplication can happen after 
a restart from checkpoint.
+- `EXACTLY_ONCE`: No data loss happens. Each record is sent to the Pulsar 
broker only once.
+  Pulsar Sink uses [Pulsar 
transaction](https://pulsar.apache.org/docs/en/transactions/)
+  and two-phase commit (2PC) to ensure records are sent only once even after 
pipeline restarts.
+
+### Delayed message delivery
+
+[Delayed message 
delivery](https://pulsar.apache.org/docs/en/next/concepts-messaging/#delayed-message-delivery)
+enables you to consume a message later. With delayed message enabled,  the 
Pulsar sink sends a message to the Pulsar topic
+**immediately**, but the message is delivered to a consumer once the specified 
delay is over.
+
+Delayed message delivery only works in the `Shared` subscription type. In 
`Exclusive` and `Failover`
+subscription types, the delayed message is dispatched immediately.
+
+You can configure the `MessageDelayer` for judging when sending the message to 
the consumer.
+The default delayer is never delay the message dispatching. You can use the 
`MessageDelayer.fixed(Duration)` option to
+Configure delaying all messages in a fixed duration. You can also implement 
the `MessageDelayer`
+interface to dispatch messages at different time.
+
+{{< hint warning >}}
+The dispatch time should be calculated by the 
`PulsarSinkContext.processTime()`.
+{{< /hint >}}
+
+### Sink Configurable Options
+
+You can set options for `PulsarClient`, `PulsarAdmin`, Pulsar `Producer` and 
`PulsarSink`
+by using `setConfig(ConfigOption<T>, T)`, `setConfig(Configuration)` and 
`setConfig(Properties)`.
+
+#### PulsarClient and PulsarAdmin Options
+
+For details, refer to [PulsarAdmin options](#pulsaradmin-options).
+
+#### Pulsar Producer Options
+
+The Pulsar connector uses the Producer API to send messages. It extracts most 
parts of
+Pulsar's `ProducerConfigurationData` as Flink configuration options in 
`PulsarSinkOptions`.
+
+{{< generated/pulsar_producer_configuration >}}
+
+#### PulsarSink Options
+
+The configuration options below are mainly used for customizing the 
performance and message
+sending behavior. You can just leave them alone if you do not have any 
performance issues.
+
+{{< generated/pulsar_sink_configuration >}}
+
+### Sink Metrics
+
+This table lists supported metrics.
+The first 6 metrics are standard Pulsar Sink metrics as described in
+[FLIP-33: Standardize Connector 
Metrics]([https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics))
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 15%">Scope</th>
+      <th class="text-left" style="width: 18%">Metrics</th>
+      <th class="text-left" style="width: 18%">User Variables</th>
+      <th class="text-left" style="width: 39%">Description</th>
+      <th class="text-left" style="width: 10%">Type</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <th rowspan="13">Operator</th>
+        <td>numBytesOut</td>
+        <td>n/a</td>
+        <td>The total number of output bytes since the sink starts. Count 
towards the numBytesOut in TaskIOMetricsGroup.</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>numBytesOutPerSecond</td>
+        <td>n/a</td>
+        <td>The output bytes per second</td>
+        <td>Meter</td>
+    </tr>
+    <tr>
+        <td>numRecordsOut</td>
+        <td>n/a</td>
+        <td>The total number of output records since the sink starts.</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>numRecordsOutPerSecond</td>
+        <td>n/a</td>
+        <td>The output records per second</td>
+        <td>Meter</td>
+    </tr>
+    <tr>
+        <td>numRecordsOutErrors</td>
+        <td>n/a</td>
+        <td>The total number of records failed to send</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>currentSendTime</td>
+        <td>n/a</td>
+        <td>The time it takes to send the last record, from enqueue the 
message in client buffer to its ack.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.numAcksReceived</td>
+        <td>n/a</td>
+        <td>The number of acks received for sent messages.</td>
+        <td>Counter</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.sendLatencyMax</td>
+        <td>n/a</td>
+        <td>The maximum send latency in the last refresh interval across all 
producers.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency50Pct</td>
+        <td>ProducerName</td>
+        <td>The 50th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency75Pct</td>
+        <td>ProducerName</td>
+        <td>The 75th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency95Pct</td>
+        <td>ProducerName</td>
+        <td>The 95th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency99Pct</td>
+        <td>ProducerName</td>
+        <td>The 99th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+    <tr>
+        <td>PulsarSink.producer."ProducerName".sendLatency999Pct</td>
+        <td>ProducerName</td>
+        <td>The 99.9th percentile of send latency in the last refresh interval 
for a specific producer.</td>
+        <td>Gauge</td>
+    </tr>
+  </tbody>
+</table>
+
+{{< hint info >}}
+- `numBytesOut`, `numRecordsOut`, `numRecordsOutErrors` are retrieved from 
Pulsar client metrics.
+
+- `currentSendTime` tracks the time from when the producer calls `sendAync()` 
to
+  the time when the message is acknowledged by the broker. This metric is not 
available in `NONE` delivery guarantee.
+{{< /hint >}}
+
+The Pulsar producer refreshes its stats every 60 seconds by default. And 
PulsarSink retrieves the Pulsar producer
+stats every 500ms. Thus `numRecordsOut`, `numBytesOut`, `numAcksReceived`, and 
`numRecordsOutErrors` 
+is updated every 60 seconds. To increase the metrics refresh frequency, you 
can change
+the Pulsar producer stats refresh interval to a smaller value (minimum 1 
second), as shown below.
+
+```java
+builder.setConfig(PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS. 1L)
+```
+
+`numBytesOutRate` and `numRecordsOutRate` are calculated based on the 
`numBytesOut` and `numRecordsOUt`
+counter respectively. Flink internally uses a fixed 60 seconds window to 
calculate the rates.
+
+### Brief Design Rationale
+
+Pulsar sink follow the Sink API defined in 
+[FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction).

Review comment:
       Well the PulsarSInk is based on SinkV2 which is the FLIP-191 mentioned 
API




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to