This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new d03ff20  [fix] Fix MessageId serialization when it's a batched message 
(#153)
d03ff20 is described below

commit d03ff20a96de695a6c2837d62862569faf11876b
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Dec 28 17:42:46 2022 +0800

    [fix] Fix MessageId serialization when it's a batched message (#153)
    
    * [fix] Fix MessageId serialization when it's a batched message
    
    ### Motivation
    
    The serialization and deserialization of `MessageId` became wrong after
    https://github.com/apache/pulsar-client-cpp/pull/132.
    1. The batch size is not serialized.
    2. `BatchedMessageIdImpl` could never be deserialized.
    
    The wrong behaviors could lead to a result that all MessageId objects
    created from deserialization does not have a batch size, which might
    make `ReaderTest.testReaderOnSpecificMessageWithBatches` fail when the
    cmake build type is `Debug`. What's worse is that a MessageId created
    from deserialization is always treated as a `MessageIdImpl`, on which
    the acknowledgment will have wrong behavior.
    
    ### Modifications
    
    Serialize the batch size if it's valid. In deserialization, create a
    `BatchedMessageIdImpl` when the batch index and the batch size are valid
    as a batched message.
    
    There is a problem that if a `MessageId` is created from
    deserialization, it cannot share a `BatchMessageAcker` with other
    `MessageId` objects. In this case, create a fake `BatchMessageAcker`
    that returns false for both `ackIndividual` and `ackCumulative` methods.
    It will make acknowledgment always fail but will fall back to batch
    index ACK if batch index ACK is enabled.
    
    Add the `-DCMAKE_BUILD_TYPE=Debug` for tests to enable assertions.
    
    * Add virtual destructor
---
 .github/workflows/ci-pr-validation.yaml |  2 +-
 lib/BatchMessageAcker.h                 | 41 ++++++++++++++++++++++-----------
 lib/ConsumerImpl.cc                     |  2 +-
 lib/MessageBatch.cc                     |  2 +-
 lib/MessageId.cc                        |  4 ++++
 lib/MessageIdBuilder.cc                 |  9 +++++---
 tests/MessageIdTest.cc                  | 28 ++++++++++++++++++++--
 7 files changed, 67 insertions(+), 21 deletions(-)

diff --git a/.github/workflows/ci-pr-validation.yaml 
b/.github/workflows/ci-pr-validation.yaml
index 8abdee5..492ef95 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -57,7 +57,7 @@ jobs:
           sudo curl -o /gtest-parallel 
https://raw.githubusercontent.com/google/gtest-parallel/master/gtest_parallel.py
 
 
       - name: CMake
-        run: cmake . -DBUILD_PERF_TOOLS=ON
+        run: cmake . -DCMAKE_BUILD_TYPE=Debug -DBUILD_PERF_TOOLS=ON
 
       - name: Check formatting
         run: make check-format
diff --git a/lib/BatchMessageAcker.h b/lib/BatchMessageAcker.h
index 489cf9e..ce733b8 100644
--- a/lib/BatchMessageAcker.h
+++ b/lib/BatchMessageAcker.h
@@ -31,22 +31,46 @@ class BatchMessageAcker;
 using BatchMessageAckerPtr = std::shared_ptr<BatchMessageAcker>;
 
 class BatchMessageAcker {
+   public:
+    virtual ~BatchMessageAcker() {}
+    // Return false for these methods so that batch index ACK will be falled 
back to if the acker is created
+    // by deserializing from raw bytes.
+    virtual bool ackIndividual(int32_t) { return false; }
+    virtual bool ackCumulative(int32_t) { return false; }
+
+    bool shouldAckPreviousMessageId() noexcept {
+        bool expectedValue = false;
+        return 
prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
+    }
+
+   private:
+    // When a batched message is acknowledged cumulatively, the previous 
message id will be acknowledged
+    // without batch index ACK enabled. However, it should be acknowledged 
only once. Use this flag to
+    // determine whether to acknowledge the previous message id.
+    std::atomic_bool prevBatchCumulativelyAcked_{false};
+};
+
+class BatchMessageAckerImpl : public BatchMessageAcker {
    public:
     using Lock = std::lock_guard<std::mutex>;
 
     static BatchMessageAckerPtr create(int32_t batchSize) {
-        return std::make_shared<BatchMessageAcker>(batchSize);
+        if (batchSize > 0) {
+            return std::make_shared<BatchMessageAckerImpl>(batchSize);
+        } else {
+            return std::make_shared<BatchMessageAcker>();
+        }
     }
 
-    BatchMessageAcker(int32_t batchSize) : bitSet_(batchSize) { bitSet_.set(0, 
batchSize); }
+    BatchMessageAckerImpl(int32_t batchSize) : bitSet_(batchSize) { 
bitSet_.set(0, batchSize); }
 
-    bool ackIndividual(int32_t batchIndex) {
+    bool ackIndividual(int32_t batchIndex) override {
         Lock lock{mutex_};
         bitSet_.clear(batchIndex);
         return bitSet_.isEmpty();
     }
 
-    bool ackCumulative(int32_t batchIndex) {
+    bool ackCumulative(int32_t batchIndex) override {
         Lock lock{mutex_};
         // The range of cumulative acknowledgment is closed while 
BitSet::clear accepts a left-closed
         // right-open range.
@@ -54,17 +78,8 @@ class BatchMessageAcker {
         return bitSet_.isEmpty();
     }
 
-    bool shouldAckPreviousMessageId() noexcept {
-        bool expectedValue = false;
-        return 
prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
-    }
-
    private:
     BitSet bitSet_;
-    // When a batched message is acknowledged cumulatively, the previous 
message id will be acknowledged
-    // without batch index ACK enabled. However, it should be acknowledged 
only once. Use this flag to
-    // determine whether to acknowledge the previous message id.
-    std::atomic_bool prevBatchCumulativelyAcked_{false};
     mutable std::mutex mutex_;
 };
 
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 8589d25..6f2c211 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -636,7 +636,7 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
 
     int skippedMessages = 0;
 
-    auto acker = BatchMessageAcker::create(batchSize);
+    auto acker = BatchMessageAckerImpl::create(batchSize);
     for (int i = 0; i < batchSize; i++) {
         // This is a cheap copy since message contains only one shared pointer 
(impl_)
         Message msg = 
Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
diff --git a/lib/MessageBatch.cc b/lib/MessageBatch.cc
index 3ddd7d6..d0a3d58 100644
--- a/lib/MessageBatch.cc
+++ b/lib/MessageBatch.cc
@@ -47,7 +47,7 @@ MessageBatch& MessageBatch::parseFrom(const SharedBuffer& 
payload, uint32_t batc
     impl_->metadata.set_num_messages_in_batch(batchSize);
     batch_.clear();
 
-    auto acker = BatchMessageAcker::create(batchSize);
+    auto acker = BatchMessageAckerImpl::create(batchSize);
     for (int i = 0; i < batchSize; ++i) {
         
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i, 
batchSize, acker));
     }
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index 9b5205b..ebe52c1 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -69,6 +69,10 @@ void MessageId::serialize(std::string& result) const {
         idData.set_batch_index(impl_->batchIndex_);
     }
 
+    if (impl_->batchSize_ != 0) {
+        idData.set_batch_size(impl_->batchSize_);
+    }
+
     auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
     if (chunkMsgId) {
         proto::MessageIdData& firstChunkIdData = 
*idData.mutable_first_chunk_message_id();
diff --git a/lib/MessageIdBuilder.cc b/lib/MessageIdBuilder.cc
index 8857daf..c2db5ef 100644
--- a/lib/MessageIdBuilder.cc
+++ b/lib/MessageIdBuilder.cc
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#include <assert.h>
 #include <pulsar/MessageIdBuilder.h>
 
+#include "BatchedMessageIdImpl.h"
 #include "MessageIdImpl.h"
 #include "PulsarApi.pb.h"
 
@@ -42,8 +42,11 @@ MessageIdBuilder MessageIdBuilder::from(const 
proto::MessageIdData& messageIdDat
 }
 
 MessageId MessageIdBuilder::build() const {
-    assert(impl_->batchIndex_ < 0 || (impl_->batchSize_ > impl_->batchIndex_));
-    return MessageId{impl_};
+    if (impl_->batchIndex_ >= 0 && impl_->batchSize_ > 0) {
+        return MessageId{std::make_shared<BatchedMessageIdImpl>(*impl_, 
BatchMessageAckerImpl::create(0))};
+    } else {
+        return MessageId{impl_};
+    }
 }
 
 MessageIdBuilder& MessageIdBuilder::ledgerId(int64_t ledgerId) {
diff --git a/tests/MessageIdTest.cc b/tests/MessageIdTest.cc
index e653aa1..56dc7e8 100644
--- a/tests/MessageIdTest.cc
+++ b/tests/MessageIdTest.cc
@@ -22,19 +22,43 @@
 #include <string>
 
 #include "PulsarFriend.h"
+#include "lib/BatchedMessageIdImpl.h"
+#include "lib/Commands.h"
 #include "lib/MessageIdUtil.h"
 
 using namespace pulsar;
 
 TEST(MessageIdTest, testSerialization) {
-    auto msgId = 
MessageIdBuilder().ledgerId(1L).entryId(2L).batchIndex(3L).build();
+    auto msgId = 
MessageIdBuilder().ledgerId(1L).entryId(2L).partition(10).batchIndex(3).build();
 
     std::string serialized;
     msgId.serialize(serialized);
 
     MessageId deserialized = MessageId::deserialize(serialized);
+    
ASSERT_FALSE(std::dynamic_pointer_cast<BatchedMessageIdImpl>(Commands::getMessageIdImpl(deserialized)));
+    ASSERT_EQ(deserialized.ledgerId(), 1L);
+    ASSERT_EQ(deserialized.entryId(), 2L);
+    ASSERT_EQ(deserialized.partition(), 10);
+    ASSERT_EQ(deserialized.batchIndex(), 3);
+    ASSERT_EQ(deserialized.batchSize(), 0);
 
-    ASSERT_EQ(msgId, deserialized);
+    // Only a MessageId whose batch index and batch size are both valid can be 
deserialized as a batched
+    // message id.
+    msgId = 
MessageIdBuilder().ledgerId(3L).entryId(1L).batchIndex(0).batchSize(1).build();
+    msgId.serialize(serialized);
+    deserialized = MessageId::deserialize(serialized);
+    auto batchedMessageId =
+        
std::dynamic_pointer_cast<BatchedMessageIdImpl>(Commands::getMessageIdImpl(deserialized));
+    ASSERT_TRUE(batchedMessageId);
+    // The BatchMessageAcker object created from deserialization is a fake 
implementation that all acknowledge
+    // methods return false.
+    ASSERT_FALSE(batchedMessageId->ackIndividual(0));
+    ASSERT_FALSE(batchedMessageId->ackCumulative(0));
+    ASSERT_EQ(deserialized.ledgerId(), 3L);
+    ASSERT_EQ(deserialized.entryId(), 1L);
+    ASSERT_EQ(deserialized.partition(), -1);
+    ASSERT_EQ(deserialized.batchIndex(), 0);
+    ASSERT_EQ(deserialized.batchSize(), 1);
 }
 
 TEST(MessageIdTest, testCompareLedgerAndEntryId) {

Reply via email to