This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new d03ff20 [fix] Fix MessageId serialization when it's a batched message
(#153)
d03ff20 is described below
commit d03ff20a96de695a6c2837d62862569faf11876b
Author: Yunze Xu <[email protected]>
AuthorDate: Wed Dec 28 17:42:46 2022 +0800
[fix] Fix MessageId serialization when it's a batched message (#153)
* [fix] Fix MessageId serialization when it's a batched message
### Motivation
The serialization and deserialization of `MessageId` became wrong after
https://github.com/apache/pulsar-client-cpp/pull/132.
1. The batch size is not serialized.
2. `BatchedMessageIdImpl` could never be deserialized.
The wrong behaviors could lead to a result that all MessageId objects
created from deserialization does not have a batch size, which might
make `ReaderTest.testReaderOnSpecificMessageWithBatches` fail when the
cmake build type is `Debug`. What's worse is that a MessageId created
from deserialization is always treated as a `MessageIdImpl`, on which
the acknowledgment will have wrong behavior.
### Modifications
Serialize the batch size if it's valid. In deserialization, create a
`BatchedMessageIdImpl` when the batch index and the batch size are valid
as a batched message.
There is a problem that if a `MessageId` is created from
deserialization, it cannot share a `BatchMessageAcker` with other
`MessageId` objects. In this case, create a fake `BatchMessageAcker`
that returns false for both `ackIndividual` and `ackCumulative` methods.
It will make acknowledgment always fail but will fall back to batch
index ACK if batch index ACK is enabled.
Add the `-DCMAKE_BUILD_TYPE=Debug` for tests to enable assertions.
* Add virtual destructor
---
.github/workflows/ci-pr-validation.yaml | 2 +-
lib/BatchMessageAcker.h | 41 ++++++++++++++++++++++-----------
lib/ConsumerImpl.cc | 2 +-
lib/MessageBatch.cc | 2 +-
lib/MessageId.cc | 4 ++++
lib/MessageIdBuilder.cc | 9 +++++---
tests/MessageIdTest.cc | 28 ++++++++++++++++++++--
7 files changed, 67 insertions(+), 21 deletions(-)
diff --git a/.github/workflows/ci-pr-validation.yaml
b/.github/workflows/ci-pr-validation.yaml
index 8abdee5..492ef95 100644
--- a/.github/workflows/ci-pr-validation.yaml
+++ b/.github/workflows/ci-pr-validation.yaml
@@ -57,7 +57,7 @@ jobs:
sudo curl -o /gtest-parallel
https://raw.githubusercontent.com/google/gtest-parallel/master/gtest_parallel.py
- name: CMake
- run: cmake . -DBUILD_PERF_TOOLS=ON
+ run: cmake . -DCMAKE_BUILD_TYPE=Debug -DBUILD_PERF_TOOLS=ON
- name: Check formatting
run: make check-format
diff --git a/lib/BatchMessageAcker.h b/lib/BatchMessageAcker.h
index 489cf9e..ce733b8 100644
--- a/lib/BatchMessageAcker.h
+++ b/lib/BatchMessageAcker.h
@@ -31,22 +31,46 @@ class BatchMessageAcker;
using BatchMessageAckerPtr = std::shared_ptr<BatchMessageAcker>;
class BatchMessageAcker {
+ public:
+ virtual ~BatchMessageAcker() {}
+ // Return false for these methods so that batch index ACK will be falled
back to if the acker is created
+ // by deserializing from raw bytes.
+ virtual bool ackIndividual(int32_t) { return false; }
+ virtual bool ackCumulative(int32_t) { return false; }
+
+ bool shouldAckPreviousMessageId() noexcept {
+ bool expectedValue = false;
+ return
prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
+ }
+
+ private:
+ // When a batched message is acknowledged cumulatively, the previous
message id will be acknowledged
+ // without batch index ACK enabled. However, it should be acknowledged
only once. Use this flag to
+ // determine whether to acknowledge the previous message id.
+ std::atomic_bool prevBatchCumulativelyAcked_{false};
+};
+
+class BatchMessageAckerImpl : public BatchMessageAcker {
public:
using Lock = std::lock_guard<std::mutex>;
static BatchMessageAckerPtr create(int32_t batchSize) {
- return std::make_shared<BatchMessageAcker>(batchSize);
+ if (batchSize > 0) {
+ return std::make_shared<BatchMessageAckerImpl>(batchSize);
+ } else {
+ return std::make_shared<BatchMessageAcker>();
+ }
}
- BatchMessageAcker(int32_t batchSize) : bitSet_(batchSize) { bitSet_.set(0,
batchSize); }
+ BatchMessageAckerImpl(int32_t batchSize) : bitSet_(batchSize) {
bitSet_.set(0, batchSize); }
- bool ackIndividual(int32_t batchIndex) {
+ bool ackIndividual(int32_t batchIndex) override {
Lock lock{mutex_};
bitSet_.clear(batchIndex);
return bitSet_.isEmpty();
}
- bool ackCumulative(int32_t batchIndex) {
+ bool ackCumulative(int32_t batchIndex) override {
Lock lock{mutex_};
// The range of cumulative acknowledgment is closed while
BitSet::clear accepts a left-closed
// right-open range.
@@ -54,17 +78,8 @@ class BatchMessageAcker {
return bitSet_.isEmpty();
}
- bool shouldAckPreviousMessageId() noexcept {
- bool expectedValue = false;
- return
prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
- }
-
private:
BitSet bitSet_;
- // When a batched message is acknowledged cumulatively, the previous
message id will be acknowledged
- // without batch index ACK enabled. However, it should be acknowledged
only once. Use this flag to
- // determine whether to acknowledge the previous message id.
- std::atomic_bool prevBatchCumulativelyAcked_{false};
mutable std::mutex mutex_;
};
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 8589d25..6f2c211 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -636,7 +636,7 @@ uint32_t
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
int skippedMessages = 0;
- auto acker = BatchMessageAcker::create(batchSize);
+ auto acker = BatchMessageAckerImpl::create(batchSize);
for (int i = 0; i < batchSize; i++) {
// This is a cheap copy since message contains only one shared pointer
(impl_)
Message msg =
Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
diff --git a/lib/MessageBatch.cc b/lib/MessageBatch.cc
index 3ddd7d6..d0a3d58 100644
--- a/lib/MessageBatch.cc
+++ b/lib/MessageBatch.cc
@@ -47,7 +47,7 @@ MessageBatch& MessageBatch::parseFrom(const SharedBuffer&
payload, uint32_t batc
impl_->metadata.set_num_messages_in_batch(batchSize);
batch_.clear();
- auto acker = BatchMessageAcker::create(batchSize);
+ auto acker = BatchMessageAckerImpl::create(batchSize);
for (int i = 0; i < batchSize; ++i) {
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i,
batchSize, acker));
}
diff --git a/lib/MessageId.cc b/lib/MessageId.cc
index 9b5205b..ebe52c1 100644
--- a/lib/MessageId.cc
+++ b/lib/MessageId.cc
@@ -69,6 +69,10 @@ void MessageId::serialize(std::string& result) const {
idData.set_batch_index(impl_->batchIndex_);
}
+ if (impl_->batchSize_ != 0) {
+ idData.set_batch_size(impl_->batchSize_);
+ }
+
auto chunkMsgId = std::dynamic_pointer_cast<ChunkMessageIdImpl>(impl_);
if (chunkMsgId) {
proto::MessageIdData& firstChunkIdData =
*idData.mutable_first_chunk_message_id();
diff --git a/lib/MessageIdBuilder.cc b/lib/MessageIdBuilder.cc
index 8857daf..c2db5ef 100644
--- a/lib/MessageIdBuilder.cc
+++ b/lib/MessageIdBuilder.cc
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-#include <assert.h>
#include <pulsar/MessageIdBuilder.h>
+#include "BatchedMessageIdImpl.h"
#include "MessageIdImpl.h"
#include "PulsarApi.pb.h"
@@ -42,8 +42,11 @@ MessageIdBuilder MessageIdBuilder::from(const
proto::MessageIdData& messageIdDat
}
MessageId MessageIdBuilder::build() const {
- assert(impl_->batchIndex_ < 0 || (impl_->batchSize_ > impl_->batchIndex_));
- return MessageId{impl_};
+ if (impl_->batchIndex_ >= 0 && impl_->batchSize_ > 0) {
+ return MessageId{std::make_shared<BatchedMessageIdImpl>(*impl_,
BatchMessageAckerImpl::create(0))};
+ } else {
+ return MessageId{impl_};
+ }
}
MessageIdBuilder& MessageIdBuilder::ledgerId(int64_t ledgerId) {
diff --git a/tests/MessageIdTest.cc b/tests/MessageIdTest.cc
index e653aa1..56dc7e8 100644
--- a/tests/MessageIdTest.cc
+++ b/tests/MessageIdTest.cc
@@ -22,19 +22,43 @@
#include <string>
#include "PulsarFriend.h"
+#include "lib/BatchedMessageIdImpl.h"
+#include "lib/Commands.h"
#include "lib/MessageIdUtil.h"
using namespace pulsar;
TEST(MessageIdTest, testSerialization) {
- auto msgId =
MessageIdBuilder().ledgerId(1L).entryId(2L).batchIndex(3L).build();
+ auto msgId =
MessageIdBuilder().ledgerId(1L).entryId(2L).partition(10).batchIndex(3).build();
std::string serialized;
msgId.serialize(serialized);
MessageId deserialized = MessageId::deserialize(serialized);
+
ASSERT_FALSE(std::dynamic_pointer_cast<BatchedMessageIdImpl>(Commands::getMessageIdImpl(deserialized)));
+ ASSERT_EQ(deserialized.ledgerId(), 1L);
+ ASSERT_EQ(deserialized.entryId(), 2L);
+ ASSERT_EQ(deserialized.partition(), 10);
+ ASSERT_EQ(deserialized.batchIndex(), 3);
+ ASSERT_EQ(deserialized.batchSize(), 0);
- ASSERT_EQ(msgId, deserialized);
+ // Only a MessageId whose batch index and batch size are both valid can be
deserialized as a batched
+ // message id.
+ msgId =
MessageIdBuilder().ledgerId(3L).entryId(1L).batchIndex(0).batchSize(1).build();
+ msgId.serialize(serialized);
+ deserialized = MessageId::deserialize(serialized);
+ auto batchedMessageId =
+
std::dynamic_pointer_cast<BatchedMessageIdImpl>(Commands::getMessageIdImpl(deserialized));
+ ASSERT_TRUE(batchedMessageId);
+ // The BatchMessageAcker object created from deserialization is a fake
implementation that all acknowledge
+ // methods return false.
+ ASSERT_FALSE(batchedMessageId->ackIndividual(0));
+ ASSERT_FALSE(batchedMessageId->ackCumulative(0));
+ ASSERT_EQ(deserialized.ledgerId(), 3L);
+ ASSERT_EQ(deserialized.entryId(), 1L);
+ ASSERT_EQ(deserialized.partition(), -1);
+ ASSERT_EQ(deserialized.batchIndex(), 0);
+ ASSERT_EQ(deserialized.batchSize(), 1);
}
TEST(MessageIdTest, testCompareLedgerAndEntryId) {