This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 74c694f  [C++] Fix segfault when get topic name from received message 
id (#10006)
74c694f is described below

commit 74c694f20ba27c17c5fd225971be279b0eb748b4
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Mar 23 03:56:30 2021 +0800

    [C++] Fix segfault when get topic name from received message id (#10006)
    
    * Fix segfault when get topic name from received message id
    
    * Add tests for non-batched message
---
 pulsar-client-cpp/lib/ConsumerImpl.cc   |  1 +
 pulsar-client-cpp/tests/ConsumerTest.cc | 53 +++++++++++++++++++++++++++++++++
 2 files changed, 54 insertions(+)

diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 5bd54ed..47250c8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -427,6 +427,7 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
         // This is a cheap copy since message contains only one shared pointer 
(impl_)
         Message msg = 
Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
         msg.impl_->setRedeliveryCount(redeliveryCount);
+        msg.impl_->setTopicName(batchedMessage.getTopicName());
 
         if (startMessageId_.is_present()) {
             const MessageId& msgId = msg.getMessageId();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc 
b/pulsar-client-cpp/tests/ConsumerTest.cc
index e0d4091..05147f8 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -413,4 +413,57 @@ TEST(ConsumerTest, testBatchUnAckedMessageTracker) {
     client.close();
 }
 
+TEST(ConsumerTest, testGetTopicNameFromReceivedMessage) {
+    // topic1 and topic2 are non-partitioned topics, topic3 is a partitioned 
topic
+    const std::string topic1 = "testGetTopicNameFromReceivedMessage1-" + 
std::to_string(time(nullptr));
+    const std::string topic2 = "testGetTopicNameFromReceivedMessage2-" + 
std::to_string(time(nullptr));
+    const std::string topic3 = "testGetTopicNameFromReceivedMessage3-" + 
std::to_string(time(nullptr));
+    int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" 
+ topic3 + "/partitions", "3");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Client client(lookupUrl);
+
+    auto sendMessage = [&client](const std::string& topic, bool 
enabledBatching) {
+        const auto producerConf = 
ProducerConfiguration().setBatchingEnabled(enabledBatching);
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, 
producer));
+        ASSERT_EQ(ResultOk, 
producer.send(MessageBuilder().setContent("hello").build()));
+        LOG_INFO("Send 'hello' to " << topic);
+    };
+    auto validateTopicName = [](Consumer& consumer, const std::string& topic) {
+        Message msg;
+        ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+
+        const auto fullTopic = "persistent://public/default/" + topic;
+        ASSERT_EQ(msg.getTopicName(), fullTopic);
+        ASSERT_EQ(msg.getMessageId().getTopicName(), fullTopic);
+    };
+
+    // 1. ConsumerImpl
+    Consumer consumer1;
+    ASSERT_EQ(ResultOk, client.subscribe(topic1, "sub-1", consumer1));
+
+    // 2. MultiTopicsConsumerImpl
+    Consumer consumer2;
+    ASSERT_EQ(ResultOk, client.subscribe({topic1, topic2}, "sub-2", 
consumer2));
+
+    sendMessage(topic1, true);
+    validateTopicName(consumer1, topic1);
+    validateTopicName(consumer2, topic1);
+    sendMessage(topic1, false);
+    validateTopicName(consumer1, topic1);
+    validateTopicName(consumer2, topic1);
+
+    // 3. PartitionedConsumerImpl
+    Consumer consumer3;
+    ASSERT_EQ(ResultOk, client.subscribe(topic3, "sub-3", consumer3));
+    const auto partition = topic3 + "-partition-0";
+    sendMessage(partition, true);
+    validateTopicName(consumer3, partition);
+    sendMessage(partition, false);
+    validateTopicName(consumer3, partition);
+
+    client.close();
+}
+
 }  // namespace pulsar

Reply via email to