This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new cac5e1d Avoid copying OpSendMsg when sending messages (#308)
cac5e1d is described below
commit cac5e1dc8a6afc22503db5d314efac7e6af5dca0
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 31 21:55:13 2023 +0800
Avoid copying OpSendMsg when sending messages (#308)
Fixes https://github.com/apache/pulsar-client-cpp/issues/306
### Motivation
`OpSendMsg` is a struct whose size is 400 bytes. We should avoid the
copy operation on it.
### Modifications
Pass the `unique_ptr<OpSendMsg>` everywhere instead of `OpSendMsg`.
- Use `unique_ptr<OpSendMsg>` as the element of the pending message
queue in `ProducerImpl` and disable the copy constructor and
assignment for `OpSendMsg`.
- Add `SendArgument`, which includes the necessary fields to construct a
`CommandSend` request. Use `shared_ptr` rather than `unique_ptr` to
store `SendArgument` in `OpSendMsg` because the producer might need to
resend the message so the `SendArgument` object could be shared by
`ProducerImpl` and `ClientConnection`.
This patch is more like a refactor because the compiler optimization
might reduce unnecessary copying.
---
lib/BatchMessageContainer.cc | 13 +--
lib/BatchMessageContainer.h | 13 +--
lib/BatchMessageContainerBase.cc | 57 ++---------
lib/BatchMessageContainerBase.h | 51 +++-------
lib/BatchMessageKeyBasedContainer.cc | 29 ++----
lib/BatchMessageKeyBasedContainer.h | 12 +--
lib/ClientConnection.cc | 46 ++++-----
lib/ClientConnection.h | 6 +-
lib/Commands.cc | 16 +--
lib/Commands.h | 6 +-
lib/MessageAndCallbackBatch.cc | 13 ++-
lib/MessageAndCallbackBatch.h | 10 +-
lib/OpSendMsg.h | 85 ++++++++++------
lib/ProducerImpl.cc | 183 +++++++++++++++++++----------------
lib/ProducerImpl.h | 12 +--
15 files changed, 250 insertions(+), 302 deletions(-)
diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc
index ae0425e..777b2c3 100644
--- a/lib/BatchMessageContainer.cc
+++ b/lib/BatchMessageContainer.cc
@@ -21,6 +21,7 @@
#include <stdexcept>
#include "LogUtils.h"
+#include "OpSendMsg.h"
DECLARE_LOG_OBJECT()
@@ -52,14 +53,10 @@ void BatchMessageContainer::clear() {
LOG_DEBUG(*this << " clear() called");
}
-Result BatchMessageContainer::createOpSendMsg(OpSendMsg& opSendMsg,
- const FlushCallback&
flushCallback) const {
- return createOpSendMsgHelper(opSendMsg, flushCallback, batch_);
-}
-
-std::vector<Result>
BatchMessageContainer::createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
- const
FlushCallback& flushCallback) const {
- throw std::runtime_error("createOpSendMsgs is not supported for
BatchMessageContainer");
+std::unique_ptr<OpSendMsg> BatchMessageContainer::createOpSendMsg(const
FlushCallback& flushCallback) {
+ auto op = createOpSendMsgHelper(flushCallback, batch_);
+ clear();
+ return op;
}
void BatchMessageContainer::serialize(std::ostream& os) const {
diff --git a/lib/BatchMessageContainer.h b/lib/BatchMessageContainer.h
index cd8a62c..5b1213c 100644
--- a/lib/BatchMessageContainer.h
+++ b/lib/BatchMessageContainer.h
@@ -39,25 +39,22 @@ class BatchMessageContainer : public
BatchMessageContainerBase {
~BatchMessageContainer();
- size_t getNumBatches() const override { return 1; }
+ bool hasMultiOpSendMsgs() const override { return false; }
bool isFirstMessageToAdd(const Message& msg) const override { return
batch_.empty(); }
bool add(const Message& msg, const SendCallback& callback) override;
- void clear() override;
-
- Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback&
flushCallback) const override;
-
- std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
- const FlushCallback& flushCallback)
const override;
-
void serialize(std::ostream& os) const override;
+ std::unique_ptr<OpSendMsg> createOpSendMsg(const FlushCallback&
flushCallback) override;
+
private:
MessageAndCallbackBatch batch_;
size_t numberOfBatchesSent_ = 0;
double averageBatchSize_ = 0;
+
+ void clear() override;
};
} // namespace pulsar
diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc
index 807a261..96ea94b 100644
--- a/lib/BatchMessageContainerBase.cc
+++ b/lib/BatchMessageContainerBase.cc
@@ -37,23 +37,13 @@ BatchMessageContainerBase::BatchMessageContainerBase(const
ProducerImpl& produce
producerId_(producer.producerId_),
msgCryptoWeakPtr_(producer.msgCrypto_) {}
-Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
- const FlushCallback&
flushCallback,
- const
MessageAndCallbackBatch& batch) const {
- opSendMsg.sendCallback_ = batch.createSendCallback();
- opSendMsg.messagesCount_ = batch.messagesCount();
- opSendMsg.messagesSize_ = batch.messagesSize();
-
- if (flushCallback) {
- auto sendCallback = opSendMsg.sendCallback_;
- opSendMsg.sendCallback_ = [sendCallback, flushCallback](Result result,
const MessageId& id) {
- sendCallback(result, id);
- flushCallback(result);
- };
- }
+BatchMessageContainerBase::~BatchMessageContainerBase() {}
+std::unique_ptr<OpSendMsg> BatchMessageContainerBase::createOpSendMsgHelper(
+ const FlushCallback& flushCallback, const MessageAndCallbackBatch& batch)
const {
+ auto sendCallback = batch.createSendCallback(flushCallback);
if (batch.empty()) {
- return ResultOperationNotSupported;
+ return OpSendMsg::create(ResultOperationNotSupported,
std::move(sendCallback));
}
MessageImplPtr impl = batch.msgImpl();
@@ -70,45 +60,18 @@ Result
BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
SharedBuffer encryptedPayload;
if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(),
producerConfig_.getCryptoKeyReader(),
impl->metadata, impl->payload,
encryptedPayload)) {
- return ResultCryptoError;
+ return OpSendMsg::create(ResultCryptoError,
std::move(sendCallback));
}
impl->payload = encryptedPayload;
}
if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize())
{
- return ResultMessageTooBig;
+ return OpSendMsg::create(ResultMessageTooBig, std::move(sendCallback));
}
- opSendMsg.metadata_ = impl->metadata;
- opSendMsg.payload_ = impl->payload;
- opSendMsg.sequenceId_ = impl->metadata.sequence_id();
- opSendMsg.producerId_ = producerId_;
- opSendMsg.timeout_ = TimeUtils::now() +
milliseconds(producerConfig_.getSendTimeout());
-
- return ResultOk;
-}
-
-void BatchMessageContainerBase::processAndClear(
- std::function<void(Result, const OpSendMsg&)> opSendMsgCallback,
FlushCallback flushCallback) {
- if (isEmpty()) {
- if (flushCallback) {
- // do nothing, flushCallback complete until the lastOpSend complete
- }
- } else {
- const auto numBatches = getNumBatches();
- if (numBatches == 1) {
- OpSendMsg opSendMsg;
- Result result = createOpSendMsg(opSendMsg, flushCallback);
- opSendMsgCallback(result, opSendMsg);
- } else if (numBatches > 1) {
- std::vector<OpSendMsg> opSendMsgs;
- std::vector<Result> results = createOpSendMsgs(opSendMsgs,
flushCallback);
- for (size_t i = 0; i < results.size(); i++) {
- opSendMsgCallback(results[i], opSendMsgs[i]);
- }
- } // else numBatches is 0, do nothing
- }
- clear();
+ return OpSendMsg::create(impl->metadata, batch.messagesCount(),
batch.messagesSize(),
+ producerConfig_.getSendTimeout(),
batch.createSendCallback(flushCallback),
+ nullptr, producerId_, impl->payload);
}
} // namespace pulsar
diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h
index fe4e5df..fb46019 100644
--- a/lib/BatchMessageContainerBase.h
+++ b/lib/BatchMessageContainerBase.h
@@ -26,6 +26,7 @@
#include <boost/noncopyable.hpp>
#include <memory>
+#include <stdexcept>
#include <vector>
namespace pulsar {
@@ -44,14 +45,9 @@ class BatchMessageContainerBase : public boost::noncopyable {
public:
BatchMessageContainerBase(const ProducerImpl& producer);
- virtual ~BatchMessageContainerBase() {}
+ virtual ~BatchMessageContainerBase();
- /**
- * Get number of batches in the batch message container
- *
- * @return number of batches
- */
- virtual size_t getNumBatches() const = 0;
+ virtual bool hasMultiOpSendMsgs() const = 0;
/**
* Check the message will be the 1st message to be added to the batch
@@ -73,32 +69,14 @@ class BatchMessageContainerBase : public boost::noncopyable
{
*/
virtual bool add(const Message& msg, const SendCallback& callback) = 0;
- /**
- * Clear the batch message container
- */
- virtual void clear() = 0;
-
- /**
- * Create a OpSendMsg object to send
- *
- * @param opSendMsg the OpSendMsg object to create
- * @param flushCallback the callback to trigger after the OpSendMsg was
completed
- * @return ResultOk if create successfully
- * @note OpSendMsg's sendCallback_ must be set even if it failed
- */
- virtual Result createOpSendMsg(OpSendMsg& opSendMsg,
- const FlushCallback& flushCallback =
nullptr) const = 0;
+ virtual std::unique_ptr<OpSendMsg> createOpSendMsg(const FlushCallback&
flushCallback = nullptr) {
+ throw std::runtime_error("createOpSendMsg is not supported");
+ }
- /**
- * Create a OpSendMsg list to send
- *
- * @param opSendMsgList the OpSendMsg list to create
- * @param flushCallback the callback to trigger after the OpSendMsg was
completed
- * @return all create results of `opSendMsgs`, ResultOk means create
successfully
- * @note OpSendMsg's sendCallback_ must be set even if it failed
- */
- virtual std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>&
opSendMsgs,
- const FlushCallback&
flushCallback = nullptr) const = 0;
+ virtual std::vector<std::unique_ptr<OpSendMsg>> createOpSendMsgs(
+ const FlushCallback& flushCallback = nullptr) {
+ throw std::runtime_error("createOpSendMsgs is not supported");
+ }
/**
* Serialize into a std::ostream for logging
@@ -110,9 +88,6 @@ class BatchMessageContainerBase : public boost::noncopyable {
bool hasEnoughSpace(const Message& msg) const noexcept;
bool isEmpty() const noexcept;
- void processAndClear(std::function<void(Result, const OpSendMsg&)>
opSendMsgCallback,
- FlushCallback flushCallback);
-
protected:
// references to ProducerImpl's fields
const std::shared_ptr<std::string> topicName_;
@@ -134,8 +109,10 @@ class BatchMessageContainerBase : public
boost::noncopyable {
void updateStats(const Message& msg);
void resetStats();
- Result createOpSendMsgHelper(OpSendMsg& opSendMsg, const FlushCallback&
flushCallback,
- const MessageAndCallbackBatch& batch) const;
+ std::unique_ptr<OpSendMsg> createOpSendMsgHelper(const FlushCallback&
flushCallback,
+ const
MessageAndCallbackBatch& batch) const;
+
+ virtual void clear() = 0;
};
inline bool BatchMessageContainerBase::hasEnoughSpace(const Message& msg)
const noexcept {
diff --git a/lib/BatchMessageKeyBasedContainer.cc
b/lib/BatchMessageKeyBasedContainer.cc
index 05baf34..e88674b 100644
--- a/lib/BatchMessageKeyBasedContainer.cc
+++ b/lib/BatchMessageKeyBasedContainer.cc
@@ -72,16 +72,8 @@ void BatchMessageKeyBasedContainer::clear() {
LOG_DEBUG(*this << " clear() called");
}
-Result BatchMessageKeyBasedContainer::createOpSendMsg(OpSendMsg& opSendMsg,
- const FlushCallback&
flushCallback) const {
- if (batches_.size() < 1) {
- return ResultOperationNotSupported;
- }
- return createOpSendMsgHelper(opSendMsg, flushCallback,
batches_.begin()->second);
-}
-
-std::vector<Result> BatchMessageKeyBasedContainer::createOpSendMsgs(
- std::vector<OpSendMsg>& opSendMsgs, const FlushCallback& flushCallback)
const {
+std::vector<std::unique_ptr<OpSendMsg>>
BatchMessageKeyBasedContainer::createOpSendMsgs(
+ const FlushCallback& flushCallback) {
// Sorted the batches by sequence id
std::vector<const MessageAndCallbackBatch*> sortedBatches;
for (const auto& kv : batches_) {
@@ -92,18 +84,15 @@ std::vector<Result>
BatchMessageKeyBasedContainer::createOpSendMsgs(
return lhs->sequenceId() < rhs->sequenceId();
});
- size_t numBatches = sortedBatches.size();
- opSendMsgs.resize(numBatches);
-
- std::vector<Result> results(numBatches);
- for (size_t i = 0; i + 1 < numBatches; i++) {
- results[i] = createOpSendMsgHelper(opSendMsgs[i], nullptr,
*sortedBatches[i]);
+ std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{sortedBatches.size()};
+ for (size_t i = 0; i + 1 < opSendMsgs.size(); i++) {
+ opSendMsgs[i].reset(createOpSendMsgHelper(nullptr,
*sortedBatches[i]).release());
}
- if (numBatches > 0) {
- // Add flush callback to the last batch
- results.back() = createOpSendMsgHelper(opSendMsgs.back(),
flushCallback, *sortedBatches.back());
+ if (!opSendMsgs.empty()) {
+ opSendMsgs.back().reset(createOpSendMsgHelper(flushCallback,
*sortedBatches.back()).release());
}
- return results;
+ clear();
+ return opSendMsgs;
}
void BatchMessageKeyBasedContainer::serialize(std::ostream& os) const {
diff --git a/lib/BatchMessageKeyBasedContainer.h
b/lib/BatchMessageKeyBasedContainer.h
index f580a05..e534fba 100644
--- a/lib/BatchMessageKeyBasedContainer.h
+++ b/lib/BatchMessageKeyBasedContainer.h
@@ -32,18 +32,13 @@ class BatchMessageKeyBasedContainer : public
BatchMessageContainerBase {
~BatchMessageKeyBasedContainer();
- size_t getNumBatches() const override { return batches_.size(); }
+ bool hasMultiOpSendMsgs() const override { return true; }
bool isFirstMessageToAdd(const Message& msg) const override;
bool add(const Message& msg, const SendCallback& callback) override;
- void clear() override;
-
- Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback&
flushCallback) const override;
-
- std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
- const FlushCallback& flushCallback)
const override;
+ std::vector<std::unique_ptr<OpSendMsg>> createOpSendMsgs(const
FlushCallback& flushCallback) override;
void serialize(std::ostream& os) const override;
@@ -53,8 +48,7 @@ class BatchMessageKeyBasedContainer : public
BatchMessageContainerBase {
size_t numberOfBatchesSent_ = 0;
double averageBatchSize_ = 0;
- Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback&
flushCallback,
- MessageAndCallbackBatch& batch) const;
+ void clear() override;
};
} // namespace pulsar
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 2a63c7a..4e1a667 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1025,37 +1025,30 @@ void ClientConnection::sendCommandInternal(const
SharedBuffer& cmd) {
std::bind(&ClientConnection::handleSend,
shared_from_this(), std::placeholders::_1, cmd)));
}
-void ClientConnection::sendMessage(const OpSendMsg& opSend) {
+void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args)
{
Lock lock(mutex_);
-
- if (pendingWriteOperations_++ == 0) {
- // Write immediately to socket
- if (tlsSocket_) {
+ if (pendingWriteOperations_++ > 0) {
+ pendingWriteBuffers_.emplace_back(args);
+ return;
+ }
+ auto self = shared_from_this();
+ auto sendMessageInternal = [this, self, args] {
+ BaseCommand outgoingCmd;
+ auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd,
getChecksumType(), *args);
+ asyncWrite(buffer,
customAllocReadHandler(std::bind(&ClientConnection::handleSendPair,
+
shared_from_this(), std::placeholders::_1)));
+ };
+ if (tlsSocket_) {
#if BOOST_VERSION >= 106600
- boost::asio::post(strand_,
-
std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend));
+ boost::asio::post(strand_, sendMessageInternal);
#else
- strand_.post(std::bind(&ClientConnection::sendMessageInternal,
shared_from_this(), opSend));
+ strand_.post(sendMessageInternal);
#endif
- } else {
- sendMessageInternal(opSend);
- }
} else {
- // Queue to send later
- pendingWriteBuffers_.push_back(opSend);
+ sendMessageInternal();
}
}
-void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) {
- BaseCommand outgoingCmd;
- PairSharedBuffer buffer =
- Commands::newSend(outgoingBuffer_, outgoingCmd, opSend.producerId_,
opSend.sequenceId_,
- getChecksumType(), opSend.metadata_,
opSend.payload_);
-
- asyncWrite(buffer,
customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
- shared_from_this(),
std::placeholders::_1)));
-}
-
void ClientConnection::handleSend(const boost::system::error_code& err, const
SharedBuffer&) {
if (err) {
LOG_WARN(cnxString_ << "Could not send message on connection: " << err
<< " " << err.message());
@@ -1088,13 +1081,12 @@ void ClientConnection::sendPendingCommands() {
customAllocWriteHandler(std::bind(&ClientConnection::handleSend,
shared_from_this(),
std::placeholders::_1, buffer)));
} else {
- assert(any.type() == typeid(OpSendMsg));
+ assert(any.type() == typeid(std::shared_ptr<SendArguments>));
- const OpSendMsg& op = boost::any_cast<const OpSendMsg&>(any);
+ auto args = boost::any_cast<std::shared_ptr<SendArguments>>(any);
BaseCommand outgoingCmd;
PairSharedBuffer buffer =
- Commands::newSend(outgoingBuffer_, outgoingCmd,
op.producerId_, op.sequenceId_,
- getChecksumType(), op.metadata_,
op.payload_);
+ Commands::newSend(outgoingBuffer_, outgoingCmd,
getChecksumType(), *args);
asyncWrite(buffer,
customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
shared_from_this(), std::placeholders::_1)));
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 38b814c..9abff9d 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -69,8 +69,7 @@ typedef std::weak_ptr<ConsumerImpl> ConsumerImplWeakPtr;
class LookupDataResult;
class BrokerConsumerStatsImpl;
class PeriodicTask;
-
-struct OpSendMsg;
+struct SendArguments;
namespace proto {
class BaseCommand;
@@ -153,8 +152,7 @@ class PULSAR_PUBLIC ClientConnection : public
std::enable_shared_from_this<Clien
void sendCommand(const SharedBuffer& cmd);
void sendCommandInternal(const SharedBuffer& cmd);
- void sendMessage(const OpSendMsg& opSend);
- void sendMessageInternal(const OpSendMsg& opSend);
+ void sendMessage(const std::shared_ptr<SendArguments>& args);
void registerProducer(int producerId, ProducerImplPtr producer);
void registerConsumer(int consumerId, ConsumerImplPtr consumer);
diff --git a/lib/Commands.cc b/lib/Commands.cc
index d9251a0..5b1734c 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -32,6 +32,7 @@
#include "ChunkMessageIdImpl.h"
#include "LogUtils.h"
#include "MessageImpl.h"
+#include "OpSendMsg.h"
#include "PulsarApi.pb.h"
#include "Url.h"
#include "checksum/ChecksumProvider.h"
@@ -193,13 +194,13 @@ SharedBuffer Commands::newConsumerStats(uint64_t
consumerId, uint64_t requestId)
return buffer;
}
-PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd,
uint64_t producerId,
- uint64_t sequenceId, ChecksumType
checksumType,
- const proto::MessageMetadata& metadata,
const SharedBuffer& payload) {
+PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd,
ChecksumType checksumType,
+ const SendArguments& args) {
cmd.set_type(BaseCommand::SEND);
CommandSend* send = cmd.mutable_send();
- send->set_producer_id(producerId);
- send->set_sequence_id(sequenceId);
+ send->set_producer_id(args.producerId);
+ send->set_sequence_id(args.sequenceId);
+ const auto& metadata = args.metadata;
if (metadata.has_num_messages_in_batch()) {
send->set_num_messages(metadata.num_messages_in_batch());
}
@@ -210,8 +211,9 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers,
BaseCommand& cmd, uint
// / Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM]
[METADATA_SIZE][METADATA] [PAYLOAD]
- int cmdSize = cmd.ByteSize();
- int msgMetadataSize = metadata.ByteSize();
+ int cmdSize = cmd.ByteSizeLong();
+ int msgMetadataSize = metadata.ByteSizeLong();
+ const auto& payload = args.payload;
int payloadSize = payload.readableBytes();
int magicAndChecksumLength = (Crc32c == (checksumType)) ? (2 + 4 /* magic
+ checksumLength*/) : 0;
diff --git a/lib/Commands.h b/lib/Commands.h
index c21adb4..22a4b7b 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -40,6 +40,7 @@ using BatchMessageAckerPtr =
std::shared_ptr<BatchMessageAcker>;
class MessageIdImpl;
using MessageIdImplPtr = std::shared_ptr<MessageIdImpl>;
class BitSet;
+struct SendArguments;
namespace proto {
class BaseCommand;
@@ -98,9 +99,8 @@ class Commands {
static SharedBuffer newGetSchema(const std::string& topic, const
std::string& version,
uint64_t requestId);
- static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand&
cmd, uint64_t producerId,
- uint64_t sequenceId, ChecksumType
checksumType,
- const proto::MessageMetadata& metadata,
const SharedBuffer& payload);
+ static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand&
cmd, ChecksumType checksumType,
+ const SendArguments& args);
static SharedBuffer newSubscribe(
const std::string& topic, const std::string& subscription, uint64_t
consumerId, uint64_t requestId,
diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc
index 5672538..e17330c 100644
--- a/lib/MessageAndCallbackBatch.cc
+++ b/lib/MessageAndCallbackBatch.cc
@@ -64,10 +64,17 @@ void MessageAndCallbackBatch::complete(Result result, const
MessageId& id) const
completeSendCallbacks(callbacks_, result, id);
}
-SendCallback MessageAndCallbackBatch::createSendCallback() const {
+SendCallback MessageAndCallbackBatch::createSendCallback(const FlushCallback&
flushCallback) const {
const auto& callbacks = callbacks_;
- return [callbacks] // save a copy of `callbacks_`
- (Result result, const MessageId& id) {
completeSendCallbacks(callbacks, result, id); };
+ if (flushCallback) {
+ return [callbacks, flushCallback](Result result, const MessageId& id) {
+ completeSendCallbacks(callbacks, result, id);
+ flushCallback(result);
+ };
+ } else {
+ return [callbacks] // save a copy of `callbacks_`
+ (Result result, const MessageId& id) {
completeSendCallbacks(callbacks, result, id); };
+ }
}
} // namespace pulsar
diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h
index 3d107c6..e8717b3 100644
--- a/lib/MessageAndCallbackBatch.h
+++ b/lib/MessageAndCallbackBatch.h
@@ -30,6 +30,7 @@ namespace pulsar {
class MessageImpl;
using MessageImplPtr = std::shared_ptr<MessageImpl>;
+using FlushCallback = std::function<void(Result)>;
class MessageAndCallbackBatch : public boost::noncopyable {
public:
@@ -58,14 +59,7 @@ class MessageAndCallbackBatch : public boost::noncopyable {
*/
void complete(Result result, const MessageId& id) const;
- /**
- * Create a single callback to trigger all the internal callbacks in order
- * It's used when you want to clear and add new messages and callbacks but
current callbacks need to be
- * triggered later.
- *
- * @return the merged send callback
- */
- SendCallback createSendCallback() const;
+ SendCallback createSendCallback(const FlushCallback& flushCallback) const;
const MessageImplPtr& msgImpl() const { return msgImpl_; }
uint64_t sequenceId() const noexcept { return sequenceId_; }
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index d365b90..0b06285 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -21,6 +21,7 @@
#include <pulsar/Message.h>
#include <pulsar/Producer.h>
+#include <pulsar/Result.h>
#include <boost/date_time/posix_time/ptime.hpp>
@@ -31,46 +32,72 @@
namespace pulsar {
-struct OpSendMsg {
- proto::MessageMetadata metadata_;
- SharedBuffer payload_;
- SendCallback sendCallback_;
- uint64_t producerId_;
- uint64_t sequenceId_;
- boost::posix_time::ptime timeout_;
- uint32_t messagesCount_;
- uint64_t messagesSize_;
- std::vector<std::function<void(Result)>> trackerCallbacks_;
- ChunkMessageIdImplPtr chunkedMessageId_;
+struct SendArguments {
+ const uint64_t producerId;
+ const uint64_t sequenceId;
+ const proto::MessageMetadata metadata;
+ const SharedBuffer payload;
- OpSendMsg() = default;
+ SendArguments(uint64_t producerId, uint64_t sequenceId, const
proto::MessageMetadata& metadata,
+ const SharedBuffer& payload)
+ : producerId(producerId), sequenceId(sequenceId), metadata(metadata),
payload(payload) {}
+ SendArguments(const SendArguments&) = delete;
+ SendArguments& operator=(const SendArguments&) = delete;
+};
- OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer&
payload,
- const SendCallback& sendCallback, uint64_t producerId, uint64_t
sequenceId, int sendTimeoutMs,
- uint32_t messagesCount, uint64_t messagesSize,
ChunkMessageIdImplPtr chunkedMessageId = nullptr)
- : metadata_(metadata), // the copy happens here because OpSendMsg of
chunks are constructed with
- // a shared metadata object
- payload_(payload),
- sendCallback_(sendCallback),
- producerId_(producerId),
- sequenceId_(sequenceId),
- timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)),
- messagesCount_(messagesCount),
- messagesSize_(messagesSize),
- chunkedMessageId_(chunkedMessageId) {}
+struct OpSendMsg {
+ const Result result;
+ const int32_t chunkId;
+ const int32_t numChunks;
+ const uint32_t messagesCount;
+ const uint64_t messagesSize;
+ const boost::posix_time::ptime timeout;
+ const SendCallback sendCallback;
+ std::vector<std::function<void(Result)>> trackerCallbacks;
+ ChunkMessageIdImplPtr chunkedMessageId;
+ // Use shared_ptr here because producer might resend the message with the
same arguments
+ const std::shared_ptr<SendArguments> sendArgs;
+
+ template <typename... Args>
+ static std::unique_ptr<OpSendMsg> create(Args&&... args) {
+ return std::unique_ptr<OpSendMsg>(new
OpSendMsg(std::forward<Args>(args)...));
+ }
void complete(Result result, const MessageId& messageId) const {
- if (sendCallback_) {
- sendCallback_(result, messageId);
+ if (sendCallback) {
+ sendCallback(result, messageId);
}
- for (const auto& trackerCallback : trackerCallbacks_) {
+ for (const auto& trackerCallback : trackerCallbacks) {
trackerCallback(result);
}
}
void addTrackerCallback(std::function<void(Result)> trackerCallback) {
- trackerCallbacks_.emplace_back(trackerCallback);
+ trackerCallbacks.emplace_back(trackerCallback);
}
+
+ private:
+ OpSendMsg(Result result, SendCallback&& callback)
+ : result(result),
+ chunkId(-1),
+ numChunks(-1),
+ messagesCount(0),
+ messagesSize(0),
+ sendCallback(std::move(callback)),
+ sendArgs(nullptr) {}
+
+ OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount,
uint64_t messagesSize,
+ int sendTimeoutMs, SendCallback&& callback,
ChunkMessageIdImplPtr chunkedMessageId,
+ uint64_t producerId, SharedBuffer payload)
+ : result(ResultOk),
+ chunkId(metadata.chunk_id()),
+ numChunks(metadata.num_chunks_from_msg()),
+ messagesCount(messagesCount),
+ messagesSize(messagesSize),
+ timeout(TimeUtils::now() +
boost::posix_time::milliseconds(sendTimeoutMs)),
+ sendCallback(std::move(callback)),
+ chunkedMessageId(chunkedMessageId),
+ sendArgs(new SendArguments(producerId, metadata.sequence_id(),
metadata, payload)) {}
};
} // namespace pulsar
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 71559ff..d8f02be 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -45,16 +45,6 @@
namespace pulsar {
DECLARE_LOG_OBJECT()
-struct ProducerImpl::PendingCallbacks {
- std::vector<OpSendMsg> opSendMsgs;
-
- void complete(Result result) {
- for (const auto& opSendMsg : opSendMsgs) {
- opSendMsg.complete(result, {});
- }
- }
-};
-
ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
const ProducerConfiguration& conf, const
ProducerInterceptorsPtr& interceptors,
int32_t partition)
@@ -64,7 +54,6 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const
TopicName& topicName,
milliseconds(std::max(100, conf.getSendTimeout() -
100)))),
conf_(conf),
semaphore_(),
- pendingMessagesQueue_(),
partition_(partition),
producerName_(conf_.getProducerName()),
userProvidedProducerName_(false),
@@ -297,43 +286,46 @@ void ProducerImpl::handleCreateProducer(const
ClientConnectionPtr& cnx, Result r
}
}
-std::shared_ptr<ProducerImpl::PendingCallbacks>
ProducerImpl::getPendingCallbacksWhenFailed() {
- auto callbacks = std::make_shared<PendingCallbacks>();
- callbacks->opSendMsgs.reserve(pendingMessagesQueue_.size());
+auto ProducerImpl::getPendingCallbacksWhenFailed() ->
decltype(pendingMessagesQueue_) {
+ decltype(pendingMessagesQueue_) pendingMessages;
LOG_DEBUG(getName() << "# messages in pending queue : " <<
pendingMessagesQueue_.size());
- // Iterate over a copy of the pending messages queue, to trigger the
future completion
- // without holding producer mutex.
- for (auto& op : pendingMessagesQueue_) {
- callbacks->opSendMsgs.push_back(op);
- releaseSemaphoreForSendOp(op);
+ pendingMessages.swap(pendingMessagesQueue_);
+ for (const auto& op : pendingMessages) {
+ releaseSemaphoreForSendOp(*op);
}
- if (batchMessageContainer_) {
- batchMessageContainer_->processAndClear(
- [this, &callbacks](Result result, const OpSendMsg& opSendMsg) {
- if (result == ResultOk) {
- callbacks->opSendMsgs.emplace_back(opSendMsg);
- }
- releaseSemaphoreForSendOp(opSendMsg);
- },
- nullptr);
+ if (!batchMessageContainer_ || batchMessageContainer_->isEmpty()) {
+ return pendingMessages;
}
- pendingMessagesQueue_.clear();
- return callbacks;
+ auto handleOp = [this, &pendingMessages](std::unique_ptr<OpSendMsg>&& op) {
+ releaseSemaphoreForSendOp(*op);
+ if (op->result == ResultOk) {
+ pendingMessages.emplace_back(std::move(op));
+ }
+ };
+
+ if (batchMessageContainer_->hasMultiOpSendMsgs()) {
+ auto opSendMsgs = batchMessageContainer_->createOpSendMsgs();
+ for (auto&& op : opSendMsgs) {
+ handleOp(std::move(op));
+ }
+ } else {
+ handleOp(batchMessageContainer_->createOpSendMsg());
+ }
+ return pendingMessages;
}
-std::shared_ptr<ProducerImpl::PendingCallbacks>
ProducerImpl::getPendingCallbacksWhenFailedWithLock() {
+auto ProducerImpl::getPendingCallbacksWhenFailedWithLock() ->
decltype(pendingMessagesQueue_) {
Lock lock(mutex_);
return getPendingCallbacksWhenFailed();
}
void ProducerImpl::failPendingMessages(Result result, bool withLock) {
- if (withLock) {
- getPendingCallbacksWhenFailedWithLock()->complete(result);
- } else {
- getPendingCallbacksWhenFailed()->complete(result);
+ auto opSendMsgs = withLock ? getPendingCallbacksWhenFailedWithLock() :
getPendingCallbacksWhenFailed();
+ for (const auto& op : opSendMsgs) {
+ op->complete(result, {});
}
}
@@ -345,8 +337,8 @@ void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
LOG_DEBUG(getName() << "Re-Sending " << pendingMessagesQueue_.size() << "
messages to server");
for (const auto& op : pendingMessagesQueue_) {
- LOG_DEBUG(getName() << "Re-Sending " << op.sequenceId_);
- cnx->sendMessage(op);
+ LOG_DEBUG(getName() << "Re-Sending " << op->sendArgs->sequenceId);
+ cnx->sendMessage(op->sendArgs);
}
}
@@ -378,7 +370,7 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
auto& opSendMsg = pendingMessagesQueue_.back();
lock.unlock();
failures.complete();
- opSendMsg.addTrackerCallback(callback);
+ opSendMsg->addTrackerCallback(callback);
} else {
lock.unlock();
failures.complete();
@@ -389,7 +381,7 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
if (!pendingMessagesQueue_.empty()) {
auto& opSendMsg = pendingMessagesQueue_.back();
lock.unlock();
- opSendMsg.addTrackerCallback(callback);
+ opSendMsg->addTrackerCallback(callback);
} else {
lock.unlock();
callback(ResultOk);
@@ -462,7 +454,7 @@ void ProducerImpl::sendAsync(const Message& msg,
SendCallback callback) {
});
}
-void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const
SendCallback& callback) {
+void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&&
callback) {
if (!isValidProducerState(callback)) {
return;
}
@@ -601,17 +593,18 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const
Message& msg, const SendCallba
handleFailedResult(ResultCryptoError);
return;
}
- OpSendMsg op{msgMetadata, encryptedPayload, (chunkId ==
totalChunks - 1) ? callback : nullptr,
- producerId_, sequenceId, conf_.getSendTimeout(),
- 1, uncompressedSize, chunkMessageId};
+
+ auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize,
conf_.getSendTimeout(),
+ (chunkId == totalChunks - 1) ?
callback : nullptr, chunkMessageId,
+ producerId_, encryptedPayload);
if (!chunkingEnabled_) {
- const uint32_t msgMetadataSize = op.metadata_.ByteSize();
- const uint32_t payloadSize = op.payload_.readableBytes();
+ const uint32_t msgMetadataSize =
op->sendArgs->metadata.ByteSizeLong();
+ const uint32_t payloadSize =
op->sendArgs->payload.readableBytes();
const uint32_t msgHeadersAndPayloadSize = msgMetadataSize +
payloadSize;
if (msgHeadersAndPayloadSize > maxMessageSize) {
lock.unlock();
- releaseSemaphoreForSendOp(op);
+ releaseSemaphoreForSendOp(*op);
LOG_WARN(getName()
<< " - compressed Message size " <<
msgHeadersAndPayloadSize << " cannot exceed "
<< maxMessageSize << " bytes unless chunking is
enabled");
@@ -620,7 +613,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message&
msg, const SendCallba
}
}
- sendMessage(op);
+ sendMessage(std::move(op));
}
}
}
@@ -667,10 +660,10 @@ void ProducerImpl::releaseSemaphore(uint32_t payloadSize)
{
void ProducerImpl::releaseSemaphoreForSendOp(const OpSendMsg& op) {
if (semaphore_) {
- semaphore_->release(op.messagesCount_);
+ semaphore_->release(op.messagesCount);
}
- memoryLimitController_.releaseMemory(op.messagesSize_);
+ memoryLimitController_.releaseMemory(op.messagesSize);
}
// It must be called while `mutex_` is acquired
@@ -678,37 +671,50 @@ PendingFailures ProducerImpl::batchMessageAndSend(const
FlushCallback& flushCall
PendingFailures failures;
LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
batchTimer_->cancel();
+ if (batchMessageContainer_->isEmpty()) {
+ return failures;
+ }
- batchMessageContainer_->processAndClear(
- [this, &failures](Result result, const OpSendMsg& opSendMsg) {
- if (result == ResultOk) {
- sendMessage(opSendMsg);
- } else {
- // A spot has been reserved for this batch, but the batch
failed to be pushed to the queue, so
- // we need to release the spot manually
- LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: "
<< result);
- releaseSemaphoreForSendOp(opSendMsg);
- failures.add([opSendMsg, result] { opSendMsg.complete(result,
{}); });
- }
- },
- flushCallback);
+ auto handleOp = [this, &failures](std::unique_ptr<OpSendMsg>&& op) {
+ if (op->result == ResultOk) {
+ sendMessage(std::move(op));
+ } else {
+ LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " <<
op->result);
+ releaseSemaphoreForSendOp(*op);
+ auto rawOpPtr = op.release();
+ failures.add([rawOpPtr] {
+ std::unique_ptr<OpSendMsg> op{rawOpPtr};
+ op->complete(op->result, {});
+ });
+ }
+ };
+
+ if (batchMessageContainer_->hasMultiOpSendMsgs()) {
+ auto opSendMsgs =
batchMessageContainer_->createOpSendMsgs(flushCallback);
+ for (auto&& op : opSendMsgs) {
+ handleOp(std::move(op));
+ }
+ } else {
+ handleOp(batchMessageContainer_->createOpSendMsg(flushCallback));
+ }
return failures;
}
// Precondition -
// a. we have a reserved spot on the queue
// b. call this function after acquiring the ProducerImpl mutex_
-void ProducerImpl::sendMessage(const OpSendMsg& op) {
- const auto sequenceId = op.metadata_.sequence_id();
+void ProducerImpl::sendMessage(std::unique_ptr<OpSendMsg> opSendMsg) {
+ const auto sequenceId = opSendMsg->sendArgs->sequenceId;
LOG_DEBUG("Inserting data to pendingMessagesQueue_");
- pendingMessagesQueue_.push_back(op);
+ auto args = opSendMsg->sendArgs;
+ pendingMessagesQueue_.emplace_back(std::move(opSendMsg));
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
// If we do have a connection, the message is sent immediately,
otherwise
// we'll try again once a new connection is established
LOG_DEBUG(getName() << "Sending msg immediately - seq: " <<
sequenceId);
- cnx->sendMessage(op);
+ cnx->sendMessage(args);
} else {
LOG_DEBUG(getName() << "Connection is not ready - seq: " <<
sequenceId);
}
@@ -808,7 +814,7 @@ void ProducerImpl::handleSendTimeout(const
boost::system::error_code& err) {
return;
}
- std::shared_ptr<PendingCallbacks> pendingCallbacks;
+ decltype(pendingMessagesQueue_) pendingMessages;
if (pendingMessagesQueue_.empty()) {
// If there are no pending messages, reset the timeout to the
configured value.
LOG_DEBUG(getName() << "Producer timeout triggered on empty pending
message queue");
@@ -816,11 +822,11 @@ void ProducerImpl::handleSendTimeout(const
boost::system::error_code& err) {
} else {
// If there is at least one message, calculate the diff between the
message timeout and
// the current time.
- time_duration diff = pendingMessagesQueue_.front().timeout_ -
TimeUtils::now();
+ time_duration diff = pendingMessagesQueue_.front()->timeout -
TimeUtils::now();
if (diff.total_milliseconds() <= 0) {
// The diff is less than or equal to zero, meaning that the
message has been expired.
LOG_DEBUG(getName() << "Timer expired. Calling timeout
callbacks.");
- pendingCallbacks = getPendingCallbacksWhenFailed();
+ pendingMessages = getPendingCallbacksWhenFailed();
// Since the pending queue is cleared now, set timer to expire
after configured value.
asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
} else {
@@ -831,8 +837,8 @@ void ProducerImpl::handleSendTimeout(const
boost::system::error_code& err) {
}
lock.unlock();
- if (pendingCallbacks) {
- pendingCallbacks->complete(ResultTimeout);
+ for (const auto& op : pendingMessages) {
+ op->complete(ResultTimeout, {});
}
}
@@ -844,8 +850,8 @@ bool ProducerImpl::removeCorruptMessage(uint64_t
sequenceId) {
return true;
}
- OpSendMsg op = pendingMessagesQueue_.front();
- uint64_t expectedSequenceId = op.sequenceId_;
+ std::unique_ptr<OpSendMsg>
op{std::move(pendingMessagesQueue_.front().release())};
+ uint64_t expectedSequenceId = op->sendArgs->sequenceId;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack failure for msg " << sequenceId
//
<< " expecting: " << expectedSequenceId << " queue
size=" //
@@ -860,11 +866,11 @@ bool ProducerImpl::removeCorruptMessage(uint64_t
sequenceId) {
lock.unlock();
try {
// to protect from client callback exception
- op.complete(ResultChecksumError, {});
+ op->complete(ResultChecksumError, {});
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " <<
e.what());
}
- releaseSemaphoreForSendOp(op);
+ releaseSemaphoreForSendOp(*op);
return true;
}
}
@@ -880,8 +886,14 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId,
MessageId& rawMessageId) {
return true;
}
- OpSendMsg op = pendingMessagesQueue_.front();
- uint64_t expectedSequenceId = op.sequenceId_;
+ const auto& op = *pendingMessagesQueue_.front();
+ if (op.result != ResultOk) {
+ LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << "
for " << sequenceId << " and "
+ << rawMessageId);
+ return false;
+ }
+
+ uint64_t expectedSequenceId = op.sendArgs->sequenceId;
if (sequenceId > expectedSequenceId) {
LOG_WARN(getName() << "Got ack for msg " << sequenceId
//
<< " expecting: " << expectedSequenceId << " queue
size=" //
@@ -898,24 +910,25 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId,
MessageId& rawMessageId) {
// Message was persisted correctly
LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
- if (op.chunkedMessageId_) {
+ if (op.chunkedMessageId) {
// Handling the chunk message id.
- if (op.metadata_.chunk_id() == 0) {
- op.chunkedMessageId_->setFirstChunkMessageId(messageId);
- } else if (op.metadata_.chunk_id() ==
op.metadata_.num_chunks_from_msg() - 1) {
- op.chunkedMessageId_->setLastChunkMessageId(messageId);
- messageId = op.chunkedMessageId_->build();
+ if (op.chunkId == 0) {
+ op.chunkedMessageId->setFirstChunkMessageId(messageId);
+ } else if (op.chunkId == op.numChunks - 1) {
+ op.chunkedMessageId->setLastChunkMessageId(messageId);
+ messageId = op.chunkedMessageId->build();
}
}
releaseSemaphoreForSendOp(op);
- lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
+ lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;
+ std::unique_ptr<OpSendMsg>
opSendMsg{pendingMessagesQueue_.front().release()};
pendingMessagesQueue_.pop_front();
lock.unlock();
try {
- op.complete(ResultOk, messageId);
+ opSendMsg->complete(ResultOk, messageId);
} catch (const std::exception& e) {
LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
}
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index afc6346..8611bfe 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -104,11 +104,9 @@ class ProducerImpl : public HandlerBase,
protected:
ProducerStatsBasePtr producerStatsBasePtr_;
- typedef std::deque<OpSendMsg> MessageQueue;
-
void setMessageMetadata(const Message& msg, const uint64_t& sequenceId,
const uint32_t& uncompressedSize);
- void sendMessage(const OpSendMsg& opSendMsg);
+ void sendMessage(std::unique_ptr<OpSendMsg> opSendMsg);
void startSendTimeoutTimer();
@@ -138,7 +136,7 @@ class ProducerImpl : public HandlerBase,
bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer&
payload,
SharedBuffer& encryptedPayload);
- void sendAsyncWithStatsUpdate(const Message& msg, const SendCallback&
callback);
+ void sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& callback);
/**
* Reserve a spot in the messages queue before acquiring the ProducerImpl
mutex. When the queue is full,
@@ -163,7 +161,7 @@ class ProducerImpl : public HandlerBase,
ProducerConfiguration conf_;
std::unique_ptr<Semaphore> semaphore_;
- MessageQueue pendingMessagesQueue_;
+ std::list<std::unique_ptr<OpSendMsg>> pendingMessagesQueue_;
const int32_t partition_; // -1 if topic is non-partitioned
std::string producerName_;
@@ -187,8 +185,8 @@ class ProducerImpl : public HandlerBase,
Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
struct PendingCallbacks;
- std::shared_ptr<PendingCallbacks> getPendingCallbacksWhenFailed();
- std::shared_ptr<PendingCallbacks> getPendingCallbacksWhenFailedWithLock();
+ decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailed();
+ decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailedWithLock();
void failPendingMessages(Result result, bool withLock);