This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push: new 7134705ef42 [C++] Fix send callback might not be invoked in key based batching (#14898) 7134705ef42 is described below commit 7134705ef42d8e5d099ebd1810e57d44bbad4a0d Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Tue Mar 29 01:45:14 2022 +0800 [C++] Fix send callback might not be invoked in key based batching (#14898) * [C++] Fix send callback might not be invoked in key based batching ### Motivation When C++ client enables key based batching, there is a chance that the send callback is not invoked. See https://github.com/apache/pulsar/blob/32df93f693bfdf42953bd728a12ecdea1796dcc8/pulsar-client-cpp/lib/ProducerImpl.cc#L272-L275 If a batch container has multiple batches, only one batch could be processed during `closeAsync`. Even worse, the semaphores of other batches won't be released. ### Modifications - Add a `clearPendingBatches` method to clear all pending batches and process them. Then call this method in `closeAsync` and `getPendingCallbacksWhenFailed`. - Add a test `testCloseBeforeSend` to verify when a producer has multiple pending batches, all callbacks can be invoked in `closeAsync`. * Add processAndClear() to batch message container (cherry picked from commit f3295ff0b14526de27791493d4c45cf814ef3654) --- pulsar-client-cpp/lib/BatchMessageContainerBase.h | 26 +++++++++++++ pulsar-client-cpp/lib/ProducerImpl.cc | 47 ++++++----------------- pulsar-client-cpp/tests/KeyBasedBatchingTest.cc | 32 ++++++++++++++- 3 files changed, 69 insertions(+), 36 deletions(-) diff --git a/pulsar-client-cpp/lib/BatchMessageContainerBase.h b/pulsar-client-cpp/lib/BatchMessageContainerBase.h index 8a32d8e9dca..71eef5fab62 100644 --- a/pulsar-client-cpp/lib/BatchMessageContainerBase.h +++ b/pulsar-client-cpp/lib/BatchMessageContainerBase.h @@ -112,6 +112,9 @@ class BatchMessageContainerBase : public boost::noncopyable { bool hasEnoughSpace(const Message& msg) const noexcept; bool isEmpty() const noexcept; + void processAndClear(std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, + FlushCallback flushCallback); + protected: // references to ProducerImpl's fields const std::string& topicName_; @@ -157,6 +160,29 @@ inline void BatchMessageContainerBase::resetStats() { sizeInBytes_ = 0; } +inline void BatchMessageContainerBase::processAndClear( + std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, FlushCallback flushCallback) { + if (isEmpty()) { + if (flushCallback) { + flushCallback(ResultOk); + } + } else { + const auto numBatches = getNumBatches(); + if (numBatches == 1) { + OpSendMsg opSendMsg; + Result result = createOpSendMsg(opSendMsg, flushCallback); + opSendMsgCallback(result, opSendMsg); + } else if (numBatches > 1) { + std::vector<OpSendMsg> opSendMsgs; + std::vector<Result> results = createOpSendMsgs(opSendMsgs, flushCallback); + for (size_t i = 0; i < results.size(); i++) { + opSendMsgCallback(results[i], opSendMsgs[i]); + } + } // else numBatches is 0, do nothing + } + clear(); +} + inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainerBase& container) { container.serialize(os); return os; diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index e9812d46054..e15d388ef64 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -268,13 +268,14 @@ std::shared_ptr<ProducerImpl::PendingCallbacks> ProducerImpl::getPendingCallback } if (batchMessageContainer_) { - OpSendMsg opSendMsg; - if (batchMessageContainer_->createOpSendMsg(opSendMsg) == ResultOk) { - callbacks->opSendMsgs.emplace_back(opSendMsg); - } - - releaseSemaphoreForSendOp(opSendMsg); - batchMessageContainer_->clear(); + batchMessageContainer_->processAndClear( + [this, &callbacks](Result result, const OpSendMsg& opSendMsg) { + if (result == ResultOk) { + callbacks->opSendMsgs.emplace_back(opSendMsg); + } + releaseSemaphoreForSendOp(opSendMsg); + }, + nullptr); } pendingMessagesQueue_.clear(); @@ -507,15 +508,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_); batchTimer_->cancel(); - if (PULSAR_UNLIKELY(batchMessageContainer_->isEmpty())) { - if (flushCallback) { - flushCallback(ResultOk); - } - } else { - const size_t numBatches = batchMessageContainer_->getNumBatches(); - if (numBatches == 1) { - OpSendMsg opSendMsg; - Result result = batchMessageContainer_->createOpSendMsg(opSendMsg, flushCallback); + batchMessageContainer_->processAndClear( + [this, &failures](Result result, const OpSendMsg& opSendMsg) { if (result == ResultOk) { sendMessage(opSendMsg); } else { @@ -525,25 +519,8 @@ PendingFailures ProducerImpl::batchMessageAndSend(const FlushCallback& flushCall releaseSemaphoreForSendOp(opSendMsg); failures.add(std::bind(opSendMsg.sendCallback_, result, MessageId{})); } - } else if (numBatches > 1) { - std::vector<OpSendMsg> opSendMsgs; - std::vector<Result> results = batchMessageContainer_->createOpSendMsgs(opSendMsgs, flushCallback); - for (size_t i = 0; i < results.size(); i++) { - if (results[i] == ResultOk) { - sendMessage(opSendMsgs[i]); - } else { - // A spot has been reserved for this batch, but the batch failed to be pushed to the - // queue, so we need to release the spot manually - LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsgs[" << i - << "]: " << results[i]); - releaseSemaphoreForSendOp(opSendMsgs[i]); - failures.add(std::bind(opSendMsgs[i].sendCallback_, results[i], MessageId{})); - } - } - } // else numBatches is 0, do nothing - } - - batchMessageContainer_->clear(); + }, + flushCallback); return failures; } diff --git a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc index 3bec21ac3d7..fcb558a7dad 100644 --- a/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc +++ b/pulsar-client-cpp/tests/KeyBasedBatchingTest.cc @@ -41,7 +41,6 @@ class KeyBasedBatchingTest : public ::testing::Test { void TearDown() override { client_.close(); } - void setTopicName(const std::string& topicName) { topicName_ = topicName; } void initTopicName(const std::string& testName) { topicName_ = "KeyBasedBatchingTest-" + testName + "-" + std::to_string(time(nullptr)); } @@ -179,3 +178,34 @@ TEST_F(KeyBasedBatchingTest, testSingleBatch) { ASSERT_EQ(ResultTimeout, consumer_.receive(msg, 3000)); ASSERT_EQ(numMessageSent.load(), numMessages); } + +TEST_F(KeyBasedBatchingTest, testCloseBeforeSend) { + initTopicName("CloseBeforeSend"); + // Any asynchronous send won't be completed unless `close()` or `flush()` is triggered + initProducer(createDefaultProducerConfig().setBatchingMaxMessages(static_cast<unsigned>(-1))); + + std::mutex mtx; + std::vector<Result> results; + auto saveResult = [&mtx, &results](Result result) { + std::lock_guard<std::mutex> lock(mtx); + results.emplace_back(result); + }; + auto sendAsync = [saveResult, this](const std::string& key, const std::string& value) { + producer_.sendAsync(MessageBuilder().setOrderingKey(key).setContent(value).build(), + [saveResult](Result result, const MessageId& id) { saveResult(result); }); + }; + + constexpr int numKeys = 10; + for (int i = 0; i < numKeys; i++) { + sendAsync("key-" + std::to_string(i), "value"); + } + + ASSERT_EQ(ResultOk, producer_.close()); + + // After close() completed, all callbacks should have failed with ResultAlreadyClosed + std::lock_guard<std::mutex> lock(mtx); + ASSERT_EQ(results.size(), numKeys); + for (int i = 0; i < numKeys; i++) { + ASSERT_EQ(results[i], ResultAlreadyClosed) << " results[" << i << "] is " << results[i]; + } +}