This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 4d087b5  Fix the flush callback might be called repeatedly (#353)
4d087b5 is described below

commit 4d087b5ba273094f651da9adcd2ccfd309aa0fe4
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Nov 21 10:42:01 2023 +0800

    Fix the flush callback might be called repeatedly (#353)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/352
    
    ### Motivation
    
    https://github.com/apache/pulsar-client-cpp/pull/303 adds the flush
    callback to the last `OpSendMsg` instead of adding to the batch message
    container. However, `batchMessageAndSend` will create an `OpSendMsg` and
    add it to the `pendingMessagesQueue_`.
    
    
https://github.com/apache/pulsar-client-cpp/blob/7bb94f45b917ed30b5302ac93ffa1f1942fc6313/lib/ProducerImpl.cc#L384-L389
    
    In the code above, `pendingMessagesQueue_` could never be empty and the
    callback will be added again by `opSendMsg->addTrackerCallback`. The 1st
    time it's added in `createOpSendMsg` or `createOpSendMsgs` called by
    `batchMessageAndSend`.
    
    ### Motivation
    
    Add the callback to the last `OpSendMsg only when the batch message
    container is empty.
    
    In `testFlushBatch`, replace the `flush` call with the `flushAsync` call
    and verify the callback is only called once after it's completed.
    
    (cherry picked from commit 37ea76977929d8fd2e74005a4fb7a238b83e3f1c)
---
 lib/BatchMessageKeyBasedContainer.cc |  7 ++++++-
 lib/ProducerImpl.cc                  | 40 +++++++++++++++++++++---------------
 tests/ProducerTest.cc                | 20 ++++++++++++++++--
 3 files changed, 48 insertions(+), 19 deletions(-)

diff --git a/lib/BatchMessageKeyBasedContainer.cc 
b/lib/BatchMessageKeyBasedContainer.cc
index 2006736..5b18184 100644
--- a/lib/BatchMessageKeyBasedContainer.cc
+++ b/lib/BatchMessageKeyBasedContainer.cc
@@ -77,11 +77,16 @@ std::vector<std::unique_ptr<OpSendMsg>> 
BatchMessageKeyBasedContainer::createOpS
     // Store raw pointers to use std::sort
     std::vector<OpSendMsg*> rawOpSendMsgs;
     for (auto& kv : batches_) {
-        rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
+        if (!kv.second.empty()) {
+            
rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
+        }
     }
     std::sort(rawOpSendMsgs.begin(), rawOpSendMsgs.end(), [](const OpSendMsg* 
lhs, const OpSendMsg* rhs) {
         return lhs->sendArgs->sequenceId < rhs->sendArgs->sequenceId;
     });
+    if (rawOpSendMsgs.empty()) {
+        return {};
+    }
     rawOpSendMsgs.back()->addTrackerCallback(flushCallback);
 
     std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{rawOpSendMsgs.size()};
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 76a999a..61b95bf 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -376,29 +376,37 @@ void ProducerImpl::setMessageMetadata(const Message& msg, 
const uint64_t& sequen
 
 void ProducerImpl::flushAsync(FlushCallback callback) {
     if (state_ != Ready) {
-        callback(ResultAlreadyClosed);
+        if (callback) {
+            callback(ResultAlreadyClosed);
+        }
         return;
     }
+
+    auto addCallbackToLastOp = [this, &callback] {
+        if (pendingMessagesQueue_.empty()) {
+            return false;
+        }
+        pendingMessagesQueue_.back()->addTrackerCallback(callback);
+        return true;
+    };
+
     if (batchMessageContainer_) {
         Lock lock(mutex_);
-        auto failures = batchMessageAndSend(callback);
-        if (!pendingMessagesQueue_.empty()) {
-            auto& opSendMsg = pendingMessagesQueue_.back();
-            lock.unlock();
-            failures.complete();
-            opSendMsg->addTrackerCallback(callback);
-        } else {
-            lock.unlock();
-            failures.complete();
-            callback(ResultOk);
+
+        if (batchMessageContainer_->isEmpty()) {
+            if (!addCallbackToLastOp() && callback) {
+                lock.unlock();
+                callback(ResultOk);
+            }
+            return;
         }
+
+        auto failures = batchMessageAndSend(callback);
+        lock.unlock();
+        failures.complete();
     } else {
         Lock lock(mutex_);
-        if (!pendingMessagesQueue_.empty()) {
-            auto& opSendMsg = pendingMessagesQueue_.back();
-            lock.unlock();
-            opSendMsg->addTrackerCallback(callback);
-        } else {
+        if (!addCallbackToLastOp() && callback) {
             lock.unlock();
             callback(ResultOk);
         }
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 9685cc8..45bb3aa 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -476,7 +476,23 @@ TEST_P(ProducerTest, testFlushBatch) {
         producer.sendAsync(msg, cb);
     }
 
-    producer.flush();
+    auto assertFlushCallbackOnce = [&producer] {
+        Latch latch{1};
+        std::mutex mutex;
+        std::vector<Result> results;
+        producer.flushAsync([&](Result result) {
+            {
+                std::lock_guard<std::mutex> lock{mutex};
+                results.emplace_back(result);
+            }
+            latch.countdown();
+        });
+        latch.wait();
+        std::lock_guard<std::mutex> lock{mutex};
+        ASSERT_EQ(results, (std::vector<Result>{ResultOk}));
+    };
+
+    assertFlushCallbackOnce();
     ASSERT_EQ(needCallBack.load(), 0);
     producer.close();
 
@@ -494,7 +510,7 @@ TEST_P(ProducerTest, testFlushBatch) {
         producer.sendAsync(msg, cb2);
     }
 
-    producer.flush();
+    assertFlushCallbackOnce();
     ASSERT_EQ(needCallBack2.load(), 0);
     producer.close();
 

Reply via email to