This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 69e5680  [fix] Flush no batch message when call producer.flush (#98)
69e5680 is described below

commit 69e568096ddc8760d9538fdb770976d1ebbcab0e
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Nov 3 17:07:13 2022 +0800

    [fix] Flush no batch message when call producer.flush (#98)
---
 lib/OpSendMsg.h       |  8 ++++++++
 lib/ProducerImpl.cc   | 22 +++++++++++++++-------
 tests/ProducerTest.cc | 45 ++++++++++++++++++++++++++++++++++++++++++---
 3 files changed, 65 insertions(+), 10 deletions(-)

diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index c94bcbe..d805dd3 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -39,6 +39,7 @@ struct OpSendMsg {
     boost::posix_time::ptime timeout_;
     uint32_t messagesCount_;
     uint64_t messagesSize_;
+    std::vector<std::function<void(Result)>> trackerCallbacks_;
 
     OpSendMsg() = default;
 
@@ -59,6 +60,13 @@ struct OpSendMsg {
         if (sendCallback_) {
             sendCallback_(result, messageId);
         }
+        for (const auto& trackerCallback : trackerCallbacks_) {
+            trackerCallback(result);
+        }
+    }
+
+    void addTrackerCallback(std::function<void(Result)> trackerCallback) {
+        trackerCallbacks_.emplace_back(trackerCallback);
     }
 };
 
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 05ab13d..f3e6120 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -332,17 +332,25 @@ void ProducerImpl::setMessageMetadata(const Message& msg, 
const uint64_t& sequen
 }
 
 void ProducerImpl::flushAsync(FlushCallback callback) {
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
     if (batchMessageContainer_) {
-        if (state_ == Ready) {
-            Lock lock(mutex_);
-            auto failures = batchMessageAndSend(callback);
+        Lock lock(mutex_);
+        auto failures = batchMessageAndSend(callback);
+        lock.unlock();
+        failures.complete();
+    } else {
+        Lock lock(mutex_);
+        if (!pendingMessagesQueue_.empty()) {
+            auto& opSendMsg = pendingMessagesQueue_.back();
             lock.unlock();
-            failures.complete();
+            opSendMsg.addTrackerCallback(callback);
         } else {
-            callback(ResultAlreadyClosed);
+            lock.unlock();
+            callback(ResultOk);
         }
-    } else {
-        callback(ResultOk);
     }
 }
 
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index ee07cbb..77a79e1 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -245,11 +245,10 @@ TEST_P(ProducerTest, testMaxMessageSize) {
     client.close();
 }
 
-TEST_P(ProducerTest, testChunkingMaxMessageSize) {
+TEST(ProducerTest, testChunkingMaxMessageSize) {
     Client client(serviceUrl);
 
-    const auto topic = std::string("ProducerTest-ChunkingMaxMessageSize-") +
-                       (GetParam() ? "batch-" : "no-batch-") + 
std::to_string(time(nullptr));
+    const auto topic = std::string("ProducerTest-ChunkingMaxMessageSize-") + 
std::to_string(time(nullptr));
 
     Consumer consumer;
     ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
@@ -297,4 +296,44 @@ TEST(ProducerTest, testExclusiveProducer) {
     ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName, 
producerConfiguration3, producer3));
 }
 
+TEST_P(ProducerTest, testFlushNoBatch) {
+    Client client(serviceUrl);
+
+    auto partitioned = GetParam();
+    const auto topicName = std::string("testFlushNoBatch") +
+                           (partitioned ? "partitioned-" : "-no-partitioned-") 
+
+                           std::to_string(time(nullptr));
+
+    if (partitioned) {
+        // call admin api to make it partitioned
+        std::string url = adminUrl + "admin/v2/persistent/public/default/" + 
topicName + "/partitions";
+        int res = makePutRequest(url, "5");
+        LOG_INFO("res = " << res);
+        ASSERT_FALSE(res != 204 && res != 409);
+    }
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setBatchingEnabled(false);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, 
producerConfiguration, producer));
+
+    std::atomic_int needCallBack(100);
+    auto cb = [&needCallBack](Result code, const MessageId& msgId) {
+        ASSERT_EQ(code, ResultOk);
+        needCallBack.fetch_sub(1);
+    };
+
+    for (int i = 0; i < 100; ++i) {
+        Message msg = MessageBuilder().setContent("content").build();
+        producer.sendAsync(msg, cb);
+    }
+
+    producer.flush();
+    ASSERT_EQ(needCallBack.load(), 0);
+    producer.close();
+
+    client.close();
+}
+
 INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

Reply via email to