This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 69e5680 [fix] Flush no batch message when call producer.flush (#98)
69e5680 is described below
commit 69e568096ddc8760d9538fdb770976d1ebbcab0e
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Nov 3 17:07:13 2022 +0800
[fix] Flush no batch message when call producer.flush (#98)
---
lib/OpSendMsg.h | 8 ++++++++
lib/ProducerImpl.cc | 22 +++++++++++++++-------
tests/ProducerTest.cc | 45 ++++++++++++++++++++++++++++++++++++++++++---
3 files changed, 65 insertions(+), 10 deletions(-)
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index c94bcbe..d805dd3 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -39,6 +39,7 @@ struct OpSendMsg {
boost::posix_time::ptime timeout_;
uint32_t messagesCount_;
uint64_t messagesSize_;
+ std::vector<std::function<void(Result)>> trackerCallbacks_;
OpSendMsg() = default;
@@ -59,6 +60,13 @@ struct OpSendMsg {
if (sendCallback_) {
sendCallback_(result, messageId);
}
+ for (const auto& trackerCallback : trackerCallbacks_) {
+ trackerCallback(result);
+ }
+ }
+
+ void addTrackerCallback(std::function<void(Result)> trackerCallback) {
+ trackerCallbacks_.emplace_back(trackerCallback);
}
};
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 05ab13d..f3e6120 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -332,17 +332,25 @@ void ProducerImpl::setMessageMetadata(const Message& msg,
const uint64_t& sequen
}
void ProducerImpl::flushAsync(FlushCallback callback) {
+ if (state_ != Ready) {
+ callback(ResultAlreadyClosed);
+ return;
+ }
if (batchMessageContainer_) {
- if (state_ == Ready) {
- Lock lock(mutex_);
- auto failures = batchMessageAndSend(callback);
+ Lock lock(mutex_);
+ auto failures = batchMessageAndSend(callback);
+ lock.unlock();
+ failures.complete();
+ } else {
+ Lock lock(mutex_);
+ if (!pendingMessagesQueue_.empty()) {
+ auto& opSendMsg = pendingMessagesQueue_.back();
lock.unlock();
- failures.complete();
+ opSendMsg.addTrackerCallback(callback);
} else {
- callback(ResultAlreadyClosed);
+ lock.unlock();
+ callback(ResultOk);
}
- } else {
- callback(ResultOk);
}
}
diff --git a/tests/ProducerTest.cc b/tests/ProducerTest.cc
index ee07cbb..77a79e1 100644
--- a/tests/ProducerTest.cc
+++ b/tests/ProducerTest.cc
@@ -245,11 +245,10 @@ TEST_P(ProducerTest, testMaxMessageSize) {
client.close();
}
-TEST_P(ProducerTest, testChunkingMaxMessageSize) {
+TEST(ProducerTest, testChunkingMaxMessageSize) {
Client client(serviceUrl);
- const auto topic = std::string("ProducerTest-ChunkingMaxMessageSize-") +
- (GetParam() ? "batch-" : "no-batch-") +
std::to_string(time(nullptr));
+ const auto topic = std::string("ProducerTest-ChunkingMaxMessageSize-") +
std::to_string(time(nullptr));
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
@@ -297,4 +296,44 @@ TEST(ProducerTest, testExclusiveProducer) {
ASSERT_EQ(ResultProducerBusy, client.createProducer(topicName,
producerConfiguration3, producer3));
}
+TEST_P(ProducerTest, testFlushNoBatch) {
+ Client client(serviceUrl);
+
+ auto partitioned = GetParam();
+ const auto topicName = std::string("testFlushNoBatch") +
+ (partitioned ? "partitioned-" : "-no-partitioned-")
+
+ std::to_string(time(nullptr));
+
+ if (partitioned) {
+ // call admin api to make it partitioned
+ std::string url = adminUrl + "admin/v2/persistent/public/default/" +
topicName + "/partitions";
+ int res = makePutRequest(url, "5");
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+ }
+
+ ProducerConfiguration producerConfiguration;
+ producerConfiguration.setBatchingEnabled(false);
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topicName,
producerConfiguration, producer));
+
+ std::atomic_int needCallBack(100);
+ auto cb = [&needCallBack](Result code, const MessageId& msgId) {
+ ASSERT_EQ(code, ResultOk);
+ needCallBack.fetch_sub(1);
+ };
+
+ for (int i = 0; i < 100; ++i) {
+ Message msg = MessageBuilder().setContent("content").build();
+ producer.sendAsync(msg, cb);
+ }
+
+ producer.flush();
+ ASSERT_EQ(needCallBack.load(), 0);
+ producer.close();
+
+ client.close();
+}
+
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));