This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 74c694f [C++] Fix segfault when get topic name from received message
id (#10006)
74c694f is described below
commit 74c694f20ba27c17c5fd225971be279b0eb748b4
Author: Yunze Xu <[email protected]>
AuthorDate: Tue Mar 23 03:56:30 2021 +0800
[C++] Fix segfault when get topic name from received message id (#10006)
* Fix segfault when get topic name from received message id
* Add tests for non-batched message
---
pulsar-client-cpp/lib/ConsumerImpl.cc | 1 +
pulsar-client-cpp/tests/ConsumerTest.cc | 53 +++++++++++++++++++++++++++++++++
2 files changed, 54 insertions(+)
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 5bd54ed..47250c8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -427,6 +427,7 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
// This is a cheap copy since message contains only one shared pointer
(impl_)
Message msg =
Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
msg.impl_->setRedeliveryCount(redeliveryCount);
+ msg.impl_->setTopicName(batchedMessage.getTopicName());
if (startMessageId_.is_present()) {
const MessageId& msgId = msg.getMessageId();
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc
b/pulsar-client-cpp/tests/ConsumerTest.cc
index e0d4091..05147f8 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -413,4 +413,57 @@ TEST(ConsumerTest, testBatchUnAckedMessageTracker) {
client.close();
}
+TEST(ConsumerTest, testGetTopicNameFromReceivedMessage) {
+ // topic1 and topic2 are non-partitioned topics, topic3 is a partitioned
topic
+ const std::string topic1 = "testGetTopicNameFromReceivedMessage1-" +
std::to_string(time(nullptr));
+ const std::string topic2 = "testGetTopicNameFromReceivedMessage2-" +
std::to_string(time(nullptr));
+ const std::string topic3 = "testGetTopicNameFromReceivedMessage3-" +
std::to_string(time(nullptr));
+ int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/"
+ topic3 + "/partitions", "3");
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Client client(lookupUrl);
+
+ auto sendMessage = [&client](const std::string& topic, bool
enabledBatching) {
+ const auto producerConf =
ProducerConfiguration().setBatchingEnabled(enabledBatching);
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf,
producer));
+ ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("hello").build()));
+ LOG_INFO("Send 'hello' to " << topic);
+ };
+ auto validateTopicName = [](Consumer& consumer, const std::string& topic) {
+ Message msg;
+ ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
+
+ const auto fullTopic = "persistent://public/default/" + topic;
+ ASSERT_EQ(msg.getTopicName(), fullTopic);
+ ASSERT_EQ(msg.getMessageId().getTopicName(), fullTopic);
+ };
+
+ // 1. ConsumerImpl
+ Consumer consumer1;
+ ASSERT_EQ(ResultOk, client.subscribe(topic1, "sub-1", consumer1));
+
+ // 2. MultiTopicsConsumerImpl
+ Consumer consumer2;
+ ASSERT_EQ(ResultOk, client.subscribe({topic1, topic2}, "sub-2",
consumer2));
+
+ sendMessage(topic1, true);
+ validateTopicName(consumer1, topic1);
+ validateTopicName(consumer2, topic1);
+ sendMessage(topic1, false);
+ validateTopicName(consumer1, topic1);
+ validateTopicName(consumer2, topic1);
+
+ // 3. PartitionedConsumerImpl
+ Consumer consumer3;
+ ASSERT_EQ(ResultOk, client.subscribe(topic3, "sub-3", consumer3));
+ const auto partition = topic3 + "-partition-0";
+ sendMessage(partition, true);
+ validateTopicName(consumer3, partition);
+ sendMessage(partition, false);
+ validateTopicName(consumer3, partition);
+
+ client.close();
+}
+
} // namespace pulsar