This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.7 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 53404620d039dd4ebbdc8bea9fe4aa6c70ce4ad9 Author: Yunze Xu <[email protected]> AuthorDate: Tue Mar 23 03:56:30 2021 +0800 [C++] Fix segfault when get topic name from received message id (#10006) * Fix segfault when get topic name from received message id * Add tests for non-batched message (cherry picked from commit 74c694f20ba27c17c5fd225971be279b0eb748b4) --- pulsar-client-cpp/lib/ConsumerImpl.cc | 1 + pulsar-client-cpp/tests/ConsumerTest.cc | 52 +++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index f6163d7..d75d018 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -427,6 +427,7 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection // This is a cheap copy since message contains only one shared pointer (impl_) Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i); msg.impl_->setRedeliveryCount(redeliveryCount); + msg.impl_->setTopicName(batchedMessage.getTopicName()); if (startMessageId_.is_present()) { const MessageId& msgId = msg.getMessageId(); diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc index 2278c05..c080cad 100644 --- a/pulsar-client-cpp/tests/ConsumerTest.cc +++ b/pulsar-client-cpp/tests/ConsumerTest.cc @@ -345,4 +345,56 @@ TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery) { client.close(); } +TEST(ConsumerTest, testGetTopicNameFromReceivedMessage) { + // topic1 and topic2 are non-partitioned topics, topic3 is a partitioned topic + const std::string topic1 = "testGetTopicNameFromReceivedMessage1-" + std::to_string(time(nullptr)); + const std::string topic2 = "testGetTopicNameFromReceivedMessage2-" + std::to_string(time(nullptr)); + const std::string topic3 = "testGetTopicNameFromReceivedMessage3-" + std::to_string(time(nullptr)); + int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic3 + "/partitions", "3"); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + Client client(lookupUrl); + + auto sendMessage = [&client](const std::string& topic, bool enabledBatching) { + const auto producerConf = ProducerConfiguration().setBatchingEnabled(enabledBatching); + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer)); + ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build())); + LOG_INFO("Send 'hello' to " << topic); + }; + auto validateTopicName = [](Consumer& consumer, const std::string& topic) { + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); + + const auto fullTopic = "persistent://public/default/" + topic; + ASSERT_EQ(msg.getTopicName(), fullTopic); + ASSERT_EQ(msg.getMessageId().getTopicName(), fullTopic); + }; + + // 1. ConsumerImpl + Consumer consumer1; + ASSERT_EQ(ResultOk, client.subscribe(topic1, "sub-1", consumer1)); + + // 2. MultiTopicsConsumerImpl + Consumer consumer2; + ASSERT_EQ(ResultOk, client.subscribe({topic1, topic2}, "sub-2", consumer2)); + + sendMessage(topic1, true); + validateTopicName(consumer1, topic1); + validateTopicName(consumer2, topic1); + sendMessage(topic1, false); + validateTopicName(consumer1, topic1); + validateTopicName(consumer2, topic1); + + // 3. PartitionedConsumerImpl + Consumer consumer3; + ASSERT_EQ(ResultOk, client.subscribe(topic3, "sub-3", consumer3)); + const auto partition = topic3 + "-partition-0"; + sendMessage(partition, true); + validateTopicName(consumer3, partition); + sendMessage(partition, false); + validateTopicName(consumer3, partition); + + client.close(); +} } // namespace pulsar
