This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 424034b  add triggerFlush to immediately flush batch messages in 
producer::send (#2980)
424034b is described below

commit 424034b5393bbf6440ac1c301febb98b64299fe6
Author: Jia Zhai <[email protected]>
AuthorDate: Wed Nov 14 13:24:22 2018 +0800

    add triggerFlush to immediately flush batch messages in producer::send 
(#2980)
---
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc |   7 +
 pulsar-client-cpp/lib/PartitionedProducerImpl.h  |   2 +
 pulsar-client-cpp/lib/Producer.cc                |   5 +
 pulsar-client-cpp/lib/ProducerImpl.cc            |   7 +
 pulsar-client-cpp/lib/ProducerImpl.h             |   4 +-
 pulsar-client-cpp/lib/ProducerImplBase.h         |   1 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 202 +++++++++++++++++++++++
 pulsar-client-cpp/tests/BatchMessageTest.cc      |   5 +-
 pulsar-client-cpp/tests/ConsumerStatsTest.cc     |   1 +
 9 files changed, 232 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 6808aad..4e75e0e 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -234,4 +234,11 @@ Future<Result, ProducerImplBaseWeakPtr> 
PartitionedProducerImpl::getProducerCrea
 
 // override
 bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
+
+void PartitionedProducerImpl::triggerFlush() {
+    for (ProducerList::const_iterator prod = producers_.begin(); prod != 
producers_.end(); prod++) {
+        (*prod)->triggerFlush();
+    }
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index be73ddc..cf6ddd9 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -69,6 +69,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture();
 
+    virtual void triggerFlush();
+
     void handleSinglePartitionProducerCreated(Result result, 
ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                               const unsigned int 
partitionIndex);
 
diff --git a/pulsar-client-cpp/lib/Producer.cc 
b/pulsar-client-cpp/lib/Producer.cc
index 822f41e..49adbfc 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -24,6 +24,7 @@
 #include "ProducerImpl.h"
 
 namespace pulsar {
+DECLARE_LOG_OBJECT()
 
 static const std::string EMPTY_STRING;
 
@@ -37,6 +38,10 @@ Result Producer::send(const Message& msg) {
     Promise<Result, Message> promise;
     sendAsync(msg, WaitForCallbackValue<Message>(promise));
 
+    if (!promise.isComplete()) {
+        impl_->triggerFlush();
+    }
+
     Message m;
     Result result = promise.getFuture().get(m);
     return result;
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 881b189..ddcf671 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -285,6 +285,13 @@ void ProducerImpl::statsCallBackHandler(Result res, const 
Message& msg, SendCall
     }
 }
 
+void ProducerImpl::triggerFlush() {
+    if (batchMessageContainer) {
+        Lock lock(mutex_);
+        batchMessageContainer->sendMessage();
+    }
+}
+
 void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
     producerStatsBasePtr_->messageSent(msg);
     SendCallback cb = boost::bind(&ProducerImpl::statsCallBackHandler, this, 
_1, _2, callback,
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h 
b/pulsar-client-cpp/lib/ProducerImpl.h
index 2907d40..615eafb 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -86,7 +86,9 @@ class ProducerImpl : public HandlerBase,
 
     virtual void shutdown();
 
-    bool isClosed();
+    virtual bool isClosed();
+
+    virtual void triggerFlush();
 
    protected:
     ProducerStatsBasePtr producerStatsBasePtr_;
diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h 
b/pulsar-client-cpp/lib/ProducerImplBase.h
index ec19163..92f886f 100644
--- a/pulsar-client-cpp/lib/ProducerImplBase.h
+++ b/pulsar-client-cpp/lib/ProducerImplBase.h
@@ -42,6 +42,7 @@ class ProducerImplBase {
     virtual bool isClosed() = 0;
     virtual const std::string& getTopic() const = 0;
     virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() 
= 0;
+    virtual void triggerFlush() = 0;
 };
 }  // namespace pulsar
 #endif  // PULSAR_PRODUCER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 73430fc..122a643 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1956,4 +1956,206 @@ TEST(BasicEndToEndTest, 
testPatternMultiTopicsConsumerAutoDiscovery) {
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
 
     client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testSyncFlushBatchMessages) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = 
"persistent://property/cluster/namespace/test-flush-batch-messages";
+    std::string subName = "subscription-name";
+    Producer producer;
+
+    int numOfMessages = 10;
+
+    ProducerConfiguration conf;
+
+    conf.setBatchingEnabled(true);
+    // set batch message number numOfMessages, and max delay 60s
+    conf.setBatchingMaxMessages(numOfMessages);
+    conf.setBatchingMaxPublishDelayMs(60000);
+
+    conf.setBlockIfQueueFull(true);
+    conf.setProperty("producer-name", "test-producer-name");
+    conf.setProperty("producer-id", "test-producer-id");
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, conf, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // Send Asynchronously of half the messages
+    std::string prefix = "msg-batch-async";
+    for (int i = 0; i < numOfMessages / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending first half messages in async, should timeout to 
receive");
+
+    // message not reached max batch number, should not receive any data.
+    Message receivedMsg;
+    ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 5000));
+
+    // Send Asynchronously of the other half the messages
+    for (int i = numOfMessages / 2; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending the other half messages in async, should able to 
receive");
+    // message not reached max batch number, should received the messages
+    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000));
+
+    // receive all the messages.
+    int i = 1;
+    while (consumer.receive(receivedMsg, 5000) == ResultOk) {
+        std::string expectedMessageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        LOG_DEBUG("Received Message with [ content - " << 
receivedMsg.getDataAsString() << "] [ messageID = "
+                                                       << 
receivedMsg.getMessageId() << "]");
+        ASSERT_EQ(receivedMsg.getProperty("msgIndex"), 
boost::lexical_cast<std::string>(i++));
+        ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+    }
+
+    // Send sync of half the messages, this will triggerFlush, and could get 
the messages.
+    prefix = "msg-batch-sync";
+    for (int i = 0; i < numOfMessages / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.send(msg);
+        LOG_DEBUG("sync sending message " << messageContent);
+    }
+    // message not reached max batch number, should received the messages, and 
not timeout
+    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 5000));
+
+    producer.close();
+    client.shutdown();
+}
+
+// for partitioned reason, it may hard to verify message id.
+static void simpleCallback(Result code, const Message& msg) {
+    LOG_INFO("Received code: " << code << " -- Msg: " << msg);
+}
+
+TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
+    Client client(lookupUrl);
+    std::string topicName = 
"persistent://prop/unit/ns/partition-testSyncFlushBatchMessages";
+    // call admin api to make it partitioned
+    std::string url =
+        adminUrl + 
"admin/persistent/prop/unit/ns/partition-testSyncFlushBatchMessages/partitions";
+    int res = makePutRequest(url, "5");
+    int numberOfPartitions = 5;
+
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer;
+    int numOfMessages = 20;
+    ProducerConfiguration tempProducerConfiguration;
+    
tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    ProducerConfiguration producerConfiguration = tempProducerConfiguration;
+    producerConfiguration.setBatchingEnabled(true);
+    // set batch message number numOfMessages, and max delay 60s
+    producerConfiguration.setBatchingMaxMessages(numOfMessages / 
numberOfPartitions);
+    producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+
+    Result result = client.createProducer(topicName, producerConfiguration, 
producer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(producer.getTopic(), topicName);
+
+    // Topic is partitioned into 5 partitions so each partition will receive 
two messages
+    LOG_INFO("Creating Subscriber");
+    std::string consumerId = "CONSUMER";
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerExclusive);
+    consConfig.setReceiverQueueSize(2);
+    ASSERT_FALSE(consConfig.hasMessageListener());
+    Consumer consumer[numberOfPartitions];
+    Result subscribeResult;
+    for (int i = 0; i < numberOfPartitions; i++) {
+        std::stringstream partitionedTopicName;
+        partitionedTopicName << topicName << "-partition-" << i;
+
+        std::stringstream partitionedConsumerId;
+        partitionedConsumerId << consumerId << i;
+        subscribeResult = client.subscribe(partitionedTopicName.str(), 
partitionedConsumerId.str(),
+                                           consConfig, consumer[i]);
+
+        ASSERT_EQ(ResultOk, subscribeResult);
+        ASSERT_EQ(consumer[i].getTopic(), partitionedTopicName.str());
+    }
+
+    // Send asynchronously of first part the messages
+    std::string prefix = "msg-batch-async";
+    for (int i = 0; i < numOfMessages / numberOfPartitions / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, simpleCallback);
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending first part messages in async, should timeout to 
receive");
+
+    Message m;
+    ASSERT_EQ(ResultTimeout, consumer[0].receive(m, 5000));
+
+    for (int i = numOfMessages / numberOfPartitions / 2; i < numOfMessages; 
i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, simpleCallback);
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending second part messages in async, should be able to 
receive");
+
+    for (int i = 0; i < numOfMessages / numberOfPartitions; i++) {
+        for (int partitionIndex = 0; partitionIndex < numberOfPartitions; 
partitionIndex++) {
+            ASSERT_EQ(ResultOk, consumer[partitionIndex].receive(m));
+            ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m));
+        }
+    }
+
+    // Sync send of first part of the messages, this will triggerFlush, and 
could get the messages.
+    prefix = "msg-batch-sync";
+    for (int i = 0; i < numOfMessages / numberOfPartitions / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.send(msg);
+        LOG_DEBUG("sync sending message " << messageContent);
+    }
+    LOG_INFO("sending first part messages in sync, should not timeout to 
receive");
+    ASSERT_EQ(ResultOk, consumer[0].receive(m, 5000));
+
+    producer.close();
+    client.shutdown();
 }
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc 
b/pulsar-client-cpp/tests/BatchMessageTest.cc
index 34c38db..894a582 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -139,7 +139,10 @@ TEST(BatchMessageTest, testProducerTimeout) {
         /* Start the timer */
         start = time(NULL);
         LOG_DEBUG("start = " << start);
-        producer.send(msg);
+        Promise<Result, Message> promise;
+        producer.sendAsync(msg, WaitForCallbackValue<Message>(promise));
+        Message m;
+        promise.getFuture().get(m);
         /* End the timer */
         end = time(NULL);
         LOG_DEBUG("end = " << end);
diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc 
b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
index 09c4b75..6843027 100644
--- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
@@ -266,6 +266,7 @@ TEST(ConsumerStatsTest, testAsyncCallOnPartitionedTopic) {
     int numOfMessages = 7 * 5;  // 5 message per partition
     Promise<Result, Producer> producerPromise;
     ProducerConfiguration config;
+    config.setBatchingEnabled(false);
     
config.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
     client.createProducerAsync(topicName, config, 
WaitForCallbackValue<Producer>(producerPromise));
     Future<Result, Producer> producerFuture = producerPromise.getFuture();

Reply via email to