This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new ffee4a0 [fix] callback of send batch message is error when flush
(#303)
ffee4a0 is described below
commit ffee4a0194ea04cbd7e845197dcaf181eb15ffd3
Author: ken <[email protected]>
AuthorDate: Mon Jul 24 21:31:40 2023 +0800
[fix] callback of send batch message is error when flush (#303)
### Motivation
When sendAsync batch messages and then flush() and close(), found that the
callback of message is ResultAlreadyClosed but not Ok.
The root is if there is no batch messages remain in BatchMessageContainer
when do flush, the flush callback directly return Ok. Instead, it should
return Ok until the lastSendOpFuture complete.
### Modifications
1. fix the error code.
2. test two cases of flush batch messages.
---
lib/BatchMessageContainerBase.cc | 2 +-
lib/ProducerImpl.cc | 12 ++++++--
tests/ProducerTest.cc | 62 ++++++++++++++++++++++++++++++++++++++++
3 files changed, 73 insertions(+), 3 deletions(-)
diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc
index 0cf338f..807a261 100644
--- a/lib/BatchMessageContainerBase.cc
+++ b/lib/BatchMessageContainerBase.cc
@@ -92,7 +92,7 @@ void BatchMessageContainerBase::processAndClear(
std::function<void(Result, const OpSendMsg&)> opSendMsgCallback,
FlushCallback flushCallback) {
if (isEmpty()) {
if (flushCallback) {
- flushCallback(ResultOk);
+ // do nothing, flushCallback complete until the lastOpSend complete
}
} else {
const auto numBatches = getNumBatches();
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 3b83166..71559ff 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -374,8 +374,16 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
if (batchMessageContainer_) {
Lock lock(mutex_);
auto failures = batchMessageAndSend(callback);
- lock.unlock();
- failures.complete();
+ if (!pendingMessagesQueue_.empty()) {
+ auto& opSendMsg = pendingMessagesQueue_.back();
+ lock.unlock();
+ failures.complete();
+ opSendMsg.addTrackerCallback(callback);
+ } else {
+ lock.unlock();
+ failures.complete();
+ callback(ResultOk);
+ }
} else {
Lock lock(mutex_);
if (!pendingMessagesQueue_.empty()) {
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index 77cb619..eeda2b4 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -439,6 +439,68 @@ TEST_P(ProducerTest, testFlushNoBatch) {
client.close();
}
+TEST_P(ProducerTest, testFlushBatch) {
+ Client client(serviceUrl);
+
+ auto partitioned = GetParam();
+ const auto topicName = std::string("testFlushNoBatch") +
+ (partitioned ? "partitioned-" : "-no-partitioned-")
+
+ std::to_string(time(nullptr));
+
+ if (partitioned) {
+ // call admin api to make it partitioned
+ std::string url = adminUrl + "admin/v2/persistent/public/default/" +
topicName + "/partitions";
+ int res = makePutRequest(url, "5");
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+ }
+
+ ProducerConfiguration producerConfiguration;
+ producerConfiguration.setBatchingEnabled(true);
+ producerConfiguration.setBatchingMaxMessages(10);
+ producerConfiguration.setBatchingMaxPublishDelayMs(1000);
+ producerConfiguration.setBatchingMaxAllowedSizeInBytes(4 * 1024 * 1024);
+
+ // test all messages in batch has been sent
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName,
producerConfiguration, producer));
+
+ std::atomic_int needCallBack(100);
+ auto cb = [&needCallBack](Result code, const MessageId& msgId) {
+ ASSERT_EQ(code, ResultOk);
+ needCallBack.fetch_sub(1);
+ };
+
+ for (int i = 0; i < 100; ++i) {
+ Message msg = MessageBuilder().setContent("content").build();
+ producer.sendAsync(msg, cb);
+ }
+
+ producer.flush();
+ ASSERT_EQ(needCallBack.load(), 0);
+ producer.close();
+
+ // test remain messages in batch not send
+ ASSERT_EQ(ResultOk, client.createProducer(topicName,
producerConfiguration, producer));
+
+ std::atomic_int needCallBack2(105);
+ auto cb2 = [&needCallBack2](Result code, const MessageId& msgId) {
+ ASSERT_EQ(code, ResultOk);
+ needCallBack2.fetch_sub(1);
+ };
+
+ for (int i = 0; i < 105; ++i) {
+ Message msg = MessageBuilder().setContent("content").build();
+ producer.sendAsync(msg, cb2);
+ }
+
+ producer.flush();
+ ASSERT_EQ(needCallBack2.load(), 0);
+ producer.close();
+
+ client.close();
+}
+
TEST(ProducerTest, testCloseSubProducerWhenFail) {
Client client(serviceUrl);