Anonymitaet commented on a change in pull request #11570:
URL: https://github.com/apache/pulsar/pull/11570#discussion_r686474425



##########
File path: site2/docs/client-libraries-cpp.md
##########
@@ -253,52 +253,227 @@ pulsar+ssl://pulsar.us-west.example.com:6651
 
 ## Create a consumer
 
-To use Pulsar as a consumer, you need to create a consumer on the C++ client. 
The following is an example. 
+To use Pulsar as a consumer, you need to create a consumer on the C++ client. 
There are two main ways of using the consumer:
+- blocking style: synchronously calling `receive(msg)`
+- asynchronous (event based) style: using a message listener
+
+### Blocking example
+
+The benefit of this approach is that it is the simplest code. Simply keeps 
calling `receive(msg)` which blocks until a message is received.
+
+This example starts a subscription at the earliest offset and consumes 100 
messages.
 
 ```c++
-Client client("pulsar://localhost:6650");
+#include <pulsar/Client.h>
+
+using namespace pulsar;
+
+int main() {
+    Client client("pulsar://localhost:6650");
 
-Consumer consumer;
-Result result = client.subscribe("my-topic", "my-subscription-name", consumer);
-if (result != ResultOk) {
-    LOG_ERROR("Failed to subscribe: " << result);
-    return -1;
+    Consumer consumer;
+    ConsumerConfiguration config;
+    config.setSubscriptionInitialPosition(InitialPositionEarliest);
+    Result result = client.subscribe("persistent://public/default/my-topic", 
"consumer-1", config, consumer);
+    if (result != ResultOk) {
+        std::cout << "Failed to subscribe: " << result << std::endl;
+        return -1;
+    }
+
+    Message msg;
+    int ctr = 0;
+    // consume 100 messages
+    while (ctr < 100) {
+        consumer.receive(msg);
+        std::cout << "Received: " << msg
+            << "  with payload '" << msg.getDataAsString() << "'" << std::endl;
+
+        consumer.acknowledge(msg);
+        ctr++;
+    }
+
+    std::cout << "Finished consuming synchronously!" << std::endl;
+
+    client.close();
+    return 0;
 }
+```
+
+### Consumer with a message listener
+
+We can avoid the need to run a loop with blocking calls with an event based 
style by using a message listener which is invoked for each message that is 
received.
+
+This example starts a subscription at the earliest offset and consumes 100 
messages.
 
-Message msg;
+```c++
+#include <pulsar/Client.h>
+#include <atomic>
+#include <thread>
+
+using namespace pulsar;
 
-while (true) {
-    consumer.receive(msg);
-    LOG_INFO("Received: " << msg
-            << "  with payload '" << msg.getDataAsString() << "'");
+std::atomic<uint32_t> messagesReceived;
 
-    consumer.acknowledge(msg);
+void handleAckComplete(Result res) {
+    std::cout << "Ack res: " << res << std::endl;
 }
 
-client.close();
+void listener(Consumer consumer, const Message& msg) {
+    std::cout << "Got message " << msg << " with content '" << 
msg.getDataAsString() << "'" << std::endl;
+    messagesReceived++;
+    consumer.acknowledgeAsync(msg.getMessageId(), handleAckComplete);
+}
+
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    Consumer consumer;
+    ConsumerConfiguration config;
+    config.setMessageListener(listener);
+    config.setSubscriptionInitialPosition(InitialPositionEarliest);
+    Result result = client.subscribe("persistent://public/default/my-topic", 
"consumer-1", config, consumer);
+    if (result != ResultOk) {
+        std::cout << "Failed to subscribe: " << result << std::endl;
+        return -1;
+    }
+
+    // wait for 100 messages to be consumed
+    while (messagesReceived < 100) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+
+    std::cout << "Finished consuming asynchronously!" << std::endl;
+
+    client.close();
+    return 0;
+}
 ```
 
 ## Create a producer
 
-To use Pulsar as a producer, you need to create a producer on the C++ client. 
The following is an example. 
+To use Pulsar as a producer, you need to create a producer on the C++ client. 
There are two main ways of using a producer:
+- blocking style where each call to `send` waits for an ack from the broker.
+- non-blocking asynchronous style where `sendAsync` is called instead of 
`send` and a callback is supplied for when the ack is received from the broker.
+
+### Simple blocking example
+
+This example sends 100 messages using the blocking style. While simple, it 
does not produce high throughput as it waits for each ack to come back before 
sending the next message.
+
+```c++
+#include <pulsar/Client.h>
+#include <thread>
+
+using namespace pulsar;
+
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    Result result = 
client.createProducer("persistent://public/default/my-topic", producer);
+    if (result != ResultOk) {
+        std::cout << "Error creating producer: " << result << std::endl;
+        return -1;
+    }
+
+    // Send 100 messages synchronously
+    int ctr = 0;
+    while (ctr < 100) {
+        std::string content = "msg" + std::to_string(ctr);
+        Message msg = MessageBuilder().setContent(content).setProperty("x", 
"1").build();
+        Result result = producer.send(msg);
+        if (result != ResultOk) {
+            std::cout << "The message " << content << " could not be sent, 
received code: " << result << std::endl;
+        } else {
+            std::cout << "The message " << content << " sent successfully" << 
std::endl;
+        }
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        ctr++;
+    }
+
+    std::cout << "Finished producing synchronously!" << std::endl;
+
+    client.close();
+    return 0;
+}
+```
+
+### Non-blockging example
+
+This example sends 100 messages using the non-blocking style calling 
`sendAsync` instead of `send`. This allows the producer to have multiple 
messages inflight at a time which increases throughput.
+
+The producer configuration `blockIfQueueFull` is useful here to avoid 
`ResultProducerQueueIsFull` errors when the internal queue for outgoing send 
requests becomes full. Once the internal queue is full, `sendAsync` becomes 
blocking which can make your code simpler.
+
+Without this configuration the `Result` in the callback is the error 
`ResultProducerQueueIsFull` and you must decide how to deal with that.
 
 ```c++
-Client client("pulsar://localhost:6650");
+#include <pulsar/Client.h>
+#include <thread>
+
+using namespace pulsar;
 
-Producer producer;
-Result result = client.createProducer("my-topic", producer);
-if (result != ResultOk) {
-    LOG_ERROR("Error creating producer: " << result);
-    return -1;
+std::atomic<uint32_t> acksReceived;
+
+void callback(Result code, const MessageId& msgId, std::string msgContent) {
+    // message processing logic here
+    std::cout << "Received ack for msg: " << msgContent << " with code: "
+        << code << " -- MsgID: " << msgId << std::endl;
+    acksReceived++;
 }
 
-// Publish 10 messages to the topic
-for (int i = 0; i < 10; i++){
-    Message msg = MessageBuilder().setContent("my-message").build();
-    Result res = producer.send(msg);
-    LOG_INFO("Message sent: " << res);
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    ProducerConfiguration producerConf;
+    producerConf.setBlockIfQueueFull(true);
+    Producer producer;
+    Result result = 
client.createProducer("persistent://public/default/my-topic",
+                                          producerConf, producer);
+    if (result != ResultOk) {
+        std::cout << "Error creating producer: " << result << std::endl;
+        return -1;
+    }
+
+    // Send 100 messages asynchronously
+    int ctr = 0;
+    while (ctr < 100) {
+        std::string content = "msg" + std::to_string(ctr);
+        Message msg = MessageBuilder().setContent(content).setProperty("x", 
"1").build();
+        producer.sendAsync(msg, std::bind(callback,
+                                          std::placeholders::_1, 
std::placeholders::_2, content));
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        ctr++;
+    }
+
+    // wait for 100 messages to be acked
+    while (acksReceived < 100) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+
+    std::cout << "Finished producing asynchronously!" << std::endl;
+
+    client.close();
+    return 0;
 }
-client.close();
+```
+
+### Partitioned Topics and Lazy Producers
+
+When scaling out a Pulsar topic, you may configure a topic to have hundreds of 
partitions. Likewise, you may have also scaled out your producers so there are 
hundreds or even thousands of producers. This can put some strain the the 
Pulsar brokers as when you create a producer on a partitioned topic, internally 
it creates one internal producer per partition which involves communications to 
the brokers for each one. So for a topic with 1000 partitions and 1000 
producers, it ends up creating 1,000,000 internal producers across the producer 
applications, each of which has to communicate with a broker to find out which 
broker it should connect to and then perform the connection handshake.
+
+You can reduce the load caused by this combination of large number of 
partitions and many producers by doing the following:
+- use SinglePartition partition routing mode (this ensures that all messages 
are only sent to a single, randomly selected partition)
+- use non-keyed messages (when messages are keyed, routing is based on the 
hash of the key and so messages will end up being sent to multiple partitions)
+- use lazy producers (this ensures that an internal producer is only created 
on demand when a message needs to be routed to a partition)

Review comment:
       ```suggestion
   You can reduce the load caused by this combination of a large number of 
partitions and many producers by doing the following:
   - use SinglePartition partition routing mode (this ensures that all messages 
are only sent to a single, randomly selected partition)
   - use non-keyed messages (when messages are keyed, routing is based on the 
hash of the key and so messages will end up being sent to multiple partitions)
   - use lazy producers (this ensures that an internal producer is only created 
on demand when a message needs to be routed to a partition)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to