This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 4d087b5 Fix the flush callback might be called repeatedly (#353)
4d087b5 is described below
commit 4d087b5ba273094f651da9adcd2ccfd309aa0fe4
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Nov 21 10:42:01 2023 +0800
Fix the flush callback might be called repeatedly (#353)
Fixes https://github.com/apache/pulsar-client-cpp/issues/352
### Motivation
https://github.com/apache/pulsar-client-cpp/pull/303 adds the flush
callback to the last `OpSendMsg` instead of adding to the batch message
container. However, `batchMessageAndSend` will create an `OpSendMsg` and
add it to the `pendingMessagesQueue_`.
https://github.com/apache/pulsar-client-cpp/blob/7bb94f45b917ed30b5302ac93ffa1f1942fc6313/lib/ProducerImpl.cc#L384-L389
In the code above, `pendingMessagesQueue_` could never be empty and the
callback will be added again by `opSendMsg->addTrackerCallback`. The 1st
time it's added in `createOpSendMsg` or `createOpSendMsgs` called by
`batchMessageAndSend`.
### Motivation
Add the callback to the last `OpSendMsg only when the batch message
container is empty.
In `testFlushBatch`, replace the `flush` call with the `flushAsync` call
and verify the callback is only called once after it's completed.
(cherry picked from commit 37ea76977929d8fd2e74005a4fb7a238b83e3f1c)
---
lib/BatchMessageKeyBasedContainer.cc | 7 ++++++-
lib/ProducerImpl.cc | 40 +++++++++++++++++++++---------------
tests/ProducerTest.cc | 20 ++++++++++++++++--
3 files changed, 48 insertions(+), 19 deletions(-)
diff --git a/lib/BatchMessageKeyBasedContainer.cc
b/lib/BatchMessageKeyBasedContainer.cc
index 2006736..5b18184 100644
--- a/lib/BatchMessageKeyBasedContainer.cc
+++ b/lib/BatchMessageKeyBasedContainer.cc
@@ -77,11 +77,16 @@ std::vector<std::unique_ptr<OpSendMsg>>
BatchMessageKeyBasedContainer::createOpS
// Store raw pointers to use std::sort
std::vector<OpSendMsg*> rawOpSendMsgs;
for (auto& kv : batches_) {
- rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
+ if (!kv.second.empty()) {
+
rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
+ }
}
std::sort(rawOpSendMsgs.begin(), rawOpSendMsgs.end(), [](const OpSendMsg*
lhs, const OpSendMsg* rhs) {
return lhs->sendArgs->sequenceId < rhs->sendArgs->sequenceId;
});
+ if (rawOpSendMsgs.empty()) {
+ return {};
+ }
rawOpSendMsgs.back()->addTrackerCallback(flushCallback);
std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{rawOpSendMsgs.size()};
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 76a999a..61b95bf 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -376,29 +376,37 @@ void ProducerImpl::setMessageMetadata(const Message& msg,
const uint64_t& sequen
void ProducerImpl::flushAsync(FlushCallback callback) {
if (state_ != Ready) {
- callback(ResultAlreadyClosed);
+ if (callback) {
+ callback(ResultAlreadyClosed);
+ }
return;
}
+
+ auto addCallbackToLastOp = [this, &callback] {
+ if (pendingMessagesQueue_.empty()) {
+ return false;
+ }
+ pendingMessagesQueue_.back()->addTrackerCallback(callback);
+ return true;
+ };
+
if (batchMessageContainer_) {
Lock lock(mutex_);
- auto failures = batchMessageAndSend(callback);
- if (!pendingMessagesQueue_.empty()) {
- auto& opSendMsg = pendingMessagesQueue_.back();
- lock.unlock();
- failures.complete();
- opSendMsg->addTrackerCallback(callback);
- } else {
- lock.unlock();
- failures.complete();
- callback(ResultOk);
+
+ if (batchMessageContainer_->isEmpty()) {
+ if (!addCallbackToLastOp() && callback) {
+ lock.unlock();
+ callback(ResultOk);
+ }
+ return;
}
+
+ auto failures = batchMessageAndSend(callback);
+ lock.unlock();
+ failures.complete();
} else {
Lock lock(mutex_);
- if (!pendingMessagesQueue_.empty()) {
- auto& opSendMsg = pendingMessagesQueue_.back();
- lock.unlock();
- opSendMsg->addTrackerCallback(callback);
- } else {
+ if (!addCallbackToLastOp() && callback) {
lock.unlock();
callback(ResultOk);
}
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 9685cc8..45bb3aa 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -476,7 +476,23 @@ TEST_P(ProducerTest, testFlushBatch) {
producer.sendAsync(msg, cb);
}
- producer.flush();
+ auto assertFlushCallbackOnce = [&producer] {
+ Latch latch{1};
+ std::mutex mutex;
+ std::vector<Result> results;
+ producer.flushAsync([&](Result result) {
+ {
+ std::lock_guard<std::mutex> lock{mutex};
+ results.emplace_back(result);
+ }
+ latch.countdown();
+ });
+ latch.wait();
+ std::lock_guard<std::mutex> lock{mutex};
+ ASSERT_EQ(results, (std::vector<Result>{ResultOk}));
+ };
+
+ assertFlushCallbackOnce();
ASSERT_EQ(needCallBack.load(), 0);
producer.close();
@@ -494,7 +510,7 @@ TEST_P(ProducerTest, testFlushBatch) {
producer.sendAsync(msg, cb2);
}
- producer.flush();
+ assertFlushCallbackOnce();
ASSERT_EQ(needCallBack2.load(), 0);
producer.close();