This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5efafc5 Use private impl for MessageId in c++ client (#1322) 5efafc5 is described below commit 5efafc57fd81eaf622fe65548facfec00af5b49b Author: Matteo Merli <mme...@apache.org> AuthorDate: Sun Mar 25 16:20:58 2018 -0700 Use private impl for MessageId in c++ client (#1322) --- pulsar-client-cpp/include/pulsar/BatchMessageId.h | 67 ------------------ pulsar-client-cpp/include/pulsar/Message.h | 10 +-- pulsar-client-cpp/include/pulsar/MessageId.h | 33 +++++---- .../lib/BatchAcknowledgementTracker.cc | 40 ++++++----- .../lib/BatchAcknowledgementTracker.h | 27 ++++---- pulsar-client-cpp/lib/BatchMessageId.cc | 76 -------------------- pulsar-client-cpp/lib/ClientImpl.cc | 4 +- pulsar-client-cpp/lib/ClientImpl.h | 2 +- pulsar-client-cpp/lib/Commands.cc | 16 +++-- pulsar-client-cpp/lib/Commands.h | 5 +- pulsar-client-cpp/lib/ConsumerImpl.cc | 65 ++++++++---------- pulsar-client-cpp/lib/ConsumerImpl.h | 14 ++-- pulsar-client-cpp/lib/Message.cc | 11 +-- pulsar-client-cpp/lib/MessageId.cc | 80 ++++++++++++++-------- .../MessageIdTest.cc => lib/MessageIdImpl.h} | 28 ++++---- pulsar-client-cpp/lib/MessageImpl.h | 3 +- pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 4 +- pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 1 - pulsar-client-cpp/lib/ReaderImpl.cc | 10 ++- pulsar-client-cpp/lib/ReaderImpl.h | 2 +- .../lib/UnAckedMessageTrackerEnabled.cc | 4 +- pulsar-client-cpp/python/src/client.cc | 2 +- pulsar-client-cpp/python/src/message.cc | 14 ++-- pulsar-client-cpp/tests/BatchMessageTest.cc | 2 +- pulsar-client-cpp/tests/MessageIdTest.cc | 9 +-- pulsar-client-cpp/tests/PulsarFriend.h | 8 ++- pulsar-client-cpp/tests/ReaderTest.cc | 2 +- 27 files changed, 213 insertions(+), 326 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/BatchMessageId.h b/pulsar-client-cpp/include/pulsar/BatchMessageId.h deleted file mode 100644 index b647886..0000000 --- a/pulsar-client-cpp/include/pulsar/BatchMessageId.h +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#ifndef LIB_BATCHMESSAGEID_H_ -#define LIB_BATCHMESSAGEID_H_ - -#include <pulsar/MessageId.h> -#include <iosfwd> - -#pragma GCC visibility push(default) - -namespace pulsar { - -class PulsarWrapper; - -class BatchMessageId : public MessageId { - public: - BatchMessageId(int64_t ledgerId, int64_t entryId, int batchIndex = -1) - : MessageId(ledgerId, entryId), batchIndex_(batchIndex) {} - - BatchMessageId(const MessageId& msgId); - - BatchMessageId() : batchIndex_(-1) {} - - virtual void serialize(std::string& result) const; - - // These functions compare the message order as stored in bookkeeper - bool operator<(const BatchMessageId& other) const; - bool operator<=(const BatchMessageId& other) const; - bool operator==(const BatchMessageId& other) const; - - protected: - virtual int64_t getBatchIndex() const; - - friend class Commands; - friend class ConsumerImpl; - friend class ReaderImpl; - friend class Message; - friend class MessageImpl; - friend class PartitionedProducerImpl; - friend class PartitionedConsumerImpl; - friend class BatchAcknowledgementTracker; - friend class PulsarWrapper; - friend class PulsarFriend; - int64_t batchIndex_; - - friend std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId); -}; -} // namespace pulsar -#pragma GCC visibility pop - -#endif /* LIB_BATCHMESSAGEID_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h index b98b48b..aff0d94 100644 --- a/pulsar-client-cpp/include/pulsar/Message.h +++ b/pulsar-client-cpp/include/pulsar/Message.h @@ -23,7 +23,8 @@ #include <string> #include <boost/shared_ptr.hpp> -#include "BatchMessageId.h" + +#include "MessageId.h" #pragma GCC visibility push(default) @@ -39,8 +40,6 @@ class MessageBuilder; class MessageImpl; class PulsarWrapper; -// TODO: When releasing 2.0.0, make all methods virtual and create the virtual destructor for Google Mock -// tests class Message { public: typedef std::map<std::string, std::string> StringMap; @@ -128,9 +127,10 @@ class Message { MessageImplPtr impl_; Message(MessageImplPtr& impl); - Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload); + Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, SharedBuffer& payload, + int32_t partition); /// Used for Batch Messages - Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload, + Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload, proto::SingleMessageMetadata& singleMetadata); friend class PartitionedProducerImpl; friend class PartitionedConsumerImpl; diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h index 40fe5d0..149d177 100644 --- a/pulsar-client-cpp/include/pulsar/MessageId.h +++ b/pulsar-client-cpp/include/pulsar/MessageId.h @@ -27,15 +27,12 @@ namespace pulsar { -class ConsumerImpl; -class UnAckedMessageTrackerEnabled; -class PulsarWrapper; +class MessageIdImpl; class MessageId { public: MessageId& operator=(const MessageId&); MessageId(); - virtual ~MessageId() {} /** * MessageId representing the "earliest" or "oldest available" message stored in the topic @@ -50,34 +47,44 @@ class MessageId { /** * Serialize the message id into a binary string for storing */ - virtual void serialize(std::string& result) const; + void serialize(std::string& result) const; /** * Deserialize a message id from a binary string */ - static boost::shared_ptr<MessageId> deserialize(const std::string& serializedMessageId); + static MessageId deserialize(const std::string& serializedMessageId); // These functions compare the message order as stored in bookkeeper bool operator<(const MessageId& other) const; + bool operator<=(const MessageId& other) const; + bool operator>(const MessageId& other) const; + bool operator>=(const MessageId& other) const; bool operator==(const MessageId& other) const; + bool operator!=(const MessageId& other) const; - protected: - virtual int64_t getBatchIndex() const; + private: friend class ConsumerImpl; + friend class ReaderImpl; friend class Message; friend class MessageImpl; friend class Commands; - friend class BatchMessageId; friend class PartitionedProducerImpl; friend class PartitionedConsumerImpl; friend class UnAckedMessageTrackerEnabled; friend class BatchAcknowledgementTracker; friend class PulsarWrapper; - MessageId(int64_t, int64_t); + friend class PulsarFriend; + + explicit MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex); friend std::ostream& operator<<(std::ostream& s, const MessageId& messageId); - int64_t ledgerId_; - int64_t entryId_ : 48; - short partition_ : 16; + + int64_t ledgerId() const; + int64_t entryId() const; + int32_t batchIndex() const; + int32_t partition() const; + + typedef boost::shared_ptr<MessageIdImpl> MessageIdImplPtr; + MessageIdImplPtr impl_; }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc index 868c050..0dcf3be 100644 --- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc +++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc @@ -24,7 +24,7 @@ DECLARE_LOG_OBJECT() BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic, const std::string subscription, const long consumerId) - : greatestCumulativeAckSent_(BatchMessageId()) { + : greatestCumulativeAckSent_() { std::stringstream consumerStrStream; consumerStrStream << "BatchAcknowledgementTracker for [" << topic << ", " << subscription << ", " << consumerId << "] "; @@ -43,7 +43,7 @@ void BatchAcknowledgementTracker::receivedMessage(const Message& message) { return; } Lock lock(mutex_); - BatchMessageId msgID = message.impl_->messageId; + MessageId msgID = message.impl_->messageId; // ignore message if it is less than the last cumulative ack sent or messageID is already being tracked TrackerMap::iterator pos = trackerMap_.find(msgID); @@ -60,10 +60,10 @@ void BatchAcknowledgementTracker::receivedMessage(const Message& message) { TrackerPair(msgID, boost::dynamic_bitset<>(message.impl_->metadata.num_messages_in_batch()).set())); } -void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messageId, +void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& messageId, proto::CommandAck_AckType ackType) { // Not a batch message and a individual ack - if (messageId.batchIndex_ == -1 && ackType == proto::CommandAck_AckType_Individual) { + if (messageId.batchIndex() == -1 && ackType == proto::CommandAck_AckType_Individual) { return; } @@ -104,18 +104,22 @@ void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messa } } -bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID, +bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID, const proto::CommandAck_AckType ackType) { Lock lock(mutex_); - TrackerMap::iterator pos = trackerMap_.find(msgID); - if (pos == trackerMap_.end() || std::find(sendList_.begin(), sendList_.end(), msgID) != sendList_.end()) { + // Remove batch index + MessageId batchMessageId = MessageId(msgID.partition(), msgID.ledgerId(), msgID.entryId(), + -1 /* Batch index */); + + TrackerMap::iterator pos = trackerMap_.find(batchMessageId); + if (pos == trackerMap_.end() || std::find(sendList_.begin(), sendList_.end(), batchMessageId) != sendList_.end()) { LOG_DEBUG( "Batch is ready since message present in sendList_ or not present in trackerMap_ [message ID = " - << msgID << "]"); + << batchMessageId << "]"); return true; } - int batchIndex = msgID.batchIndex_; + int batchIndex = msgID.batchIndex(); assert(batchIndex < pos->second.size()); pos->second.set(batchIndex, false); @@ -128,7 +132,7 @@ bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID, if (pos->second.any()) { return false; } - sendList_.push_back(msgID); + sendList_.push_back(batchMessageId); trackerMap_.erase(pos); LOG_DEBUG("Batch is ready since message all bits are reset in trackerMap_ [message ID = " << msgID << "]"); @@ -138,22 +142,24 @@ bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID, // returns // - a batch message id < messageId // - same messageId if it is the last message in the batch -const BatchMessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady( - const BatchMessageId& messageId) { +const MessageId BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const MessageId& messageId) { Lock lock(mutex_); - BatchMessageId messageReadyForCumulativeAck = BatchMessageId(); - TrackerMap::iterator pos = trackerMap_.find(messageId); + + // Remove batch index + MessageId batchMessageId = MessageId(messageId.partition(), messageId.ledgerId(), + messageId.entryId(), -1 /* Batch index */); + TrackerMap::iterator pos = trackerMap_.find(batchMessageId); // element not found if (pos == trackerMap_.end()) { - return BatchMessageId(); + return MessageId(); } - if (pos->second.size() - 1 != messageId.batchIndex_) { + if (pos->second.size() - 1 != messageId.batchIndex()) { // Can't cumulatively ack this batch message if (pos == trackerMap_.begin()) { // This was the first message hence we can't decrement the iterator - return BatchMessageId(); + return MessageId(); } pos--; } diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h index b6f8ba8..6b20c00 100644 --- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h +++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h @@ -19,7 +19,6 @@ #ifndef LIB_BATCHACKNOWLEDGEMENTTRACKER_H_ #define LIB_BATCHACKNOWLEDGEMENTTRACKER_H_ -#include "pulsar/BatchMessageId.h" #include "MessageImpl.h" #include <map> #include <boost/thread/mutex.hpp> @@ -36,8 +35,8 @@ class ConsumerImpl; class BatchAcknowledgementTracker { private: typedef boost::unique_lock<boost::mutex> Lock; - typedef std::pair<BatchMessageId, boost::dynamic_bitset<> > TrackerPair; - typedef std::map<BatchMessageId, boost::dynamic_bitset<> > TrackerMap; + typedef std::pair<MessageId, boost::dynamic_bitset<> > TrackerPair; + typedef std::map<MessageId, boost::dynamic_bitset<> > TrackerMap; boost::mutex mutex_; TrackerMap trackerMap_; @@ -48,20 +47,20 @@ class BatchAcknowledgementTracker { // batch index // is acked again, we just check the sendList to verify that the batch is acked w/o iterating over the // dynamic_bitset. - std::vector<BatchMessageId> sendList_; + std::vector<MessageId> sendList_; // we don't need to track MessageId < greatestCumulativeAckReceived - BatchMessageId greatestCumulativeAckSent_; + MessageId greatestCumulativeAckSent_; std::string name_; public: BatchAcknowledgementTracker(const std::string topic, const std::string subscription, const long consumerId); - bool isBatchReady(const BatchMessageId& msgID, const proto::CommandAck_AckType ackType); - const BatchMessageId getGreatestCumulativeAckReady(const BatchMessageId& messageId); + bool isBatchReady(const MessageId& msgID, const proto::CommandAck_AckType ackType); + const MessageId getGreatestCumulativeAckReady(const MessageId& messageId); - void deleteAckedMessage(const BatchMessageId& messageId, proto::CommandAck_AckType ackType); + void deleteAckedMessage(const MessageId& messageId, proto::CommandAck_AckType ackType); void receivedMessage(const Message& message); void clear(); @@ -72,23 +71,23 @@ class BatchAcknowledgementTracker { // Used for Cumulative acks only struct SendRemoveCriteria { private: - const BatchMessageId& messageId_; + const MessageId& messageId_; public: - SendRemoveCriteria(const BatchMessageId& messageId) : messageId_(messageId) {} + SendRemoveCriteria(const MessageId& messageId) : messageId_(messageId) {} - bool operator()(const BatchMessageId& element) const { return (element <= messageId_); } + bool operator()(const MessageId& element) const { return (element <= messageId_); } }; // Used for Cumulative acks only struct TrackerMapRemoveCriteria { private: - const BatchMessageId& messageId_; + const MessageId& messageId_; public: - TrackerMapRemoveCriteria(const BatchMessageId& messageId) : messageId_(messageId) {} + TrackerMapRemoveCriteria(const MessageId& messageId) : messageId_(messageId) {} - bool operator()(std::pair<const pulsar::BatchMessageId, boost::dynamic_bitset<> >& element) const { + bool operator()(std::pair<const MessageId, boost::dynamic_bitset<> >& element) const { return (element.first <= messageId_); } }; diff --git a/pulsar-client-cpp/lib/BatchMessageId.cc b/pulsar-client-cpp/lib/BatchMessageId.cc deleted file mode 100644 index 27d7bbd..0000000 --- a/pulsar-client-cpp/lib/BatchMessageId.cc +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <pulsar/BatchMessageId.h> - -#include "PulsarApi.pb.h" - -#include <tuple> -#include <iostream> - -namespace pulsar { - -BatchMessageId::BatchMessageId(const MessageId& msgId) - : MessageId(msgId.ledgerId_, msgId.entryId_), batchIndex_(msgId.getBatchIndex()) {} - -void BatchMessageId::serialize(std::string& result) const { - proto::MessageIdData idData; - idData.set_ledgerid(ledgerId_); - idData.set_entryid(entryId_); - idData.set_batch_index(batchIndex_); - - if (partition_ != -1) { - idData.set_partition(partition_); - } - - idData.SerializeToString(&result); -} - -int64_t BatchMessageId::getBatchIndex() const { return batchIndex_; } - -#pragma GCC visibility push(default) - -bool BatchMessageId::operator<(const BatchMessageId& other) const { - if (ledgerId_ < other.ledgerId_) { - return true; - } else if (ledgerId_ > other.ledgerId_) { - return false; - } - - if (entryId_ < other.entryId_) { - return true; - } else { - return false; - } -} - -bool BatchMessageId::operator<=(const BatchMessageId& other) const { return *this < other || *this == other; } - -bool BatchMessageId::operator==(const BatchMessageId& other) const { - return ledgerId_ == other.ledgerId_ && entryId_ == other.entryId_ && batchIndex_ == other.batchIndex_; -} - -std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId) { - s << '(' << messageId.ledgerId_ << ':' << messageId.entryId_ << ':' << messageId.batchIndex_ << ':' - << messageId.partition_ << ')'; - return s; -} - -#pragma GCC visibility pop -} // namespace pulsar diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc index 68d366a..1390336 100644 --- a/pulsar-client-cpp/lib/ClientImpl.cc +++ b/pulsar-client-cpp/lib/ClientImpl.cc @@ -161,14 +161,14 @@ void ClientImpl::createReaderAsync(const std::string& topic, const MessageId& st } } - BatchMessageId msgId(startMessageId); + MessageId msgId(startMessageId); lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener( boost::bind(&ClientImpl::handleReaderMetadataLookup, shared_from_this(), _1, _2, topicName, msgId, conf, callback)); } void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata, - TopicNamePtr topicName, BatchMessageId startMessageId, + TopicNamePtr topicName, MessageId startMessageId, ReaderConfiguration conf, ReaderCallback callback) { if (result != ResultOk) { LOG_ERROR("Error Checking/Getting Partition Metadata while creating reader: " << result); diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h index 105df8a..5283b58 100644 --- a/pulsar-client-cpp/lib/ClientImpl.h +++ b/pulsar-client-cpp/lib/ClientImpl.h @@ -91,7 +91,7 @@ class ClientImpl : public boost::enable_shared_from_this<ClientImpl> { SubscribeCallback callback); void handleReaderMetadataLookup(const Result result, const LookupDataResultPtr partitionMetadata, - TopicNamePtr topicName, BatchMessageId startMessageId, + TopicNamePtr topicName, MessageId startMessageId, ReaderConfiguration conf, ReaderCallback callback); void handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerWeakPtr, diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index e0245c5..82e7abb 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -185,7 +185,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode, - Optional<BatchMessageId> startMessageId) { + Optional<MessageId> startMessageId) { BaseCommand cmd; cmd.set_type(BaseCommand::SUBSCRIBE); CommandSubscribe* subscribe = cmd.mutable_subscribe(); @@ -198,11 +198,11 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscribe->set_durable(subscriptionMode == SubscriptionModeDurable); if (startMessageId.is_present()) { MessageIdData& messageIdData = *subscribe->mutable_start_message_id(); - messageIdData.set_ledgerid(startMessageId.value().ledgerId_); - messageIdData.set_entryid(startMessageId.value().entryId_); + messageIdData.set_ledgerid(startMessageId.value().ledgerId()); + messageIdData.set_entryid(startMessageId.value().entryId()); - if (startMessageId.value().batchIndex_ != -1) { - messageIdData.set_batch_index(startMessageId.value().batchIndex_); + if (startMessageId.value().batchIndex() != -1) { + messageIdData.set_batch_index(startMessageId.value().batchIndex()); } } @@ -442,7 +442,7 @@ void Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, Shar batchPayLoad.write(msg.impl_->payload.data(), payloadSize); } -Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage) { +Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex) { SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload; // Format of batch message @@ -459,7 +459,9 @@ Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage) { SharedBuffer payload = uncompressedPayload.slice(0, payloadSize); uncompressedPayload.consume(payloadSize); - Message singleMessage(batchedMessage.impl_->messageId, batchedMessage.impl_->metadata, payload, metadata); + const MessageId& m = batchedMessage.impl_->messageId; + MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), batchIndex); + Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, payload, metadata); singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_; return singleMessage; diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h index bda48d2..3746dc5 100644 --- a/pulsar-client-cpp/lib/Commands.h +++ b/pulsar-client-cpp/lib/Commands.h @@ -77,8 +77,7 @@ class Commands { static SharedBuffer newSubscribe(const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, proto::CommandSubscribe_SubType subType, const std::string& consumerName, - SubscriptionMode subscriptionMode, - Optional<BatchMessageId> startMessageId); + SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId); static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId); @@ -106,7 +105,7 @@ class Commands { static void serializeSingleMessageInBatchWithPayload(const Message& msg, SharedBuffer& batchPayLoad, const unsigned long& maxMessageSizeInBytes); - static Message deSerializeSingleMessageInBatch(Message& batchedMessage); + static Message deSerializeSingleMessageInBatch(Message& batchedMessage, int32_t batchIndex); static SharedBuffer newConsumerStats(uint64_t consumerId, uint64_t requestId); diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index 55e4e6c..2abbdd0 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -36,8 +36,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, const std::string& subscription, const ConsumerConfiguration& conf, const ExecutorServicePtr listenerExecutor /* = NULL by default */, const ConsumerTopicType consumerTopicType /* = NonPartitioned by default */, - Commands::SubscriptionMode subscriptionMode, - Optional<BatchMessageId> startMessageId) + Commands::SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId) : HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), milliseconds(0))), waitingForZeroQueueSizeMessage(false), config_(conf), @@ -122,7 +121,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { return; } - Optional<BatchMessageId> firstMessageInQueue = clearReceiveQueue(); + Optional<MessageId> firstMessageInQueue = clearReceiveQueue(); unAckedMessageTrackerPtr_->clear(); batchAcknowledgementTracker_.clear(); @@ -271,8 +270,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: return; } - Message m(msg, metadata, payload); - m.impl_->messageId.partition_ = partitionIndex_; + Message m(msg, metadata, payload, partitionIndex_); m.impl_->cnx_ = cnx.get(); LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch()); @@ -320,18 +318,17 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection int skippedMessages = 0; for (int i = 0; i < batchSize; i++) { - batchedMessage.impl_->messageId.batchIndex_ = i; // This is a cheap copy since message contains only one shared pointer (impl_) - Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage); + Message msg = Commands::deSerializeSingleMessageInBatch(batchedMessage, i); if (startMessageId_.is_present()) { - const BatchMessageId& msgId = static_cast<const BatchMessageId&>(msg.getMessageId()); + const MessageId& msgId = msg.getMessageId(); // If we are receiving a batch message, we need to discard messages that were prior // to the startMessageId - if (msgId.ledgerId_ == startMessageId_.value().ledgerId_ && - msgId.entryId_ == startMessageId_.value().entryId_ && - msgId.batchIndex_ <= startMessageId_.value().batchIndex_) { + if (msgId.ledgerId() == startMessageId_.value().ledgerId() && + msgId.entryId() == startMessageId_.value().entryId() && + msgId.batchIndex() <= startMessageId_.value().batchIndex()) { LOG_DEBUG(getName() << "Ignoring message from before the startMessageId" << msg.getMessageId()); ++skippedMessages; @@ -559,7 +556,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) { void ConsumerImpl::messageProcessed(Message& msg) { Lock lock(mutex_); - lastDequedMessage_ = Optional<BatchMessageId>::of(static_cast<const BatchMessageId&>(msg.getMessageId())); + lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId()); ClientConnectionPtr currentCnx = getCnx().lock(); if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) { @@ -575,23 +572,19 @@ void ConsumerImpl::messageProcessed(Message& msg) { * was * not seen by the application */ -Optional<BatchMessageId> ConsumerImpl::clearReceiveQueue() { +Optional<MessageId> ConsumerImpl::clearReceiveQueue() { Message nextMessageInQueue; if (incomingMessages_.peekAndClear(nextMessageInQueue)) { // There was at least one message pending in the queue - // We can safely cast to 'BatchMessageId' since all the messages queued will have that type of message - // id, - // irrespective of whether they were part of a batch or not. - const BatchMessageId& nextMessageId = - static_cast<const BatchMessageId&>(nextMessageInQueue.getMessageId()); - BatchMessageId previousMessageId; - if (nextMessageId.batchIndex_ >= 0) { - previousMessageId = BatchMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_, - nextMessageId.batchIndex_ - 1); + const MessageId& nextMessageId = nextMessageInQueue.getMessageId(); + MessageId previousMessageId; + if (nextMessageId.batchIndex() >= 0) { + previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId(), + nextMessageId.batchIndex() - 1); } else { - previousMessageId = BatchMessageId(nextMessageId.ledgerId_, nextMessageId.entryId_ - 1, -1); + previousMessageId = MessageId(-1, nextMessageId.ledgerId(), nextMessageId.entryId() - 1, -1); } - return Optional<BatchMessageId>::of(previousMessageId); + return Optional<MessageId>::of(previousMessageId); } else if (lastDequedMessage_.is_present()) { // If the queue was empty we need to restart from the message just after the last one that has been // dequeued @@ -646,23 +639,21 @@ void ConsumerImpl::statsCallback(Result res, ResultCallback callback, proto::Com void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { ResultCallback cb = boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, proto::CommandAck_AckType_Individual); - const BatchMessageId& batchMsgId = (const BatchMessageId&)msgId; - if (batchMsgId.batchIndex_ != -1 && - !batchAcknowledgementTracker_.isBatchReady(batchMsgId, proto::CommandAck_AckType_Individual)) { + if (msgId.batchIndex() != -1 && + !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Individual)) { cb(ResultOk); return; } - doAcknowledge(batchMsgId, proto::CommandAck_AckType_Individual, cb); + doAcknowledge(msgId, proto::CommandAck_AckType_Individual, cb); } -void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& mId, ResultCallback callback) { +void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) { ResultCallback cb = boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, proto::CommandAck_AckType_Cumulative); - const BatchMessageId& msgId = (const BatchMessageId&)mId; - if (msgId.batchIndex_ != -1 && + if (msgId.batchIndex() != -1 && !batchAcknowledgementTracker_.isBatchReady(msgId, proto::CommandAck_AckType_Cumulative)) { - BatchMessageId messageId = batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId); - if (messageId == BatchMessageId()) { + MessageId messageId = batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId); + if (messageId == MessageId()) { // nothing to ack cb(ResultOk); } else { @@ -673,11 +664,11 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& mId, ResultCallba } } -void ConsumerImpl::doAcknowledge(const BatchMessageId& messageId, proto::CommandAck_AckType ackType, +void ConsumerImpl::doAcknowledge(const MessageId& messageId, proto::CommandAck_AckType ackType, ResultCallback callback) { proto::MessageIdData messageIdData; - messageIdData.set_ledgerid(messageId.ledgerId_); - messageIdData.set_entryid(messageId.entryId_); + messageIdData.set_ledgerid(messageId.ledgerId()); + messageIdData.set_entryid(messageId.entryId()); ClientConnectionPtr cnx = getCnx().lock(); if (cnx) { SharedBuffer cmd = Commands::newAck(consumerId_, messageIdData, ackType, -1); @@ -687,7 +678,7 @@ void ConsumerImpl::doAcknowledge(const BatchMessageId& messageId, proto::Command } else { unAckedMessageTrackerPtr_->removeMessagesTill(messageId); } - batchAcknowledgementTracker_.deleteAckedMessage((BatchMessageId&)messageId, ackType); + batchAcknowledgementTracker_.deleteAckedMessage(messageId, ackType); callback(ResultOk); LOG_DEBUG(getName() << "ack request sent for message - [" << messageIdData.ledgerid() << "," << messageIdData.entryid() << "]"); diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index d115ed8..45a12ef 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -20,7 +20,7 @@ #define LIB_CONSUMERIMPL_H_ #include <string> -#include "pulsar/BatchMessageId.h" + #include "pulsar/Result.h" #include "UnboundedBlockingQueue.h" #include "HandlerBase.h" @@ -69,7 +69,7 @@ class ConsumerImpl : public ConsumerImplBase, const ExecutorServicePtr listenerExecutor = ExecutorServicePtr(), const ConsumerTopicType consumerTopicType = NonPartitioned, Commands::SubscriptionMode = Commands::SubscriptionModeDurable, - Optional<BatchMessageId> startMessageId = Optional<BatchMessageId>::empty()); + Optional<MessageId> startMessageId = Optional<MessageId>::empty()); ~ConsumerImpl(); void setPartitionIndex(int partitionIndex); int getPartitionIndex(); @@ -82,7 +82,7 @@ class ConsumerImpl : public ConsumerImplBase, inline proto::CommandSubscribe_SubType getSubType(); void unsubscribeAsync(ResultCallback callback); void handleUnsubscribe(Result result, ResultCallback callback); - void doAcknowledge(const BatchMessageId& messageId, proto::CommandAck_AckType ackType, + void doAcknowledge(const MessageId& messageId, proto::CommandAck_AckType ackType, ResultCallback callback); virtual void disconnectConsumer(); virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture(); @@ -134,7 +134,7 @@ class ConsumerImpl : public ConsumerImplBase, Result receiveHelper(Message& msg, int timeout); void statsCallback(Result, ResultCallback, proto::CommandAck_AckType); - Optional<BatchMessageId> clearReceiveQueue(); + Optional<MessageId> clearReceiveQueue(); boost::mutex mutexForReceiveWithZeroQueueSize; const ConsumerConfiguration config_; @@ -145,15 +145,15 @@ class ConsumerImpl : public ConsumerImplBase, ConsumerTopicType consumerTopicType_; Commands::SubscriptionMode subscriptionMode_; - Optional<BatchMessageId> startMessageId_; + Optional<MessageId> startMessageId_; - Optional<BatchMessageId> lastDequedMessage_; + Optional<MessageId> lastDequedMessage_; UnboundedBlockingQueue<Message> incomingMessages_; int availablePermits_; uint64_t consumerId_; std::string consumerName_; std::string consumerStr_; - short partitionIndex_; + int32_t partitionIndex_; Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_; bool messageListenerRunning_; boost::mutex messageListenerMutex_; diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc index 79d7ca4..6b46fd0 100644 --- a/pulsar-client-cpp/lib/Message.cc +++ b/pulsar-client-cpp/lib/Message.cc @@ -34,7 +34,7 @@ using namespace pulsar; namespace pulsar { const static std::string emptyString; -const static BatchMessageId invalidMessageId; +const static MessageId invalidMessageId; const Message::StringMap& Message::getProperties() const { return impl_->properties(); } @@ -62,14 +62,17 @@ Message::Message() : impl_() {} Message::Message(MessageImplPtr& impl) : impl_(impl) {} -Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload) +Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& metadata, SharedBuffer& payload, + int32_t partition) : impl_(boost::make_shared<MessageImpl>()) { - impl_->messageId = BatchMessageId(msg.message_id().ledgerid(), msg.message_id().entryid()); + impl_->messageId = + MessageId(partition, msg.message_id().ledgerid(), msg.message_id().entryid(), /* batchId */ + -1); impl_->metadata = metadata; impl_->payload = payload; } -Message::Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload, +Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, SharedBuffer& payload, proto::SingleMessageMetadata& singleMetadata) : impl_(boost::make_shared<MessageImpl>()) { impl_->messageId = messageID; diff --git a/pulsar-client-cpp/lib/MessageId.cc b/pulsar-client-cpp/lib/MessageId.cc index dc1ff38..c5314d8 100644 --- a/pulsar-client-cpp/lib/MessageId.cc +++ b/pulsar-client-cpp/lib/MessageId.cc @@ -18,9 +18,9 @@ */ #include <pulsar/MessageId.h> -#include <pulsar/BatchMessageId.h> #include "PulsarApi.pb.h" +#include "MessageIdImpl.h" #include <iostream> #include <limits> @@ -30,43 +30,40 @@ namespace pulsar { -MessageId::MessageId() : ledgerId_(-1), entryId_(-1), partition_(-1) {} +MessageId::MessageId() { + static const MessageIdImplPtr emptyMessageId = boost::make_shared<MessageIdImpl>(); + impl_ = emptyMessageId; +} MessageId& MessageId::operator=(const MessageId& m) { - entryId_ = m.entryId_; - ledgerId_ = m.ledgerId_; - partition_ = m.partition_; + impl_ = m.impl_; return *this; } -MessageId::MessageId(int64_t ledgerId, int64_t entryId) - : ledgerId_(ledgerId), entryId_(entryId), partition_(-1) { - // partition is set explicitly in consumerImpl when message is received - // consumer's partition is assigned to this partition -} - -int64_t MessageId::getBatchIndex() const { - // It's only relevant for batch message ids - return -1; -} +MessageId::MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex) + : impl_(boost::make_shared<MessageIdImpl>(partition, ledgerId, entryId, batchIndex)) {} const MessageId& MessageId::earliest() { - static const BatchMessageId _earliest(-1, -1); + static const MessageId _earliest(-1, -1, -1, -1); return _earliest; } const MessageId& MessageId::latest() { - // For entry-id we only have 48bits - static const BatchMessageId _latest(std::numeric_limits<int64_t>::max(), (int64_t)(pow(2, 47) - 1)); + static const int64_t long_max = std::numeric_limits<int64_t>::max(); + static const MessageId _latest(-1, long_max, long_max, -1); return _latest; } void MessageId::serialize(std::string& result) const { proto::MessageIdData idData; - idData.set_ledgerid(ledgerId_); - idData.set_entryid(entryId_); - if (partition_ != -1) { - idData.set_partition(partition_); + idData.set_ledgerid(impl_->ledgerId_); + idData.set_entryid(impl_->entryId_); + if (impl_->partition_ != -1) { + idData.set_partition(impl_->partition_); + } + + if (impl_->batchIndex_ != -1) { + idData.set_batch_index(impl_->batchIndex_); } idData.SerializeToString(&result); @@ -75,38 +72,63 @@ void MessageId::serialize(std::string& result) const { /** * Deserialize a message id from a binary string */ -boost::shared_ptr<MessageId> MessageId::deserialize(const std::string& serializedMessageId) { +MessageId MessageId::deserialize(const std::string& serializedMessageId) { proto::MessageIdData idData; if (!idData.ParseFromString(serializedMessageId)) { throw "Failed to parse serialized message id"; } - return boost::make_shared<BatchMessageId>(idData.ledgerid(), idData.entryid(), idData.batch_index()); + return MessageId(idData.partition(), idData.ledgerid(), idData.entryid(), idData.batch_index()); } +int64_t MessageId::ledgerId() const { return impl_->ledgerId_; } + +int64_t MessageId::entryId() const { return impl_->entryId_; } + +int32_t MessageId::batchIndex() const { return impl_->batchIndex_; } + +int32_t MessageId::partition() const { return impl_->partition_; } + #pragma GCC visibility push(default) + std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) { - s << '(' << messageId.ledgerId_ << ',' << messageId.entryId_ << ',' << messageId.partition_ << ')'; + s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ << ',' + << messageId.impl_->batchIndex_ << ',' << messageId.impl_->partition_ << ')'; return s; } bool MessageId::operator<(const MessageId& other) const { - if (ledgerId_ < other.ledgerId_) { + if (impl_->ledgerId_ < other.impl_->ledgerId_) { + return true; + } else if (impl_->ledgerId_ > other.impl_->ledgerId_) { + return false; + } + + if (impl_->entryId_ < other.impl_->entryId_) { return true; - } else if (ledgerId_ > other.ledgerId_) { + } else if (impl_->entryId_ > other.impl_->entryId_) { return false; } - if (entryId_ < other.entryId_) { + if (impl_->batchIndex_ < other.impl_->batchIndex_) { return true; } else { return false; } } +bool MessageId::operator<=(const MessageId& other) const { return *this < other || *this == other; } + +bool MessageId::operator>(const MessageId& other) const { return !(*this <= other); } + +bool MessageId::operator>=(const MessageId& other) const { return !(*this < other); } + bool MessageId::operator==(const MessageId& other) const { - return ledgerId_ == other.ledgerId_ && entryId_ == other.entryId_; + return impl_->ledgerId_ == other.impl_->ledgerId_ && impl_->entryId_ == other.impl_->entryId_ && + impl_->batchIndex_ == other.impl_->batchIndex_ && impl_->partition_ == other.impl_->partition_; } +bool MessageId::operator!=(const MessageId& other) const { return !(*this == other); } + #pragma GCC visibility pop } // namespace pulsar diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc b/pulsar-client-cpp/lib/MessageIdImpl.h similarity index 61% copy from pulsar-client-cpp/tests/MessageIdTest.cc copy to pulsar-client-cpp/lib/MessageIdImpl.h index 7402143..a3fc171 100644 --- a/pulsar-client-cpp/tests/MessageIdTest.cc +++ b/pulsar-client-cpp/lib/MessageIdImpl.h @@ -16,21 +16,21 @@ * specific language governing permissions and limitations * under the License. */ -#include <pulsar/BatchMessageId.h> -#include <gtest/gtest.h> +#pragma once -#include <string> +#include <cstdint> -using namespace pulsar; +namespace pulsar { -TEST(MessageIdTest, testSerialization) { - BatchMessageId msgId(1, 2, 3); - - std::string serialized; - msgId.serialize(serialized); - - boost::shared_ptr<MessageId> deserialized = MessageId::deserialize(serialized); - - ASSERT_EQ(msgId, static_cast<const BatchMessageId&>(*deserialized)); -} +class MessageIdImpl { + public: + MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), batchIndex_(-1) {} + MessageIdImpl(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex) + : ledgerId_(ledgerId), entryId_(entryId), partition_(partition), batchIndex_(batchIndex) {} + const int64_t ledgerId_; + const int64_t entryId_; + const int32_t partition_; + const int32_t batchIndex_; +}; +} // namespace pulsar diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h index 23a8fcf..f3753b1 100644 --- a/pulsar-client-cpp/lib/MessageImpl.h +++ b/pulsar-client-cpp/lib/MessageImpl.h @@ -21,7 +21,6 @@ #include <pulsar/Message.h> #include <pulsar/MessageId.h> -#include "pulsar/BatchMessageId.h" #include "SharedBuffer.h" #include "PulsarApi.pb.h" @@ -42,7 +41,7 @@ class MessageImpl { proto::MessageMetadata metadata; SharedBuffer payload; - BatchMessageId messageId; + MessageId messageId; ClientConnection* cnx_; const std::string& getPartitionKey() const; diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc index f9c02b3..dc218ef 100644 --- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc @@ -150,7 +150,7 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int } void PartitionedConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) { - int partition = msgId.partition_; + int32_t partition = msgId.partition(); assert(partition < numPartitions_ && partition >= 0 && consumers_.size() > partition); unAckedMessageTrackerPtr_->remove(msgId); consumers_[partition]->acknowledgeAsync(msgId, callback); @@ -313,7 +313,7 @@ bool PartitionedConsumerImpl::isOpen() { } void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) { - LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition_); + LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition()); messages_.push(msg); if (messageListener_) { listenerExecutor_->postWork( diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 728573a..6808aad 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -130,7 +130,6 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac } // find a producer for that partition, index should start from 0 ProducerImplPtr& producer = producers_[partition]; - msg.impl_->messageId.partition_ = partition; // send message on that partition producer->sendAsync(msg, callback); } diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc index 308908d..5e69a72 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.cc +++ b/pulsar-client-cpp/lib/ReaderImpl.cc @@ -28,7 +28,7 @@ ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, con const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback) : topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {} -void ReaderImpl::start(const BatchMessageId& startMessageId) { +void ReaderImpl::start(const MessageId& startMessageId) { ConsumerConfiguration consumerConf; consumerConf.setConsumerType(ConsumerExclusive); consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize()); @@ -51,7 +51,7 @@ void ReaderImpl::start(const BatchMessageId& startMessageId) { consumer_ = boost::make_shared<ConsumerImpl>( client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned, - Commands::SubscriptionModeNonDurable, Optional<BatchMessageId>::of(startMessageId)); + Commands::SubscriptionModeNonDurable, Optional<MessageId>::of(startMessageId)); consumer_->getConsumerCreatedFuture().addListener( boost::bind(&ReaderImpl::handleConsumerCreated, shared_from_this(), _1, _2)); consumer_->start(); @@ -87,13 +87,11 @@ void ReaderImpl::acknowledgeIfNecessary(Result result, const Message& msg) { return; } - const BatchMessageId& msgId = static_cast<const BatchMessageId&>(msg.getMessageId()); - // Only acknowledge on the first message in the batch - if (msgId.batchIndex_ <= 0) { + if (msg.getMessageId().batchIndex() <= 0) { // Acknowledge message immediately because the reader is based on non-durable // subscription. When it reconnects, it will specify the subscription position anyway - consumer_->acknowledgeCumulativeAsync(msgId, emptyCallback); + consumer_->acknowledgeCumulativeAsync(msg.getMessageId(), emptyCallback); } } diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h index 97c7f6b..61216b8 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.h +++ b/pulsar-client-cpp/lib/ReaderImpl.h @@ -36,7 +36,7 @@ class ReaderImpl : public boost::enable_shared_from_this<ReaderImpl> { ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback); - void start(const BatchMessageId& startMessageId); + void start(const MessageId& startMessageId); const std::string& getTopic() const; diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc index 5930606..90006b6 100644 --- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc +++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc @@ -85,14 +85,14 @@ long UnAckedMessageTrackerEnabled::size() { void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) { boost::unique_lock<boost::mutex> acquire(lock_); for (std::set<MessageId>::iterator it = oldSet_.begin(); it != oldSet_.end();) { - if (*it < msgId && it->partition_ == msgId.partition_) { + if (*it < msgId && it->partition() == msgId.partition()) { oldSet_.erase(it++); } else { it++; } } for (std::set<MessageId>::iterator it = currentSet_.begin(); it != currentSet_.end();) { - if (*it < msgId && it->partition_ == msgId.partition_) { + if (*it < msgId && it->partition() == msgId.partition()) { currentSet_.erase(it++); } else { it++; diff --git a/pulsar-client-cpp/python/src/client.cc b/pulsar-client-cpp/python/src/client.cc index ad78ed2..b5295ac 100644 --- a/pulsar-client-cpp/python/src/client.cc +++ b/pulsar-client-cpp/python/src/client.cc @@ -44,7 +44,7 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s } Reader Client_createReader(Client& client, const std::string& topic, - const BatchMessageId& startMessageId, + const MessageId& startMessageId, const ReaderConfiguration& conf) { Reader reader; Result res; diff --git a/pulsar-client-cpp/python/src/message.cc b/pulsar-client-cpp/python/src/message.cc index b7513d3..1d1ee23 100644 --- a/pulsar-client-cpp/python/src/message.cc +++ b/pulsar-client-cpp/python/src/message.cc @@ -20,13 +20,13 @@ #include <boost/python/suite/indexing/map_indexing_suite.hpp> -std::string MessageId_str(const BatchMessageId& msgId) { +std::string MessageId_str(const MessageId& msgId) { std::stringstream ss; ss << msgId; return ss.str(); } -std::string MessageId_serialize(const BatchMessageId& msgId) { +std::string MessageId_serialize(const MessageId& msgId) { std::string serialized; msgId.serialize(serialized); return serialized; @@ -42,8 +42,8 @@ boost::python::object Message_data(const Message& msg) { return boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength()))); } -const BatchMessageId& Message_getMessageId(const Message& msg) { - return static_cast<const BatchMessageId&>(msg.getMessageId()); +const MessageId& Message_getMessageId(const Message& msg) { + return msg.getMessageId(); } void export_message() { @@ -67,10 +67,10 @@ void export_message() { .def(map_indexing_suite<Message::StringMap>()) ; - static const BatchMessageId& _MessageId_earliest = static_cast<const BatchMessageId&>(MessageId::earliest()); - static const BatchMessageId& _MessageId_latest = static_cast<const BatchMessageId&>(MessageId::latest()); + static const MessageId& _MessageId_earliest = MessageId::earliest(); + static const MessageId& _MessageId_latest = MessageId::latest(); - class_<BatchMessageId, boost::shared_ptr<BatchMessageId> >("MessageId") + class_<MessageId>("MessageId") .def("__str__", &MessageId_str) .add_static_property("earliest", make_getter(&_MessageId_earliest)) .add_static_property("latest", make_getter(&_MessageId_latest)) diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc b/pulsar-client-cpp/tests/BatchMessageTest.cc index dda8f79..34c38db 100644 --- a/pulsar-client-cpp/tests/BatchMessageTest.cc +++ b/pulsar-client-cpp/tests/BatchMessageTest.cc @@ -228,7 +228,7 @@ TEST(BatchMessageTest, testBatchSizeInBytes) { std::string expectedMessageContent = prefix + boost::lexical_cast<std::string>(i); LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = " << receivedMsg.getMessageId() << "]"); - ASSERT_LT(pulsar::PulsarFriend::getBatchIndex((BatchMessageId&)receivedMsg.getMessageId()), 2); + ASSERT_LT(pulsar::PulsarFriend::getBatchIndex(receivedMsg.getMessageId()), 2); ASSERT_EQ(receivedMsg.getProperty("msgIndex"), boost::lexical_cast<std::string>(i++)); ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString()); ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg)); diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc b/pulsar-client-cpp/tests/MessageIdTest.cc index 7402143..06c2528 100644 --- a/pulsar-client-cpp/tests/MessageIdTest.cc +++ b/pulsar-client-cpp/tests/MessageIdTest.cc @@ -16,7 +16,8 @@ * specific language governing permissions and limitations * under the License. */ -#include <pulsar/BatchMessageId.h> +#include <pulsar/MessageId.h> +#include "PulsarFriend.h" #include <gtest/gtest.h> @@ -25,12 +26,12 @@ using namespace pulsar; TEST(MessageIdTest, testSerialization) { - BatchMessageId msgId(1, 2, 3); + MessageId msgId = PulsarFriend::getMessageId(-1, 1, 2, 3); std::string serialized; msgId.serialize(serialized); - boost::shared_ptr<MessageId> deserialized = MessageId::deserialize(serialized); + MessageId deserialized = MessageId::deserialize(serialized); - ASSERT_EQ(msgId, static_cast<const BatchMessageId&>(*deserialized)); + ASSERT_EQ(msgId, deserialized); } diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index c64bab2..f2d1a69 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -#include <pulsar/BatchMessageId.h> + #include <lib/ProducerImpl.h> #include <lib/ConsumerImpl.h> #include <string> @@ -26,7 +26,11 @@ using std::string; namespace pulsar { class PulsarFriend { public: - static int getBatchIndex(const BatchMessageId& mId) { return mId.batchIndex_; } + static MessageId getMessageId(int32_t partition, int64_t ledgerId, int64_t entryId, int32_t batchIndex) { + return MessageId(partition, ledgerId, entryId, batchIndex); + } + + static int getBatchIndex(const MessageId& mId) { return mId.batchIndex(); } static ProducerStatsImplPtr getProducerStatsPtr(Producer producer) { ProducerImpl* producerImpl = static_cast<ProducerImpl*>(producer.impl_.get()); diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc index 0e31f6a..be9678d 100644 --- a/pulsar-client-cpp/tests/ReaderTest.cc +++ b/pulsar-client-cpp/tests/ReaderTest.cc @@ -257,7 +257,7 @@ TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) { // Create another reader starting on msgid4 auto msgId4 = MessageId::deserialize(lastMessageId); Reader reader2; - ASSERT_EQ(ResultOk, client.createReader(topicName, *msgId4, readerConf, reader2)); + ASSERT_EQ(ResultOk, client.createReader(topicName, msgId4, readerConf, reader2)); for (int i = 5; i < 11; i++) { Message msg; -- To stop receiving notification emails like this one, please contact mme...@apache.org.