This is an automated email from the ASF dual-hosted git repository.

liuyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-site.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d5d46d6b29 Restructure the producer doc (#589)
0d5d46d6b29 is described below

commit 0d5d46d6b299380eab172757f74ea563e9295523
Author: Zike Yang <[email protected]>
AuthorDate: Sat Jun 10 04:42:56 2023 +0800

    Restructure the producer doc (#589)
---
 docs/client-libraries-producers.md | 463 ++++++++++++-------------------------
 1 file changed, 151 insertions(+), 312 deletions(-)

diff --git a/docs/client-libraries-producers.md 
b/docs/client-libraries-producers.md
index 4a42e1ecbb7..afaae7fa29e 100644
--- a/docs/client-libraries-producers.md
+++ b/docs/client-libraries-producers.md
@@ -22,12 +22,23 @@ This example shows how to create a producer.
 
   <TabItem value="Java">
 
+  Create a producer synchronously:
   ```java
-Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+  Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
                 .topic("my-topic")
                 .create();
   ```
 
+  Create a producer asynchronously:
+  ```java
+  pulsarClient.newProducer(Schema.STRING)
+                .topic("my-topic")
+                .createAsync()
+                .thenAccept(p -> {
+                    log.info("Producer created: {}", p.getProducerName());
+                });
+  ```
+
   </TabItem>
 
   <TabItem value="C++">
@@ -41,9 +52,13 @@ Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING)
 </Tabs>
 ````
 
-## Send messages
+## Publish messages
 
-This example shows how to send messages using producers.
+Pulsar supports both synchronous and asynchronous publishing of messages in 
most clients. In some language-specific clients, such as Node.js and C#, you 
can publish messages synchronously based on the asynchronous method using 
language-specific mechanisms (like `await`).
+
+With async publishment, the producer puts the message in a blocking queue and 
returns it immediately. Then the client library sends the message to the broker 
in the background. If the queue is full (max size configurable), the producer 
is blocked or fails immediately when calling the API, depending on arguments 
passed to the producer.
+
+This example shows how to publish messages using producers. The publish 
operation is done until the broker tells you the message has been successfully 
published. The broker returns the message ID after the message is published 
successfully.
 
 ````mdx-code-block
 <Tabs groupId="lang-choice"
@@ -51,14 +66,22 @@ This example shows how to send messages using producers.
   
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Go","value":"Go"},{"label":"Node.js","value":"Node.js"},{"label":"C#","value":"C#"}]}>
 <TabItem value="Java">
 
-   ```java
+  Publish messages synchronously:
+  ```java
+  MessageId messageId = producer.newMessage()
+                    .value("my-sync-message")
+                    .send();
+  ```
+
+  Publish messages asynchronously:
+  ```java
   producer.newMessage()
-          .key("my-message-key")
           .value("my-sync-message")
-          .send();
-   ```
-
-   You can terminate the builder chain with `sendAsync()` and get a future 
return.
+          .sendAsync()
+          .thenAccept(messageId -> {
+              log.info("Message ID: {}", messageId);
+          });
+  ```
 
   </TabItem>
 
@@ -66,8 +89,7 @@ This example shows how to send messages using producers.
 
   ```cpp
   Message msg = MessageBuilder()
-                      .setContent("content")
-                      .setPartitionKey("my-message-key")
+                      .setContent("my-sync-message")
                       .build();
   Result res = producer.send(msg);
   ```
@@ -78,13 +100,7 @@ This example shows how to send messages using producers.
 
    ```go
     msg := pulsar.ProducerMessage{
-        Payload: []byte("Here is some message data"),
-        Key: "message-key",
-        Properties: map[string]string{
-            "foo": "bar",
-        },
-        EventTime: time.Now(),
-        ReplicationClusters: []string{"cluster1", "cluster3"},
+        Payload: []byte("my-sync-message"),
     }
 
     if _, err := producer.send(msg); err != nil {
@@ -92,23 +108,14 @@ This example shows how to send messages using producers.
     }
    ```
 
-    For all methods of the `ProducerMessage` object, see 
[here](https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#ProducerMessage).
+  For all methods of the `ProducerMessage` object, see [Go API 
doc](https://pkg.go.dev/github.com/apache/pulsar-client-go/pulsar#ProducerMessage).
 
   </TabItem>
   <TabItem value="Node.js">
 
    ```javascript
    const msg = {
-   data: Buffer.from('Hello, Pulsar'),
-   partitionKey: 'key1',
-   properties: {
-       'foo': 'bar',
-   },
-   eventTimestamp: Date.now(),
-   replicationClusters: [
-       'cluster1',
-       'cluster2',
-   ],
+   data: Buffer.from('my-sync-message'),
    }
 
    await producer.send(msg);
@@ -178,90 +185,84 @@ await producer.Send(data);
 </Tabs>
 ````
 
-## Send messages with customized metadata
-
-- Send messages with customized metadata by using the builder.
-
-  ````mdx-code-block
-  <Tabs groupId="lang-choice"
-    defaultValue="C#"
-    
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"C#","value":"C#"}]}>
-
-  <TabItem value="Java">
-
-    ```java
-    producer.newMessage()
-            .value("my-sync-message")
-            .property("my-key", "my-value")
-            .property("my-other-key", "my-other-value")
-            .send();
-    ```
+## Configure messages
 
-  </TabItem>
-
-  <TabItem value="C++">
+You can set various properties of Pulsar's messages. The values of these 
properties are stored in the metadata of a message.
 
-    ```cpp
-    Message msg = MessageBuilder()
-                      .setContent("content")
-                      .setProperty("my-key", "my-value")
-                      .setProperty("my-other-key", "my-other-value")
-                      .build();
-    Result res = producer.send(msg);
-    ```
+````mdx-code-block
+<Tabs groupId="lang-choice"
+  defaultValue="Java"
+  
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"C#","value":"C#"}]}>
 
-  </TabItem>
+<TabItem value="Java">
 
-  <TabItem value="C#">
+  ```java
+  producer.newMessage()
+                .key("my-key") // Set the message key
+                .eventTime(System.currentTimeMillis()) // Set the event time
+                .sequenceId(1203) // Set the sequenceId for the deduplication 
purposes
+                .deliverAfter(1, TimeUnit.HOURS) // Delay message delivery for 
1 hour
+                .property("my-key", "my-value") // Set the customized metadata
+                .property("my-other-key", "my-other-value")
+                .replicationClusters(
+                        Lists.newArrayList("r1", "r2")) // Set the 
geo-replication clusters for this message.
+                .value("content")
+                .send();
+  ```
 
-    ```csharp
-    var messageId = await producer.NewMessage()
-                                .Property("SomeKey", "SomeValue")
-                                .Send(data);
-    ```
+  For the Java client, you can also use `loadConf` to configure the message 
metadata. Here is an example:
+  ```java
+  Map<String, Object> conf = new HashMap<>();
+  conf.put("key", "my-key");
+  conf.put("eventTime", System.currentTimeMillis());
+  producer.newMessage()
+          .value("my-message")
+          .loadConf(conf)
+          .send();
+  ```
 
-    </TabItem>
-  </Tabs>
-  ````
+</TabItem>
 
-## Async send messages
+<TabItem value="C++">
 
-You can publish messages [asynchronously](concepts-clients.md#send-modes) 
using the Java client. With async send, the producer puts the message in a 
blocking queue and returns it immediately. Then the client library sends the 
message to the broker in the background. If the queue is full (max size 
configurable), the producer is blocked or fails immediately when calling the 
API, depending on arguments passed to the producer.
+  ```cpp
+  Message msg = MessageBuilder()
+                    .setContent("content")
+                    .setProperty("my-key", "my-value")
+                    .setProperty("my-other-key", "my-other-value")
+                    .setDeliverAfter(std::chrono::minutes(3)) // Delay message 
delivery for 3 minutes
+                    .build();
+  Result res = producer.send(msg);
+  ```
 
-The following is an example.
+</TabItem>
 
-````mdx-code-block
-<Tabs groupId="lang-choice"
-  defaultValue="Java"
-  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"}]}>
-<TabItem value="Java">
+<TabItem value="Go">
 
-```java
-producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
-    System.out.println("Message with ID " + msgId + " successfully sent");
-});
-```
+  ```go
+  ID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
+       Payload:      []byte(fmt.Sprintf("content")),
+       DeliverAfter: 3 * time.Second, // Delay message delivery for 3 seconds
+   })
+   if err != nil {
+       log.Fatal(err)
+   }
+  ```
 
 </TabItem>
 
-<TabItem value="C++">
+<TabItem value="C#">
 
-```cpp
-Message msg = MessageBuilder()
-                  .setContent("content")
-                  .build();
-producer.sendAsync(msg, [](Result result, MessageId messageId) {
-    std::cout << "Result: " << result << "; Message ID:" << messageId;
-});
-```
+  ```csharp
+  var messageId = await producer.NewMessage()
+                              .Property("SomeKey", "SomeValue")
+                              .Send(data);
+  ```
 
 </TabItem>
-
 </Tabs>
 ````
 
-As you can see from the example above, async send operations return a 
[MessageId](/api/client/org/apache/pulsar/client/api/MessageId) wrapped in a 
[`CompletableFuture`](http://www.baeldung.com/java-completablefuture).
-
 ## Publish messages to partitioned topics
 
 By default, Pulsar topics are served by a single broker, which limits the 
maximum throughput of a topic. *Partitioned topics* can span multiple brokers 
and thus allow for higher throughput.
@@ -270,7 +271,7 @@ You can publish messages to partitioned topics using Pulsar 
client libraries. Wh
 
 ### Use built-in message router
 
-You can specify the [routing mode](concepts-messaging.md#routing-modes) in the 
`ProducerConfiguration` object to configure your producer. The routing mode 
determines which partition (internal topic) each message should be published to.
+The routing mode determines which partition (internal topic) each message 
should be published to.
 
 The following is an example:
 
@@ -281,15 +282,10 @@ The following is an example:
   <TabItem value="Java">
 
    ```java
-   String pulsarBrokerRootUrl = "pulsar://localhost:6650";
-   String topic = "persistent://my-tenant/my-namespace/my-topic";
-
-   PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
    Producer<byte[]> producer = pulsarClient.newProducer()
-      .topic(topic)
+      .topic("my-topic")
       .messageRoutingMode(MessageRoutingMode.SinglePartition)
       .create();
-   producer.send("Partitioned topic message".getBytes());
    ```
 
   </TabItem>
@@ -299,12 +295,12 @@ The following is an example:
    ```cpp
    #include "lib/RoundRobinMessageRouter.h" // Make sure include this header 
file
 
-    Producer producer;
-    Result result = client.createProducer(
-        "persistent://public/default/my-topic",
-        
ProducerConfiguration().setMessageRouter(std::make_shared<RoundRobinMessageRouter>(
-            ProducerConfiguration::BoostHash, true, 1000, 100000, 
boost::posix_time::seconds(1))),
-        producer);
+  Producer producer;
+  Result result = client.createProducer(
+      "my-topic",
+      
ProducerConfiguration().setMessageRouter(std::make_shared<RoundRobinMessageRouter>(
+          ProducerConfiguration::BoostHash, true, 1000, 100000, 
boost::posix_time::seconds(1))),
+      producer);
    ```
 
   </TabItem>
@@ -312,60 +308,14 @@ The following is an example:
   <TabItem value="Go">
 
    ```go
-    client, err := pulsar.NewClient(pulsar.ClientOptions{
-        URL: "pulsar://localhost:6650",
-    })
-
-    if err != nil {
-        log.Fatal(err)
-    }
-    defer client.Close()
-
     producer, err := client.CreateProducer(pulsar.ProducerOptions{
-        Topic: "my-partitioned-topic",
+        Topic: "my-topic",
         MessageRouter: func(msg *pulsar.ProducerMessage, tm 
pulsar.TopicMetadata) int {
             fmt.Println("Topic has", tm.NumPartitions(), "partitions. Routing 
message ", msg, " to partition 2.")
             // always push msg to partition 2
             return 2
         },
     })
-
-    if err != nil {
-        log.Fatal(err)
-    }
-    defer producer.Close()
-
-    for i := 0; i < 10; i++ {
-        if msgId, err := producer.Send(context.Background(), 
&pulsar.ProducerMessage{
-            Payload: []byte(fmt.Sprintf("message-%d", i)),
-        }); err != nil {
-            log.Fatal(err)
-        } else {
-            log.Println("Published message: ", msgId)
-        }
-    }
-
-    // subscribe a specific partition of a topic
-    // for demos only, not recommend to subscribe a specific partition
-    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
-        // pulsar partition is a special topic has the suffix '-partition-xx'
-        Topic:            "my-partitioned-topic-partition-2",
-        SubscriptionName: "my-sub",
-        Type:             pulsar.Shared,
-    })
-    if err != nil {
-        log.Fatal(err)
-    }
-    defer consumer.Close()
-
-    for i := 0; i < 10; i++ {
-        msg, err := consumer.Receive(context.Background())
-        if err != nil {
-            log.Fatal(err)
-        }
-        fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), 
string(msg.Payload()))
-        consumer.Ack(msg)
-    }
    ```
 
   </TabItem>
@@ -377,7 +327,7 @@ The following is an example:
 ````mdx-code-block
 <Tabs groupId="lang-choice"
   defaultValue="Java"
-  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"}]}>
+  
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Go","value":"Go"}]}>
 <TabItem value="Java">
 
 To use a custom message router, you need to provide an implementation of the 
[MessageRouter](/api/client/org/apache/pulsar/client/api/MessageRouter) 
interface, which has just one `choosePartition` method:
@@ -386,13 +336,33 @@ To use a custom message router, you need to provide an 
implementation of the [Me
 public interface MessageRouter extends Serializable {
     int choosePartition(Message msg);
 }
+```
+
+The following router routes every message to partition 10:
+
+```java
+public class AlwaysTenRouter implements MessageRouter {
+    public int choosePartition(Message msg) {
+        return 10;
+    }
+}
+```
+
+With that implementation, you can send messages to partitioned topics as below.
+
+```java
+Producer<byte[]> producer = pulsarClient.newProducer()
+        .topic("my-topic")
+        .messageRouter(new AlwaysTenRouter())
+        .create();
+producer.send("Partitioned topic message".getBytes());
 ```
 
   </TabItem>
 
   <TabItem value="C++">
 
-To use a custom message router, you need to provide an implementation of the 
``MessageRoutingPolicy interface, which has just one `getPartition` method:
+To use a custom message router, you need to provide an implementation of the 
`MessageRoutingPolicy` interface, which has one `getPartition` method:
 
 ```cpp
 class MessageRouter : public MessageRoutingPolicy {
@@ -406,29 +376,7 @@ class MessageRouter : public MessageRoutingPolicy {
 };
 ```
 
-  </TabItem>
-</Tabs>
-````
-
 The following router routes every message to partition 10:
-
-````mdx-code-block
-<Tabs groupId="lang-choice"
-  defaultValue="Java"
-  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"}]}>
-<TabItem value="Java">
-
-```java
-public class AlwaysTenRouter implements MessageRouter {
-    public int choosePartition(Message msg) {
-        return 10;
-    }
-}
-```
-
-  </TabItem>
-  <TabItem value="C++">
-
 ```cpp
 class MessageRouter : public MessageRoutingPolicy {
    public:
@@ -440,44 +388,31 @@ class MessageRouter : public MessageRoutingPolicy {
 };
 ```
 
-  </TabItem>
-</Tabs>
-````
-
 With that implementation, you can send messages to partitioned topics as below.
-
-````mdx-code-block
-<Tabs groupId="lang-choice"
-  defaultValue="Java"
-  values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"}]}>
-<TabItem value="Java">
-
-```java
-String pulsarBrokerRootUrl = "pulsar://localhost:6650";
-String topic = "persistent://my-tenant/my-cluster-my-namespace/my-topic";
-
-PulsarClient pulsarClient = 
PulsarClient.builder().serviceUrl(pulsarBrokerRootUrl).build();
-Producer<byte[]> producer = pulsarClient.newProducer()
-        .topic(topic)
-        .messageRouter(new AlwaysTenRouter())
-        .create();
-producer.send("Partitioned topic message".getBytes());
-```
-
-  </TabItem>
-
-
-<TabItem value="C++">
-
 ```cpp
 Producer producer;
 Result result = client.createProducer(
-    "persistent://public/default/my-topic",
+    "my-topic",
     
ProducerConfiguration().setMessageRouter(std::make_shared<MessageRouter>()),
     producer);
 Message msg = MessageBuilder().setContent("content").build();
 result = producer.send(msg);
 ```
+  </TabItem>
+
+  <TabItem value="Go">
+
+  In the Go client, you can configure a customized message router by passing a 
function.
+   ```go
+    producer, err := client.CreateProducer(pulsar.ProducerOptions{
+        Topic: "my-topic",
+        MessageRouter: func(msg *pulsar.ProducerMessage, tm 
pulsar.TopicMetadata) int {
+            fmt.Println("Topic has", tm.NumPartitions(), "partitions. Routing 
message ", msg, " to partition 10.")
+            // always push msg to partition 10
+            return 10
+        },
+    })
+   ```
 
   </TabItem>
 </Tabs>
@@ -489,16 +424,16 @@ If a message has a key, it supersedes the round robin 
routing policy. The follow
 
 ```java
 // If the message has a key, it supersedes the round robin routing policy
-        if (msg.hasKey()) {
-            return signSafeMod(hash.makeHash(msg.getKey()), 
topicMetadata.numPartitions());
-        }
+if (msg.hasKey()) {
+    return signSafeMod(hash.makeHash(msg.getKey()), 
topicMetadata.numPartitions());
+}
 
-        if (isBatchingEnabled) { // if batching is enabled, choose partition 
on `partitionSwitchMs` boundary.
-            long currentMs = clock.millis();
-            return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, 
topicMetadata.numPartitions());
-        } else {
-            return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), 
topicMetadata.numPartitions());
-        }
+if (isBatchingEnabled) { // if batching is enabled, choose partition on 
`partitionSwitchMs` boundary.
+    long currentMs = clock.millis();
+    return signSafeMod(currentMs / partitionSwitchMs + startPtnIdx, 
topicMetadata.numPartitions());
+} else {
+    return signSafeMod(PARTITION_INDEX_UPDATER.getAndIncrement(this), 
topicMetadata.numPartitions());
+}
 ```
 
 ## Enable chunking
@@ -536,15 +471,6 @@ The message chunking feature is OFF by default. The 
following is an example of h
   <TabItem value="Go">
 
    ```go
-   client, err := pulsar.NewClient(pulsar.ClientOptions{
-       URL: serviceURL,
-   })
-
-   if err != nil {
-          log.Fatal(err)
-   }
-   defer client.Close()
-
    // The message chunking feature is OFF by default.
    // By default, a producer chunks the large message based on the max message 
size (`maxMessageSize`) configured at the broker side (for example, 5MB).
    // Client can also configure the max chunked size using the producer 
configuration `ChunkMaxMessageSize`.
@@ -583,93 +509,6 @@ To enable chunking, you need to disable batching 
(`enableBatching`=`false`) conc
 
 :::
 
-## Configure delayed message delivery
-
-The following is an example of how to configure delayed message delivery for a 
producer.
-
-````mdx-code-block
-<Tabs groupId="lang-choice"
-  defaultValue="Java"
-  
values={[{"label":"Java","value":"Java"},{"label":"C++","value":"C++"},{"label":"Go","value":"Go"}]}>
-<TabItem value="Java">
-
-   ```java
-   // message to be delivered at the configured delay interval
-   producer.newMessage().deliverAfter(3L, TimeUnit.Minute).value("Hello 
Pulsar!").send();
-   ```
-
-  </TabItem>
-  <TabItem value="C++">
-
-   ```cpp
-   Message msg = MessageBuilder().setContent("content")
-                      .setDeliverAfter(std::chrono::minutes(3))
-                      .build();
-   producer.send(msg);
-   ```
-
-  </TabItem>
-  <TabItem value="Go">
-
-   ```go
-   client, err := pulsar.NewClient(pulsar.ClientOptions{
-       URL: "pulsar://localhost:6650",
-   })
-   if err != nil {
-       log.Fatal(err)
-   }
-   defer client.Close()
-
-   topicName := "topic-1"
-   producer, err := client.CreateProducer(pulsar.ProducerOptions{
-       Topic:           topicName,
-       DisableBatching: true,
-   })
-   if err != nil {
-       log.Fatal(err)
-   }
-   defer producer.Close()
-
-   consumer, err := client.Subscribe(pulsar.ConsumerOptions{
-       Topic:            topicName,
-       SubscriptionName: "subName",
-       Type:             pulsar.Shared,
-   })
-   if err != nil {
-       log.Fatal(err)
-   }
-   defer consumer.Close()
-
-   ID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
-       Payload:      []byte(fmt.Sprintf("test")),
-       DeliverAfter: 3 * time.Second,
-   })
-   if err != nil {
-       log.Fatal(err)
-   }
-   fmt.Println(ID)
-
-   ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
-   msg, err := consumer.Receive(ctx)
-   if err != nil {
-       log.Fatal(err)
-   }
-   fmt.Println(msg.Payload())
-   cancel()
-
-   ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
-   msg, err = consumer.Receive(ctx)
-   if err != nil {
-       log.Fatal(err)
-   }
-   fmt.Println(msg.Payload())
-   cancel()
-   ```
-
-  </TabItem>
-</Tabs>
-````
-
 ## Intercept messages
 
 `ProducerInterceptor` intercepts and possibly mutates messages received by the 
producer before they are published to the brokers.
@@ -729,7 +568,7 @@ To intercept messages, you can add a `ProducerInterceptor` 
or multiple ones when
           // Your implementation code
         }
 
-        void close() override { 
+        void close() override {
           // Your implementation code
         }
     };

Reply via email to