This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new cac5e1d  Avoid copying OpSendMsg when sending messages (#308)
cac5e1d is described below

commit cac5e1dc8a6afc22503db5d314efac7e6af5dca0
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Aug 31 21:55:13 2023 +0800

    Avoid copying OpSendMsg when sending messages (#308)
    
    Fixes https://github.com/apache/pulsar-client-cpp/issues/306
    
    ### Motivation
    
    `OpSendMsg` is a struct whose size is 400 bytes. We should avoid the
    copy operation on it.
    
    ### Modifications
    
    Pass the `unique_ptr<OpSendMsg>` everywhere instead of `OpSendMsg`.
    - Use `unique_ptr<OpSendMsg>` as the element of the pending message
      queue in `ProducerImpl` and disable the copy constructor and
      assignment for `OpSendMsg`.
    - Add `SendArgument`, which includes the necessary fields to construct a
      `CommandSend` request. Use `shared_ptr` rather than `unique_ptr` to
      store `SendArgument` in `OpSendMsg` because the producer might need to
      resend the message so the `SendArgument` object could be shared by
      `ProducerImpl` and `ClientConnection`.
    
    This patch is more like a refactor because the compiler optimization
    might reduce unnecessary copying.
---
 lib/BatchMessageContainer.cc         |  13 +--
 lib/BatchMessageContainer.h          |  13 +--
 lib/BatchMessageContainerBase.cc     |  57 ++---------
 lib/BatchMessageContainerBase.h      |  51 +++-------
 lib/BatchMessageKeyBasedContainer.cc |  29 ++----
 lib/BatchMessageKeyBasedContainer.h  |  12 +--
 lib/ClientConnection.cc              |  46 ++++-----
 lib/ClientConnection.h               |   6 +-
 lib/Commands.cc                      |  16 +--
 lib/Commands.h                       |   6 +-
 lib/MessageAndCallbackBatch.cc       |  13 ++-
 lib/MessageAndCallbackBatch.h        |  10 +-
 lib/OpSendMsg.h                      |  85 ++++++++++------
 lib/ProducerImpl.cc                  | 183 +++++++++++++++++++----------------
 lib/ProducerImpl.h                   |  12 +--
 15 files changed, 250 insertions(+), 302 deletions(-)

diff --git a/lib/BatchMessageContainer.cc b/lib/BatchMessageContainer.cc
index ae0425e..777b2c3 100644
--- a/lib/BatchMessageContainer.cc
+++ b/lib/BatchMessageContainer.cc
@@ -21,6 +21,7 @@
 #include <stdexcept>
 
 #include "LogUtils.h"
+#include "OpSendMsg.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -52,14 +53,10 @@ void BatchMessageContainer::clear() {
     LOG_DEBUG(*this << " clear() called");
 }
 
-Result BatchMessageContainer::createOpSendMsg(OpSendMsg& opSendMsg,
-                                              const FlushCallback& 
flushCallback) const {
-    return createOpSendMsgHelper(opSendMsg, flushCallback, batch_);
-}
-
-std::vector<Result> 
BatchMessageContainer::createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
-                                                            const 
FlushCallback& flushCallback) const {
-    throw std::runtime_error("createOpSendMsgs is not supported for 
BatchMessageContainer");
+std::unique_ptr<OpSendMsg> BatchMessageContainer::createOpSendMsg(const 
FlushCallback& flushCallback) {
+    auto op = createOpSendMsgHelper(flushCallback, batch_);
+    clear();
+    return op;
 }
 
 void BatchMessageContainer::serialize(std::ostream& os) const {
diff --git a/lib/BatchMessageContainer.h b/lib/BatchMessageContainer.h
index cd8a62c..5b1213c 100644
--- a/lib/BatchMessageContainer.h
+++ b/lib/BatchMessageContainer.h
@@ -39,25 +39,22 @@ class BatchMessageContainer : public 
BatchMessageContainerBase {
 
     ~BatchMessageContainer();
 
-    size_t getNumBatches() const override { return 1; }
+    bool hasMultiOpSendMsgs() const override { return false; }
 
     bool isFirstMessageToAdd(const Message& msg) const override { return 
batch_.empty(); }
 
     bool add(const Message& msg, const SendCallback& callback) override;
 
-    void clear() override;
-
-    Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& 
flushCallback) const override;
-
-    std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
-                                         const FlushCallback& flushCallback) 
const override;
-
     void serialize(std::ostream& os) const override;
 
+    std::unique_ptr<OpSendMsg> createOpSendMsg(const FlushCallback& 
flushCallback) override;
+
    private:
     MessageAndCallbackBatch batch_;
     size_t numberOfBatchesSent_ = 0;
     double averageBatchSize_ = 0;
+
+    void clear() override;
 };
 
 }  // namespace pulsar
diff --git a/lib/BatchMessageContainerBase.cc b/lib/BatchMessageContainerBase.cc
index 807a261..96ea94b 100644
--- a/lib/BatchMessageContainerBase.cc
+++ b/lib/BatchMessageContainerBase.cc
@@ -37,23 +37,13 @@ BatchMessageContainerBase::BatchMessageContainerBase(const 
ProducerImpl& produce
       producerId_(producer.producerId_),
       msgCryptoWeakPtr_(producer.msgCrypto_) {}
 
-Result BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
-                                                        const FlushCallback& 
flushCallback,
-                                                        const 
MessageAndCallbackBatch& batch) const {
-    opSendMsg.sendCallback_ = batch.createSendCallback();
-    opSendMsg.messagesCount_ = batch.messagesCount();
-    opSendMsg.messagesSize_ = batch.messagesSize();
-
-    if (flushCallback) {
-        auto sendCallback = opSendMsg.sendCallback_;
-        opSendMsg.sendCallback_ = [sendCallback, flushCallback](Result result, 
const MessageId& id) {
-            sendCallback(result, id);
-            flushCallback(result);
-        };
-    }
+BatchMessageContainerBase::~BatchMessageContainerBase() {}
 
+std::unique_ptr<OpSendMsg> BatchMessageContainerBase::createOpSendMsgHelper(
+    const FlushCallback& flushCallback, const MessageAndCallbackBatch& batch) 
const {
+    auto sendCallback = batch.createSendCallback(flushCallback);
     if (batch.empty()) {
-        return ResultOperationNotSupported;
+        return OpSendMsg::create(ResultOperationNotSupported, 
std::move(sendCallback));
     }
 
     MessageImplPtr impl = batch.msgImpl();
@@ -70,45 +60,18 @@ Result 
BatchMessageContainerBase::createOpSendMsgHelper(OpSendMsg& opSendMsg,
         SharedBuffer encryptedPayload;
         if (!msgCrypto->encrypt(producerConfig_.getEncryptionKeys(), 
producerConfig_.getCryptoKeyReader(),
                                 impl->metadata, impl->payload, 
encryptedPayload)) {
-            return ResultCryptoError;
+            return OpSendMsg::create(ResultCryptoError, 
std::move(sendCallback));
         }
         impl->payload = encryptedPayload;
     }
 
     if (impl->payload.readableBytes() > ClientConnection::getMaxMessageSize()) 
{
-        return ResultMessageTooBig;
+        return OpSendMsg::create(ResultMessageTooBig, std::move(sendCallback));
     }
 
-    opSendMsg.metadata_ = impl->metadata;
-    opSendMsg.payload_ = impl->payload;
-    opSendMsg.sequenceId_ = impl->metadata.sequence_id();
-    opSendMsg.producerId_ = producerId_;
-    opSendMsg.timeout_ = TimeUtils::now() + 
milliseconds(producerConfig_.getSendTimeout());
-
-    return ResultOk;
-}
-
-void BatchMessageContainerBase::processAndClear(
-    std::function<void(Result, const OpSendMsg&)> opSendMsgCallback, 
FlushCallback flushCallback) {
-    if (isEmpty()) {
-        if (flushCallback) {
-            // do nothing, flushCallback complete until the lastOpSend complete
-        }
-    } else {
-        const auto numBatches = getNumBatches();
-        if (numBatches == 1) {
-            OpSendMsg opSendMsg;
-            Result result = createOpSendMsg(opSendMsg, flushCallback);
-            opSendMsgCallback(result, opSendMsg);
-        } else if (numBatches > 1) {
-            std::vector<OpSendMsg> opSendMsgs;
-            std::vector<Result> results = createOpSendMsgs(opSendMsgs, 
flushCallback);
-            for (size_t i = 0; i < results.size(); i++) {
-                opSendMsgCallback(results[i], opSendMsgs[i]);
-            }
-        }  // else numBatches is 0, do nothing
-    }
-    clear();
+    return OpSendMsg::create(impl->metadata, batch.messagesCount(), 
batch.messagesSize(),
+                             producerConfig_.getSendTimeout(), 
batch.createSendCallback(flushCallback),
+                             nullptr, producerId_, impl->payload);
 }
 
 }  // namespace pulsar
diff --git a/lib/BatchMessageContainerBase.h b/lib/BatchMessageContainerBase.h
index fe4e5df..fb46019 100644
--- a/lib/BatchMessageContainerBase.h
+++ b/lib/BatchMessageContainerBase.h
@@ -26,6 +26,7 @@
 
 #include <boost/noncopyable.hpp>
 #include <memory>
+#include <stdexcept>
 #include <vector>
 
 namespace pulsar {
@@ -44,14 +45,9 @@ class BatchMessageContainerBase : public boost::noncopyable {
    public:
     BatchMessageContainerBase(const ProducerImpl& producer);
 
-    virtual ~BatchMessageContainerBase() {}
+    virtual ~BatchMessageContainerBase();
 
-    /**
-     * Get number of batches in the batch message container
-     *
-     * @return number of batches
-     */
-    virtual size_t getNumBatches() const = 0;
+    virtual bool hasMultiOpSendMsgs() const = 0;
 
     /**
      * Check the message will be the 1st message to be added to the batch
@@ -73,32 +69,14 @@ class BatchMessageContainerBase : public boost::noncopyable 
{
      */
     virtual bool add(const Message& msg, const SendCallback& callback) = 0;
 
-    /**
-     * Clear the batch message container
-     */
-    virtual void clear() = 0;
-
-    /**
-     * Create a OpSendMsg object to send
-     *
-     * @param opSendMsg the OpSendMsg object to create
-     * @param flushCallback the callback to trigger after the OpSendMsg was 
completed
-     * @return ResultOk if create successfully
-     * @note OpSendMsg's sendCallback_ must be set even if it failed
-     */
-    virtual Result createOpSendMsg(OpSendMsg& opSendMsg,
-                                   const FlushCallback& flushCallback = 
nullptr) const = 0;
+    virtual std::unique_ptr<OpSendMsg> createOpSendMsg(const FlushCallback& 
flushCallback = nullptr) {
+        throw std::runtime_error("createOpSendMsg is not supported");
+    }
 
-    /**
-     * Create a OpSendMsg list to send
-     *
-     * @param opSendMsgList the OpSendMsg list to create
-     * @param flushCallback the callback to trigger after the OpSendMsg was 
completed
-     * @return all create results of `opSendMsgs`, ResultOk means create 
successfully
-     * @note OpSendMsg's sendCallback_ must be set even if it failed
-     */
-    virtual std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& 
opSendMsgs,
-                                                 const FlushCallback& 
flushCallback = nullptr) const = 0;
+    virtual std::vector<std::unique_ptr<OpSendMsg>> createOpSendMsgs(
+        const FlushCallback& flushCallback = nullptr) {
+        throw std::runtime_error("createOpSendMsgs is not supported");
+    }
 
     /**
      * Serialize into a std::ostream for logging
@@ -110,9 +88,6 @@ class BatchMessageContainerBase : public boost::noncopyable {
     bool hasEnoughSpace(const Message& msg) const noexcept;
     bool isEmpty() const noexcept;
 
-    void processAndClear(std::function<void(Result, const OpSendMsg&)> 
opSendMsgCallback,
-                         FlushCallback flushCallback);
-
    protected:
     // references to ProducerImpl's fields
     const std::shared_ptr<std::string> topicName_;
@@ -134,8 +109,10 @@ class BatchMessageContainerBase : public 
boost::noncopyable {
     void updateStats(const Message& msg);
     void resetStats();
 
-    Result createOpSendMsgHelper(OpSendMsg& opSendMsg, const FlushCallback& 
flushCallback,
-                                 const MessageAndCallbackBatch& batch) const;
+    std::unique_ptr<OpSendMsg> createOpSendMsgHelper(const FlushCallback& 
flushCallback,
+                                                     const 
MessageAndCallbackBatch& batch) const;
+
+    virtual void clear() = 0;
 };
 
 inline bool BatchMessageContainerBase::hasEnoughSpace(const Message& msg) 
const noexcept {
diff --git a/lib/BatchMessageKeyBasedContainer.cc 
b/lib/BatchMessageKeyBasedContainer.cc
index 05baf34..e88674b 100644
--- a/lib/BatchMessageKeyBasedContainer.cc
+++ b/lib/BatchMessageKeyBasedContainer.cc
@@ -72,16 +72,8 @@ void BatchMessageKeyBasedContainer::clear() {
     LOG_DEBUG(*this << " clear() called");
 }
 
-Result BatchMessageKeyBasedContainer::createOpSendMsg(OpSendMsg& opSendMsg,
-                                                      const FlushCallback& 
flushCallback) const {
-    if (batches_.size() < 1) {
-        return ResultOperationNotSupported;
-    }
-    return createOpSendMsgHelper(opSendMsg, flushCallback, 
batches_.begin()->second);
-}
-
-std::vector<Result> BatchMessageKeyBasedContainer::createOpSendMsgs(
-    std::vector<OpSendMsg>& opSendMsgs, const FlushCallback& flushCallback) 
const {
+std::vector<std::unique_ptr<OpSendMsg>> 
BatchMessageKeyBasedContainer::createOpSendMsgs(
+    const FlushCallback& flushCallback) {
     // Sorted the batches by sequence id
     std::vector<const MessageAndCallbackBatch*> sortedBatches;
     for (const auto& kv : batches_) {
@@ -92,18 +84,15 @@ std::vector<Result> 
BatchMessageKeyBasedContainer::createOpSendMsgs(
                   return lhs->sequenceId() < rhs->sequenceId();
               });
 
-    size_t numBatches = sortedBatches.size();
-    opSendMsgs.resize(numBatches);
-
-    std::vector<Result> results(numBatches);
-    for (size_t i = 0; i + 1 < numBatches; i++) {
-        results[i] = createOpSendMsgHelper(opSendMsgs[i], nullptr, 
*sortedBatches[i]);
+    std::vector<std::unique_ptr<OpSendMsg>> opSendMsgs{sortedBatches.size()};
+    for (size_t i = 0; i + 1 < opSendMsgs.size(); i++) {
+        opSendMsgs[i].reset(createOpSendMsgHelper(nullptr, 
*sortedBatches[i]).release());
     }
-    if (numBatches > 0) {
-        // Add flush callback to the last batch
-        results.back() = createOpSendMsgHelper(opSendMsgs.back(), 
flushCallback, *sortedBatches.back());
+    if (!opSendMsgs.empty()) {
+        opSendMsgs.back().reset(createOpSendMsgHelper(flushCallback, 
*sortedBatches.back()).release());
     }
-    return results;
+    clear();
+    return opSendMsgs;
 }
 
 void BatchMessageKeyBasedContainer::serialize(std::ostream& os) const {
diff --git a/lib/BatchMessageKeyBasedContainer.h 
b/lib/BatchMessageKeyBasedContainer.h
index f580a05..e534fba 100644
--- a/lib/BatchMessageKeyBasedContainer.h
+++ b/lib/BatchMessageKeyBasedContainer.h
@@ -32,18 +32,13 @@ class BatchMessageKeyBasedContainer : public 
BatchMessageContainerBase {
 
     ~BatchMessageKeyBasedContainer();
 
-    size_t getNumBatches() const override { return batches_.size(); }
+    bool hasMultiOpSendMsgs() const override { return true; }
 
     bool isFirstMessageToAdd(const Message& msg) const override;
 
     bool add(const Message& msg, const SendCallback& callback) override;
 
-    void clear() override;
-
-    Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& 
flushCallback) const override;
-
-    std::vector<Result> createOpSendMsgs(std::vector<OpSendMsg>& opSendMsgs,
-                                         const FlushCallback& flushCallback) 
const override;
+    std::vector<std::unique_ptr<OpSendMsg>> createOpSendMsgs(const 
FlushCallback& flushCallback) override;
 
     void serialize(std::ostream& os) const override;
 
@@ -53,8 +48,7 @@ class BatchMessageKeyBasedContainer : public 
BatchMessageContainerBase {
     size_t numberOfBatchesSent_ = 0;
     double averageBatchSize_ = 0;
 
-    Result createOpSendMsg(OpSendMsg& opSendMsg, const FlushCallback& 
flushCallback,
-                           MessageAndCallbackBatch& batch) const;
+    void clear() override;
 };
 
 }  // namespace pulsar
diff --git a/lib/ClientConnection.cc b/lib/ClientConnection.cc
index 2a63c7a..4e1a667 100644
--- a/lib/ClientConnection.cc
+++ b/lib/ClientConnection.cc
@@ -1025,37 +1025,30 @@ void ClientConnection::sendCommandInternal(const 
SharedBuffer& cmd) {
                    std::bind(&ClientConnection::handleSend, 
shared_from_this(), std::placeholders::_1, cmd)));
 }
 
-void ClientConnection::sendMessage(const OpSendMsg& opSend) {
+void ClientConnection::sendMessage(const std::shared_ptr<SendArguments>& args) 
{
     Lock lock(mutex_);
-
-    if (pendingWriteOperations_++ == 0) {
-        // Write immediately to socket
-        if (tlsSocket_) {
+    if (pendingWriteOperations_++ > 0) {
+        pendingWriteBuffers_.emplace_back(args);
+        return;
+    }
+    auto self = shared_from_this();
+    auto sendMessageInternal = [this, self, args] {
+        BaseCommand outgoingCmd;
+        auto buffer = Commands::newSend(outgoingBuffer_, outgoingCmd, 
getChecksumType(), *args);
+        asyncWrite(buffer, 
customAllocReadHandler(std::bind(&ClientConnection::handleSendPair,
+                                                            
shared_from_this(), std::placeholders::_1)));
+    };
+    if (tlsSocket_) {
 #if BOOST_VERSION >= 106600
-            boost::asio::post(strand_,
-                              
std::bind(&ClientConnection::sendMessageInternal, shared_from_this(), opSend));
+        boost::asio::post(strand_, sendMessageInternal);
 #else
-            strand_.post(std::bind(&ClientConnection::sendMessageInternal, 
shared_from_this(), opSend));
+        strand_.post(sendMessageInternal);
 #endif
-        } else {
-            sendMessageInternal(opSend);
-        }
     } else {
-        // Queue to send later
-        pendingWriteBuffers_.push_back(opSend);
+        sendMessageInternal();
     }
 }
 
-void ClientConnection::sendMessageInternal(const OpSendMsg& opSend) {
-    BaseCommand outgoingCmd;
-    PairSharedBuffer buffer =
-        Commands::newSend(outgoingBuffer_, outgoingCmd, opSend.producerId_, 
opSend.sequenceId_,
-                          getChecksumType(), opSend.metadata_, 
opSend.payload_);
-
-    asyncWrite(buffer, 
customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
-                                                         shared_from_this(), 
std::placeholders::_1)));
-}
-
 void ClientConnection::handleSend(const boost::system::error_code& err, const 
SharedBuffer&) {
     if (err) {
         LOG_WARN(cnxString_ << "Could not send message on connection: " << err 
<< " " << err.message());
@@ -1088,13 +1081,12 @@ void ClientConnection::sendPendingCommands() {
                        
customAllocWriteHandler(std::bind(&ClientConnection::handleSend, 
shared_from_this(),
                                                          
std::placeholders::_1, buffer)));
         } else {
-            assert(any.type() == typeid(OpSendMsg));
+            assert(any.type() == typeid(std::shared_ptr<SendArguments>));
 
-            const OpSendMsg& op = boost::any_cast<const OpSendMsg&>(any);
+            auto args = boost::any_cast<std::shared_ptr<SendArguments>>(any);
             BaseCommand outgoingCmd;
             PairSharedBuffer buffer =
-                Commands::newSend(outgoingBuffer_, outgoingCmd, 
op.producerId_, op.sequenceId_,
-                                  getChecksumType(), op.metadata_, 
op.payload_);
+                Commands::newSend(outgoingBuffer_, outgoingCmd, 
getChecksumType(), *args);
 
             asyncWrite(buffer, 
customAllocWriteHandler(std::bind(&ClientConnection::handleSendPair,
                                                                  
shared_from_this(), std::placeholders::_1)));
diff --git a/lib/ClientConnection.h b/lib/ClientConnection.h
index 38b814c..9abff9d 100644
--- a/lib/ClientConnection.h
+++ b/lib/ClientConnection.h
@@ -69,8 +69,7 @@ typedef std::weak_ptr<ConsumerImpl> ConsumerImplWeakPtr;
 class LookupDataResult;
 class BrokerConsumerStatsImpl;
 class PeriodicTask;
-
-struct OpSendMsg;
+struct SendArguments;
 
 namespace proto {
 class BaseCommand;
@@ -153,8 +152,7 @@ class PULSAR_PUBLIC ClientConnection : public 
std::enable_shared_from_this<Clien
 
     void sendCommand(const SharedBuffer& cmd);
     void sendCommandInternal(const SharedBuffer& cmd);
-    void sendMessage(const OpSendMsg& opSend);
-    void sendMessageInternal(const OpSendMsg& opSend);
+    void sendMessage(const std::shared_ptr<SendArguments>& args);
 
     void registerProducer(int producerId, ProducerImplPtr producer);
     void registerConsumer(int consumerId, ConsumerImplPtr consumer);
diff --git a/lib/Commands.cc b/lib/Commands.cc
index d9251a0..5b1734c 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -32,6 +32,7 @@
 #include "ChunkMessageIdImpl.h"
 #include "LogUtils.h"
 #include "MessageImpl.h"
+#include "OpSendMsg.h"
 #include "PulsarApi.pb.h"
 #include "Url.h"
 #include "checksum/ChecksumProvider.h"
@@ -193,13 +194,13 @@ SharedBuffer Commands::newConsumerStats(uint64_t 
consumerId, uint64_t requestId)
     return buffer;
 }
 
-PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, 
uint64_t producerId,
-                                   uint64_t sequenceId, ChecksumType 
checksumType,
-                                   const proto::MessageMetadata& metadata, 
const SharedBuffer& payload) {
+PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, 
ChecksumType checksumType,
+                                   const SendArguments& args) {
     cmd.set_type(BaseCommand::SEND);
     CommandSend* send = cmd.mutable_send();
-    send->set_producer_id(producerId);
-    send->set_sequence_id(sequenceId);
+    send->set_producer_id(args.producerId);
+    send->set_sequence_id(args.sequenceId);
+    const auto& metadata = args.metadata;
     if (metadata.has_num_messages_in_batch()) {
         send->set_num_messages(metadata.num_messages_in_batch());
     }
@@ -210,8 +211,9 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, 
BaseCommand& cmd, uint
     // / Wire format
     // [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] 
[METADATA_SIZE][METADATA] [PAYLOAD]
 
-    int cmdSize = cmd.ByteSize();
-    int msgMetadataSize = metadata.ByteSize();
+    int cmdSize = cmd.ByteSizeLong();
+    int msgMetadataSize = metadata.ByteSizeLong();
+    const auto& payload = args.payload;
     int payloadSize = payload.readableBytes();
 
     int magicAndChecksumLength = (Crc32c == (checksumType)) ? (2 + 4 /* magic 
+ checksumLength*/) : 0;
diff --git a/lib/Commands.h b/lib/Commands.h
index c21adb4..22a4b7b 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -40,6 +40,7 @@ using BatchMessageAckerPtr = 
std::shared_ptr<BatchMessageAcker>;
 class MessageIdImpl;
 using MessageIdImplPtr = std::shared_ptr<MessageIdImpl>;
 class BitSet;
+struct SendArguments;
 
 namespace proto {
 class BaseCommand;
@@ -98,9 +99,8 @@ class Commands {
     static SharedBuffer newGetSchema(const std::string& topic, const 
std::string& version,
                                      uint64_t requestId);
 
-    static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& 
cmd, uint64_t producerId,
-                                    uint64_t sequenceId, ChecksumType 
checksumType,
-                                    const proto::MessageMetadata& metadata, 
const SharedBuffer& payload);
+    static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& 
cmd, ChecksumType checksumType,
+                                    const SendArguments& args);
 
     static SharedBuffer newSubscribe(
         const std::string& topic, const std::string& subscription, uint64_t 
consumerId, uint64_t requestId,
diff --git a/lib/MessageAndCallbackBatch.cc b/lib/MessageAndCallbackBatch.cc
index 5672538..e17330c 100644
--- a/lib/MessageAndCallbackBatch.cc
+++ b/lib/MessageAndCallbackBatch.cc
@@ -64,10 +64,17 @@ void MessageAndCallbackBatch::complete(Result result, const 
MessageId& id) const
     completeSendCallbacks(callbacks_, result, id);
 }
 
-SendCallback MessageAndCallbackBatch::createSendCallback() const {
+SendCallback MessageAndCallbackBatch::createSendCallback(const FlushCallback& 
flushCallback) const {
     const auto& callbacks = callbacks_;
-    return [callbacks]  // save a copy of `callbacks_`
-        (Result result, const MessageId& id) { 
completeSendCallbacks(callbacks, result, id); };
+    if (flushCallback) {
+        return [callbacks, flushCallback](Result result, const MessageId& id) {
+            completeSendCallbacks(callbacks, result, id);
+            flushCallback(result);
+        };
+    } else {
+        return [callbacks]  // save a copy of `callbacks_`
+            (Result result, const MessageId& id) { 
completeSendCallbacks(callbacks, result, id); };
+    }
 }
 
 }  // namespace pulsar
diff --git a/lib/MessageAndCallbackBatch.h b/lib/MessageAndCallbackBatch.h
index 3d107c6..e8717b3 100644
--- a/lib/MessageAndCallbackBatch.h
+++ b/lib/MessageAndCallbackBatch.h
@@ -30,6 +30,7 @@ namespace pulsar {
 
 class MessageImpl;
 using MessageImplPtr = std::shared_ptr<MessageImpl>;
+using FlushCallback = std::function<void(Result)>;
 
 class MessageAndCallbackBatch : public boost::noncopyable {
    public:
@@ -58,14 +59,7 @@ class MessageAndCallbackBatch : public boost::noncopyable {
      */
     void complete(Result result, const MessageId& id) const;
 
-    /**
-     * Create a single callback to trigger all the internal callbacks in order
-     * It's used when you want to clear and add new messages and callbacks but 
current callbacks need to be
-     * triggered later.
-     *
-     * @return the merged send callback
-     */
-    SendCallback createSendCallback() const;
+    SendCallback createSendCallback(const FlushCallback& flushCallback) const;
 
     const MessageImplPtr& msgImpl() const { return msgImpl_; }
     uint64_t sequenceId() const noexcept { return sequenceId_; }
diff --git a/lib/OpSendMsg.h b/lib/OpSendMsg.h
index d365b90..0b06285 100644
--- a/lib/OpSendMsg.h
+++ b/lib/OpSendMsg.h
@@ -21,6 +21,7 @@
 
 #include <pulsar/Message.h>
 #include <pulsar/Producer.h>
+#include <pulsar/Result.h>
 
 #include <boost/date_time/posix_time/ptime.hpp>
 
@@ -31,46 +32,72 @@
 
 namespace pulsar {
 
-struct OpSendMsg {
-    proto::MessageMetadata metadata_;
-    SharedBuffer payload_;
-    SendCallback sendCallback_;
-    uint64_t producerId_;
-    uint64_t sequenceId_;
-    boost::posix_time::ptime timeout_;
-    uint32_t messagesCount_;
-    uint64_t messagesSize_;
-    std::vector<std::function<void(Result)>> trackerCallbacks_;
-    ChunkMessageIdImplPtr chunkedMessageId_;
+struct SendArguments {
+    const uint64_t producerId;
+    const uint64_t sequenceId;
+    const proto::MessageMetadata metadata;
+    const SharedBuffer payload;
 
-    OpSendMsg() = default;
+    SendArguments(uint64_t producerId, uint64_t sequenceId, const 
proto::MessageMetadata& metadata,
+                  const SharedBuffer& payload)
+        : producerId(producerId), sequenceId(sequenceId), metadata(metadata), 
payload(payload) {}
+    SendArguments(const SendArguments&) = delete;
+    SendArguments& operator=(const SendArguments&) = delete;
+};
 
-    OpSendMsg(const proto::MessageMetadata& metadata, const SharedBuffer& 
payload,
-              const SendCallback& sendCallback, uint64_t producerId, uint64_t 
sequenceId, int sendTimeoutMs,
-              uint32_t messagesCount, uint64_t messagesSize, 
ChunkMessageIdImplPtr chunkedMessageId = nullptr)
-        : metadata_(metadata),  // the copy happens here because OpSendMsg of 
chunks are constructed with
-                                // a shared metadata object
-          payload_(payload),
-          sendCallback_(sendCallback),
-          producerId_(producerId),
-          sequenceId_(sequenceId),
-          timeout_(TimeUtils::now() + milliseconds(sendTimeoutMs)),
-          messagesCount_(messagesCount),
-          messagesSize_(messagesSize),
-          chunkedMessageId_(chunkedMessageId) {}
+struct OpSendMsg {
+    const Result result;
+    const int32_t chunkId;
+    const int32_t numChunks;
+    const uint32_t messagesCount;
+    const uint64_t messagesSize;
+    const boost::posix_time::ptime timeout;
+    const SendCallback sendCallback;
+    std::vector<std::function<void(Result)>> trackerCallbacks;
+    ChunkMessageIdImplPtr chunkedMessageId;
+    // Use shared_ptr here because producer might resend the message with the 
same arguments
+    const std::shared_ptr<SendArguments> sendArgs;
+
+    template <typename... Args>
+    static std::unique_ptr<OpSendMsg> create(Args&&... args) {
+        return std::unique_ptr<OpSendMsg>(new 
OpSendMsg(std::forward<Args>(args)...));
+    }
 
     void complete(Result result, const MessageId& messageId) const {
-        if (sendCallback_) {
-            sendCallback_(result, messageId);
+        if (sendCallback) {
+            sendCallback(result, messageId);
         }
-        for (const auto& trackerCallback : trackerCallbacks_) {
+        for (const auto& trackerCallback : trackerCallbacks) {
             trackerCallback(result);
         }
     }
 
     void addTrackerCallback(std::function<void(Result)> trackerCallback) {
-        trackerCallbacks_.emplace_back(trackerCallback);
+        trackerCallbacks.emplace_back(trackerCallback);
     }
+
+   private:
+    OpSendMsg(Result result, SendCallback&& callback)
+        : result(result),
+          chunkId(-1),
+          numChunks(-1),
+          messagesCount(0),
+          messagesSize(0),
+          sendCallback(std::move(callback)),
+          sendArgs(nullptr) {}
+
+    OpSendMsg(const proto::MessageMetadata& metadata, uint32_t messagesCount, 
uint64_t messagesSize,
+              int sendTimeoutMs, SendCallback&& callback, 
ChunkMessageIdImplPtr chunkedMessageId,
+              uint64_t producerId, SharedBuffer payload)
+        : result(ResultOk),
+          chunkId(metadata.chunk_id()),
+          numChunks(metadata.num_chunks_from_msg()),
+          messagesCount(messagesCount),
+          messagesSize(messagesSize),
+          timeout(TimeUtils::now() + 
boost::posix_time::milliseconds(sendTimeoutMs)),
+          sendCallback(std::move(callback)),
+          chunkedMessageId(chunkedMessageId),
+          sendArgs(new SendArguments(producerId, metadata.sequence_id(), 
metadata, payload)) {}
 };
 
 }  // namespace pulsar
diff --git a/lib/ProducerImpl.cc b/lib/ProducerImpl.cc
index 71559ff..d8f02be 100644
--- a/lib/ProducerImpl.cc
+++ b/lib/ProducerImpl.cc
@@ -45,16 +45,6 @@
 namespace pulsar {
 DECLARE_LOG_OBJECT()
 
-struct ProducerImpl::PendingCallbacks {
-    std::vector<OpSendMsg> opSendMsgs;
-
-    void complete(Result result) {
-        for (const auto& opSendMsg : opSendMsgs) {
-            opSendMsg.complete(result, {});
-        }
-    }
-};
-
 ProducerImpl::ProducerImpl(ClientImplPtr client, const TopicName& topicName,
                            const ProducerConfiguration& conf, const 
ProducerInterceptorsPtr& interceptors,
                            int32_t partition)
@@ -64,7 +54,6 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const 
TopicName& topicName,
                           milliseconds(std::max(100, conf.getSendTimeout() - 
100)))),
       conf_(conf),
       semaphore_(),
-      pendingMessagesQueue_(),
       partition_(partition),
       producerName_(conf_.getProducerName()),
       userProvidedProducerName_(false),
@@ -297,43 +286,46 @@ void ProducerImpl::handleCreateProducer(const 
ClientConnectionPtr& cnx, Result r
     }
 }
 
-std::shared_ptr<ProducerImpl::PendingCallbacks> 
ProducerImpl::getPendingCallbacksWhenFailed() {
-    auto callbacks = std::make_shared<PendingCallbacks>();
-    callbacks->opSendMsgs.reserve(pendingMessagesQueue_.size());
+auto ProducerImpl::getPendingCallbacksWhenFailed() -> 
decltype(pendingMessagesQueue_) {
+    decltype(pendingMessagesQueue_) pendingMessages;
     LOG_DEBUG(getName() << "# messages in pending queue : " << 
pendingMessagesQueue_.size());
 
-    // Iterate over a copy of the pending messages queue, to trigger the 
future completion
-    // without holding producer mutex.
-    for (auto& op : pendingMessagesQueue_) {
-        callbacks->opSendMsgs.push_back(op);
-        releaseSemaphoreForSendOp(op);
+    pendingMessages.swap(pendingMessagesQueue_);
+    for (const auto& op : pendingMessages) {
+        releaseSemaphoreForSendOp(*op);
     }
 
-    if (batchMessageContainer_) {
-        batchMessageContainer_->processAndClear(
-            [this, &callbacks](Result result, const OpSendMsg& opSendMsg) {
-                if (result == ResultOk) {
-                    callbacks->opSendMsgs.emplace_back(opSendMsg);
-                }
-                releaseSemaphoreForSendOp(opSendMsg);
-            },
-            nullptr);
+    if (!batchMessageContainer_ || batchMessageContainer_->isEmpty()) {
+        return pendingMessages;
     }
-    pendingMessagesQueue_.clear();
 
-    return callbacks;
+    auto handleOp = [this, &pendingMessages](std::unique_ptr<OpSendMsg>&& op) {
+        releaseSemaphoreForSendOp(*op);
+        if (op->result == ResultOk) {
+            pendingMessages.emplace_back(std::move(op));
+        }
+    };
+
+    if (batchMessageContainer_->hasMultiOpSendMsgs()) {
+        auto opSendMsgs = batchMessageContainer_->createOpSendMsgs();
+        for (auto&& op : opSendMsgs) {
+            handleOp(std::move(op));
+        }
+    } else {
+        handleOp(batchMessageContainer_->createOpSendMsg());
+    }
+    return pendingMessages;
 }
 
-std::shared_ptr<ProducerImpl::PendingCallbacks> 
ProducerImpl::getPendingCallbacksWhenFailedWithLock() {
+auto ProducerImpl::getPendingCallbacksWhenFailedWithLock() -> 
decltype(pendingMessagesQueue_) {
     Lock lock(mutex_);
     return getPendingCallbacksWhenFailed();
 }
 
 void ProducerImpl::failPendingMessages(Result result, bool withLock) {
-    if (withLock) {
-        getPendingCallbacksWhenFailedWithLock()->complete(result);
-    } else {
-        getPendingCallbacksWhenFailed()->complete(result);
+    auto opSendMsgs = withLock ? getPendingCallbacksWhenFailedWithLock() : 
getPendingCallbacksWhenFailed();
+    for (const auto& op : opSendMsgs) {
+        op->complete(result, {});
     }
 }
 
@@ -345,8 +337,8 @@ void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
     LOG_DEBUG(getName() << "Re-Sending " << pendingMessagesQueue_.size() << " 
messages to server");
 
     for (const auto& op : pendingMessagesQueue_) {
-        LOG_DEBUG(getName() << "Re-Sending " << op.sequenceId_);
-        cnx->sendMessage(op);
+        LOG_DEBUG(getName() << "Re-Sending " << op->sendArgs->sequenceId);
+        cnx->sendMessage(op->sendArgs);
     }
 }
 
@@ -378,7 +370,7 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
             auto& opSendMsg = pendingMessagesQueue_.back();
             lock.unlock();
             failures.complete();
-            opSendMsg.addTrackerCallback(callback);
+            opSendMsg->addTrackerCallback(callback);
         } else {
             lock.unlock();
             failures.complete();
@@ -389,7 +381,7 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
         if (!pendingMessagesQueue_.empty()) {
             auto& opSendMsg = pendingMessagesQueue_.back();
             lock.unlock();
-            opSendMsg.addTrackerCallback(callback);
+            opSendMsg->addTrackerCallback(callback);
         } else {
             lock.unlock();
             callback(ResultOk);
@@ -462,7 +454,7 @@ void ProducerImpl::sendAsync(const Message& msg, 
SendCallback callback) {
     });
 }
 
-void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, const 
SendCallback& callback) {
+void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& 
callback) {
     if (!isValidProducerState(callback)) {
         return;
     }
@@ -601,17 +593,18 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const 
Message& msg, const SendCallba
                 handleFailedResult(ResultCryptoError);
                 return;
             }
-            OpSendMsg op{msgMetadata, encryptedPayload, (chunkId == 
totalChunks - 1) ? callback : nullptr,
-                         producerId_, sequenceId,       conf_.getSendTimeout(),
-                         1,           uncompressedSize, chunkMessageId};
+
+            auto op = OpSendMsg::create(msgMetadata, 1, uncompressedSize, 
conf_.getSendTimeout(),
+                                        (chunkId == totalChunks - 1) ? 
callback : nullptr, chunkMessageId,
+                                        producerId_, encryptedPayload);
 
             if (!chunkingEnabled_) {
-                const uint32_t msgMetadataSize = op.metadata_.ByteSize();
-                const uint32_t payloadSize = op.payload_.readableBytes();
+                const uint32_t msgMetadataSize = 
op->sendArgs->metadata.ByteSizeLong();
+                const uint32_t payloadSize = 
op->sendArgs->payload.readableBytes();
                 const uint32_t msgHeadersAndPayloadSize = msgMetadataSize + 
payloadSize;
                 if (msgHeadersAndPayloadSize > maxMessageSize) {
                     lock.unlock();
-                    releaseSemaphoreForSendOp(op);
+                    releaseSemaphoreForSendOp(*op);
                     LOG_WARN(getName()
                              << " - compressed Message size " << 
msgHeadersAndPayloadSize << " cannot exceed "
                              << maxMessageSize << " bytes unless chunking is 
enabled");
@@ -620,7 +613,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& 
msg, const SendCallba
                 }
             }
 
-            sendMessage(op);
+            sendMessage(std::move(op));
         }
     }
 }
@@ -667,10 +660,10 @@ void ProducerImpl::releaseSemaphore(uint32_t payloadSize) 
{
 
 void ProducerImpl::releaseSemaphoreForSendOp(const OpSendMsg& op) {
     if (semaphore_) {
-        semaphore_->release(op.messagesCount_);
+        semaphore_->release(op.messagesCount);
     }
 
-    memoryLimitController_.releaseMemory(op.messagesSize_);
+    memoryLimitController_.releaseMemory(op.messagesSize);
 }
 
 // It must be called while `mutex_` is acquired
@@ -678,37 +671,50 @@ PendingFailures ProducerImpl::batchMessageAndSend(const 
FlushCallback& flushCall
     PendingFailures failures;
     LOG_DEBUG("batchMessageAndSend " << *batchMessageContainer_);
     batchTimer_->cancel();
+    if (batchMessageContainer_->isEmpty()) {
+        return failures;
+    }
 
-    batchMessageContainer_->processAndClear(
-        [this, &failures](Result result, const OpSendMsg& opSendMsg) {
-            if (result == ResultOk) {
-                sendMessage(opSendMsg);
-            } else {
-                // A spot has been reserved for this batch, but the batch 
failed to be pushed to the queue, so
-                // we need to release the spot manually
-                LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " 
<< result);
-                releaseSemaphoreForSendOp(opSendMsg);
-                failures.add([opSendMsg, result] { opSendMsg.complete(result, 
{}); });
-            }
-        },
-        flushCallback);
+    auto handleOp = [this, &failures](std::unique_ptr<OpSendMsg>&& op) {
+        if (op->result == ResultOk) {
+            sendMessage(std::move(op));
+        } else {
+            LOG_ERROR("batchMessageAndSend | Failed to createOpSendMsg: " << 
op->result);
+            releaseSemaphoreForSendOp(*op);
+            auto rawOpPtr = op.release();
+            failures.add([rawOpPtr] {
+                std::unique_ptr<OpSendMsg> op{rawOpPtr};
+                op->complete(op->result, {});
+            });
+        }
+    };
+
+    if (batchMessageContainer_->hasMultiOpSendMsgs()) {
+        auto opSendMsgs = 
batchMessageContainer_->createOpSendMsgs(flushCallback);
+        for (auto&& op : opSendMsgs) {
+            handleOp(std::move(op));
+        }
+    } else {
+        handleOp(batchMessageContainer_->createOpSendMsg(flushCallback));
+    }
     return failures;
 }
 
 // Precondition -
 // a. we have a reserved spot on the queue
 // b. call this function after acquiring the ProducerImpl mutex_
-void ProducerImpl::sendMessage(const OpSendMsg& op) {
-    const auto sequenceId = op.metadata_.sequence_id();
+void ProducerImpl::sendMessage(std::unique_ptr<OpSendMsg> opSendMsg) {
+    const auto sequenceId = opSendMsg->sendArgs->sequenceId;
     LOG_DEBUG("Inserting data to pendingMessagesQueue_");
-    pendingMessagesQueue_.push_back(op);
+    auto args = opSendMsg->sendArgs;
+    pendingMessagesQueue_.emplace_back(std::move(opSendMsg));
 
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         // If we do have a connection, the message is sent immediately, 
otherwise
         // we'll try again once a new connection is established
         LOG_DEBUG(getName() << "Sending msg immediately - seq: " << 
sequenceId);
-        cnx->sendMessage(op);
+        cnx->sendMessage(args);
     } else {
         LOG_DEBUG(getName() << "Connection is not ready - seq: " << 
sequenceId);
     }
@@ -808,7 +814,7 @@ void ProducerImpl::handleSendTimeout(const 
boost::system::error_code& err) {
         return;
     }
 
-    std::shared_ptr<PendingCallbacks> pendingCallbacks;
+    decltype(pendingMessagesQueue_) pendingMessages;
     if (pendingMessagesQueue_.empty()) {
         // If there are no pending messages, reset the timeout to the 
configured value.
         LOG_DEBUG(getName() << "Producer timeout triggered on empty pending 
message queue");
@@ -816,11 +822,11 @@ void ProducerImpl::handleSendTimeout(const 
boost::system::error_code& err) {
     } else {
         // If there is at least one message, calculate the diff between the 
message timeout and
         // the current time.
-        time_duration diff = pendingMessagesQueue_.front().timeout_ - 
TimeUtils::now();
+        time_duration diff = pendingMessagesQueue_.front()->timeout - 
TimeUtils::now();
         if (diff.total_milliseconds() <= 0) {
             // The diff is less than or equal to zero, meaning that the 
message has been expired.
             LOG_DEBUG(getName() << "Timer expired. Calling timeout 
callbacks.");
-            pendingCallbacks = getPendingCallbacksWhenFailed();
+            pendingMessages = getPendingCallbacksWhenFailed();
             // Since the pending queue is cleared now, set timer to expire 
after configured value.
             asyncWaitSendTimeout(milliseconds(conf_.getSendTimeout()));
         } else {
@@ -831,8 +837,8 @@ void ProducerImpl::handleSendTimeout(const 
boost::system::error_code& err) {
     }
 
     lock.unlock();
-    if (pendingCallbacks) {
-        pendingCallbacks->complete(ResultTimeout);
+    for (const auto& op : pendingMessages) {
+        op->complete(ResultTimeout, {});
     }
 }
 
@@ -844,8 +850,8 @@ bool ProducerImpl::removeCorruptMessage(uint64_t 
sequenceId) {
         return true;
     }
 
-    OpSendMsg op = pendingMessagesQueue_.front();
-    uint64_t expectedSequenceId = op.sequenceId_;
+    std::unique_ptr<OpSendMsg> 
op{std::move(pendingMessagesQueue_.front().release())};
+    uint64_t expectedSequenceId = op->sendArgs->sequenceId;
     if (sequenceId > expectedSequenceId) {
         LOG_WARN(getName() << "Got ack failure for msg " << sequenceId         
       //
                            << " expecting: " << expectedSequenceId << " queue 
size="  //
@@ -860,11 +866,11 @@ bool ProducerImpl::removeCorruptMessage(uint64_t 
sequenceId) {
         lock.unlock();
         try {
             // to protect from client callback exception
-            op.complete(ResultChecksumError, {});
+            op->complete(ResultChecksumError, {});
         } catch (const std::exception& e) {
             LOG_ERROR(getName() << "Exception thrown from callback " << 
e.what());
         }
-        releaseSemaphoreForSendOp(op);
+        releaseSemaphoreForSendOp(*op);
         return true;
     }
 }
@@ -880,8 +886,14 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, 
MessageId& rawMessageId) {
         return true;
     }
 
-    OpSendMsg op = pendingMessagesQueue_.front();
-    uint64_t expectedSequenceId = op.sequenceId_;
+    const auto& op = *pendingMessagesQueue_.front();
+    if (op.result != ResultOk) {
+        LOG_ERROR("Unexpected OpSendMsg whose result is " << op.result << " 
for " << sequenceId << " and "
+                                                          << rawMessageId);
+        return false;
+    }
+
+    uint64_t expectedSequenceId = op.sendArgs->sequenceId;
     if (sequenceId > expectedSequenceId) {
         LOG_WARN(getName() << "Got ack for msg " << sequenceId                 
       //
                            << " expecting: " << expectedSequenceId << " queue 
size="  //
@@ -898,24 +910,25 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId, 
MessageId& rawMessageId) {
     // Message was persisted correctly
     LOG_DEBUG(getName() << "Received ack for msg " << sequenceId);
 
-    if (op.chunkedMessageId_) {
+    if (op.chunkedMessageId) {
         // Handling the chunk message id.
-        if (op.metadata_.chunk_id() == 0) {
-            op.chunkedMessageId_->setFirstChunkMessageId(messageId);
-        } else if (op.metadata_.chunk_id() == 
op.metadata_.num_chunks_from_msg() - 1) {
-            op.chunkedMessageId_->setLastChunkMessageId(messageId);
-            messageId = op.chunkedMessageId_->build();
+        if (op.chunkId == 0) {
+            op.chunkedMessageId->setFirstChunkMessageId(messageId);
+        } else if (op.chunkId == op.numChunks - 1) {
+            op.chunkedMessageId->setLastChunkMessageId(messageId);
+            messageId = op.chunkedMessageId->build();
         }
     }
 
     releaseSemaphoreForSendOp(op);
-    lastSequenceIdPublished_ = sequenceId + op.messagesCount_ - 1;
+    lastSequenceIdPublished_ = sequenceId + op.messagesCount - 1;
 
+    std::unique_ptr<OpSendMsg> 
opSendMsg{pendingMessagesQueue_.front().release()};
     pendingMessagesQueue_.pop_front();
 
     lock.unlock();
     try {
-        op.complete(ResultOk, messageId);
+        opSendMsg->complete(ResultOk, messageId);
     } catch (const std::exception& e) {
         LOG_ERROR(getName() << "Exception thrown from callback " << e.what());
     }
diff --git a/lib/ProducerImpl.h b/lib/ProducerImpl.h
index afc6346..8611bfe 100644
--- a/lib/ProducerImpl.h
+++ b/lib/ProducerImpl.h
@@ -104,11 +104,9 @@ class ProducerImpl : public HandlerBase,
    protected:
     ProducerStatsBasePtr producerStatsBasePtr_;
 
-    typedef std::deque<OpSendMsg> MessageQueue;
-
     void setMessageMetadata(const Message& msg, const uint64_t& sequenceId, 
const uint32_t& uncompressedSize);
 
-    void sendMessage(const OpSendMsg& opSendMsg);
+    void sendMessage(std::unique_ptr<OpSendMsg> opSendMsg);
 
     void startSendTimeoutTimer();
 
@@ -138,7 +136,7 @@ class ProducerImpl : public HandlerBase,
     bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& 
payload,
                         SharedBuffer& encryptedPayload);
 
-    void sendAsyncWithStatsUpdate(const Message& msg, const SendCallback& 
callback);
+    void sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& callback);
 
     /**
      * Reserve a spot in the messages queue before acquiring the ProducerImpl 
mutex. When the queue is full,
@@ -163,7 +161,7 @@ class ProducerImpl : public HandlerBase,
     ProducerConfiguration conf_;
 
     std::unique_ptr<Semaphore> semaphore_;
-    MessageQueue pendingMessagesQueue_;
+    std::list<std::unique_ptr<OpSendMsg>> pendingMessagesQueue_;
 
     const int32_t partition_;  // -1 if topic is non-partitioned
     std::string producerName_;
@@ -187,8 +185,8 @@ class ProducerImpl : public HandlerBase,
     Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
 
     struct PendingCallbacks;
-    std::shared_ptr<PendingCallbacks> getPendingCallbacksWhenFailed();
-    std::shared_ptr<PendingCallbacks> getPendingCallbacksWhenFailedWithLock();
+    decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailed();
+    decltype(pendingMessagesQueue_) getPendingCallbacksWhenFailedWithLock();
 
     void failPendingMessages(Result result, bool withLock);
 


Reply via email to