This is an automated email from the ASF dual-hosted git repository. zhaijia pushed a commit to branch branch-2.5 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3421e75e0035063a8eeb2c927f3f23da64c05dff Author: Yunze Xu <[email protected]> AuthorDate: Tue May 12 12:12:43 2020 +0800 Fix message id error if messages were sent to a partitioned topic (#6938) ### Motivation If messages were sent to a partitioned topic, the message id's `partition` field was always -1 because SendReceipt command only contains ledger id and entry id. ### Modifications - Add a `partition` field to `ProducerImpl` and set the `MessageId`'s `partition` field with it in `ackReceived` method later. - Add a test to check message id in send callback if messages were sent to a partitioned topic. (cherry picked from commit 15cb920b394874d37039df5e7665092651c28fae) --- pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 2 +- pulsar-client-cpp/lib/ProducerImpl.cc | 8 +++- pulsar-client-cpp/lib/ProducerImpl.h | 3 +- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 54 ++++++++++++++++++++++++ 4 files changed, 63 insertions(+), 4 deletions(-) diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 0461ee3..628afbc 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -90,7 +90,7 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const { ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const { using namespace std::placeholders; std::string topicPartitionName = topicName_->getTopicPartitionName(partition); - auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_); + auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition); producer->getProducerCreatedFuture().addListener( std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated, const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition)); diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 0095dc8..a2547cd 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -39,13 +39,15 @@ OpSendMsg::OpSendMsg(uint64_t producerId, uint64_t sequenceId, const Message& ms sequenceId_(sequenceId), timeout_(TimeUtils::now() + milliseconds(conf.getSendTimeout())) {} -ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf) +ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const ProducerConfiguration& conf, + int32_t partition) : HandlerBase( client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(std::max(100, conf.getSendTimeout() - 100)))), conf_(conf), executor_(client->getIOExecutorProvider()->get()), pendingMessagesQueue_(conf_.getMaxPendingMessages()), + partition_(partition), producerName_(conf_.getProducerName()), producerStr_("[" + topic_ + ", " + producerName_ + "] "), producerId_(client->newProducerId()), @@ -627,7 +629,9 @@ bool ProducerImpl::removeCorruptMessage(uint64_t sequenceId) { } } -bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& messageId) { +bool ProducerImpl::ackReceived(uint64_t sequenceId, MessageId& rawMessageId) { + MessageId messageId(partition_, rawMessageId.ledgerId(), rawMessageId.entryId(), + rawMessageId.batchIndex()); OpSendMsg op; Lock lock(mutex_); bool havePendingAck = pendingMessagesQueue_.peek(op); diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index 80927b1..e4f35d4 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -63,7 +63,7 @@ class ProducerImpl : public HandlerBase, public ProducerImplBase { public: ProducerImpl(ClientImplPtr client, const std::string& topic, - const ProducerConfiguration& producerConfiguration); + const ProducerConfiguration& producerConfiguration, int32_t partition = -1); ~ProducerImpl(); int keepMaxMessageSize_; @@ -150,6 +150,7 @@ class ProducerImpl : public HandlerBase, MessageQueue pendingMessagesQueue_; + int32_t partition_; // -1 if topic is non-partitioned std::string producerName_; std::string producerStr_; uint64_t producerId_; diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 99b7f87..5277ca4 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -3207,11 +3207,65 @@ TEST(BasicEndToEndTest, testSendCallback) { Message msg; ASSERT_EQ(ResultOk, consumer.receive(msg)); receivedIdSet.emplace(msg.getMessageId()); + consumer.acknowledge(msg); + } + + latch.wait(); + ASSERT_EQ(sentIdSet, receivedIdSet); + + consumer.close(); + producer.close(); + + const std::string partitionedTopicName = topicName + "-" + std::to_string(time(nullptr)); + const std::string url = adminUrl + "admin/v2/persistent/" + + partitionedTopicName.substr(partitionedTopicName.find("://") + 3) + "/partitions"; + const int numPartitions = 3; + + int res = makePutRequest(url, std::to_string(numPartitions)); + ASSERT_TRUE(res == 204 || res == 409) << "res: " << res; + + std::this_thread::sleep_for(std::chrono::seconds(2)); + + ProducerConfiguration producerConfig; + producerConfig.setBatchingEnabled(false); + producerConfig.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + ASSERT_EQ(ResultOk, client.createProducer(partitionedTopicName, producerConfig, producer)); + ASSERT_EQ(ResultOk, client.subscribe(partitionedTopicName, "SubscriptionName", consumer)); + + sentIdSet.clear(); + receivedIdSet.clear(); + + const int numMessages = numPartitions * 2; + latch = Latch(numMessages); + for (int i = 0; i < numMessages; i++) { + const auto msg = MessageBuilder().setContent("a").build(); + producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId &id) { + ASSERT_EQ(ResultOk, result); + sentIdSet.emplace(id); + latch.countdown(); + }); + } + + for (int i = 0; i < numMessages; i++) { + Message msg; + ASSERT_EQ(ResultOk, consumer.receive(msg)); + receivedIdSet.emplace(msg.getMessageId()); + consumer.acknowledge(msg); } latch.wait(); ASSERT_EQ(sentIdSet, receivedIdSet); + std::set<int> partitionIndexSet; + for (const auto &id : sentIdSet) { + partitionIndexSet.emplace(id.partition()); + } + std::set<int> expectedPartitionIndexSet; + for (int i = 0; i < numPartitions; i++) { + expectedPartitionIndexSet.emplace(i); + } + ASSERT_EQ(sentIdSet, receivedIdSet); + consumer.close(); producer.close(); client.close();
