This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fc69a62  [C++] Fix message id is always the default value in send 
callback (#6812)
fc69a62 is described below

commit fc69a628abb92e3b5ecd8e98b8b00cc3738f4603
Author: BewareMyPower <[email protected]>
AuthorDate: Sat Apr 25 14:57:26 2020 +0800

    [C++] Fix message id is always the default value in send callback (#6812)
    
    * Fix bug: sendCallback's 2nd argument was always the default MessageId
    
    * Set batch index for each message's callback of batch
    
    * Add test for message id in send callback
    
    * Ensure all send callbacks completed before ASSERT_EQ
---
 pulsar-client-cpp/lib/BatchMessageContainer.cc | 12 ++++---
 pulsar-client-cpp/lib/BatchMessageContainer.h  | 10 +++---
 pulsar-client-cpp/lib/ProducerImpl.cc          |  2 +-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc   | 37 +++++++++++++++++++
 pulsar-client-cpp/tests/BatchMessageTest.cc    | 50 +++++++++++++++++++++++++-
 5 files changed, 99 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc 
b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index 9f904a2..7413d57 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -67,7 +67,7 @@ void BatchMessageContainer::add(const Message& msg, 
SendCallback sendCallback, b
                                                        
maxAllowedMessageBatchSizeInBytes_);
     LOG_DEBUG(*this << " After serialization payload size in bytes = " << 
impl_->payload.readableBytes());
 
-    messagesContainerListPtr_->push_back(MessageContainer(msg, sendCallback, 
msg.getMessageId()));
+    messagesContainerListPtr_->emplace_back(msg, sendCallback);
 
     LOG_DEBUG(*this << " Number of messages in Batch = " << 
messagesContainerListPtr_->size());
     LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
@@ -105,7 +105,7 @@ void BatchMessageContainer::sendMessage(FlushCallback 
flushCallback) {
     if (impl_->payload.readableBytes() > producer_.keepMaxMessageSize_) {
         // At this point the compressed batch is above the overall 
MaxMessageSize. There
         // can only 1 single message in the batch at this point.
-        batchMessageCallBack(ResultMessageTooBig, messagesContainerListPtr_, 
nullptr);
+        batchMessageCallBack(ResultMessageTooBig, MessageId{}, 
messagesContainerListPtr_, nullptr);
         clear();
         return;
     }
@@ -115,7 +115,7 @@ void BatchMessageContainer::sendMessage(FlushCallback 
flushCallback) {
 
     // bind keeps a copy of the parameters
     SendCallback callback = 
std::bind(&BatchMessageContainer::batchMessageCallBack, std::placeholders::_1,
-                                      messagesContainerListPtr_, 
flushCallback);
+                                      std::placeholders::_2, 
messagesContainerListPtr_, flushCallback);
 
     producer_.sendMessage(msg, callback);
     clear();
@@ -144,7 +144,8 @@ void BatchMessageContainer::clear() {
     batchSizeInBytes_ = 0;
 }
 
-void BatchMessageContainer::batchMessageCallBack(Result r, 
MessageContainerListPtr messagesContainerListPtr,
+void BatchMessageContainer::batchMessageCallBack(Result r, const MessageId& 
messageId,
+                                                 MessageContainerListPtr 
messagesContainerListPtr,
                                                  FlushCallback flushCallback) {
     if (!messagesContainerListPtr) {
         if (flushCallback) {
@@ -156,7 +157,8 @@ void BatchMessageContainer::batchMessageCallBack(Result r, 
MessageContainerListP
               << r << "] [numOfMessages = " << 
messagesContainerListPtr->size() << "]");
     size_t batch_size = messagesContainerListPtr->size();
     for (size_t i = 0; i < batch_size; i++) {
-        messagesContainerListPtr->operator[](i).callBack(r);
+        MessageId messageIdInBatch(messageId.partition(), 
messageId.ledgerId(), messageId.entryId(), i);
+        messagesContainerListPtr->operator[](i).callBack(r, messageIdInBatch);
     }
     if (flushCallback) {
         flushCallback(ResultOk);
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.h 
b/pulsar-client-cpp/lib/BatchMessageContainer.h
index 424c7f4..93b4d81 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.h
@@ -45,12 +45,11 @@ namespace pulsar {
 class BatchMessageContainer {
    public:
     struct MessageContainer {
-        MessageContainer(Message message, SendCallback sendCallback, MessageId 
messageId)
-            : message_(message), sendCallback_(sendCallback), 
messageId_(messageId) {}
+        MessageContainer(Message message, SendCallback sendCallback)
+            : message_(message), sendCallback_(sendCallback) {}
         Message message_;
         SendCallback sendCallback_;
-        MessageId messageId_;
-        void callBack(const pulsar::Result& r) { sendCallback_(r, messageId_); 
}
+        void callBack(Result r, const MessageId& messageId) { sendCallback_(r, 
messageId); }
     };
     typedef std::vector<MessageContainer> MessageContainerList;
     typedef std::shared_ptr<MessageContainerList> MessageContainerListPtr;
@@ -65,7 +64,8 @@ class BatchMessageContainer {
 
     void clear();
 
-    static void batchMessageCallBack(Result r, MessageContainerListPtr 
messages, FlushCallback callback);
+    static void batchMessageCallBack(Result r, const MessageId& messageId, 
MessageContainerListPtr messages,
+                                     FlushCallback callback);
 
     friend inline std::ostream& operator<<(std::ostream& os,
                                            const BatchMessageContainer& 
batchMessageContainer);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 4a79848..8dad6b7 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -259,7 +259,7 @@ void ProducerImpl::failPendingMessages(Result result) {
     }
 
     // this function can handle null pointer
-    BatchMessageContainer::batchMessageCallBack(result, 
messageContainerListPtr, NULL);
+    BatchMessageContainer::batchMessageCallBack(result, MessageId{}, 
messageContainerListPtr, NULL);
 }
 
 void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 0010c96..ccff387 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -3186,3 +3186,40 @@ TEST(BasicEndToEndTest, 
testCumulativeAcknowledgeNotAllowed) {
     }
     client.shutdown();
 }
+
+TEST(BasicEndToEndTest, testSendCallback) {
+    const std::string topicName = 
"persistent://public/default/BasicEndToEndTest-testSendCallback";
+
+    Client client(lookupUrl);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", 
consumer));
+
+    Latch latch(100);
+    std::set<MessageId> sentIdSet;
+    for (int i = 0; i < 100; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const 
MessageId &id) {
+            ASSERT_EQ(ResultOk, result);
+            sentIdSet.emplace(id);
+            latch.countdown();
+        });
+    }
+
+    std::set<MessageId> receivedIdSet;
+    for (int i = 0; i < 100; i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        receivedIdSet.emplace(msg.getMessageId());
+    }
+
+    latch.wait();
+    ASSERT_EQ(sentIdSet, receivedIdSet);
+
+    consumer.close();
+    producer.close();
+    client.close();
+}
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc 
b/pulsar-client-cpp/tests/BatchMessageTest.cc
index 3fe46ed..f9638f8 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -25,6 +25,7 @@
 
 #include <lib/Commands.h>
 #include <lib/Future.h>
+#include <lib/Latch.h>
 #include <lib/LogUtils.h>
 #include <lib/TopicName.h>
 #include <lib/Utils.h>
@@ -982,4 +983,51 @@ TEST(BatchMessageTest, testPraseMessageBatchEntry) {
         ASSERT_EQ(message.getDataAsString(), expected.content);
         ASSERT_EQ(message.getProperty(expected.propKey), expected.propValue);
     }
-}
\ No newline at end of file
+}
+
+TEST(BatchMessageTest, testSendCallback) {
+    const std::string topicName = 
"persistent://public/default/BasicMessageTest-testSendCallback";
+
+    Client client(lookupUrl);
+
+    constexpr int numMessagesOfBatch = 3;
+
+    ProducerConfiguration producerConfig;
+    producerConfig.setBatchingEnabled(5);
+    producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
+    producerConfig.setBatchingMaxPublishDelayMs(1000);  // 1 s, it's long 
enough for 3 messages batched
+    producerConfig.setMaxPendingMessages(numMessagesOfBatch);
+
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, 
producer));
+
+    Consumer consumer;
+    ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", 
consumer));
+
+    Latch latch(numMessagesOfBatch);
+    std::set<MessageId> sentIdSet;
+    for (int i = 0; i < numMessagesOfBatch; i++) {
+        const auto msg = MessageBuilder().setContent("a").build();
+        producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const 
MessageId& id) {
+            ASSERT_EQ(ResultOk, result);
+            ASSERT_EQ(i, id.batchIndex());
+            sentIdSet.emplace(id);
+            LOG_INFO("id of batch " << i << ": " << id);
+            latch.countdown();
+        });
+    }
+
+    std::set<MessageId> receivedIdSet;
+    for (int i = 0; i < numMessagesOfBatch; i++) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        receivedIdSet.emplace(msg.getMessageId());
+    }
+
+    latch.wait();
+    ASSERT_EQ(sentIdSet, receivedIdSet);
+
+    consumer.close();
+    producer.close();
+    client.close();
+}

Reply via email to