This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 6ebea5c Avoid calling serializeSingleMessageInBatchWithPayload each
time a message is added (#309)
6ebea5c is described below
commit 6ebea5c554f821fe5aae840ffbfed6548caee2ef
Author: Yunze Xu <[email protected]>
AuthorDate: Sun Sep 3 17:37:11 2023 +0800
Avoid calling serializeSingleMessageInBatchWithPayload each time a message
is added (#309)
* Avoid calling serializeSingleMessageInBatchWithPayload each time a
message is added
### Motivation
Currently, each time a message is added to the batch message container,
`serializeSingleMessageInBatchWithPayload` will be called. In this
method, if the payload buffer's size is not enough, it will grow twice.
After batch is cleared, the payload buffer will be reset. For example,
here is a typical buffer size increament during a period of a batch:
```
increase buffer size from 0 to 1033
increase buffer size from 1033 to 2066
increase buffer size from 2066 to 4132
increase buffer size from 3099 to 6198
increase buffer size from 5165 to 10330
increase buffer size from 9297 to 18594
increase buffer size from 17561 to 35122
increase buffer size from 34089 to 68178
increase buffer size from 67145 to 134290
```
### Modifications
Refactor the `MessageAndCallbackBatch` design, in `add` method, only
store the message and callback. Provide a `createOpSendMsg` method to
serialize the messages and callbacks into a `OpSendMsg`.
---
lib/BatchMessageContainer.cc | 5 ++-
lib/BatchMessageContainerBase.cc | 39 ++---------------
lib/BatchMessageContainerBase.h | 3 +-
lib/BatchMessageKeyBasedContainer.cc | 25 +++++------
lib/Commands.cc | 69 ++++++++++++++++--------------
lib/Commands.h | 4 +-
lib/MessageAndCallbackBatch.cc | 83 +++++++++++++++++++++++-------------
lib/MessageAndCallbackBatch.h | 44 +++++++++----------
lib/OpSendMsg.h | 6 ++-
tests/BatchMessageTest.cc | 13 +++---
10 files changed, 141 insertions(+), 150 deletions(-)
diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc
index 777b2c3..cd7ddc8 100644
--- a/lib/BatchMessageContainer.cc
+++ b/lib/BatchMessageContainer.cc
@@ -54,7 +54,10 @@ void BatchMessageContainer::clear() {
}
std::unique_ptr<OpSendMsg> BatchMessageContainer::createOpSendMsg(const
FlushCallback& flushCallback) {
- auto op = createOpSendMsgHelper(flushCallback, batch_);
+ auto op = createOpSendMsgHelper(batch_);
+ if (flushCallback) {
+ op->addTrackerCallback(flushCallback);
+ }
clear();
return op;
}
diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc
index 96ea94b..c1b1727 100644
--- a/lib/BatchMessageContainerBase.cc
+++ b/lib/BatchMessageContainerBase.cc
@@ -18,14 +18,10 @@
*/
#include "BatchMessageContainerBase.h"
-#include "ClientConnection.h"
-#include "CompressionCodec.h"
#include "MessageAndCallbackBatch.h"
#include "MessageCrypto.h"
-#include "MessageImpl.h"
#include "OpSendMsg.h"
#include "ProducerImpl.h"
-#include "PulsarApi.pb.h"
#include "SharedBuffer.h"
namespace pulsar {
@@ -40,38 +36,9 @@ BatchMessageContainerBase::BatchMessageContainerBase(const
ProducerImpl& produce
BatchMessageContainerBase::~BatchMessageContainerBase() {}
std::unique_ptr<OpSendMsg> BatchMessageContainerBase::createOpSendMsgHelper(
- const FlushCallback& flushCallback, const MessageAndCallbackBatch& batch)
const {
- auto sendCallback = batch.createSendCallback(flushCallback);
- if (batch.empty()) {
- return OpSendMsg::create(ResultOperationNotSupported,
std::move(sendCallback));
- }
-
- MessageImplPtr impl = batch.msgImpl();
- impl->metadata.set_num_messages_in_batch(batch.size());
- auto compressionType = producerConfig_.getCompressionType();
- if (compressionType != CompressionNone) {
-
impl->metadata.set_compression(static_cast<proto::CompressionType>(compressionType));
- impl->metadata.set_uncompressed_size(impl->payload.readableBytes());
- }
- impl->payload =
CompressionCodecProvider::getCodec(compressionType).encode(impl->payload);
-
- auto msgCrypto = msgCryptoWeakPtr_.lock();
- if (msgCrypto && producerConfig_.isEncryptionEnabled()) {
- SharedBuffer encryptedPayload;
- if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(),
producerConfig_.getCryptoKeyReader(),
- impl->metadata, impl->payload,
encryptedPayload)) {
- return OpSendMsg::create(ResultCryptoError,
std::move(sendCallback));
- }
- impl->payload = encryptedPayload;
- }
-
- if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize())
{
- return OpSendMsg::create(ResultMessageTooBig, std::move(sendCallback));
- }
-
- return OpSendMsg::create(impl->metadata, batch.messagesCount(),
batch.messagesSize(),
- producerConfig_.getSendTimeout(),
batch.createSendCallback(flushCallback),
- nullptr, producerId_, impl->payload);
+ MessageAndCallbackBatch& batch) const {
+ auto crypto = msgCryptoWeakPtr_.lock();
+ return batch.createOpSendMsg(producerId_, producerConfig_, crypto.get());
}
} // namespace pulsar
diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h
index fb46019..cd17d6d 100644
--- a/lib/BatchMessageContainerBase.h
+++ b/lib/BatchMessageContainerBase.h
@@ -109,8 +109,7 @@ class BatchMessageContainerBase : public boost::noncopyable
{
void updateStats(const Message& msg);
void resetStats();
- std::unique_ptr<OpSendMsg> createOpSendMsgHelper(const FlushCallback&
flushCallback,
- const
MessageAndCallbackBatch& batch) const;
+ std::unique_ptr<OpSendMsg> createOpSendMsgHelper(MessageAndCallbackBatch&
flushCallback) const;
virtual void clear() = 0;
};
diff --git a/lib/BatchMessageKeyBasedContainer.cc
b/lib/BatchMessageKeyBasedContainer.cc
index e88674b..2006736 100644
--- a/lib/BatchMessageKeyBasedContainer.cc
+++ b/lib/BatchMessageKeyBasedContainer.cc
@@ -74,22 +74,19 @@ void BatchMessageKeyBasedContainer::clear() {
std::vector<std::unique_ptr<OpSendMsg>>
BatchMessageKeyBasedContainer::createOpSendMsgs(
const FlushCallback& flushCallback) {
- // Sorted the batches by sequence id
- std::vector<const MessageAndCallbackBatch*> sortedBatches;
- for (const auto& kv : batches_) {
- sortedBatches.emplace_back(&kv.second);
+ // Store raw pointers to use std::sort
+ std::vector<OpSendMsg*> rawOpSendMsgs;
+ for (auto& kv : batches_) {
+ rawOpSendMsgs.emplace_back(createOpSendMsgHelper(kv.second).release());
}
- std::sort(sortedBatches.begin(), sortedBatches.end(),
- [](const MessageAndCallbackBatch* lhs, const
MessageAndCallbackBatch* rhs) {
- return lhs->sequenceId() < rhs->sequenceId();
- });
+ std::sort(rawOpSendMsgs.begin(), rawOpSendMsgs.end(), [](const OpSendMsg*
lhs, const OpSendMsg* rhs) {
+ return lhs->sendArgs->sequenceId < rhs->sendArgs->sequenceId;
+ });
+ rawOpSendMsgs.back()->addTrackerCallback(flushCallback);
- std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{sortedBatches.size()};
- for (size_t i = 0; i + 1 < opSendMsgs.size(); i++) {
- opSendMsgs[i].reset(createOpSendMsgHelper(nullptr,
*sortedBatches[i]).release());
- }
- if (!opSendMsgs.empty()) {
- opSendMsgs.back().reset(createOpSendMsgHelper(flushCallback,
*sortedBatches.back()).release());
+ std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{rawOpSendMsgs.size()};
+ for (size_t i = 0; i < opSendMsgs.size(); i++) {
+ opSendMsgs[i].reset(rawOpSendMsgs[i]);
}
clear();
return opSendMsgs;
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 5b1734c..f2e6c6d 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -30,7 +30,6 @@
#include "BatchedMessageIdImpl.h"
#include "BitSet.h"
#include "ChunkMessageIdImpl.h"
-#include "LogUtils.h"
#include "MessageImpl.h"
#include "OpSendMsg.h"
#include "PulsarApi.pb.h"
@@ -40,8 +39,6 @@
using namespace pulsar;
namespace pulsar {
-DECLARE_LOG_OBJECT();
-
using proto::AuthData;
using proto::BaseCommand;
using proto::CommandAck;
@@ -836,10 +833,14 @@ void Commands::initBatchMessageMetadata(const Message&
msg, pulsar::proto::Messa
}
}
-uint64_t Commands::serializeSingleMessageInBatchWithPayload(const Message&
msg, SharedBuffer& batchPayLoad,
- unsigned long
maxMessageSizeInBytes) {
- const auto& msgMetadata = msg.impl_->metadata;
- SingleMessageMetadata metadata;
+// The overhead of constructing and destructing a SingleMessageMetadata is
higher than allocating and
+// deallocating memory for a byte array, so use a thread local
SingleMessageMetadata and serialize it to a
+// byte array.
+static std::pair<std::unique_ptr<char[]>, size_t>
serializeSingleMessageMetadata(
+ const proto::MessageMetadata& msgMetadata, size_t payloadSize) {
+ thread_local SingleMessageMetadata metadata;
+ metadata.Clear();
+ metadata.set_payload_size(payloadSize);
if (msgMetadata.has_partition_key()) {
metadata.set_partition_key(msgMetadata.partition_key());
}
@@ -862,34 +863,38 @@ uint64_t
Commands::serializeSingleMessageInBatchWithPayload(const Message& msg,
metadata.set_sequence_id(msgMetadata.sequence_id());
}
+ size_t size = metadata.ByteSizeLong();
+ std::unique_ptr<char[]> data{new char[size]};
+ metadata.SerializeToArray(data.get(), size);
+ return std::make_pair(std::move(data), size);
+}
+
+uint64_t Commands::serializeSingleMessagesToBatchPayload(SharedBuffer&
batchPayload,
+ const
std::vector<Message>& messages) {
+ assert(!messages.empty());
+ size_t size = sizeof(uint32_t) * messages.size();
+
+ std::vector<std::pair<std::unique_ptr<char[]>, size_t>>
singleMetadataBuffers(messages.size());
+ for (size_t i = 0; i < messages.size(); i++) {
+ const auto& impl = messages[i].impl_;
+ singleMetadataBuffers[i] =
+ serializeSingleMessageMetadata(impl->metadata,
impl->payload.readableBytes());
+ size += singleMetadataBuffers[i].second;
+ size += messages[i].getLength();
+ }
+
// Format of batch message
// Each Message = [METADATA_SIZE][METADATA] [PAYLOAD]
+ batchPayload = SharedBuffer::allocate(size);
+ for (size_t i = 0; i < messages.size(); i++) {
+ auto msgMetadataSize = singleMetadataBuffers[i].second;
+ batchPayload.writeUnsignedInt(msgMetadataSize);
+ batchPayload.write(singleMetadataBuffers[i].first.get(),
msgMetadataSize);
+ const auto& payload = messages[i].impl_->payload;
+ batchPayload.write(payload.data(), payload.readableBytes());
+ }
- int payloadSize = msg.impl_->payload.readableBytes();
- metadata.set_payload_size(payloadSize);
-
- int msgMetadataSize = metadata.ByteSize();
-
- unsigned long requiredSpace = sizeof(uint32_t) + msgMetadataSize +
payloadSize;
- if (batchPayLoad.writableBytes() <= sizeof(uint32_t) + msgMetadataSize +
payloadSize) {
- LOG_DEBUG("remaining size of batchPayLoad buffer ["
- << batchPayLoad.writableBytes() << "] can't accomodate new
payload [" << requiredSpace
- << "] - expanding the batchPayload buffer");
- uint32_t new_size =
- std::min(batchPayLoad.readableBytes() * 2,
static_cast<uint32_t>(maxMessageSizeInBytes));
- new_size = std::max(new_size, batchPayLoad.readableBytes() +
static_cast<uint32_t>(requiredSpace));
- SharedBuffer buffer = SharedBuffer::allocate(new_size);
- // Adding batch created so far
- buffer.write(batchPayLoad.data(), batchPayLoad.readableBytes());
- batchPayLoad = buffer;
- }
- // Adding the new message
- batchPayLoad.writeUnsignedInt(msgMetadataSize);
- metadata.SerializeToArray(batchPayLoad.mutableData(), msgMetadataSize);
- batchPayLoad.bytesWritten(msgMetadataSize);
- batchPayLoad.write(msg.impl_->payload.data(), payloadSize);
-
- return msgMetadata.sequence_id();
+ return messages.back().impl_->metadata.sequence_id();
}
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage,
int32_t batchIndex,
diff --git a/lib/Commands.h b/lib/Commands.h
index 22a4b7b..de5445d 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -148,8 +148,8 @@ class Commands {
static void initBatchMessageMetadata(const Message& msg,
pulsar::proto::MessageMetadata& batchMetadata);
- static PULSAR_PUBLIC uint64_t serializeSingleMessageInBatchWithPayload(
- const Message& msg, SharedBuffer& batchPayLoad, unsigned long
maxMessageSizeInBytes);
+ static uint64_t serializeSingleMessagesToBatchPayload(SharedBuffer&
batchPayload,
+ const
std::vector<Message>& messages);
static Message deSerializeSingleMessageInBatch(Message& batchedMessage,
int32_t batchIndex,
int32_t batchSize, const
BatchMessageAckerPtr& acker);
diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc
index e17330c..b518b03 100644
--- a/lib/MessageAndCallbackBatch.cc
+++ b/lib/MessageAndCallbackBatch.cc
@@ -22,59 +22,82 @@
#include "ClientConnection.h"
#include "Commands.h"
-#include "LogUtils.h"
-#include "MessageImpl.h"
-
-DECLARE_LOG_OBJECT()
+#include "CompressionCodec.h"
+#include "MessageCrypto.h"
+#include "OpSendMsg.h"
+#include "PulsarApi.pb.h"
namespace pulsar {
+MessageAndCallbackBatch::MessageAndCallbackBatch() {}
+
+MessageAndCallbackBatch::~MessageAndCallbackBatch() {}
+
void MessageAndCallbackBatch::add(const Message& msg, const SendCallback&
callback) {
- if (empty()) {
- msgImpl_.reset(new MessageImpl);
- Commands::initBatchMessageMetadata(msg, msgImpl_->metadata);
+ if (callbacks_.empty()) {
+ metadata_.reset(new proto::MessageMetadata);
+ Commands::initBatchMessageMetadata(msg, *metadata_);
}
- LOG_DEBUG(" Before serialization payload size in bytes = " <<
msgImpl_->payload.readableBytes());
- sequenceId_ = Commands::serializeSingleMessageInBatchWithPayload(msg,
msgImpl_->payload,
-
ClientConnection::getMaxMessageSize());
- LOG_DEBUG(" After serialization payload size in bytes = " <<
msgImpl_->payload.readableBytes());
+ messages_.emplace_back(msg);
callbacks_.emplace_back(callback);
-
- ++messagesCount_;
messagesSize_ += msg.getLength();
}
+std::unique_ptr<OpSendMsg> MessageAndCallbackBatch::createOpSendMsg(
+ uint64_t producerId, const ProducerConfiguration& producerConfig,
MessageCrypto* crypto) {
+ auto callback = createSendCallback();
+ if (empty()) {
+ return OpSendMsg::create(ResultOperationNotSupported,
std::move(callback));
+ }
+
+ // TODO: Store payload as a field and support shrinking
+ SharedBuffer payload;
+ auto sequenceId = Commands::serializeSingleMessagesToBatchPayload(payload,
messages_);
+ metadata_->set_sequence_id(sequenceId);
+ metadata_->set_num_messages_in_batch(messages_.size());
+ auto compressionType = producerConfig.getCompressionType();
+ if (compressionType != CompressionNone) {
+
metadata_->set_compression(static_cast<proto::CompressionType>(compressionType));
+ metadata_->set_uncompressed_size(payload.readableBytes());
+ }
+ payload =
CompressionCodecProvider::getCodec(compressionType).encode(payload);
+
+ if (producerConfig.isEncryptionEnabled() && crypto) {
+ SharedBuffer encryptedPayload;
+ if (!crypto->encrypt(producerConfig.getEncryptionKeys(),
producerConfig.getCryptoKeyReader(),
+ *metadata_, payload, encryptedPayload)) {
+ return OpSendMsg::create(ResultCryptoError, std::move(callback));
+ }
+ payload = encryptedPayload;
+ }
+
+ if (payload.readableBytes() > ClientConnection::getMaxMessageSize()) {
+ return OpSendMsg::create(ResultMessageTooBig, std::move(callback));
+ }
+
+ auto op = OpSendMsg::create(*metadata_, callbacks_.size(), messagesSize_,
producerConfig.getSendTimeout(),
+ std::move(callback), nullptr, producerId,
payload);
+ clear();
+ return op;
+}
+
void MessageAndCallbackBatch::clear() {
- msgImpl_.reset();
+ messages_.clear();
callbacks_.clear();
- messagesCount_ = 0;
messagesSize_ = 0;
}
static void completeSendCallbacks(const std::vector<SendCallback>& callbacks,
Result result,
const MessageId& id) {
int32_t numOfMessages = static_cast<int32_t>(callbacks.size());
- LOG_DEBUG("Batch complete [Result = " << result << "] [numOfMessages = "
<< numOfMessages << "]");
for (int32_t i = 0; i < numOfMessages; i++) {
callbacks[i](result,
MessageIdBuilder::from(id).batchIndex(i).batchSize(numOfMessages).build());
}
}
-void MessageAndCallbackBatch::complete(Result result, const MessageId& id)
const {
- completeSendCallbacks(callbacks_, result, id);
-}
-
-SendCallback MessageAndCallbackBatch::createSendCallback(const FlushCallback&
flushCallback) const {
+SendCallback MessageAndCallbackBatch::createSendCallback() const {
const auto& callbacks = callbacks_;
- if (flushCallback) {
- return [callbacks, flushCallback](Result result, const MessageId& id) {
- completeSendCallbacks(callbacks, result, id);
- flushCallback(result);
- };
- } else {
- return [callbacks] // save a copy of `callbacks_`
- (Result result, const MessageId& id) {
completeSendCallbacks(callbacks, result, id); };
- }
+ return [callbacks](Result result, const MessageId& id) {
completeSendCallbacks(callbacks, result, id); };
}
} // namespace pulsar
diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h
index e8717b3..bc32e46 100644
--- a/lib/MessageAndCallbackBatch.h
+++ b/lib/MessageAndCallbackBatch.h
@@ -24,16 +24,24 @@
#include <atomic>
#include <boost/noncopyable.hpp>
+#include <memory>
#include <vector>
namespace pulsar {
-class MessageImpl;
-using MessageImplPtr = std::shared_ptr<MessageImpl>;
+struct OpSendMsg;
+class MessageCrypto;
using FlushCallback = std::function<void(Result)>;
-class MessageAndCallbackBatch : public boost::noncopyable {
+namespace proto {
+class MessageMetadata;
+}
+
+class MessageAndCallbackBatch final : public boost::noncopyable {
public:
+ MessageAndCallbackBatch();
+ ~MessageAndCallbackBatch();
+
// Wrapper methods of STL container
bool empty() const noexcept { return callbacks_.empty(); }
size_t size() const noexcept { return callbacks_.size(); }
@@ -46,34 +54,20 @@ class MessageAndCallbackBatch : public boost::noncopyable {
*/
void add(const Message& msg, const SendCallback& callback);
- /**
- * Clear the internal stats
- */
- void clear();
-
- /**
- * Complete all the callbacks with given parameters
- *
- * @param result this batch's send result
- * @param id this batch's message id
- */
- void complete(Result result, const MessageId& id) const;
-
- SendCallback createSendCallback(const FlushCallback& flushCallback) const;
-
- const MessageImplPtr& msgImpl() const { return msgImpl_; }
- uint64_t sequenceId() const noexcept { return sequenceId_; }
+ std::unique_ptr<OpSendMsg> createOpSendMsg(uint64_t producerId,
+ const ProducerConfiguration&
producerConfig,
+ MessageCrypto* crypto);
- uint32_t messagesCount() const { return messagesCount_; }
- uint64_t messagesSize() const { return messagesSize_; }
+ void clear();
private:
- MessageImplPtr msgImpl_;
+ std::unique_ptr<proto::MessageMetadata> metadata_;
+ std::vector<Message> messages_;
std::vector<SendCallback> callbacks_;
std::atomic<uint64_t> sequenceId_{static_cast<uint64_t>(-1L)};
-
- uint32_t messagesCount_{0};
uint64_t messagesSize_{0ull};
+
+ SendCallback createSendCallback() const;
};
} // namespace pulsar
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index 0b06285..06fa77f 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -36,7 +36,7 @@ struct SendArguments {
const uint64_t producerId;
const uint64_t sequenceId;
const proto::MessageMetadata metadata;
- const SharedBuffer payload;
+ SharedBuffer payload;
SendArguments(uint64_t producerId, uint64_t sequenceId, const
proto::MessageMetadata& metadata,
const SharedBuffer& payload)
@@ -73,7 +73,9 @@ struct OpSendMsg {
}
void addTrackerCallback(std::function<void(Result)> trackerCallback) {
- trackerCallbacks.emplace_back(trackerCallback);
+ if (trackerCallback) {
+ trackerCallbacks.emplace_back(trackerCallback);
+ }
}
private:
diff --git a/tests/BatchMessageTest.cc b/tests/BatchMessageTest.cc
index e46cb45..99e61f8 100644
--- a/tests/BatchMessageTest.cc
+++ b/tests/BatchMessageTest.cc
@@ -953,7 +953,7 @@ TEST(BatchMessageTest, producerFailureResult) {
PulsarFriend::producerFailMessages(producer, res);
}
-TEST(BatchMessageTest, testPraseMessageBatchEntry) {
+TEST(BatchMessageTest, testParseMessageBatchEntry) {
struct Case {
std::string content;
std::string propKey;
@@ -963,13 +963,14 @@ TEST(BatchMessageTest, testPraseMessageBatchEntry) {
cases.push_back(Case{"example1", "prop1", "value1"});
cases.push_back(Case{"example2", "prop2", "value2"});
- SharedBuffer payload = SharedBuffer::allocate(128);
- for (auto it = cases.begin(); it != cases.end(); ++it) {
+ std::vector<Message> msgs;
+ for (const auto& x : cases) {
MessageBuilder msgBuilder;
- const Message& message =
- msgBuilder.setContent(it->content).setProperty(it->propKey,
it->propValue).build();
- Commands::serializeSingleMessageInBatchWithPayload(message, payload,
1024);
+
msgs.emplace_back(msgBuilder.setContent(x.content).setProperty(x.propKey,
x.propValue).build());
}
+ SharedBuffer payload;
+ Commands::serializeSingleMessagesToBatchPayload(payload, msgs);
+ ASSERT_EQ(payload.writableBytes(), 0);
MessageBatch messageBatch;
auto fakeId =
MessageIdBuilder().ledgerId(5000L).entryId(10L).partition(0).build();