merlimat closed pull request #2980: CPP: add triggerFlush to immediately flush 
batch messages in producer::send
URL: https://github.com/apache/pulsar/pull/2980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 6808aad2d7..4e75e0e562 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -234,4 +234,11 @@ Future<Result, ProducerImplBaseWeakPtr> 
PartitionedProducerImpl::getProducerCrea
 
 // override
 bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
+
+void PartitionedProducerImpl::triggerFlush() {
+    for (ProducerList::const_iterator prod = producers_.begin(); prod != 
producers_.end(); prod++) {
+        (*prod)->triggerFlush();
+    }
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index be73ddc426..cf6ddd9a11 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -69,6 +69,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture();
 
+    virtual void triggerFlush();
+
     void handleSinglePartitionProducerCreated(Result result, 
ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                               const unsigned int 
partitionIndex);
 
diff --git a/pulsar-client-cpp/lib/Producer.cc 
b/pulsar-client-cpp/lib/Producer.cc
index 822f41e20d..49adbfc166 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -24,6 +24,7 @@
 #include "ProducerImpl.h"
 
 namespace pulsar {
+DECLARE_LOG_OBJECT()
 
 static const std::string EMPTY_STRING;
 
@@ -37,6 +38,10 @@ Result Producer::send(const Message& msg) {
     Promise<Result, Message> promise;
     sendAsync(msg, WaitForCallbackValue<Message>(promise));
 
+    if (!promise.isComplete()) {
+        impl_->triggerFlush();
+    }
+
     Message m;
     Result result = promise.getFuture().get(m);
     return result;
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 881b1899c8..ddcf671baa 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -285,6 +285,13 @@ void ProducerImpl::statsCallBackHandler(Result res, const 
Message& msg, SendCall
     }
 }
 
+void ProducerImpl::triggerFlush() {
+    if (batchMessageContainer) {
+        Lock lock(mutex_);
+        batchMessageContainer->sendMessage();
+    }
+}
+
 void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
     producerStatsBasePtr_->messageSent(msg);
     SendCallback cb = boost::bind(&ProducerImpl::statsCallBackHandler, this, 
_1, _2, callback,
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h 
b/pulsar-client-cpp/lib/ProducerImpl.h
index 2907d405d7..615eafb7af 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -86,7 +86,9 @@ class ProducerImpl : public HandlerBase,
 
     virtual void shutdown();
 
-    bool isClosed();
+    virtual bool isClosed();
+
+    virtual void triggerFlush();
 
    protected:
     ProducerStatsBasePtr producerStatsBasePtr_;
diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h 
b/pulsar-client-cpp/lib/ProducerImplBase.h
index ec19163765..92f886f563 100644
--- a/pulsar-client-cpp/lib/ProducerImplBase.h
+++ b/pulsar-client-cpp/lib/ProducerImplBase.h
@@ -42,6 +42,7 @@ class ProducerImplBase {
     virtual bool isClosed() = 0;
     virtual const std::string& getTopic() const = 0;
     virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() 
= 0;
+    virtual void triggerFlush() = 0;
 };
 }  // namespace pulsar
 #endif  // PULSAR_PRODUCER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 73430fc291..122a643b85 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1955,5 +1955,207 @@ TEST(BasicEndToEndTest, 
testPatternMultiTopicsConsumerAutoDiscovery) {
 
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
 
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = 
"persistent://property/cluster/namespace/test-flush-batch-messages";
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    int numOfMessages = 10;
+
+    ProducerConfiguration conf;
+
+    conf.setBatchingEnabled(true);
+    // set batch message number numOfMessages, and max delay 60s
+    conf.setBatchingMaxMessages(numOfMessages);
+    conf.setBatchingMaxPublishDelayMs(60000);
+
+    conf.setBlockIfQueueFull(true);
+    conf.setProperty("producer-name", "test-producer-name");
+    conf.setProperty("producer-id", "test-producer-id");
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, conf, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // Send Asynchronously of half the messages
+    std::string prefix = "msg-batch-async";
+    for (int i = 0; i < numOfMessages / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending first half messages in async, should timeout to 
receive");
+
+    // message not reached max batch number, should not receive any data.
+    Message receivedMsg;
+    ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 5000));
+
+    // Send Asynchronously of the other half the messages
+    for (int i = numOfMessages / 2; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending the other half messages in async, should able to 
receive");
+    // message not reached max batch number, should received the messages
+    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000));
+
+    // receive all the messages.
+    int i = 1;
+    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
+        std::string expectedMessageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        LOG_DEBUG("Received Message with [ content - " << 
receivedMsg.getDataAsString() << "] [ messageID = "
+                                                       << 
receivedMsg.getMessageId() << "]");
+        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), 
boost::lexical_cast<std::string>(i++));
+        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+    }
+
+    // Send sync of half the messages, this will triggerFlush, and could get 
the messages.
+    prefix = "msg-batch-sync";
+    for (int i = 0; i < numOfMessages / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.send(msg);
+        LOG_DEBUG("sync sending message " << messageContent);
+    }
+    // message not reached max batch number, should received the messages, and 
not timeout
+    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000));
+
+    producer.close();
+    client.shutdown();
+}
+
+// for partitioned reason, it may hard to verify message id.
+static void simpleCallback(Result code, const Message& msg) {
+    LOG_INFO("Received code: " << code << " -- Msg: " << msg);
+}
+
+TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
+    Client client(lookupUrl);
+    std::string topicName = 
"persistent://prop/unit/ns/partition-testSyncFlushBatchMessages";
+    // call admin api to make it partitioned
+    std::string url =
+        adminUrl + 
"admin/persistent/prop/unit/ns/partition-testSyncFlushBatchMessages/partitions";
+    int res = makePutRequest(url, "5");
+    int numberOfPartitions = 5;
+
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer;
+    int numOfMessages = 20;
+    ProducerConfiguration tempProducerConfiguration;
+    
tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    ProducerConfiguration producerConfiguration = tempProducerConfiguration;
+    producerConfiguration.setBatchingEnabled(true);
+    // set batch message number numOfMessages, and max delay 60s
+    producerConfiguration.setBatchingMaxMessages(numOfMessages / 
numberOfPartitions);
+    producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+
+    Result result = client.createProducer(topicName, producerConfiguration, 
producer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(producer.getTopic(), topicName);
+
+    // Topic is partitioned into 5 partitions so each partition will receive 
two messages
+    LOG_INFO("Creating Subscriber");
+    std::string consumerId = "CONSUMER";
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerExclusive);
+    consConfig.setReceiverQueueSize(2);
+    ASSERT_FALSE(consConfig.hasMessageListener());
+    Consumer consumer[numberOfPartitions];
+    Result subscribeResult;
+    for (int i = 0; i < numberOfPartitions; i++) {
+        std::stringstream partitionedTopicName;
+        partitionedTopicName << topicName << "-partition-" << i;
+
+        std::stringstream partitionedConsumerId;
+        partitionedConsumerId << consumerId << i;
+        subscribeResult = client.subscribe(partitionedTopicName.str(), 
partitionedConsumerId.str(),
+                                           consConfig, consumer[i]);
+
+        ASSERT_EQ(ResultOk, subscribeResult);
+        ASSERT_EQ(consumer[i].getTopic(), partitionedTopicName.str());
+    }
+
+    // Send asynchronously of first part the messages
+    std::string prefix = "msg-batch-async";
+    for (int i = 0; i < numOfMessages / numberOfPartitions / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, simpleCallback);
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending first part messages in async, should timeout to 
receive");
+
+    Message m;
+    ASSERT_EQ(ResultTimeout, consumer[0].receive(m, 5000));
+
+    for (int i = numOfMessages / numberOfPartitions / 2; i < numOfMessages; 
i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, simpleCallback);
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending second part messages in async, should be able to 
receive");
+
+    for (int i = 0; i < numOfMessages / numberOfPartitions; i++) {
+        for (int partitionIndex = 0; partitionIndex < numberOfPartitions; 
partitionIndex++) {
+            ASSERT_EQ(ResultOk, consumer[partitionIndex].receive(m));
+            ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m));
+        }
+    }
+
+    // Sync send of first part of the messages, this will triggerFlush, and 
could get the messages.
+    prefix = "msg-batch-sync";
+    for (int i = 0; i < numOfMessages / numberOfPartitions / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.send(msg);
+        LOG_DEBUG("sync sending message " << messageContent);
+    }
+    LOG_INFO("sending first part messages in sync, should not timeout to 
receive");
+    ASSERT_EQ(ResultOk, consumer[0].receive(m, 5000));
+
+    producer.close();
     client.shutdown();
 }
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc 
b/pulsar-client-cpp/tests/BatchMessageTest.cc
index 34c38db846..894a582903 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -139,7 +139,10 @@ TEST(BatchMessageTest, testProducerTimeout) {
         /* Start the timer */
         start = time(NULL);
         LOG_DEBUG("start = " << start);
-        producer.send(msg);
+        Promise<Result, Message> promise;
+        producer.sendAsync(msg, WaitForCallbackValue<Message>(promise));
+        Message m;
+        promise.getFuture().get(m);
         /* End the timer */
         end = time(NULL);
         LOG_DEBUG("end = " << end);
diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc 
b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
index 09c4b756a4..684302733a 100644
--- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
@@ -266,6 +266,7 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
     int numOfMessages = 7 * 5;  // 5 message per partition
     Promise<Result, Producer> producerPromise;
     ProducerConfiguration config;
+    config.setBatchingEnabled(false);
     
config.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
     client.createProducerAsync(topicName, config, 
WaitForCallbackValue<Producer>(producerPromise));
     Future<Result, Producer> producerFuture = producerPromise.getFuture();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to