This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5efafc5  Use private impl for MessageId in c++ client (#1322)
5efafc5 is described below

commit 5efafc57fd81eaf622fe65548facfec00af5b49b
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Sun Mar 25 16:20:58 2018 -0700

    Use private impl for MessageId in c++ client (#1322)
---
 pulsar-client-cpp/include/pulsar/BatchMessageId.h  | 67 ------------------
 pulsar-client-cpp/include/pulsar/Message.h         | 10 +--
 pulsar-client-cpp/include/pulsar/MessageId.h       | 33 +++++----
 .../lib/BatchAcknowledgementTracker.cc             | 40 ++++++-----
 .../lib/BatchAcknowledgementTracker.h              | 27 ++++----
 pulsar-client-cpp/lib/BatchMessageId.cc            | 76 --------------------
 pulsar-client-cpp/lib/ClientImpl.cc                |  4 +-
 pulsar-client-cpp/lib/ClientImpl.h                 |  2 +-
 pulsar-client-cpp/lib/Commands.cc                  | 16 +++--
 pulsar-client-cpp/lib/Commands.h                   |  5 +-
 pulsar-client-cpp/lib/ConsumerImpl.cc              | 65 ++++++++----------
 pulsar-client-cpp/lib/ConsumerImpl.h               | 14 ++--
 pulsar-client-cpp/lib/Message.cc                   | 11 +--
 pulsar-client-cpp/lib/MessageId.cc                 | 80 ++++++++++++++--------
 .../MessageIdTest.cc => lib/MessageIdImpl.h}       | 28 ++++----
 pulsar-client-cpp/lib/MessageImpl.h                |  3 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   |  4 +-
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |  1 -
 pulsar-client-cpp/lib/ReaderImpl.cc                | 10 ++-
 pulsar-client-cpp/lib/ReaderImpl.h                 |  2 +-
 .../lib/UnAckedMessageTrackerEnabled.cc            |  4 +-
 pulsar-client-cpp/python/src/client.cc             |  2 +-
 pulsar-client-cpp/python/src/message.cc            | 14 ++--
 pulsar-client-cpp/tests/BatchMessageTest.cc        |  2 +-
 pulsar-client-cpp/tests/MessageIdTest.cc           |  9 +--
 pulsar-client-cpp/tests/PulsarFriend.h             |  8 ++-
 pulsar-client-cpp/tests/ReaderTest.cc              |  2 +-
 27 files changed, 213 insertions(+), 326 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/BatchMessageId.h 
b/pulsar-client-cpp/include/pulsar/BatchMessageId.h
deleted file mode 100644
index b647886..0000000
--- a/pulsar-client-cpp/include/pulsar/BatchMessageId.h
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#ifndef LIB_BATCHMESSAGEID_H_
-#define LIB_BATCHMESSAGEID_H_
-
-#include <pulsar/MessageId.h>
-#include <iosfwd>
-
-#pragma GCC visibility push(default)
-
-namespace pulsar {
-
-class PulsarWrapper;
-
-class BatchMessageId : public MessageId {
-   public:
-    BatchMessageId(int64_t ledgerId, int64_t entryId, int batchIndex = -1)
-        : MessageId(ledgerId, entryId), batchIndex_(batchIndex) {}
-
-    BatchMessageId(const MessageId& msgId);
-
-    BatchMessageId() : batchIndex_(-1) {}
-
-    virtual void serialize(std::string& result) const;
-
-    // These functions compare the message order as stored in bookkeeper
-    bool operator<(const BatchMessageId& other) const;
-    bool operator<=(const BatchMessageId& other) const;
-    bool operator==(const BatchMessageId& other) const;
-
-   protected:
-    virtual int64_t getBatchIndex() const;
-
-    friend class Commands;
-    friend class ConsumerImpl;
-    friend class ReaderImpl;
-    friend class Message;
-    friend class MessageImpl;
-    friend class PartitionedProducerImpl;
-    friend class PartitionedConsumerImpl;
-    friend class BatchAcknowledgementTracker;
-    friend class PulsarWrapper;
-    friend class PulsarFriend;
-    int64_t batchIndex_;
-
-    friend std::ostream& operator<<(std::ostream& s, const BatchMessageId& 
messageId);
-};
-}  // namespace pulsar
-#pragma GCC visibility pop
-
-#endif /* LIB_BATCHMESSAGEID_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/Message.h 
b/pulsar-client-cpp/include/pulsar/Message.h
index b98b48b..aff0d94 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -23,7 +23,8 @@
 #include <string>
 
 #include <boost/shared_ptr.hpp>
-#include "BatchMessageId.h"
+
+#include "MessageId.h"
 
 #pragma GCC visibility push(default)
 
@@ -39,8 +40,6 @@ class MessageBuilder;
 class MessageImpl;
 class PulsarWrapper;
 
-// TODO: When releasing 2.0.0, make all methods virtual and create the virtual 
destructor for Google Mock
-// tests
 class Message {
    public:
     typedef std::map<std::string, std::string> StringMap;
@@ -128,9 +127,10 @@ class Message {
     MessageImplPtr impl_;
 
     Message(MessageImplPtr& impl);
-    Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, 
SharedBuffer& payload);
+    Message(const proto::CommandMessage& msg, proto::MessageMetadata& data, 
SharedBuffer& payload,
+            int32_t partition);
     /// Used for Batch Messages
-    Message(const BatchMessageId& messageID, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
+    Message(const MessageId& messageID, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
             proto::SingleMessageMetadata& singleMetadata);
     friend class PartitionedProducerImpl;
     friend class PartitionedConsumerImpl;
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h 
b/pulsar-client-cpp/include/pulsar/MessageId.h
index 40fe5d0..149d177 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -27,15 +27,12 @@
 
 namespace pulsar {
 
-class ConsumerImpl;
-class UnAckedMessageTrackerEnabled;
-class PulsarWrapper;
+class MessageIdImpl;
 
 class MessageId {
    public:
     MessageId& operator=(const MessageId&);
     MessageId();
-    virtual ~MessageId() {}
 
     /**
      * MessageId representing the "earliest" or "oldest available" message 
stored in the topic
@@ -50,34 +47,44 @@ class MessageId {
     /**
      * Serialize the message id into a binary string for storing
      */
-    virtual void serialize(std::string& result) const;
+    void serialize(std::string& result) const;
 
     /**
      * Deserialize a message id from a binary string
      */
-    static boost::shared_ptr<MessageId> deserialize(const std::string& 
serializedMessageId);
+    static MessageId deserialize(const std::string& serializedMessageId);
 
     // These functions compare the message order as stored in bookkeeper
     bool operator<(const MessageId& other) const;
+    bool operator<=(const MessageId& other) const;
+    bool operator>(const MessageId& other) const;
+    bool operator>=(const MessageId& other) const;
     bool operator==(const MessageId& other) const;
+    bool operator!=(const MessageId& other) const;
 
-   protected:
-    virtual int64_t getBatchIndex() const;
+   private:
     friend class ConsumerImpl;
+    friend class ReaderImpl;
     friend class Message;
     friend class MessageImpl;
     friend class Commands;
-    friend class BatchMessageId;
     friend class PartitionedProducerImpl;
     friend class PartitionedConsumerImpl;
     friend class UnAckedMessageTrackerEnabled;
     friend class BatchAcknowledgementTracker;
     friend class PulsarWrapper;
-    MessageId(int64_t, int64_t);
+    friend class PulsarFriend;
+
+    explicit MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, 
int32_t batchIndex);
     friend std::ostream& operator<<(std::ostream& s, const MessageId& 
messageId);
-    int64_t ledgerId_;
-    int64_t entryId_ : 48;
-    short partition_ : 16;
+
+    int64_t ledgerId() const;
+    int64_t entryId() const;
+    int32_t batchIndex() const;
+    int32_t partition() const;
+
+    typedef boost::shared_ptr<MessageIdImpl> MessageIdImplPtr;
+    MessageIdImplPtr impl_;
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc 
b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
index 868c050..0dcf3be 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.cc
@@ -24,7 +24,7 @@ DECLARE_LOG_OBJECT()
 BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string 
topic,
                                                          const std::string 
subscription,
                                                          const long consumerId)
-    : greatestCumulativeAckSent_(BatchMessageId()) {
+    : greatestCumulativeAckSent_() {
     std::stringstream consumerStrStream;
     consumerStrStream << "BatchAcknowledgementTracker for [" << topic << ", " 
<< subscription << ", "
                       << consumerId << "] ";
@@ -43,7 +43,7 @@ void BatchAcknowledgementTracker::receivedMessage(const 
Message& message) {
         return;
     }
     Lock lock(mutex_);
-    BatchMessageId msgID = message.impl_->messageId;
+    MessageId msgID = message.impl_->messageId;
 
     // ignore message if it is less than the last cumulative ack sent or 
messageID is already being tracked
     TrackerMap::iterator pos = trackerMap_.find(msgID);
@@ -60,10 +60,10 @@ void BatchAcknowledgementTracker::receivedMessage(const 
Message& message) {
         TrackerPair(msgID, 
boost::dynamic_bitset<>(message.impl_->metadata.num_messages_in_batch()).set()));
 }
 
-void BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& 
messageId,
+void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId& 
messageId,
                                                      proto::CommandAck_AckType 
ackType) {
     // Not a batch message and a individual ack
-    if (messageId.batchIndex_ == -1 && ackType == 
proto::CommandAck_AckType_Individual) {
+    if (messageId.batchIndex() == -1 && ackType == 
proto::CommandAck_AckType_Individual) {
         return;
     }
 
@@ -104,18 +104,22 @@ void 
BatchAcknowledgementTracker::deleteAckedMessage(const BatchMessageId& messa
     }
 }
 
-bool BatchAcknowledgementTracker::isBatchReady(const BatchMessageId& msgID,
+bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID,
                                                const proto::CommandAck_AckType 
ackType) {
     Lock lock(mutex_);
-    TrackerMap::iterator pos = trackerMap_.find(msgID);
-    if (pos == trackerMap_.end() || std::find(sendList_.begin(), 
sendList_.end(), msgID) != sendList_.end()) {
+    // Remove batch index
+    MessageId batchMessageId = MessageId(msgID.partition(), msgID.ledgerId(), 
msgID.entryId(),
+                                         -1 /* Batch index */);
+
+    TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
+    if (pos == trackerMap_.end() || std::find(sendList_.begin(), 
sendList_.end(), batchMessageId) != sendList_.end()) {
         LOG_DEBUG(
             "Batch is ready since message present in sendList_ or not present 
in trackerMap_ [message ID = "
-            << msgID << "]");
+            << batchMessageId << "]");
         return true;
     }
 
-    int batchIndex = msgID.batchIndex_;
+    int batchIndex = msgID.batchIndex();
     assert(batchIndex < pos->second.size());
     pos->second.set(batchIndex, false);
 
@@ -128,7 +132,7 @@ bool BatchAcknowledgementTracker::isBatchReady(const 
BatchMessageId& msgID,
     if (pos->second.any()) {
         return false;
     }
-    sendList_.push_back(msgID);
+    sendList_.push_back(batchMessageId);
     trackerMap_.erase(pos);
     LOG_DEBUG("Batch is ready since message all bits are reset in trackerMap_ 
[message ID = " << msgID
                                                                                
               << "]");
@@ -138,22 +142,24 @@ bool BatchAcknowledgementTracker::isBatchReady(const 
BatchMessageId& msgID,
 // returns
 // - a batch message id < messageId
 // - same messageId if it is the last message in the batch
-const BatchMessageId 
BatchAcknowledgementTracker::getGreatestCumulativeAckReady(
-    const BatchMessageId& messageId) {
+const MessageId 
BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const MessageId& 
messageId) {
     Lock lock(mutex_);
-    BatchMessageId messageReadyForCumulativeAck = BatchMessageId();
-    TrackerMap::iterator pos = trackerMap_.find(messageId);
+
+    // Remove batch index
+    MessageId batchMessageId = MessageId(messageId.partition(), 
messageId.ledgerId(),
+                                         messageId.entryId(), -1 /* Batch 
index */);
+    TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
 
     // element not found
     if (pos == trackerMap_.end()) {
-        return BatchMessageId();
+        return MessageId();
     }
 
-    if (pos->second.size() - 1 != messageId.batchIndex_) {
+    if (pos->second.size() - 1 != messageId.batchIndex()) {
         // Can't cumulatively ack this batch message
         if (pos == trackerMap_.begin()) {
             // This was the first message hence we can't decrement the iterator
-            return BatchMessageId();
+            return MessageId();
         }
         pos--;
     }
diff --git a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h 
b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
index b6f8ba8..6b20c00 100644
--- a/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
+++ b/pulsar-client-cpp/lib/BatchAcknowledgementTracker.h
@@ -19,7 +19,6 @@
 #ifndef LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
 #define LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
 
-#include "pulsar/BatchMessageId.h"
 #include "MessageImpl.h"
 #include <map>
 #include <boost/thread/mutex.hpp>
@@ -36,8 +35,8 @@ class ConsumerImpl;
 class BatchAcknowledgementTracker {
    private:
     typedef boost::unique_lock<boost::mutex> Lock;
-    typedef std::pair<BatchMessageId, boost::dynamic_bitset<> > TrackerPair;
-    typedef std::map<BatchMessageId, boost::dynamic_bitset<> > TrackerMap;
+    typedef std::pair<MessageId, boost::dynamic_bitset<> > TrackerPair;
+    typedef std::map<MessageId, boost::dynamic_bitset<> > TrackerMap;
     boost::mutex mutex_;
 
     TrackerMap trackerMap_;
@@ -48,20 +47,20 @@ class BatchAcknowledgementTracker {
     // batch index
     // is acked again, we just check the sendList to verify that the batch is 
acked w/o iterating over the
     // dynamic_bitset.
-    std::vector<BatchMessageId> sendList_;
+    std::vector<MessageId> sendList_;
 
     // we don't need to track MessageId < greatestCumulativeAckReceived
-    BatchMessageId greatestCumulativeAckSent_;
+    MessageId greatestCumulativeAckSent_;
     std::string name_;
 
    public:
     BatchAcknowledgementTracker(const std::string topic, const std::string 
subscription,
                                 const long consumerId);
 
-    bool isBatchReady(const BatchMessageId& msgID, const 
proto::CommandAck_AckType ackType);
-    const BatchMessageId getGreatestCumulativeAckReady(const BatchMessageId& 
messageId);
+    bool isBatchReady(const MessageId& msgID, const proto::CommandAck_AckType 
ackType);
+    const MessageId getGreatestCumulativeAckReady(const MessageId& messageId);
 
-    void deleteAckedMessage(const BatchMessageId& messageId, 
proto::CommandAck_AckType ackType);
+    void deleteAckedMessage(const MessageId& messageId, 
proto::CommandAck_AckType ackType);
     void receivedMessage(const Message& message);
 
     void clear();
@@ -72,23 +71,23 @@ class BatchAcknowledgementTracker {
     // Used for Cumulative acks only
     struct SendRemoveCriteria {
        private:
-        const BatchMessageId& messageId_;
+        const MessageId& messageId_;
 
        public:
-        SendRemoveCriteria(const BatchMessageId& messageId) : 
messageId_(messageId) {}
+        SendRemoveCriteria(const MessageId& messageId) : messageId_(messageId) 
{}
 
-        bool operator()(const BatchMessageId& element) const { return (element 
<= messageId_); }
+        bool operator()(const MessageId& element) const { return (element <= 
messageId_); }
     };
 
     // Used for Cumulative acks only
     struct TrackerMapRemoveCriteria {
        private:
-        const BatchMessageId& messageId_;
+        const MessageId& messageId_;
 
        public:
-        TrackerMapRemoveCriteria(const BatchMessageId& messageId) : 
messageId_(messageId) {}
+        TrackerMapRemoveCriteria(const MessageId& messageId) : 
messageId_(messageId) {}
 
-        bool operator()(std::pair<const pulsar::BatchMessageId, 
boost::dynamic_bitset<> >& element) const {
+        bool operator()(std::pair<const MessageId, boost::dynamic_bitset<> >& 
element) const {
             return (element.first <= messageId_);
         }
     };
diff --git a/pulsar-client-cpp/lib/BatchMessageId.cc 
b/pulsar-client-cpp/lib/BatchMessageId.cc
deleted file mode 100644
index 27d7bbd..0000000
--- a/pulsar-client-cpp/lib/BatchMessageId.cc
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <pulsar/BatchMessageId.h>
-
-#include "PulsarApi.pb.h"
-
-#include <tuple>
-#include <iostream>
-
-namespace pulsar {
-
-BatchMessageId::BatchMessageId(const MessageId& msgId)
-    : MessageId(msgId.ledgerId_, msgId.entryId_), 
batchIndex_(msgId.getBatchIndex()) {}
-
-void BatchMessageId::serialize(std::string& result) const {
-    proto::MessageIdData idData;
-    idData.set_ledgerid(ledgerId_);
-    idData.set_entryid(entryId_);
-    idData.set_batch_index(batchIndex_);
-
-    if (partition_ != -1) {
-        idData.set_partition(partition_);
-    }
-
-    idData.SerializeToString(&result);
-}
-
-int64_t BatchMessageId::getBatchIndex() const { return batchIndex_; }
-
-#pragma GCC visibility push(default)
-
-bool BatchMessageId::operator<(const BatchMessageId& other) const {
-    if (ledgerId_ < other.ledgerId_) {
-        return true;
-    } else if (ledgerId_ > other.ledgerId_) {
-        return false;
-    }
-
-    if (entryId_ < other.entryId_) {
-        return true;
-    } else {
-        return false;
-    }
-}
-
-bool BatchMessageId::operator<=(const BatchMessageId& other) const { return 
*this < other || *this == other; }
-
-bool BatchMessageId::operator==(const BatchMessageId& other) const {
-    return ledgerId_ == other.ledgerId_ && entryId_ == other.entryId_ && 
batchIndex_ == other.batchIndex_;
-}
-
-std::ostream& operator<<(std::ostream& s, const BatchMessageId& messageId) {
-    s << '(' << messageId.ledgerId_ << ':' << messageId.entryId_ << ':' << 
messageId.batchIndex_ << ':'
-      << messageId.partition_ << ')';
-    return s;
-}
-
-#pragma GCC visibility pop
-}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index 68d366a..1390336 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -161,14 +161,14 @@ void ClientImpl::createReaderAsync(const std::string& 
topic, const MessageId& st
         }
     }
 
-    BatchMessageId msgId(startMessageId);
+    MessageId msgId(startMessageId);
     lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
         boost::bind(&ClientImpl::handleReaderMetadataLookup, 
shared_from_this(), _1, _2, topicName, msgId,
                     conf, callback));
 }
 
 void ClientImpl::handleReaderMetadataLookup(const Result result, const 
LookupDataResultPtr partitionMetadata,
-                                            TopicNamePtr topicName, 
BatchMessageId startMessageId,
+                                            TopicNamePtr topicName, MessageId 
startMessageId,
                                             ReaderConfiguration conf, 
ReaderCallback callback) {
     if (result != ResultOk) {
         LOG_ERROR("Error Checking/Getting Partition Metadata while creating 
reader: " << result);
diff --git a/pulsar-client-cpp/lib/ClientImpl.h 
b/pulsar-client-cpp/lib/ClientImpl.h
index 105df8a..5283b58 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -91,7 +91,7 @@ class ClientImpl : public 
boost::enable_shared_from_this<ClientImpl> {
                          SubscribeCallback callback);
 
     void handleReaderMetadataLookup(const Result result, const 
LookupDataResultPtr partitionMetadata,
-                                    TopicNamePtr topicName, BatchMessageId 
startMessageId,
+                                    TopicNamePtr topicName, MessageId 
startMessageId,
                                     ReaderConfiguration conf, ReaderCallback 
callback);
 
     void handleProducerCreated(Result result, ProducerImplBaseWeakPtr 
producerWeakPtr,
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index e0245c5..82e7abb 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -185,7 +185,7 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& 
authentication, const
 SharedBuffer Commands::newSubscribe(const std::string& topic, const 
std::string& subscription,
                                     uint64_t consumerId, uint64_t requestId, 
CommandSubscribe_SubType subType,
                                     const std::string& consumerName, 
SubscriptionMode subscriptionMode,
-                                    Optional<BatchMessageId> startMessageId) {
+                                    Optional<MessageId> startMessageId) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::SUBSCRIBE);
     CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -198,11 +198,11 @@ SharedBuffer Commands::newSubscribe(const std::string& 
topic, const std::string&
     subscribe->set_durable(subscriptionMode == SubscriptionModeDurable);
     if (startMessageId.is_present()) {
         MessageIdData& messageIdData = *subscribe->mutable_start_message_id();
-        messageIdData.set_ledgerid(startMessageId.value().ledgerId_);
-        messageIdData.set_entryid(startMessageId.value().entryId_);
+        messageIdData.set_ledgerid(startMessageId.value().ledgerId());
+        messageIdData.set_entryid(startMessageId.value().entryId());
 
-        if (startMessageId.value().batchIndex_ != -1) {
-            messageIdData.set_batch_index(startMessageId.value().batchIndex_);
+        if (startMessageId.value().batchIndex() != -1) {
+            messageIdData.set_batch_index(startMessageId.value().batchIndex());
         }
     }
 
@@ -442,7 +442,7 @@ void 
Commands::serializeSingleMessageInBatchWithPayload(const Message& msg, Shar
     batchPayLoad.write(msg.impl_->payload.data(), payloadSize);
 }
 
-Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage) {
+Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, 
int32_t batchIndex) {
     SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;
 
     // Format of batch message
@@ -459,7 +459,9 @@ Message Commands::deSerializeSingleMessageInBatch(Message& 
batchedMessage) {
     SharedBuffer payload = uncompressedPayload.slice(0, payloadSize);
     uncompressedPayload.consume(payloadSize);
 
-    Message singleMessage(batchedMessage.impl_->messageId, 
batchedMessage.impl_->metadata, payload, metadata);
+    const MessageId& m = batchedMessage.impl_->messageId;
+    MessageId singleMessageId(m.partition(), m.ledgerId(), m.entryId(), 
batchIndex);
+    Message singleMessage(singleMessageId, batchedMessage.impl_->metadata, 
payload, metadata);
     singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
 
     return singleMessage;
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index bda48d2..3746dc5 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -77,8 +77,7 @@ class Commands {
     static SharedBuffer newSubscribe(const std::string& topic, const 
std::string& subscription,
                                      uint64_t consumerId, uint64_t requestId,
                                      proto::CommandSubscribe_SubType subType, 
const std::string& consumerName,
-                                     SubscriptionMode subscriptionMode,
-                                     Optional<BatchMessageId> startMessageId);
+                                     SubscriptionMode subscriptionMode, 
Optional<MessageId> startMessageId);
 
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t 
requestId);
 
@@ -106,7 +105,7 @@ class Commands {
     static void serializeSingleMessageInBatchWithPayload(const Message& msg, 
SharedBuffer& batchPayLoad,
                                                          const unsigned long& 
maxMessageSizeInBytes);
 
-    static Message deSerializeSingleMessageInBatch(Message& batchedMessage);
+    static Message deSerializeSingleMessageInBatch(Message& batchedMessage, 
int32_t batchIndex);
 
     static SharedBuffer newConsumerStats(uint64_t consumerId, uint64_t 
requestId);
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 55e4e6c..2abbdd0 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -36,8 +36,7 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
                            const std::string& subscription, const 
ConsumerConfiguration& conf,
                            const ExecutorServicePtr listenerExecutor /* = NULL 
by default */,
                            const ConsumerTopicType consumerTopicType /* = 
NonPartitioned by default */,
-                           Commands::SubscriptionMode subscriptionMode,
-                           Optional<BatchMessageId> startMessageId)
+                           Commands::SubscriptionMode subscriptionMode, 
Optional<MessageId> startMessageId)
     : HandlerBase(client, topic, Backoff(milliseconds(100), seconds(60), 
milliseconds(0))),
       waitingForZeroQueueSizeMessage(false),
       config_(conf),
@@ -122,7 +121,7 @@ void ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
         return;
     }
 
-    Optional<BatchMessageId> firstMessageInQueue = clearReceiveQueue();
+    Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
     unAckedMessageTrackerPtr_->clear();
     batchAcknowledgementTracker_.clear();
 
@@ -271,8 +270,7 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
         return;
     }
 
-    Message m(msg, metadata, payload);
-    m.impl_->messageId.partition_ = partitionIndex_;
+    Message m(msg, metadata, payload, partitionIndex_);
     m.impl_->cnx_ = cnx.get();
 
     LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << 
metadata.num_messages_in_batch());
@@ -320,18 +318,17 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
     int skippedMessages = 0;
 
     for (int i = 0; i < batchSize; i++) {
-        batchedMessage.impl_->messageId.batchIndex_ = i;
         // This is a cheap copy since message contains only one shared pointer 
(impl_)
-        Message msg = 
Commands::deSerializeSingleMessageInBatch(batchedMessage);
+        Message msg = 
Commands::deSerializeSingleMessageInBatch(batchedMessage, i);
 
         if (startMessageId_.is_present()) {
-            const BatchMessageId& msgId = static_cast<const 
BatchMessageId&>(msg.getMessageId());
+            const MessageId& msgId = msg.getMessageId();
 
             // If we are receiving a batch message, we need to discard 
messages that were prior
             // to the startMessageId
-            if (msgId.ledgerId_ == startMessageId_.value().ledgerId_ &&
-                msgId.entryId_ == startMessageId_.value().entryId_ &&
-                msgId.batchIndex_ <= startMessageId_.value().batchIndex_) {
+            if (msgId.ledgerId() == startMessageId_.value().ledgerId() &&
+                msgId.entryId() == startMessageId_.value().entryId() &&
+                msgId.batchIndex() <= startMessageId_.value().batchIndex()) {
                 LOG_DEBUG(getName() << "Ignoring message from before the 
startMessageId"
                                     << msg.getMessageId());
                 ++skippedMessages;
@@ -559,7 +556,7 @@ Result ConsumerImpl::receiveHelper(Message& msg, int 
timeout) {
 
 void ConsumerImpl::messageProcessed(Message& msg) {
     Lock lock(mutex_);
-    lastDequedMessage_ = Optional<BatchMessageId>::of(static_cast<const 
BatchMessageId&>(msg.getMessageId()));
+    lastDequedMessage_ = Optional<MessageId>::of(msg.getMessageId());
 
     ClientConnectionPtr currentCnx = getCnx().lock();
     if (currentCnx && msg.impl_->cnx_ != currentCnx.get()) {
@@ -575,23 +572,19 @@ void ConsumerImpl::messageProcessed(Message& msg) {
  * was
  * not seen by the application
  */
-Optional<BatchMessageId> ConsumerImpl::clearReceiveQueue() {
+Optional<MessageId> ConsumerImpl::clearReceiveQueue() {
     Message nextMessageInQueue;
     if (incomingMessages_.peekAndClear(nextMessageInQueue)) {
         // There was at least one message pending in the queue
-        // We can safely cast to 'BatchMessageId' since all the messages 
queued will have that type of message
-        // id,
-        // irrespective of whether they were part of a batch or not.
-        const BatchMessageId& nextMessageId =
-            static_cast<const 
BatchMessageId&>(nextMessageInQueue.getMessageId());
-        BatchMessageId previousMessageId;
-        if (nextMessageId.batchIndex_ >= 0) {
-            previousMessageId = BatchMessageId(nextMessageId.ledgerId_, 
nextMessageId.entryId_,
-                                               nextMessageId.batchIndex_ - 1);
+        const MessageId& nextMessageId = nextMessageInQueue.getMessageId();
+        MessageId previousMessageId;
+        if (nextMessageId.batchIndex() >= 0) {
+            previousMessageId = MessageId(-1, nextMessageId.ledgerId(), 
nextMessageId.entryId(),
+                                          nextMessageId.batchIndex() - 1);
         } else {
-            previousMessageId = BatchMessageId(nextMessageId.ledgerId_, 
nextMessageId.entryId_ - 1, -1);
+            previousMessageId = MessageId(-1, nextMessageId.ledgerId(), 
nextMessageId.entryId() - 1, -1);
         }
-        return Optional<BatchMessageId>::of(previousMessageId);
+        return Optional<MessageId>::of(previousMessageId);
     } else if (lastDequedMessage_.is_present()) {
         // If the queue was empty we need to restart from the message just 
after the last one that has been
         // dequeued
@@ -646,23 +639,21 @@ void ConsumerImpl::statsCallback(Result res, 
ResultCallback callback, proto::Com
 void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback 
callback) {
     ResultCallback cb =
         boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, 
proto::CommandAck_AckType_Individual);
-    const BatchMessageId& batchMsgId = (const BatchMessageId&)msgId;
-    if (batchMsgId.batchIndex_ != -1 &&
-        !batchAcknowledgementTracker_.isBatchReady(batchMsgId, 
proto::CommandAck_AckType_Individual)) {
+    if (msgId.batchIndex() != -1 &&
+        !batchAcknowledgementTracker_.isBatchReady(msgId, 
proto::CommandAck_AckType_Individual)) {
         cb(ResultOk);
         return;
     }
-    doAcknowledge(batchMsgId, proto::CommandAck_AckType_Individual, cb);
+    doAcknowledge(msgId, proto::CommandAck_AckType_Individual, cb);
 }
 
-void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& mId, 
ResultCallback callback) {
+void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, 
ResultCallback callback) {
     ResultCallback cb =
         boost::bind(&ConsumerImpl::statsCallback, this, _1, callback, 
proto::CommandAck_AckType_Cumulative);
-    const BatchMessageId& msgId = (const BatchMessageId&)mId;
-    if (msgId.batchIndex_ != -1 &&
+    if (msgId.batchIndex() != -1 &&
         !batchAcknowledgementTracker_.isBatchReady(msgId, 
proto::CommandAck_AckType_Cumulative)) {
-        BatchMessageId messageId = 
batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId);
-        if (messageId == BatchMessageId()) {
+        MessageId messageId = 
batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId);
+        if (messageId == MessageId()) {
             // nothing to ack
             cb(ResultOk);
         } else {
@@ -673,11 +664,11 @@ void ConsumerImpl::acknowledgeCumulativeAsync(const 
MessageId& mId, ResultCallba
     }
 }
 
-void ConsumerImpl::doAcknowledge(const BatchMessageId& messageId, 
proto::CommandAck_AckType ackType,
+void ConsumerImpl::doAcknowledge(const MessageId& messageId, 
proto::CommandAck_AckType ackType,
                                  ResultCallback callback) {
     proto::MessageIdData messageIdData;
-    messageIdData.set_ledgerid(messageId.ledgerId_);
-    messageIdData.set_entryid(messageId.entryId_);
+    messageIdData.set_ledgerid(messageId.ledgerId());
+    messageIdData.set_entryid(messageId.entryId());
     ClientConnectionPtr cnx = getCnx().lock();
     if (cnx) {
         SharedBuffer cmd = Commands::newAck(consumerId_, messageIdData, 
ackType, -1);
@@ -687,7 +678,7 @@ void ConsumerImpl::doAcknowledge(const BatchMessageId& 
messageId, proto::Command
         } else {
             unAckedMessageTrackerPtr_->removeMessagesTill(messageId);
         }
-        
batchAcknowledgementTracker_.deleteAckedMessage((BatchMessageId&)messageId, 
ackType);
+        batchAcknowledgementTracker_.deleteAckedMessage(messageId, ackType);
         callback(ResultOk);
         LOG_DEBUG(getName() << "ack request sent for message - [" << 
messageIdData.ledgerid() << ","
                             << messageIdData.entryid() << "]");
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h 
b/pulsar-client-cpp/lib/ConsumerImpl.h
index d115ed8..45a12ef 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -20,7 +20,7 @@
 #define LIB_CONSUMERIMPL_H_
 
 #include <string>
-#include "pulsar/BatchMessageId.h"
+
 #include "pulsar/Result.h"
 #include "UnboundedBlockingQueue.h"
 #include "HandlerBase.h"
@@ -69,7 +69,7 @@ class ConsumerImpl : public ConsumerImplBase,
                  const ExecutorServicePtr listenerExecutor = 
ExecutorServicePtr(),
                  const ConsumerTopicType consumerTopicType = NonPartitioned,
                  Commands::SubscriptionMode = 
Commands::SubscriptionModeDurable,
-                 Optional<BatchMessageId> startMessageId = 
Optional<BatchMessageId>::empty());
+                 Optional<MessageId> startMessageId = 
Optional<MessageId>::empty());
     ~ConsumerImpl();
     void setPartitionIndex(int partitionIndex);
     int getPartitionIndex();
@@ -82,7 +82,7 @@ class ConsumerImpl : public ConsumerImplBase,
     inline proto::CommandSubscribe_SubType getSubType();
     void unsubscribeAsync(ResultCallback callback);
     void handleUnsubscribe(Result result, ResultCallback callback);
-    void doAcknowledge(const BatchMessageId& messageId, 
proto::CommandAck_AckType ackType,
+    void doAcknowledge(const MessageId& messageId, proto::CommandAck_AckType 
ackType,
                        ResultCallback callback);
     virtual void disconnectConsumer();
     virtual Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture();
@@ -134,7 +134,7 @@ class ConsumerImpl : public ConsumerImplBase,
     Result receiveHelper(Message& msg, int timeout);
     void statsCallback(Result, ResultCallback, proto::CommandAck_AckType);
 
-    Optional<BatchMessageId> clearReceiveQueue();
+    Optional<MessageId> clearReceiveQueue();
 
     boost::mutex mutexForReceiveWithZeroQueueSize;
     const ConsumerConfiguration config_;
@@ -145,15 +145,15 @@ class ConsumerImpl : public ConsumerImplBase,
     ConsumerTopicType consumerTopicType_;
 
     Commands::SubscriptionMode subscriptionMode_;
-    Optional<BatchMessageId> startMessageId_;
+    Optional<MessageId> startMessageId_;
 
-    Optional<BatchMessageId> lastDequedMessage_;
+    Optional<MessageId> lastDequedMessage_;
     UnboundedBlockingQueue<Message> incomingMessages_;
     int availablePermits_;
     uint64_t consumerId_;
     std::string consumerName_;
     std::string consumerStr_;
-    short partitionIndex_;
+    int32_t partitionIndex_;
     Promise<Result, ConsumerImplBaseWeakPtr> consumerCreatedPromise_;
     bool messageListenerRunning_;
     boost::mutex messageListenerMutex_;
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 79d7ca4..6b46fd0 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -34,7 +34,7 @@ using namespace pulsar;
 namespace pulsar {
 
 const static std::string emptyString;
-const static BatchMessageId invalidMessageId;
+const static MessageId invalidMessageId;
 
 const Message::StringMap& Message::getProperties() const { return 
impl_->properties(); }
 
@@ -62,14 +62,17 @@ Message::Message() : impl_() {}
 
 Message::Message(MessageImplPtr& impl) : impl_(impl) {}
 
-Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& 
metadata, SharedBuffer& payload)
+Message::Message(const proto::CommandMessage& msg, proto::MessageMetadata& 
metadata, SharedBuffer& payload,
+                 int32_t partition)
     : impl_(boost::make_shared<MessageImpl>()) {
-    impl_->messageId = BatchMessageId(msg.message_id().ledgerid(), 
msg.message_id().entryid());
+    impl_->messageId =
+        MessageId(partition, msg.message_id().ledgerid(), 
msg.message_id().entryid(), /* batchId */
+                  -1);
     impl_->metadata = metadata;
     impl_->payload = payload;
 }
 
-Message::Message(const BatchMessageId& messageID, proto::MessageMetadata& 
metadata, SharedBuffer& payload,
+Message::Message(const MessageId& messageID, proto::MessageMetadata& metadata, 
SharedBuffer& payload,
                  proto::SingleMessageMetadata& singleMetadata)
     : impl_(boost::make_shared<MessageImpl>()) {
     impl_->messageId = messageID;
diff --git a/pulsar-client-cpp/lib/MessageId.cc 
b/pulsar-client-cpp/lib/MessageId.cc
index dc1ff38..c5314d8 100644
--- a/pulsar-client-cpp/lib/MessageId.cc
+++ b/pulsar-client-cpp/lib/MessageId.cc
@@ -18,9 +18,9 @@
  */
 
 #include <pulsar/MessageId.h>
-#include <pulsar/BatchMessageId.h>
 
 #include "PulsarApi.pb.h"
+#include "MessageIdImpl.h"
 
 #include <iostream>
 #include <limits>
@@ -30,43 +30,40 @@
 
 namespace pulsar {
 
-MessageId::MessageId() : ledgerId_(-1), entryId_(-1), partition_(-1) {}
+MessageId::MessageId() {
+    static const MessageIdImplPtr emptyMessageId = 
boost::make_shared<MessageIdImpl>();
+    impl_ = emptyMessageId;
+}
 
 MessageId& MessageId::operator=(const MessageId& m) {
-    entryId_ = m.entryId_;
-    ledgerId_ = m.ledgerId_;
-    partition_ = m.partition_;
+    impl_ = m.impl_;
     return *this;
 }
 
-MessageId::MessageId(int64_t ledgerId, int64_t entryId)
-    : ledgerId_(ledgerId), entryId_(entryId), partition_(-1) {
-    // partition is set explicitly in consumerImpl when message is received
-    // consumer's partition is assigned to this partition
-}
-
-int64_t MessageId::getBatchIndex() const {
-    // It's only relevant for batch message ids
-    return -1;
-}
+MessageId::MessageId(int32_t partition, int64_t ledgerId, int64_t entryId, 
int32_t batchIndex)
+    : impl_(boost::make_shared<MessageIdImpl>(partition, ledgerId, entryId, 
batchIndex)) {}
 
 const MessageId& MessageId::earliest() {
-    static const BatchMessageId _earliest(-1, -1);
+    static const MessageId _earliest(-1, -1, -1, -1);
     return _earliest;
 }
 
 const MessageId& MessageId::latest() {
-    // For entry-id we only have 48bits
-    static const BatchMessageId _latest(std::numeric_limits<int64_t>::max(), 
(int64_t)(pow(2, 47) - 1));
+    static const int64_t long_max = std::numeric_limits<int64_t>::max();
+    static const MessageId _latest(-1, long_max, long_max, -1);
     return _latest;
 }
 
 void MessageId::serialize(std::string& result) const {
     proto::MessageIdData idData;
-    idData.set_ledgerid(ledgerId_);
-    idData.set_entryid(entryId_);
-    if (partition_ != -1) {
-        idData.set_partition(partition_);
+    idData.set_ledgerid(impl_->ledgerId_);
+    idData.set_entryid(impl_->entryId_);
+    if (impl_->partition_ != -1) {
+        idData.set_partition(impl_->partition_);
+    }
+
+    if (impl_->batchIndex_ != -1) {
+        idData.set_batch_index(impl_->batchIndex_);
     }
 
     idData.SerializeToString(&result);
@@ -75,38 +72,63 @@ void MessageId::serialize(std::string& result) const {
 /**
  * Deserialize a message id from a binary string
  */
-boost::shared_ptr<MessageId> MessageId::deserialize(const std::string& 
serializedMessageId) {
+MessageId MessageId::deserialize(const std::string& serializedMessageId) {
     proto::MessageIdData idData;
     if (!idData.ParseFromString(serializedMessageId)) {
         throw "Failed to parse serialized message id";
     }
 
-    return boost::make_shared<BatchMessageId>(idData.ledgerid(), 
idData.entryid(), idData.batch_index());
+    return MessageId(idData.partition(), idData.ledgerid(), idData.entryid(), 
idData.batch_index());
 }
 
+int64_t MessageId::ledgerId() const { return impl_->ledgerId_; }
+
+int64_t MessageId::entryId() const { return impl_->entryId_; }
+
+int32_t MessageId::batchIndex() const { return impl_->batchIndex_; }
+
+int32_t MessageId::partition() const { return impl_->partition_; }
+
 #pragma GCC visibility push(default)
+
 std::ostream& operator<<(std::ostream& s, const pulsar::MessageId& messageId) {
-    s << '(' << messageId.ledgerId_ << ',' << messageId.entryId_ << ',' << 
messageId.partition_ << ')';
+    s << '(' << messageId.impl_->ledgerId_ << ',' << messageId.impl_->entryId_ 
<< ','
+      << messageId.impl_->batchIndex_ << ',' << messageId.impl_->partition_ << 
')';
     return s;
 }
 
 bool MessageId::operator<(const MessageId& other) const {
-    if (ledgerId_ < other.ledgerId_) {
+    if (impl_->ledgerId_ < other.impl_->ledgerId_) {
+        return true;
+    } else if (impl_->ledgerId_ > other.impl_->ledgerId_) {
+        return false;
+    }
+
+    if (impl_->entryId_ < other.impl_->entryId_) {
         return true;
-    } else if (ledgerId_ > other.ledgerId_) {
+    } else if (impl_->entryId_ > other.impl_->entryId_) {
         return false;
     }
 
-    if (entryId_ < other.entryId_) {
+    if (impl_->batchIndex_ < other.impl_->batchIndex_) {
         return true;
     } else {
         return false;
     }
 }
 
+bool MessageId::operator<=(const MessageId& other) const { return *this < 
other || *this == other; }
+
+bool MessageId::operator>(const MessageId& other) const { return !(*this <= 
other); }
+
+bool MessageId::operator>=(const MessageId& other) const { return !(*this < 
other); }
+
 bool MessageId::operator==(const MessageId& other) const {
-    return ledgerId_ == other.ledgerId_ && entryId_ == other.entryId_;
+    return impl_->ledgerId_ == other.impl_->ledgerId_ && impl_->entryId_ == 
other.impl_->entryId_ &&
+           impl_->batchIndex_ == other.impl_->batchIndex_ && impl_->partition_ 
== other.impl_->partition_;
 }
 
+bool MessageId::operator!=(const MessageId& other) const { return !(*this == 
other); }
+
 #pragma GCC visibility pop
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc 
b/pulsar-client-cpp/lib/MessageIdImpl.h
similarity index 61%
copy from pulsar-client-cpp/tests/MessageIdTest.cc
copy to pulsar-client-cpp/lib/MessageIdImpl.h
index 7402143..a3fc171 100644
--- a/pulsar-client-cpp/tests/MessageIdTest.cc
+++ b/pulsar-client-cpp/lib/MessageIdImpl.h
@@ -16,21 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/BatchMessageId.h>
 
-#include <gtest/gtest.h>
+#pragma once
 
-#include <string>
+#include <cstdint>
 
-using namespace pulsar;
+namespace pulsar {
 
-TEST(MessageIdTest, testSerialization) {
-    BatchMessageId msgId(1, 2, 3);
-
-    std::string serialized;
-    msgId.serialize(serialized);
-
-    boost::shared_ptr<MessageId> deserialized = 
MessageId::deserialize(serialized);
-
-    ASSERT_EQ(msgId, static_cast<const BatchMessageId&>(*deserialized));
-}
+class MessageIdImpl {
+   public:
+    MessageIdImpl() : ledgerId_(-1), entryId_(-1), partition_(-1), 
batchIndex_(-1) {}
+    MessageIdImpl(int32_t partition, int64_t ledgerId, int64_t entryId, 
int32_t batchIndex)
+        : ledgerId_(ledgerId), entryId_(entryId), partition_(partition), 
batchIndex_(batchIndex) {}
+    const int64_t ledgerId_;
+    const int64_t entryId_;
+    const int32_t partition_;
+    const int32_t batchIndex_;
+};
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h 
b/pulsar-client-cpp/lib/MessageImpl.h
index 23a8fcf..f3753b1 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -21,7 +21,6 @@
 
 #include <pulsar/Message.h>
 #include <pulsar/MessageId.h>
-#include "pulsar/BatchMessageId.h"
 #include "SharedBuffer.h"
 #include "PulsarApi.pb.h"
 
@@ -42,7 +41,7 @@ class MessageImpl {
 
     proto::MessageMetadata metadata;
     SharedBuffer payload;
-    BatchMessageId messageId;
+    MessageId messageId;
     ClientConnection* cnx_;
 
     const std::string& getPartitionKey() const;
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index f9c02b3..dc218ef 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -150,7 +150,7 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result 
result, unsigned int
 }
 
 void PartitionedConsumerImpl::acknowledgeAsync(const MessageId& msgId, 
ResultCallback callback) {
-    int partition = msgId.partition_;
+    int32_t partition = msgId.partition();
     assert(partition < numPartitions_ && partition >= 0 && consumers_.size() > 
partition);
     unAckedMessageTrackerPtr_->remove(msgId);
     consumers_[partition]->acknowledgeAsync(msgId, callback);
@@ -313,7 +313,7 @@ bool PartitionedConsumerImpl::isOpen() {
 }
 
 void PartitionedConsumerImpl::messageReceived(Consumer consumer, const 
Message& msg) {
-    LOG_DEBUG("Received Message from one of the partition - " << 
msg.impl_->messageId.partition_);
+    LOG_DEBUG("Received Message from one of the partition - " << 
msg.impl_->messageId.partition());
     messages_.push(msg);
     if (messageListener_) {
         listenerExecutor_->postWork(
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 728573a..6808aad 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -130,7 +130,6 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, 
SendCallback callbac
     }
     // find a producer for that partition, index should start from 0
     ProducerImplPtr& producer = producers_[partition];
-    msg.impl_->messageId.partition_ = partition;
     // send message on that partition
     producer->sendAsync(msg, callback);
 }
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc 
b/pulsar-client-cpp/lib/ReaderImpl.cc
index 308908d..5e69a72 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -28,7 +28,7 @@ ReaderImpl::ReaderImpl(const ClientImplPtr client, const 
std::string& topic, con
                        const ExecutorServicePtr listenerExecutor, 
ReaderCallback readerCreatedCallback)
     : topic_(topic), client_(client), readerConf_(conf), 
readerCreatedCallback_(readerCreatedCallback) {}
 
-void ReaderImpl::start(const BatchMessageId& startMessageId) {
+void ReaderImpl::start(const MessageId& startMessageId) {
     ConsumerConfiguration consumerConf;
     consumerConf.setConsumerType(ConsumerExclusive);
     consumerConf.setReceiverQueueSize(readerConf_.getReceiverQueueSize());
@@ -51,7 +51,7 @@ void ReaderImpl::start(const BatchMessageId& startMessageId) {
 
     consumer_ = boost::make_shared<ConsumerImpl>(
         client_.lock(), topic_, subscription, consumerConf, 
ExecutorServicePtr(), NonPartitioned,
-        Commands::SubscriptionModeNonDurable, 
Optional<BatchMessageId>::of(startMessageId));
+        Commands::SubscriptionModeNonDurable, 
Optional<MessageId>::of(startMessageId));
     consumer_->getConsumerCreatedFuture().addListener(
         boost::bind(&ReaderImpl::handleConsumerCreated, shared_from_this(), 
_1, _2));
     consumer_->start();
@@ -87,13 +87,11 @@ void ReaderImpl::acknowledgeIfNecessary(Result result, 
const Message& msg) {
         return;
     }
 
-    const BatchMessageId& msgId = static_cast<const 
BatchMessageId&>(msg.getMessageId());
-
     // Only acknowledge on the first message in the batch
-    if (msgId.batchIndex_ <= 0) {
+    if (msg.getMessageId().batchIndex() <= 0) {
         // Acknowledge message immediately because the reader is based on 
non-durable
         // subscription. When it reconnects, it will specify the subscription 
position anyway
-        consumer_->acknowledgeCumulativeAsync(msgId, emptyCallback);
+        consumer_->acknowledgeCumulativeAsync(msg.getMessageId(), 
emptyCallback);
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ReaderImpl.h 
b/pulsar-client-cpp/lib/ReaderImpl.h
index 97c7f6b..61216b8 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.h
+++ b/pulsar-client-cpp/lib/ReaderImpl.h
@@ -36,7 +36,7 @@ class ReaderImpl : public 
boost::enable_shared_from_this<ReaderImpl> {
     ReaderImpl(const ClientImplPtr client, const std::string& topic, const 
ReaderConfiguration& conf,
                const ExecutorServicePtr listenerExecutor, ReaderCallback 
readerCreatedCallback);
 
-    void start(const BatchMessageId& startMessageId);
+    void start(const MessageId& startMessageId);
 
     const std::string& getTopic() const;
 
diff --git a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc 
b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
index 5930606..90006b6 100644
--- a/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
+++ b/pulsar-client-cpp/lib/UnAckedMessageTrackerEnabled.cc
@@ -85,14 +85,14 @@ long UnAckedMessageTrackerEnabled::size() {
 void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
     boost::unique_lock<boost::mutex> acquire(lock_);
     for (std::set<MessageId>::iterator it = oldSet_.begin(); it != 
oldSet_.end();) {
-        if (*it < msgId && it->partition_ == msgId.partition_) {
+        if (*it < msgId && it->partition() == msgId.partition()) {
             oldSet_.erase(it++);
         } else {
             it++;
         }
     }
     for (std::set<MessageId>::iterator it = currentSet_.begin(); it != 
currentSet_.end();) {
-        if (*it < msgId && it->partition_ == msgId.partition_) {
+        if (*it < msgId && it->partition() == msgId.partition()) {
             currentSet_.erase(it++);
         } else {
             it++;
diff --git a/pulsar-client-cpp/python/src/client.cc 
b/pulsar-client-cpp/python/src/client.cc
index ad78ed2..b5295ac 100644
--- a/pulsar-client-cpp/python/src/client.cc
+++ b/pulsar-client-cpp/python/src/client.cc
@@ -44,7 +44,7 @@ Consumer Client_subscribe(Client& client, const std::string& 
topic, const std::s
 }
 
 Reader Client_createReader(Client& client, const std::string& topic,
-                           const BatchMessageId& startMessageId,
+                           const MessageId& startMessageId,
                            const ReaderConfiguration& conf) {
     Reader reader;
     Result res;
diff --git a/pulsar-client-cpp/python/src/message.cc 
b/pulsar-client-cpp/python/src/message.cc
index b7513d3..1d1ee23 100644
--- a/pulsar-client-cpp/python/src/message.cc
+++ b/pulsar-client-cpp/python/src/message.cc
@@ -20,13 +20,13 @@
 
 #include <boost/python/suite/indexing/map_indexing_suite.hpp>
 
-std::string MessageId_str(const BatchMessageId& msgId) {
+std::string MessageId_str(const MessageId& msgId) {
     std::stringstream ss;
     ss << msgId;
     return ss.str();
 }
 
-std::string MessageId_serialize(const BatchMessageId& msgId) {
+std::string MessageId_serialize(const MessageId& msgId) {
     std::string serialized;
     msgId.serialize(serialized);
     return serialized;
@@ -42,8 +42,8 @@ boost::python::object Message_data(const Message& msg) {
     return 
boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize((const 
char*)msg.getData(), msg.getLength())));
 }
 
-const BatchMessageId& Message_getMessageId(const Message& msg) {
-    return static_cast<const BatchMessageId&>(msg.getMessageId());
+const MessageId& Message_getMessageId(const Message& msg) {
+    return msg.getMessageId();
 }
 
 void export_message() {
@@ -67,10 +67,10 @@ void export_message() {
             .def(map_indexing_suite<Message::StringMap>())
             ;
 
-    static const BatchMessageId& _MessageId_earliest = static_cast<const 
BatchMessageId&>(MessageId::earliest());
-    static const BatchMessageId& _MessageId_latest = static_cast<const 
BatchMessageId&>(MessageId::latest());
+    static const MessageId& _MessageId_earliest = MessageId::earliest();
+    static const MessageId& _MessageId_latest = MessageId::latest();
 
-    class_<BatchMessageId, boost::shared_ptr<BatchMessageId> >("MessageId")
+    class_<MessageId>("MessageId")
             .def("__str__", &MessageId_str)
             .add_static_property("earliest", make_getter(&_MessageId_earliest))
             .add_static_property("latest", make_getter(&_MessageId_latest))
diff --git a/pulsar-client-cpp/tests/BatchMessageTest.cc 
b/pulsar-client-cpp/tests/BatchMessageTest.cc
index dda8f79..34c38db 100644
--- a/pulsar-client-cpp/tests/BatchMessageTest.cc
+++ b/pulsar-client-cpp/tests/BatchMessageTest.cc
@@ -228,7 +228,7 @@ TEST(BatchMessageTest, testBatchSizeInBytes) {
         std::string expectedMessageContent = prefix + 
boost::lexical_cast<std::string>(i);
         LOG_DEBUG("Received Message with [ content - " << 
receivedMsg.getDataAsString() << "] [ messageID = "
                                                        << 
receivedMsg.getMessageId() << "]");
-        
ASSERT_LT(pulsar::PulsarFriend::getBatchIndex((BatchMessageId&)receivedMsg.getMessageId()),
 2);
+        
ASSERT_LT(pulsar::PulsarFriend::getBatchIndex(receivedMsg.getMessageId()), 2);
         ASSERT_EQ(receivedMsg.getProperty("msgIndex"), 
boost::lexical_cast<std::string>(i++));
         ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
         ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
diff --git a/pulsar-client-cpp/tests/MessageIdTest.cc 
b/pulsar-client-cpp/tests/MessageIdTest.cc
index 7402143..06c2528 100644
--- a/pulsar-client-cpp/tests/MessageIdTest.cc
+++ b/pulsar-client-cpp/tests/MessageIdTest.cc
@@ -16,7 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/BatchMessageId.h>
+#include <pulsar/MessageId.h>
+#include "PulsarFriend.h"
 
 #include <gtest/gtest.h>
 
@@ -25,12 +26,12 @@
 using namespace pulsar;
 
 TEST(MessageIdTest, testSerialization) {
-    BatchMessageId msgId(1, 2, 3);
+    MessageId msgId = PulsarFriend::getMessageId(-1, 1, 2, 3);
 
     std::string serialized;
     msgId.serialize(serialized);
 
-    boost::shared_ptr<MessageId> deserialized = 
MessageId::deserialize(serialized);
+    MessageId deserialized = MessageId::deserialize(serialized);
 
-    ASSERT_EQ(msgId, static_cast<const BatchMessageId&>(*deserialized));
+    ASSERT_EQ(msgId, deserialized);
 }
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h 
b/pulsar-client-cpp/tests/PulsarFriend.h
index c64bab2..f2d1a69 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <pulsar/BatchMessageId.h>
+
 #include <lib/ProducerImpl.h>
 #include <lib/ConsumerImpl.h>
 #include <string>
@@ -26,7 +26,11 @@ using std::string;
 namespace pulsar {
 class PulsarFriend {
    public:
-    static int getBatchIndex(const BatchMessageId& mId) { return 
mId.batchIndex_; }
+    static MessageId getMessageId(int32_t partition, int64_t ledgerId, int64_t 
entryId, int32_t batchIndex) {
+        return MessageId(partition, ledgerId, entryId, batchIndex);
+    }
+
+    static int getBatchIndex(const MessageId& mId) { return mId.batchIndex(); }
 
     static ProducerStatsImplPtr getProducerStatsPtr(Producer producer) {
         ProducerImpl* producerImpl = 
static_cast<ProducerImpl*>(producer.impl_.get());
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc 
b/pulsar-client-cpp/tests/ReaderTest.cc
index 0e31f6a..be9678d 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -257,7 +257,7 @@ TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) {
     // Create another reader starting on msgid4
     auto msgId4 = MessageId::deserialize(lastMessageId);
     Reader reader2;
-    ASSERT_EQ(ResultOk, client.createReader(topicName, *msgId4, readerConf, 
reader2));
+    ASSERT_EQ(ResultOk, client.createReader(topicName, msgId4, readerConf, 
reader2));
 
     for (int i = 5; i < 11; i++) {
         Message msg;

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to