This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2018a06 Add BatchedMessageIdImpl to acknowledge batched messages
(#132)
2018a06 is described below
commit 2018a06e8afcde4de59971cfbc5f653a4f9d7897
Author: Yunze Xu <[email protected]>
AuthorDate: Thu Dec 8 15:47:39 2022 +0800
Add BatchedMessageIdImpl to acknowledge batched messages (#132)
* Add BatchedMessageIdImpl to acknowledge batched messages
Fixes https://github.com/apache/pulsar-client-cpp/issues/130
### Motivation
It's a catchup for https://github.com/apache/pulsar/pull/1424
### Modifications
Migrate `BitSet` implementation from JDK. Though we have `vector<bool>`
or `boost::dynamic_bitset`, to support batch index ACK in future, it's
better to have the same method to convert a bit set to the underlying
long array.
Add `BatchedMessageIdImpl` to maintain a `BatchMessageAcker`, which is
shared by messages in the same batch. The acker is responsible to record
which messages are acknowledged. Only if all messages in the batch are
acknowledged will the message id be acknowledged.
The stats for individual ACKs only updates after the whole batch are
acknowledged. Before this PR, each time the single message is
acknowledged, the stats increase by one.
---
lib/BatchAcknowledgementTracker.cc | 174 -----------------
lib/BatchAcknowledgementTracker.h | 106 ----------
lib/BatchMessageAcker.h | 71 +++++++
lib/{MessageIdImpl.h => BatchedMessageIdImpl.h} | 47 ++---
lib/BitSet.h | 250 ++++++++++++++++++++++++
lib/Commands.cc | 11 +-
lib/Commands.h | 9 +-
lib/ConsumerImpl.cc | 118 ++++++-----
lib/ConsumerImpl.h | 23 +--
lib/MessageBatch.cc | 4 +-
lib/MessageIdImpl.h | 2 +
tests/AcknowledgeTest.cc | 60 ++++++
tests/BasicEndToEndTest.cc | 12 +-
tests/BitSetTest.cc | 102 ++++++++++
tests/ConsumerWrapper.cc | 30 +++
tests/ConsumerWrapper.h | 70 +++++++
16 files changed, 707 insertions(+), 382 deletions(-)
diff --git a/lib/BatchAcknowledgementTracker.cc
b/lib/BatchAcknowledgementTracker.cc
deleted file mode 100644
index d1bb6a8..0000000
--- a/lib/BatchAcknowledgementTracker.cc
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "BatchAcknowledgementTracker.h"
-
-#include "LogUtils.h"
-#include "MessageIdUtil.h"
-#include "MessageImpl.h"
-
-namespace pulsar {
-DECLARE_LOG_OBJECT()
-
-BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string
topic,
- const std::string
subscription,
- const long consumerId)
- : greatestCumulativeAckSent_() {
- std::stringstream consumerStrStream;
- consumerStrStream << "BatchAcknowledgementTracker for [" << topic << ", "
<< subscription << ", "
- << consumerId << "] ";
- name_ = consumerStrStream.str();
- LOG_DEBUG(name_ << "Constructed BatchAcknowledgementTracker");
-}
-
-void BatchAcknowledgementTracker::clear() {
- Lock lock(mutex_);
- trackerMap_.clear();
- sendList_.clear();
-}
-
-void BatchAcknowledgementTracker::receivedMessage(const Message& message) {
- // ignore message if it is not a batch message
- if (!message.impl_->metadata.has_num_messages_in_batch()) {
- return;
- }
- Lock lock(mutex_);
- MessageId msgID = message.impl_->messageId;
-
- // ignore message if it is less than the last cumulative ack sent or
messageID is already being tracked
- TrackerMap::iterator pos = trackerMap_.find(msgID);
- if (msgID < greatestCumulativeAckSent_ || pos != trackerMap_.end() ||
- std::find(sendList_.begin(), sendList_.end(), msgID) !=
sendList_.end()) {
- return;
- }
- LOG_DEBUG("Initializing the trackerMap_ with Message ID = "
- << msgID << " -- Map size: " << trackerMap_.size() << " -- List
size: " << sendList_.size());
-
- // Since dynamic_set (this version) doesn't have all() function,
initializing all bits with 1 and then
- // reseting them to 0 and using any()
- trackerMap_.insert(
- pos,
- TrackerPair(msgID,
boost::dynamic_bitset<>(message.impl_->metadata.num_messages_in_batch()).set()));
-}
-
-void BatchAcknowledgementTracker::deleteAckedMessage(const MessageId&
messageId, CommandAck_AckType ackType) {
- // Not a batch message and a individual ack
- if (messageId.batchIndex() == -1 && ackType ==
CommandAck_AckType_Individual) {
- return;
- }
-
- auto batchMessageId = discardBatch(messageId);
-
- Lock lock(mutex_);
- if (ackType == CommandAck_AckType_Cumulative) {
- // delete from trackerMap and sendList all messageIDs less than or
equal to this one
- // equal to - since getGreatestCumulativeAckReady already gives us the
exact message id to be acked
-
- TrackerMap::iterator it = trackerMap_.begin();
- TrackerMapRemoveCriteria criteria(messageId);
- while (it != trackerMap_.end()) {
- if (criteria(*it)) {
- trackerMap_.erase(it++);
- } else {
- ++it;
- }
- }
-
- // std::remove shifts all to be deleted items to the end of the vector
and returns an iterator to the
- // first
- // instance of item, then we erase all elements from this iterator to
the end of the list
- sendList_.erase(
- std::remove_if(sendList_.begin(), sendList_.end(),
SendRemoveCriteria(batchMessageId)),
- sendList_.end());
-
- if (greatestCumulativeAckSent_ < messageId) {
- greatestCumulativeAckSent_ = messageId;
- LOG_DEBUG(*this << " The greatestCumulativeAckSent_ is now " <<
greatestCumulativeAckSent_);
- }
- } else {
- // Error - if it is a batch message and found in trackerMap_
- if (trackerMap_.find(messageId) != trackerMap_.end()) {
- LOG_ERROR(*this << " - This should not happened - Message should
have been removed from "
- "trakerMap_ and moved to sendList_ "
- << messageId);
- }
-
- sendList_.erase(std::remove(sendList_.begin(), sendList_.end(),
batchMessageId), sendList_.end());
- }
-}
-
-bool BatchAcknowledgementTracker::isBatchReady(const MessageId& msgID,
CommandAck_AckType ackType) {
- Lock lock(mutex_);
- auto batchMessageId = discardBatch(msgID);
-
- TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
- if (pos == trackerMap_.end() ||
- std::find(sendList_.begin(), sendList_.end(), batchMessageId) !=
sendList_.end()) {
- LOG_DEBUG(
- "Batch is ready since message present in sendList_ or not present
in trackerMap_ [message ID = "
- << batchMessageId << "]");
- return true;
- }
-
- int batchIndex = msgID.batchIndex();
- assert(batchIndex < pos->second.size());
- pos->second.set(batchIndex, false);
-
- if (ackType == CommandAck_AckType_Cumulative) {
- for (int i = 0; i < batchIndex; i++) {
- pos->second.set(i, false);
- }
- }
-
- if (pos->second.any()) {
- return false;
- }
- sendList_.push_back(batchMessageId);
- trackerMap_.erase(pos);
- LOG_DEBUG("Batch is ready since message all bits are reset in trackerMap_
[message ID = " << msgID
-
<< "]");
- return true;
-}
-
-// returns
-// - a batch message id < messageId
-// - same messageId if it is the last message in the batch
-const MessageId
BatchAcknowledgementTracker::getGreatestCumulativeAckReady(const MessageId&
messageId) {
- Lock lock(mutex_);
-
- // Remove batch index
- auto batchMessageId = discardBatch(messageId);
- TrackerMap::iterator pos = trackerMap_.find(batchMessageId);
-
- // element not found
- if (pos == trackerMap_.end()) {
- return MessageId();
- }
-
- if (pos->second.size() - 1 != messageId.batchIndex()) {
- // Can't cumulatively ack this batch message
- if (pos == trackerMap_.begin()) {
- // This was the first message hence we can't decrement the iterator
- return MessageId();
- }
- pos--;
- }
-
- return pos->first;
-}
-} // namespace pulsar
diff --git a/lib/BatchAcknowledgementTracker.h
b/lib/BatchAcknowledgementTracker.h
deleted file mode 100644
index 6cbe753..0000000
--- a/lib/BatchAcknowledgementTracker.h
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#ifndef LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
-#define LIB_BATCHACKNOWLEDGEMENTTRACKER_H_
-
-#include <pulsar/Message.h>
-#include <pulsar/MessageId.h>
-
-#include <boost/dynamic_bitset.hpp>
-#include <map>
-#include <mutex>
-#include <ostream>
-#include <string>
-
-#include "ProtoApiEnums.h"
-
-namespace pulsar {
-
-class ConsumerImpl;
-
-class BatchAcknowledgementTracker {
- private:
- typedef std::unique_lock<std::mutex> Lock;
- typedef std::pair<MessageId, boost::dynamic_bitset<> > TrackerPair;
- typedef std::map<MessageId, boost::dynamic_bitset<> > TrackerMap;
- std::mutex mutex_;
-
- TrackerMap trackerMap_;
-
- // SendList is used to reduce the time required to go over the
dynamic_bitset and check if the entire
- // batch is acked.
- // It is useful in cases where the entire batch is acked but cnx is
broken. In this case when any of the
- // batch index
- // is acked again, we just check the sendList to verify that the batch is
acked w/o iterating over the
- // dynamic_bitset.
- std::vector<MessageId> sendList_;
-
- // we don't need to track MessageId < greatestCumulativeAckReceived
- MessageId greatestCumulativeAckSent_;
- std::string name_;
-
- public:
- BatchAcknowledgementTracker(const std::string topic, const std::string
subscription,
- const long consumerId);
-
- bool isBatchReady(const MessageId& msgID, CommandAck_AckType ackType);
- const MessageId getGreatestCumulativeAckReady(const MessageId& messageId);
-
- void deleteAckedMessage(const MessageId& messageId, CommandAck_AckType
ackType);
- void receivedMessage(const Message& message);
-
- void clear();
-
- inline friend std::ostream& operator<<(std::ostream& os,
- const BatchAcknowledgementTracker&
batchAcknowledgementTracker);
-
- // Used for Cumulative acks only
- struct SendRemoveCriteria {
- private:
- const MessageId& messageId_;
-
- public:
- SendRemoveCriteria(const MessageId& messageId) : messageId_(messageId)
{}
-
- bool operator()(const MessageId& element) const { return (element <=
messageId_); }
- };
-
- // Used for Cumulative acks only
- struct TrackerMapRemoveCriteria {
- private:
- const MessageId& messageId_;
-
- public:
- TrackerMapRemoveCriteria(const MessageId& messageId) :
messageId_(messageId) {}
-
- bool operator()(std::pair<const MessageId, boost::dynamic_bitset<> >&
element) const {
- return (element.first <= messageId_);
- }
- };
-};
-
-std::ostream& operator<<(std::ostream& os, const BatchAcknowledgementTracker&
batchAcknowledgementTracker) {
- os << "{ " << batchAcknowledgementTracker.name_ << "
[greatestCumulativeAckReceived_-"
- << batchAcknowledgementTracker.greatestCumulativeAckSent_
- << "] [trackerMap size = " <<
batchAcknowledgementTracker.trackerMap_.size() << " ]}";
- return os;
-}
-} // namespace pulsar
-
-#endif /* LIB_BATCHACKNOWLEDGEMENTTRACKER_H_ */
diff --git a/lib/BatchMessageAcker.h b/lib/BatchMessageAcker.h
new file mode 100644
index 0000000..489cf9e
--- /dev/null
+++ b/lib/BatchMessageAcker.h
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+
+#include "BitSet.h"
+#include "ProtoApiEnums.h"
+
+namespace pulsar {
+
+class BatchMessageAcker;
+using BatchMessageAckerPtr = std::shared_ptr<BatchMessageAcker>;
+
+class BatchMessageAcker {
+ public:
+ using Lock = std::lock_guard<std::mutex>;
+
+ static BatchMessageAckerPtr create(int32_t batchSize) {
+ return std::make_shared<BatchMessageAcker>(batchSize);
+ }
+
+ BatchMessageAcker(int32_t batchSize) : bitSet_(batchSize) { bitSet_.set(0,
batchSize); }
+
+ bool ackIndividual(int32_t batchIndex) {
+ Lock lock{mutex_};
+ bitSet_.clear(batchIndex);
+ return bitSet_.isEmpty();
+ }
+
+ bool ackCumulative(int32_t batchIndex) {
+ Lock lock{mutex_};
+ // The range of cumulative acknowledgment is closed while
BitSet::clear accepts a left-closed
+ // right-open range.
+ bitSet_.clear(0, batchIndex + 1);
+ return bitSet_.isEmpty();
+ }
+
+ bool shouldAckPreviousMessageId() noexcept {
+ bool expectedValue = false;
+ return
prevBatchCumulativelyAcked_.compare_exchange_strong(expectedValue, true);
+ }
+
+ private:
+ BitSet bitSet_;
+ // When a batched message is acknowledged cumulatively, the previous
message id will be acknowledged
+ // without batch index ACK enabled. However, it should be acknowledged
only once. Use this flag to
+ // determine whether to acknowledge the previous message id.
+ std::atomic_bool prevBatchCumulativelyAcked_{false};
+ mutable std::mutex mutex_;
+};
+
+} // namespace pulsar
diff --git a/lib/MessageIdImpl.h b/lib/BatchedMessageIdImpl.h
similarity index 51%
copy from lib/MessageIdImpl.h
copy to lib/BatchedMessageIdImpl.h
index 57d1c4e..649c5cb 100644
--- a/lib/MessageIdImpl.h
+++ b/lib/BatchedMessageIdImpl.h
@@ -16,36 +16,37 @@
* specific language governing permissions and limitations
* under the License.
*/
-
#pragma once
-#include <cstdint>
-#include <string>
+#include <assert.h>
+#include <pulsar/MessageIdBuilder.h>
+
+#include <atomic>
+
+#include "BatchMessageAcker.h"
+#include "MessageIdImpl.h"
namespace pulsar {
-class MessageIdImpl {
+class BatchedMessageIdImpl : public MessageIdImpl {
public:
- MessageIdImpl() = default;
- MessageIdImpl(int32_t partition, int64_t ledgerId, int64_t entryId,
int32_t batchIndex)
- : ledgerId_(ledgerId),
- entryId_(entryId),
- partition_(partition),
- batchIndex_(batchIndex),
- topicName_() {}
- int64_t ledgerId_ = -1;
- int64_t entryId_ = -1;
- int32_t partition_ = -1;
- int32_t batchIndex_ = -1;
- int32_t batchSize_ = 0;
-
- const std::string& getTopicName() { return *topicName_; }
- void setTopicName(const std::string& topicName) { topicName_ = &topicName;
}
+ BatchedMessageIdImpl(const MessageIdImpl& messageIdImpl, const
BatchMessageAckerPtr& acker)
+ : MessageIdImpl(messageIdImpl), acker_(acker) {
+ assert(acker);
+ }
+
+ bool ackIndividual(int32_t batchIndex) const { return
acker_->ackIndividual(batchIndex); }
+
+ bool ackCumulative(int32_t batchIndex) const { return
acker_->ackCumulative(batchIndex); }
+
+ bool shouldAckPreviousMessageId() const { return
acker_->shouldAckPreviousMessageId(); }
+
+ MessageId getPreviousMessageId() {
+ return MessageIdBuilder().ledgerId(ledgerId_).entryId(entryId_ -
1).partition(partition_).build();
+ }
private:
- const std::string* topicName_ = nullptr;
- friend class MessageImpl;
- friend class MultiTopicsConsumerImpl;
- friend class UnAckedMessageTrackerEnabled;
+ BatchMessageAckerPtr acker_;
};
+
} // namespace pulsar
diff --git a/lib/BitSet.h b/lib/BitSet.h
new file mode 100644
index 0000000..713ff5e
--- /dev/null
+++ b/lib/BitSet.h
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <assert.h>
+#include <stdint.h>
+#include <string.h>
+
+#include <vector>
+
+// This class migrates the BitSet class from Java to have essential methods.
+// Here `boost::dynamic_bitset` is not used because we need to convert the
dynamic bit set
+// to a long array because the Pulsar API uses a long array to represent the
bit set.
+namespace pulsar {
+
+class BitSet {
+ public:
+ // We must store the unsigned integer to make operator >> equivalent to
Java's >>>
+ using Data = std::vector<uint64_t>;
+ using const_iterator = typename Data::const_iterator;
+
+ BitSet() {}
+
+ BitSet(int32_t numBits) : words_((numBits / 64) + ((numBits % 64 == 0) ? 0
: 1)) { assert(numBits > 0); }
+
+ // Support range loop like:
+ // ```c++
+ // BitSet bitSet(129);
+ // for (auto x : bitSet) { /* ... */ }
+ // ```
+ const_iterator begin() const noexcept { return words_.begin(); }
+ const_iterator end() const noexcept { return words_.begin() + wordsInUse_;
}
+
+ /**
+ * Returns true if this {@code BitSet} contains no bits that are set
+ * to {@code true}.
+ *
+ * @return boolean indicating whether this {@code BitSet} is empty
+ */
+ bool isEmpty() const noexcept { return wordsInUse_ == 0; }
+
+ /**
+ * Sets the bits from the specified {@code fromIndex} (inclusive) to the
+ * specified {@code toIndex} (exclusive) to {@code true}.
+ *
+ * @param fromIndex index of the first bit to be set
+ * @param toIndex index after the last bit to be set
+ */
+ void set(int32_t fromIndex, int32_t toIndex);
+
+ /**
+ * Sets the bits from the specified {@code fromIndex} (inclusive) to the
+ * specified {@code toIndex} (exclusive) to {@code false}.
+ *
+ * @param fromIndex index of the first bit to be cleared
+ * @param toIndex index after the last bit to be cleared
+ */
+ void clear(int32_t fromIndex, int32_t toIndex);
+
+ /**
+ * Sets the bit specified by the index to {@code false}.
+ *
+ * @param bitIndex the index of the bit to be cleared
+ */
+ void clear(int32_t bitIndex);
+
+ private:
+ Data words_;
+ int32_t wordsInUse_ = 0;
+
+ static constexpr int32_t ADDRESS_BITS_PER_WORD = 6;
+ static constexpr int32_t BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
+ static constexpr uint64_t WORD_MASK = 0xffffffffffffffffL;
+
+ static constexpr int32_t wordIndex(int32_t bitIndex) { return bitIndex >>
ADDRESS_BITS_PER_WORD; }
+
+ void expandTo(int32_t wordIndex) {
+ auto wordsRequired = wordIndex + 1;
+ if (wordsInUse_ < wordsRequired) {
+ words_.resize(wordsRequired);
+ wordsInUse_ = wordsRequired;
+ }
+ }
+
+ int32_t length() {
+ if (wordsInUse_ == 0) {
+ return 0;
+ }
+ return BITS_PER_WORD * (wordsInUse_ - 1) +
+ (BITS_PER_WORD - numberOfLeadingZeros(words_[wordsInUse_ - 1]));
+ }
+
+ void recalculateWordsInUse() {
+ // Traverse the bitset until a used word is found
+ int32_t i;
+ for (i = wordsInUse_ - 1; i >= 0; i--) {
+ if (words_[i] != 0) {
+ break;
+ }
+ }
+ wordsInUse_ = i + 1;
+ }
+
+ static int32_t numberOfLeadingZeros(uint64_t i) {
+ auto x = static_cast<uint32_t>(i >> 32);
+ return x == 0 ? 32 + numberOfLeadingZeros(static_cast<uint32_t>(i)) :
numberOfLeadingZeros(x);
+ }
+
+ static int32_t numberOfLeadingZeros(uint32_t i) {
+ if (i <= 0) {
+ return i == 0 ? 32 : 0;
+ }
+ int32_t n = 31;
+ if (i >= (1 << 16)) {
+ n -= 16;
+ i >>= 16;
+ }
+ if (i >= (1 << 8)) {
+ n -= 8;
+ i >>= 8;
+ }
+ if (i >= (1 << 4)) {
+ n -= 4;
+ i >>= 4;
+ }
+ if (i >= (1 << 2)) {
+ n -= 2;
+ i >>= 2;
+ }
+ return n - (i >> 1);
+ }
+
+ // In C++, when shift count is negative or >= width of type, the behavior
is undefined. We should
+ // convert n to the valid range [0, sizeof(T)) first.
+ static constexpr int32_t safeShiftCount(int32_t width, int32_t n) {
+ return (n < 0) ? safeShiftCount(width, n + width)
+ : ((n >= width) ? safeShiftCount(width, n - width) : n);
+ }
+
+ template <typename T>
+ static T safeLeftShift(T x, int32_t n) {
+ return (x << safeShiftCount(sizeof(x) * 8, n));
+ }
+
+ template <typename T>
+ static T safeRightShift(T x, int32_t n) {
+ return (x >> safeShiftCount(sizeof(x) * 8, n));
+ }
+};
+
+inline void BitSet::set(int32_t fromIndex, int32_t toIndex) {
+ assert(fromIndex < toIndex && fromIndex >= 0 && toIndex >= 0);
+ if (fromIndex == toIndex) {
+ return;
+ }
+
+ auto startWordIndex = wordIndex(fromIndex);
+ auto endWordIndex = wordIndex(toIndex - 1);
+ expandTo(endWordIndex);
+
+ auto firstWordMask = safeLeftShift(WORD_MASK, fromIndex);
+ auto lastWordMask = safeRightShift(WORD_MASK, -toIndex);
+ if (startWordIndex == endWordIndex) {
+ // Case 1: One word
+ words_[startWordIndex] |= (firstWordMask & lastWordMask);
+ } else {
+ // Case 2: Multiple words
+ // Handle first word
+ words_[startWordIndex] |= firstWordMask;
+
+ // Handle intermediate words, if any
+ for (int32_t i = startWordIndex + 1; i < endWordIndex; i++) {
+ words_[i] = WORD_MASK;
+ }
+
+ // Handle last word (restores invariants)
+ words_[endWordIndex] |= lastWordMask;
+ }
+}
+
+inline void BitSet::clear(int32_t fromIndex, int32_t toIndex) {
+ assert(fromIndex < toIndex && fromIndex >= 0 && toIndex >= 0);
+ if (fromIndex == toIndex) {
+ return;
+ }
+
+ auto startWordIndex = wordIndex(fromIndex);
+ if (startWordIndex >= wordsInUse_) {
+ return;
+ }
+
+ auto endWordIndex = wordIndex(toIndex - 1);
+ if (endWordIndex >= wordsInUse_) {
+ toIndex = length();
+ endWordIndex = wordsInUse_ - 1;
+ }
+
+ auto firstWordMask = safeLeftShift(WORD_MASK, fromIndex);
+ auto lastWordMask = safeRightShift(WORD_MASK, -toIndex);
+
+ if (startWordIndex == endWordIndex) {
+ // Case 1: One word
+ words_[startWordIndex] &= ~(firstWordMask & lastWordMask);
+ } else {
+ // Case 2: Multiple words
+ // Handle first word
+ words_[startWordIndex] &= ~firstWordMask;
+
+ // Handle intermediate words, if any
+ for (int32_t i = startWordIndex + 1; i < endWordIndex; i++) {
+ words_[i] = 0;
+ }
+
+ // Handle last word
+ words_[endWordIndex] &= ~lastWordMask;
+ }
+
+ recalculateWordsInUse();
+}
+
+inline void BitSet::clear(int32_t bitIndex) {
+ assert(bitIndex >= 0);
+
+ auto i = wordIndex(bitIndex);
+ if (i >= wordsInUse_) {
+ return;
+ }
+
+ words_[i] &= ~(safeLeftShift(1L, bitIndex));
+
+ recalculateWordsInUse();
+}
+
+} // namespace pulsar
diff --git a/lib/Commands.cc b/lib/Commands.cc
index 5bb9587..3cd97e2 100644
--- a/lib/Commands.cc
+++ b/lib/Commands.cc
@@ -26,6 +26,8 @@
#include <algorithm>
#include <mutex>
+#include "BatchMessageAcker.h"
+#include "BatchedMessageIdImpl.h"
#include "LogUtils.h"
#include "MessageImpl.h"
#include "PulsarApi.pb.h"
@@ -809,7 +811,7 @@ uint64_t
Commands::serializeSingleMessageInBatchWithPayload(const Message& msg,
}
Message Commands::deSerializeSingleMessageInBatch(Message& batchedMessage,
int32_t batchIndex,
- int32_t batchSize) {
+ int32_t batchSize, const
BatchMessageAckerPtr& acker) {
SharedBuffer& uncompressedPayload = batchedMessage.impl_->payload;
// Format of batch message
@@ -827,14 +829,17 @@ Message
Commands::deSerializeSingleMessageInBatch(Message& batchedMessage, int32
uncompressedPayload.consume(payloadSize);
const MessageId& m = batchedMessage.impl_->messageId;
- auto singleMessageId =
MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
- Message singleMessage(singleMessageId, batchedMessage.impl_->metadata,
payload, metadata,
+ auto messageId =
MessageIdBuilder::from(m).batchIndex(batchIndex).batchSize(batchSize).build();
+ auto batchedMessageId =
std::make_shared<BatchedMessageIdImpl>(*(messageId.impl_), acker);
+ Message singleMessage(MessageId{batchedMessageId},
batchedMessage.impl_->metadata, payload, metadata,
batchedMessage.impl_->getTopicName());
singleMessage.impl_->cnx_ = batchedMessage.impl_->cnx_;
return singleMessage;
}
+MessageIdImplPtr Commands::getMessageIdImpl(const MessageId& messageId) {
return messageId.impl_; }
+
bool Commands::peerSupportsGetLastMessageId(int32_t peerVersion) { return
peerVersion >= proto::v12; }
bool Commands::peerSupportsActiveConsumerListener(int32_t peerVersion) {
return peerVersion >= proto::v12; }
diff --git a/lib/Commands.h b/lib/Commands.h
index bbe96fd..48dbcf9 100644
--- a/lib/Commands.h
+++ b/lib/Commands.h
@@ -35,6 +35,11 @@ using namespace pulsar;
namespace pulsar {
+class BatchMessageAcker;
+using BatchMessageAckerPtr = std::shared_ptr<BatchMessageAcker>;
+class MessageIdImpl;
+using MessageIdImplPtr = std::shared_ptr<MessageIdImpl>;
+
namespace proto {
class BaseCommand;
class MessageIdData;
@@ -131,7 +136,9 @@ class Commands {
const Message& msg, SharedBuffer& batchPayLoad, unsigned long
maxMessageSizeInBytes);
static Message deSerializeSingleMessageInBatch(Message& batchedMessage,
int32_t batchIndex,
- int32_t batchSize);
+ int32_t batchSize, const
BatchMessageAckerPtr& acker);
+
+ static MessageIdImplPtr getMessageIdImpl(const MessageId& messageId);
static SharedBuffer newConsumerStats(uint64_t consumerId, uint64_t
requestId);
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index cf01e36..e277d99 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -25,6 +25,8 @@
#include "AckGroupingTracker.h"
#include "AckGroupingTrackerDisabled.h"
#include "AckGroupingTrackerEnabled.h"
+#include "BatchMessageAcker.h"
+#include "BatchedMessageIdImpl.h"
#include "ClientConnection.h"
#include "ClientImpl.h"
#include "Commands.h"
@@ -75,7 +77,6 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
consumerId_(client->newConsumerId()),
consumerName_(config_.getConsumerName()),
messageListenerRunning_(true),
- batchAcknowledgementTracker_(topic_, subscriptionName,
(long)consumerId_),
negativeAcksTracker_(client, *this, conf),
ackGroupingTrackerPtr_(std::make_shared<AckGroupingTracker>()),
readCompacted_(conf.isReadCompacted()),
@@ -198,7 +199,6 @@ void ConsumerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
lockForMessageId.unlock();
unAckedMessageTrackerPtr_->clear();
- batchAcknowledgementTracker_.clear();
ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
@@ -328,7 +328,7 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback
originalCallback) {
void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId,
bool autoAck) {
if (autoAck) {
- doAcknowledgeIndividual(messageId, [uuid, messageId](Result result) {
+ acknowledgeAsync(messageId, [uuid, messageId](Result result) {
if (result != ResultOk) {
LOG_WARN("Failed to acknowledge discarded chunk, uuid: " <<
uuid
<< ",
messageId: " << messageId);
@@ -622,17 +622,17 @@ void ConsumerImpl::notifyPendingReceivedCallback(Result
result, Message& msg,
// Zero Queue size is not supported with Batch Messages
uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const
ClientConnectionPtr& cnx,
Message&
batchedMessage, int redeliveryCount) {
- unsigned int batchSize =
batchedMessage.impl_->metadata.num_messages_in_batch();
- batchAcknowledgementTracker_.receivedMessage(batchedMessage);
+ auto batchSize = batchedMessage.impl_->metadata.num_messages_in_batch();
LOG_DEBUG("Received Batch messages of size - " << batchSize
<< " -- msgId: " <<
batchedMessage.getMessageId());
const auto startMessageId = startMessageId_.get();
int skippedMessages = 0;
+ auto acker = BatchMessageAcker::create(batchSize);
for (int i = 0; i < batchSize; i++) {
// This is a cheap copy since message contains only one shared pointer
(impl_)
- Message msg =
Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize);
+ Message msg =
Commands::deSerializeSingleMessageInBatch(batchedMessage, i, batchSize, acker);
msg.impl_->setRedeliveryCount(redeliveryCount);
msg.impl_->setTopicName(batchedMessage.getTopicName());
msg.impl_->convertPayloadToKeyValue(config_.getSchema());
@@ -997,53 +997,51 @@ inline CommandSubscribe_InitialPosition
ConsumerImpl::getInitialPosition() {
BOOST_THROW_EXCEPTION(std::logic_error("Invalid InitialPosition
enumeration value"));
}
-void ConsumerImpl::statsAckCallback(Result res, ResultCallback callback,
CommandAck_AckType ackType,
- uint32_t numAcks) {
- consumerStatsBasePtr_->messageAcknowledged(res, ackType, numAcks);
- if (callback) {
- callback(res);
- }
-}
-
void ConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback
callback) {
- ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback,
get_shared_this_ptr(),
- std::placeholders::_1, callback,
CommandAck_AckType_Individual, 1);
- if (msgId.batchIndex() != -1 &&
- !batchAcknowledgementTracker_.isBatchReady(msgId,
CommandAck_AckType_Individual)) {
- cb(ResultOk);
- return;
+ auto pair = prepareIndividualAck(msgId);
+ const auto& msgIdToAck = pair.first;
+ const bool readyToAck = pair.second;
+ if (readyToAck) {
+ ackGroupingTrackerPtr_->addAcknowledge(msgIdToAck);
+ }
+ if (callback) {
+ callback(ResultOk);
}
- doAcknowledgeIndividual(msgId, cb);
}
void ConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdList,
ResultCallback callback) {
- ResultCallback cb =
- std::bind(&ConsumerImpl::statsAckCallback, get_shared_this_ptr(),
std::placeholders::_1, callback,
- proto::CommandAck_AckType_Individual, messageIdList.size());
- // Currently not supported batch message id individual index ack.
- this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdList);
- this->unAckedMessageTrackerPtr_->remove(messageIdList);
- cb(ResultOk);
+ MessageIdList messageIdListToAck;
+ for (auto&& msgId : messageIdList) {
+ auto pair = prepareIndividualAck(msgId);
+ const auto& msgIdToAck = pair.first;
+ const bool readyToAck = pair.second;
+ if (readyToAck) {
+ messageIdListToAck.emplace_back(msgIdToAck);
+ }
+ }
+ this->ackGroupingTrackerPtr_->addAcknowledgeList(messageIdListToAck);
+ if (callback) {
+ callback(ResultOk);
+ }
}
void ConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId,
ResultCallback callback) {
- ResultCallback cb = std::bind(&ConsumerImpl::statsAckCallback,
get_shared_this_ptr(),
- std::placeholders::_1, callback,
CommandAck_AckType_Cumulative, 1);
if (!isCumulativeAcknowledgementAllowed(config_.getConsumerType())) {
- cb(ResultCumulativeAcknowledgementNotAllowedError);
+ if (callback) {
+ callback(ResultCumulativeAcknowledgementNotAllowedError);
+ }
return;
}
- if (msgId.batchIndex() != -1 &&
- !batchAcknowledgementTracker_.isBatchReady(msgId,
CommandAck_AckType_Cumulative)) {
- MessageId messageId =
batchAcknowledgementTracker_.getGreatestCumulativeAckReady(msgId);
- if (messageId == MessageId()) {
- // Nothing to ACK, because the batch that msgId belongs to is NOT
completely consumed.
- cb(ResultOk);
- } else {
- doAcknowledgeCumulative(messageId, cb);
- }
- } else {
- doAcknowledgeCumulative(msgId, cb);
+ auto pair = prepareCumulativeAck(msgId);
+ const auto& msgIdToAck = pair.first;
+ const auto& readyToAck = pair.second;
+ if (readyToAck) {
+ consumerStatsBasePtr_->messageAcknowledged(ResultOk,
CommandAck_AckType_Cumulative, 1);
+ unAckedMessageTrackerPtr_->removeMessagesTill(msgIdToAck);
+ ackGroupingTrackerPtr_->addAcknowledgeCumulative(msgIdToAck);
+ }
+ if (callback) {
+ callback(ResultOk);
}
}
@@ -1051,18 +1049,34 @@ bool
ConsumerImpl::isCumulativeAcknowledgementAllowed(ConsumerType consumerType)
return consumerType != ConsumerKeyShared && consumerType != ConsumerShared;
}
-void ConsumerImpl::doAcknowledgeIndividual(const MessageId& messageId,
ResultCallback callback) {
- this->unAckedMessageTrackerPtr_->remove(messageId);
- this->batchAcknowledgementTracker_.deleteAckedMessage(messageId,
proto::CommandAck::Individual);
- this->ackGroupingTrackerPtr_->addAcknowledge(messageId);
- callback(ResultOk);
+std::pair<MessageId, bool> ConsumerImpl::prepareIndividualAck(const MessageId&
messageId) {
+ auto messageIdImpl = Commands::getMessageIdImpl(messageId);
+ auto batchedMessageIdImpl =
std::dynamic_pointer_cast<BatchedMessageIdImpl>(messageIdImpl);
+
+ auto batchSize = messageId.batchSize();
+ if (!batchedMessageIdImpl ||
batchedMessageIdImpl->ackIndividual(messageId.batchIndex())) {
+ consumerStatsBasePtr_->messageAcknowledged(ResultOk,
CommandAck_AckType_Individual,
+ (batchSize > 0) ? batchSize
: 1);
+ unAckedMessageTrackerPtr_->remove(messageId);
+ return std::make_pair(discardBatch(messageId), true);
+ } else {
+ return std::make_pair(MessageId{}, false);
+ }
}
-void ConsumerImpl::doAcknowledgeCumulative(const MessageId& messageId,
ResultCallback callback) {
- this->unAckedMessageTrackerPtr_->removeMessagesTill(messageId);
- this->batchAcknowledgementTracker_.deleteAckedMessage(messageId,
proto::CommandAck::Cumulative);
- this->ackGroupingTrackerPtr_->addAcknowledgeCumulative(messageId);
- callback(ResultOk);
+std::pair<MessageId, bool> ConsumerImpl::prepareCumulativeAck(const MessageId&
messageId) {
+ auto messageIdImpl = Commands::getMessageIdImpl(messageId);
+ auto batchedMessageIdImpl =
std::dynamic_pointer_cast<BatchedMessageIdImpl>(messageIdImpl);
+
+ if (!batchedMessageIdImpl ||
batchedMessageIdImpl->ackCumulative(messageId.batchIndex())) {
+ return std::make_pair(discardBatch(messageId), true);
+ } else {
+ if (batchedMessageIdImpl->shouldAckPreviousMessageId()) {
+ return
std::make_pair(batchedMessageIdImpl->getPreviousMessageId(), true);
+ } else {
+ return std::make_pair(MessageId{}, false);
+ }
+ }
}
void ConsumerImpl::negativeAcknowledge(const MessageId& messageId) {
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index e9af3f6..d2480b8 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -24,8 +24,8 @@
#include <boost/optional.hpp>
#include <functional>
#include <memory>
+#include <utility>
-#include "BatchAcknowledgementTracker.h"
#include "BrokerConsumerStatsImpl.h"
#include "Commands.h"
#include "CompressionCodec.h"
@@ -41,7 +41,6 @@ namespace pulsar {
class UnAckedMessageTrackerInterface;
class ExecutorService;
class ConsumerImpl;
-class BatchAcknowledgementTracker;
class MessageCrypto;
class GetLastMessageIdResponse;
typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
@@ -85,21 +84,8 @@ class ConsumerImpl : public ConsumerImplBase {
inline CommandSubscribe_SubType getSubType();
inline CommandSubscribe_InitialPosition getInitialPosition();
- /**
- * Send individual ACK request of given message ID to broker.
- * @param[in] messageId ID of the message to be ACKed.
- * @param[in] callback call back function, which is called after sending
ACK. For now, it's
- * always provided with ResultOk.
- */
- void doAcknowledgeIndividual(const MessageId& messageId, ResultCallback
callback);
-
- /**
- * Send cumulative ACK request of given message ID to broker.
- * @param[in] messageId ID of the message to be ACKed.
- * @param[in] callback call back function, which is called after sending
ACK. For now, it's
- * always provided with ResultOk.
- */
- void doAcknowledgeCumulative(const MessageId& messageId, ResultCallback
callback);
+ std::pair<MessageId, bool /* readyToAck */> prepareIndividualAck(const
MessageId& messageId);
+ std::pair<MessageId, bool /* readyToAck */> prepareCumulativeAck(const
MessageId& messageId);
// overrided methods from ConsumerImplBase
Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture()
override;
@@ -184,8 +170,6 @@ class ConsumerImpl : public ConsumerImplBase {
Result receiveHelper(Message& msg);
Result receiveHelper(Message& msg, int timeout);
void executeNotifyCallback(Message& msg);
- void statsAckCallback(Result res, ResultCallback callback,
CommandAck_AckType ackType,
- uint32_t numAcks = 1);
void notifyPendingReceivedCallback(Result result, Message& message, const
ReceiveCallback& callback);
void failPendingReceiveCallback();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
@@ -223,7 +207,6 @@ class ConsumerImpl : public ConsumerImplBase {
std::atomic_bool messageListenerRunning_;
CompressionCodecProvider compressionCodecProvider_;
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
- BatchAcknowledgementTracker batchAcknowledgementTracker_;
BrokerConsumerStatsImpl brokerConsumerStats_;
NegativeAcksTracker negativeAcksTracker_;
AckGroupingTrackerPtr ackGroupingTrackerPtr_;
diff --git a/lib/MessageBatch.cc b/lib/MessageBatch.cc
index f61b56a..3ddd7d6 100644
--- a/lib/MessageBatch.cc
+++ b/lib/MessageBatch.cc
@@ -18,6 +18,7 @@
*/
#include <pulsar/MessageBatch.h>
+#include "BatchMessageAcker.h"
#include "Commands.h"
#include "MessageImpl.h"
#include "SharedBuffer.h"
@@ -46,8 +47,9 @@ MessageBatch& MessageBatch::parseFrom(const SharedBuffer&
payload, uint32_t batc
impl_->metadata.set_num_messages_in_batch(batchSize);
batch_.clear();
+ auto acker = BatchMessageAcker::create(batchSize);
for (int i = 0; i < batchSize; ++i) {
-
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i,
batchSize));
+
batch_.push_back(Commands::deSerializeSingleMessageInBatch(batchMessage_, i,
batchSize, acker));
}
return *this;
}
diff --git a/lib/MessageIdImpl.h b/lib/MessageIdImpl.h
index 57d1c4e..2cf3f75 100644
--- a/lib/MessageIdImpl.h
+++ b/lib/MessageIdImpl.h
@@ -33,6 +33,8 @@ class MessageIdImpl {
partition_(partition),
batchIndex_(batchIndex),
topicName_() {}
+ virtual ~MessageIdImpl() {}
+
int64_t ledgerId_ = -1;
int64_t entryId_ = -1;
int32_t partition_ = -1;
diff --git a/tests/AcknowledgeTest.cc b/tests/AcknowledgeTest.cc
index e0746be..7d42db9 100644
--- a/tests/AcknowledgeTest.cc
+++ b/tests/AcknowledgeTest.cc
@@ -19,6 +19,11 @@
#include <gtest/gtest.h>
#include <pulsar/Client.h>
+#include <chrono>
+#include <set>
+#include <thread>
+
+#include "ConsumerWrapper.h"
#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "lib/LogUtils.h"
@@ -151,4 +156,59 @@ TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) {
client.close();
}
+TEST_F(AcknowledgeTest, testBatchedMessageId) {
+ Client client(lookupUrl);
+
+ const std::string topic = "test-batched-message-id-" + unique_str();
+ constexpr int batchingMaxMessages = 3;
+ constexpr int numMessages = batchingMaxMessages * 3;
+
+ Producer producer;
+ ASSERT_EQ(ResultOk, client.createProducer(topic,
+ ProducerConfiguration()
+
.setBatchingMaxMessages(batchingMaxMessages)
+
.setBatchingMaxPublishDelayMs(3600 * 1000 /* 1h */),
+ producer));
+ std::vector<ConsumerWrapper> consumers{4};
+ for (size_t i = 0; i < consumers.size(); i++) {
+ consumers[i].initialize(client, topic, "sub-" + std::to_string(i));
+ }
+ for (size_t i = 0; i < numMessages; i++) {
+ producer.sendAsync(MessageBuilder().setContent("msg-" +
std::to_string(i)).build(), nullptr);
+ }
+ for (size_t i = 0; i < consumers.size(); i++) {
+ consumers[i].receiveAtMost(numMessages);
+ if (i > 0) {
+ ASSERT_EQ(consumers[i].messageIdList(),
consumers[0].messageIdList());
+ }
+ }
+
+ Message msg;
+ // ack 2 messages of the batch that has 3 messages
+ consumers[0].acknowledgeAndRedeliver({0, 2},
CommandAck_AckType_Individual);
+ ASSERT_EQ(consumers[0].receive(msg), ResultOk);
+ EXPECT_EQ(msg.getMessageId(), consumers[0].messageIdList()[0]);
+ ASSERT_EQ(consumers[0].getNumAcked(CommandAck_AckType_Individual), 0);
+
+ // ack the whole batch
+ consumers[1].acknowledgeAndRedeliver({0, 1, 2},
CommandAck_AckType_Individual);
+ ASSERT_EQ(consumers[1].receive(msg), ResultOk);
+ EXPECT_EQ(msg.getMessageId(),
consumers[1].messageIdList()[batchingMaxMessages]);
+ ASSERT_EQ(consumers[1].getNumAcked(CommandAck_AckType_Individual), 3);
+
+ // ack cumulatively the previous message id
+ consumers[2].acknowledgeAndRedeliver({batchingMaxMessages,
batchingMaxMessages + 1},
+ CommandAck_AckType_Cumulative);
+ ASSERT_EQ(consumers[2].receive(msg), ResultOk);
+ EXPECT_EQ(msg.getMessageId(),
consumers[2].messageIdList()[batchingMaxMessages]);
+ // the previous message id will only be acknowledged once
+ ASSERT_EQ(consumers[2].getNumAcked(CommandAck_AckType_Cumulative), 1);
+
+ // the whole 2nd batch is acknowledged
+ consumers[3].acknowledgeAndRedeliver({batchingMaxMessages + 2},
CommandAck_AckType_Cumulative);
+ ASSERT_EQ(consumers[3].receive(msg), ResultOk);
+ EXPECT_EQ(msg.getMessageId(),
consumers[3].messageIdList()[batchingMaxMessages * 2]);
+ ASSERT_EQ(consumers[3].getNumAcked(CommandAck_AckType_Cumulative), 1);
+}
+
INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest,
testing::Values(100, 0));
diff --git a/tests/BasicEndToEndTest.cc b/tests/BasicEndToEndTest.cc
index b801d38..d193017 100644
--- a/tests/BasicEndToEndTest.cc
+++ b/tests/BasicEndToEndTest.cc
@@ -1100,6 +1100,7 @@ TEST(BasicEndToEndTest, testStatsLatencies) {
int i = 0;
ConsumerStatsImplPtr consumerStatsImplPtr =
PulsarFriend::getConsumerStatsPtr(consumer);
+ unsigned long numAcks = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " <<
receivedMsg.getDataAsString() << "] [ messageID = "
@@ -1107,9 +1108,16 @@ TEST(BasicEndToEndTest, testStatsLatencies) {
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalReceivedMsgMap()), i);
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
-
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalAckedMsgMap()), i -
1);
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
-
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalAckedMsgMap()), i);
+
+ auto msgId = receivedMsg.getMessageId();
+ if (msgId.batchIndex() < 0) {
+ numAcks++;
+ } else if (msgId.batchIndex() + 1 == msgId.batchSize()) {
+ // The stats will only be updated after all messages in the batch
are acknowledged
+ numAcks += msgId.batchSize();
+ }
+ ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()),
numAcks);
}
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
diff --git a/tests/BitSetTest.cc b/tests/BitSetTest.cc
new file mode 100644
index 0000000..d13398d
--- /dev/null
+++ b/tests/BitSetTest.cc
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <map>
+#include <vector>
+
+#include "lib/BatchMessageAcker.h"
+#include "lib/BitSet.h"
+
+using namespace pulsar;
+
+static std::vector<uint64_t> toLongVector(const BitSet& bitSet) {
+ std::vector<uint64_t> v;
+ for (uint64_t x : bitSet) {
+ v.emplace_back(x);
+ }
+ return v;
+}
+
+TEST(BitSetTest, testFill) {
+ // An int64_t has 64 bits, so we test 64*N + {-1, 0, 1}
+ std::map<int, std::vector<uint64_t>> expectedResults;
+ expectedResults[7] = {0x7f};
+ expectedResults[63] = {0x7fffffffffffffff};
+ expectedResults[64] = {0xffffffffffffffff};
+ expectedResults[65] = {0xffffffffffffffff, 1};
+ expectedResults[127] = {0xffffffffffffffff, 0x7fffffffffffffff};
+ expectedResults[128] = {0xffffffffffffffff, 0xffffffffffffffff};
+ expectedResults[129] = {0xffffffffffffffff, 0xffffffffffffffff, 1};
+
+ std::map<int, std::vector<uint64_t>> actualResults;
+ for (const auto& kv : expectedResults) {
+ BitSet bitSet(kv.first);
+ ASSERT_TRUE(toLongVector(bitSet).empty());
+ bitSet.set(0, kv.first);
+ actualResults[kv.first] = toLongVector(bitSet);
+ }
+ ASSERT_EQ(actualResults, expectedResults);
+}
+
+TEST(BitSetTest, testSet) {
+ BitSet bitSet(64 * 5 + 1); // 6 words
+ ASSERT_TRUE(toLongVector(bitSet).empty());
+
+ // range contains one word
+ bitSet.set(3, 29);
+ ASSERT_EQ(toLongVector(bitSet), std::vector<uint64_t>{0x1ffffff8});
+
+ // range contains multiple words
+ bitSet.set(64 * 2 + 11, 64 * 4 + 19);
+ ASSERT_EQ(toLongVector(bitSet),
+ (std::vector<uint64_t>{0x1ffffff8, 0, 0xfffffffffffff800,
0xffffffffffffffff, 0x7ffff}));
+}
+
+TEST(BitSetTest, testRangeClear) {
+ BitSet bitSet(64 * 5 + 1); // 6 words
+ bitSet.set(0, 64 * 5 + 1);
+ ASSERT_EQ(toLongVector(bitSet),
+ (std::vector<uint64_t>{0xffffffffffffffff, 0xffffffffffffffff,
0xffffffffffffffff,
+ 0xffffffffffffffff, 0xffffffffffffffff,
1}));
+
+ // range contains one word
+ bitSet.clear(64 * 5, 64 * 5 + 1);
+ ASSERT_EQ(toLongVector(bitSet),
+ (std::vector<uint64_t>{0xffffffffffffffff, 0xffffffffffffffff,
0xffffffffffffffff,
+ 0xffffffffffffffff, 0xffffffffffffffff}));
+
+ // range contains multiple words
+ bitSet.clear(64 * 2 + 13, 64 * 5);
+ ASSERT_EQ(toLongVector(bitSet), (std::vector<uint64_t>{0xffffffffffffffff,
0xffffffffffffffff, 0x1fff}));
+}
+
+TEST(BitSetTest, testSingleClear) {
+ BitSet bitSet(64 * 2 + 1); // 3 words
+ bitSet.set(0, 64 * 2 + 1);
+ ASSERT_EQ(toLongVector(bitSet), (std::vector<uint64_t>{0xffffffffffffffff,
0xffffffffffffffff, 1}));
+
+ // words in use shrinked
+ bitSet.clear(64 * 2);
+ ASSERT_EQ(toLongVector(bitSet), (std::vector<uint64_t>{0xffffffffffffffff,
0xffffffffffffffff}));
+
+ // words in use doesn't change
+ bitSet.clear(13);
+ ASSERT_EQ(toLongVector(bitSet), (std::vector<uint64_t>{0xffffffffffffdfff,
0xffffffffffffffff}));
+}
diff --git a/tests/ConsumerWrapper.cc b/tests/ConsumerWrapper.cc
new file mode 100644
index 0000000..82f8787
--- /dev/null
+++ b/tests/ConsumerWrapper.cc
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "ConsumerWrapper.h"
+
+#include "PulsarFriend.h"
+
+unsigned long ConsumerWrapper::getNumAcked(CommandAck_AckType ackType) const {
+ try {
+ return
PulsarFriend::getConsumerStatsPtr(consumer_)->getAckedMsgMap().at(
+ std::make_pair(ResultOk, ackType));
+ } catch (const std::out_of_range&) {
+ return 0;
+ }
+}
diff --git a/tests/ConsumerWrapper.h b/tests/ConsumerWrapper.h
new file mode 100644
index 0000000..279a0cb
--- /dev/null
+++ b/tests/ConsumerWrapper.h
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include <chrono>
+#include <thread>
+
+#include "lib/ProtoApiEnums.h"
+
+using namespace pulsar;
+
+class ConsumerWrapper {
+ public:
+ void initialize(Client& client, const std::string& topic, const
std::string& subscription) {
+ // Enable the stats for cumulative ack
+ ConsumerConfiguration conf;
+ conf.setUnAckedMessagesTimeoutMs(10000);
+ ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf,
consumer_));
+ }
+
+ const std::vector<MessageId>& messageIdList() const noexcept { return
messageIdList_; }
+
+ Result receive(Message& msg) { return consumer_.receive(msg, 3000); }
+
+ void receiveAtMost(int numMessages) {
+ Message msg;
+ for (int i = 0; i < numMessages; i++) {
+ ASSERT_EQ(ResultOk, consumer_.receive(msg, 3000));
+ messageIdList_.emplace_back(msg.getMessageId());
+ }
+ }
+
+ unsigned long getNumAcked(CommandAck_AckType ackType) const;
+
+ void acknowledgeAndRedeliver(const std::vector<size_t>& indexes,
CommandAck_AckType ackType) {
+ for (size_t index : indexes) {
+ if (ackType == CommandAck_AckType_Individual) {
+ consumer_.acknowledge(messageIdList_.at(index));
+ } else {
+ consumer_.acknowledgeCumulative(messageIdList_.at(index));
+ }
+ }
+ // Wait until the acknowledge command is sent
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ consumer_.redeliverUnacknowledgedMessages();
+ }
+
+ private:
+ Consumer consumer_;
+ std::vector<MessageId> messageIdList_;
+};