This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push: new ba52218 Fix possible crash caused by MessageId::getTopicName (#225) ba52218 is described below commit ba5221883772201497a45711caaf748189624a6b Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Tue Mar 21 17:59:49 2023 +0800 Fix possible crash caused by MessageId::getTopicName (#225) ### Motivation Currently if a `MessageId` does not have a topic name, the `getTopicName` method will dereference a null pointer, which leads to a crash. This case usually happens when an invalid message is acknowledged, like https://github.com/apache/pulsar-client-cpp/issues/224. ### Modifications Return the const reference to an empty string in `getTopicName` when the inner topic name field is a null pointer. Then, return `ResultOperationNotSupported` when acknowledging such invalid messages. --- include/pulsar/MessageId.h | 2 ++ lib/MessageIdImpl.h | 5 ++++- lib/MultiTopicsConsumerImpl.cc | 10 ++++++++++ tests/AcknowledgeTest.cc | 13 +++++++++++++ tests/BasicEndToEndTest.cc | 2 +- 5 files changed, 30 insertions(+), 2 deletions(-) diff --git a/include/pulsar/MessageId.h b/include/pulsar/MessageId.h index e859e3c..a05a8fb 100644 --- a/include/pulsar/MessageId.h +++ b/include/pulsar/MessageId.h @@ -67,6 +67,8 @@ class PULSAR_PUBLIC MessageId { /** * Get the topic Name from which this message originated from + * + * @return the topic name or an empty string if there is no topic name */ const std::string& getTopicName() const; diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h index dbab71a..f96b176 100644 --- a/lib/MessageIdImpl.h +++ b/lib/MessageIdImpl.h @@ -68,7 +68,10 @@ class MessageIdImpl { int32_t batchIndex_ = -1; int32_t batchSize_ = 0; - const std::string& getTopicName() { return *topicName_; } + const std::string& getTopicName() { + static const std::string EMPTY_TOPIC = ""; + return topicName_ ? *topicName_ : EMPTY_TOPIC; + } void setTopicName(const std::shared_ptr<std::string>& topicName) { topicName_ = topicName; } virtual const BitSet& getBitSet() const noexcept { diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index e443d9a..878955f 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -661,6 +661,11 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCal } const std::string& topicPartitionName = msgId.getTopicName(); + if (topicPartitionName.empty()) { + LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer"); + callback(ResultOperationNotSupported); + return; + } auto optConsumer = consumers_.find(topicPartitionName); if (optConsumer) { @@ -681,6 +686,11 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis std::unordered_map<std::string, MessageIdList> topicToMessageId; for (const MessageId& messageId : messageIdList) { auto topicName = messageId.getTopicName(); + if (topicName.empty()) { + LOG_ERROR("MessageId without a topic name cannot be acknowledged for a multi-topics consumer"); + callback(ResultOperationNotSupported); + return; + } topicToMessageId[topicName].emplace_back(messageId); } diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc index e30c7b1..0818466 100644 --- a/tests/AcknowledgeTest.cc +++ b/tests/AcknowledgeTest.cc @@ -302,4 +302,17 @@ TEST_F(AcknowledgeTest, testMixedCumulativeAck) { ASSERT_EQ(ResultTimeout, consumer.getConsumer().receive(msg, 1000)); } +TEST_F(AcknowledgeTest, testInvalidMessageId) { + Client client(lookupUrl); + std::vector<std::string> topics{"test-invalid-message-id0" + unique_str(), + "test-invalid-message-id1" + unique_str()}; + Consumer consumer; + ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", consumer)); + + Message msg; + ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msg)); + msg = MessageBuilder().setContent("msg").build(); + ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msg)); +} + INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest, testing::Values(100, 0)); diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc index d193017..e668f6f 100644 --- a/tests/BasicEndToEndTest.cc +++ b/tests/BasicEndToEndTest.cc @@ -537,7 +537,7 @@ void testPartitionedProducerConsumer(bool lazyStartPartitionedProducers, std::st ASSERT_EQ(consumer.getSubscriptionName(), "subscription-A"); for (int i = 0; i < 10; i++) { Message m; - consumer.receive(m, 10000); + ASSERT_EQ(ResultOk, consumer.receive(m, 10000)); consumer.acknowledge(m); } client.shutdown();