This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 74ef1a0  [feat] Support expiration for chunked messages (#71)
74ef1a0 is described below

commit 74ef1a01f5c7a4604d251de6d040c433f9bbf56b
Author: Zike Yang <[email protected]>
AuthorDate: Mon Nov 7 12:09:22 2022 +0800

    [feat] Support expiration for chunked messages (#71)
    
    ### Motivation
    
    Add support for checking expiration for incomplete chunked messages.
    
    ### Modifications
    
    * Add configuration `expireTimeOfIncompleteChunkedMessageMs` to the 
consumer.
    * Add timer to check the expiration incomplete chunked messages
    
    Co-authored-by: Yunze Xu <[email protected]>
---
 include/pulsar/ConsumerConfiguration.h | 20 +++++++++++++
 include/pulsar/Message.h               |  1 +
 lib/ConsumerConfiguration.cc           | 10 +++++++
 lib/ConsumerConfigurationImpl.h        |  1 +
 lib/ConsumerImpl.cc                    | 53 +++++++++++++++++++++++++++++++++-
 lib/ConsumerImpl.h                     | 11 +++++++
 lib/MapCache.h                         | 17 +++++++++++
 tests/MapCacheTest.cc                  | 30 +++++++++++++++++++
 tests/MessageChunkingTest.cc           | 48 ++++++++++++++++++++++++++++++
 tests/PulsarFriend.h                   |  7 +++++
 tests/WaitUtils.h                      |  5 ++--
 11 files changed, 200 insertions(+), 3 deletions(-)

diff --git a/include/pulsar/ConsumerConfiguration.h 
b/include/pulsar/ConsumerConfiguration.h
index 0418cfa..520901c 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -519,6 +519,26 @@ class PULSAR_PUBLIC ConsumerConfiguration {
      */
     bool isAutoAckOldestChunkedMessageOnQueueFull() const;
 
+    /**
+     * If producer fails to publish all the chunks of a message then consumer 
can expire incomplete chunks if
+     * consumer won't be able to receive all chunks in expire times. Use value 
0 to disable this feature.
+     *
+     * Default: 60000, which means 1 minutes
+     *
+     * @param expireTimeOfIncompleteChunkedMessageMs expire time in 
milliseconds
+     * @return Consumer Configuration
+     */
+    ConsumerConfiguration& setExpireTimeOfIncompleteChunkedMessageMs(
+        long expireTimeOfIncompleteChunkedMessageMs);
+
+    /**
+     *
+     * Get the expire time of incomplete chunked message in milliseconds
+     *
+     * @return the expire time of incomplete chunked message in milliseconds
+     */
+    long getExpireTimeOfIncompleteChunkedMessageMs() const;
+
     /**
      * Set the consumer to include the given position of any reset operation 
like Consumer::seek.
      *
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 74427a2..a778660 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -200,6 +200,7 @@ class PULSAR_PUBLIC Message {
 
     friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const 
StringMap& map);
     friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const 
Message& msg);
+    friend class PulsarFriend;
 };
 }  // namespace pulsar
 
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 6298245..f37e042 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -262,6 +262,16 @@ bool 
ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const {
     return impl_->autoAckOldestChunkedMessageOnQueueFull;
 }
 
+ConsumerConfiguration& 
ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs(
+    long expireTimeOfIncompleteChunkedMessageMs) {
+    impl_->expireTimeOfIncompleteChunkedMessageMs = 
expireTimeOfIncompleteChunkedMessageMs;
+    return *this;
+}
+
+long ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs() const {
+    return impl_->expireTimeOfIncompleteChunkedMessageMs;
+}
+
 ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool 
startMessageIdInclusive) {
     impl_->startMessageIdInclusive = startMessageIdInclusive;
     return *this;
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index 444fedf..259b935 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -55,6 +55,7 @@ struct ConsumerConfigurationImpl {
     size_t maxPendingChunkedMessage{10};
     bool autoAckOldestChunkedMessageOnQueueFull{false};
     bool startMessageIdInclusive{false};
+    long expireTimeOfIncompleteChunkedMessageMs{60000};
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 5698b46..0966729 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -78,7 +78,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       readCompacted_(conf.isReadCompacted()),
       startMessageId_(startMessageId),
       maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
-      
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull())
 {
+      
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
+      
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs())
 {
     std::stringstream consumerStrStream;
     consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << 
consumerId_ << "] ";
     consumerStr_ = consumerStrStream.str();
@@ -109,6 +110,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, 
const std::string& topic,
     if (conf.isEncryptionEnabled()) {
         msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false);
     }
+
+    checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
 }
 
 ConsumerImpl::~ConsumerImpl() {
@@ -319,6 +322,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback 
originalCallback) {
     }
 }
 
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+    checkExpiredChunkedTimer_->expires_from_now(
+        
boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+    std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+    checkExpiredChunkedTimer_->async_wait([this, weakSelf](const 
boost::system::error_code& ec) -> void {
+        auto self = weakSelf.lock();
+        if (!self) {
+            return;
+        }
+        if (ec) {
+            LOG_DEBUG(getName() << " Check expired chunked messages was failed 
or cancelled, code[" << ec
+                                << "].");
+            return;
+        }
+        Lock lock(chunkProcessMutex_);
+        long currentTimeMs = TimeUtils::currentTimeMillis();
+        chunkedMessageCache_.removeOldestValuesIf(
+            [this, currentTimeMs](const std::string& uuid, const 
ChunkedMessageCtx& ctx) -> bool {
+                bool expired =
+                    currentTimeMs > ctx.getReceivedTimeMs() + 
expireTimeOfIncompleteChunkedMessageMs_;
+                if (!expired) {
+                    return false;
+                }
+                for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
+                    LOG_INFO("Removing expired chunk messages: uuid: " << uuid 
<< ", messageId: " << msgId);
+                    doAcknowledgeIndividual(msgId, [uuid, msgId](Result 
result) {
+                        if (result != ResultOk) {
+                            LOG_WARN("Failed to acknowledge discarded chunk, 
uuid: "
+                                     << uuid << ", messageId: " << msgId);
+                        }
+                    });
+                }
+                return true;
+            });
+        triggerCheckExpiredChunkedTimer();
+        return;
+    });
+}
+
 Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer& 
payload,
                                                          const 
proto::MessageMetadata& metadata,
                                                          const MessageId& 
messageId,
@@ -331,6 +373,14 @@ Optional<SharedBuffer> 
ConsumerImpl::processMessageChunk(const SharedBuffer& pay
                                                  << payload.readableBytes() << 
" bytes");
 
     Lock lock(chunkProcessMutex_);
+
+    // Lazy task scheduling to expire incomplete chunk message
+    bool expected = false;
+    if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
+        expireChunkMessageTaskScheduled_.compare_exchange_strong(expected, 
true)) {
+        triggerCheckExpiredChunkedTimer();
+    }
+
     auto it = chunkedMessageCache_.find(uuid);
 
     if (chunkId == 0) {
@@ -1448,6 +1498,7 @@ std::shared_ptr<ConsumerImpl> 
ConsumerImpl::get_shared_this_ptr() {
 void ConsumerImpl::cancelTimers() noexcept {
     boost::system::error_code ec;
     batchReceiveTimer_->cancel(ec);
+    checkExpiredChunkedTimer_->cancel(ec);
 }
 
 } /* namespace pulsar */
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 693180f..9ba6577 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -33,6 +33,7 @@
 #include "NegativeAcksTracker.h"
 #include "Synchronized.h"
 #include "TestUtil.h"
+#include "TimeUtils.h"
 #include "UnboundedBlockingQueue.h"
 
 namespace pulsar {
@@ -259,6 +260,7 @@ class ConsumerImpl : public ConsumerImplBase {
         void appendChunk(const MessageId& messageId, const SharedBuffer& 
payload) {
             chunkedMessageIds_.emplace_back(messageId);
             chunkedMsgBuffer_.write(payload.data(), payload.readableBytes());
+            receivedTimeMs_ = TimeUtils::currentTimeMillis();
         }
 
         bool isCompleted() const noexcept { return totalChunks_ == 
numChunks(); }
@@ -267,6 +269,8 @@ class ConsumerImpl : public ConsumerImplBase {
 
         const std::vector<MessageId>& getChunkedMessageIds() const noexcept { 
return chunkedMessageIds_; }
 
+        long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }
+
         friend std::ostream& operator<<(std::ostream& os, const 
ChunkedMessageCtx& ctx) {
             return os << "ChunkedMessageCtx " << 
ctx.chunkedMsgBuffer_.readableBytes() << " of "
                       << ctx.chunkedMsgBuffer_.writerIndex() << " bytes, " << 
ctx.numChunks() << " of "
@@ -277,6 +281,7 @@ class ConsumerImpl : public ConsumerImplBase {
         const int totalChunks_;
         SharedBuffer chunkedMsgBuffer_;
         std::vector<MessageId> chunkedMessageIds_;
+        long receivedTimeMs_;
 
         int numChunks() const noexcept { return 
static_cast<int>(chunkedMessageIds_.size()); }
     };
@@ -297,6 +302,12 @@ class ConsumerImpl : public ConsumerImplBase {
     MapCache<std::string, ChunkedMessageCtx> chunkedMessageCache_;
     mutable std::mutex chunkProcessMutex_;
 
+    const long expireTimeOfIncompleteChunkedMessageMs_;
+    DeadlineTimerPtr checkExpiredChunkedTimer_;
+    std::atomic_bool expireChunkMessageTaskScheduled_{false};
+
+    void triggerCheckExpiredChunkedTimer();
+
     /**
      * Process a chunk. If the chunk is the last chunk of a message, 
concatenate all buffered chunks into the
      * payload and return it.
diff --git a/lib/MapCache.h b/lib/MapCache.h
index b9a0069..55d58f6 100644
--- a/lib/MapCache.h
+++ b/lib/MapCache.h
@@ -73,6 +73,23 @@ class MapCache {
         }
     }
 
+    void removeOldestValuesIf(const std::function<bool(const Key&, const 
Value&)>& condition) {
+        if (!condition) return;
+        while (!keys_.empty()) {
+            const auto key = keys_.front();
+            auto it = map_.find(key);
+            if (it == map_.end()) {
+                continue;
+            }
+            if (condition(it->first, it->second)) {
+                map_.erase(it);
+                keys_.pop_front();
+            } else {
+                return;
+            }
+        }
+    }
+
     void remove(const Key& key) {
         auto it = map_.find(key);
         if (it != map_.end()) {
diff --git a/tests/MapCacheTest.cc b/tests/MapCacheTest.cc
index 2140937..69496c4 100644
--- a/tests/MapCacheTest.cc
+++ b/tests/MapCacheTest.cc
@@ -77,3 +77,33 @@ TEST(MapCacheTest, testRemoveAllValues) {
     ASSERT_TRUE(cache.getKeys().empty());
     ASSERT_EQ(cache.size(), 0);
 }
+
+TEST(MapCacheTest, testRemoveOldestValuesIf) {
+    MapCache<int, MoveOnlyInt> cache;
+    cache.putIfAbsent(1, {100});
+    cache.putIfAbsent(2, {200});
+    cache.putIfAbsent(3, {300});
+    int expireTime = 100;
+
+    auto checkCondition = [&expireTime](const int& key, const MoveOnlyInt& 
value) -> bool {
+        return expireTime > value.x;
+    };
+
+    cache.removeOldestValuesIf(nullptr);
+    ASSERT_EQ(cache.size(), 3);
+
+    cache.removeOldestValuesIf(checkCondition);
+    ASSERT_EQ(cache.size(), 3);
+
+    expireTime = 200;
+    cache.removeOldestValuesIf(checkCondition);
+
+    auto keys = cache.getKeys();
+    ASSERT_EQ(cache.size(), 2);
+    ASSERT_EQ(cache.find(2)->second.x, 200);
+    ASSERT_EQ(cache.find(3)->second.x, 300);
+
+    expireTime = 400;
+    cache.removeOldestValuesIf(checkCondition);
+    ASSERT_EQ(cache.size(), 0);
+}
diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc
index 61a9714..8675886 100644
--- a/tests/MessageChunkingTest.cc
+++ b/tests/MessageChunkingTest.cc
@@ -23,6 +23,7 @@
 #include <random>
 
 #include "PulsarFriend.h"
+#include "WaitUtils.h"
 #include "lib/LogUtils.h"
 
 DECLARE_LOG_OBJECT()
@@ -81,6 +82,10 @@ class MessageChunkingTest : public 
::testing::TestWithParam<CompressionType> {
         ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
     }
 
+    void createConsumer(const std::string& topic, Consumer& consumer, 
ConsumerConfiguration& conf) {
+        ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, 
consumer));
+    }
+
    private:
     Client client_{lookupUrl};
 };
@@ -130,6 +135,49 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
     // Verify the cache has been cleared
     auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
     ASSERT_EQ(chunkedMessageCache.size(), 0);
+
+    producer.close();
+    consumer.close();
+}
+
+TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) {
+    // This test is time-consuming and is not related to the compressionType. 
So skip other compressionType
+    // here.
+    if (toString(GetParam()) != "None") {
+        return;
+    }
+    const std::string topic = 
"MessageChunkingTest-testExpireIncompleteChunkMessage-" + toString(GetParam()) +
+                              std::to_string(time(nullptr));
+    Consumer consumer;
+    ConsumerConfiguration consumerConf;
+    consumerConf.setExpireTimeOfIncompleteChunkedMessageMs(5000);
+    consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true);
+    createConsumer(topic, consumer, consumerConf);
+    Producer producer;
+    createProducer(topic, producer);
+
+    auto msg = MessageBuilder().setContent("test-data").build();
+    auto& metadata = PulsarFriend::getMessageMetadata(msg);
+    metadata.set_num_chunks_from_msg(2);
+    metadata.set_chunk_id(0);
+    metadata.set_total_chunk_msg_size(100);
+
+    producer.send(msg);
+
+    auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
+
+    waitUntil(
+        std::chrono::seconds(2), [&] { return chunkedMessageCache.size() > 0; 
}, 1000);
+    ASSERT_EQ(chunkedMessageCache.size(), 1);
+
+    // Wait for triggering the check of the expiration.
+    // Need to wait for 2 * expireTime because there may be a gap in checking 
the expiration time.
+    waitUntil(
+        std::chrono::seconds(10), [&] { return chunkedMessageCache.size() == 
0; }, 1000);
+    ASSERT_EQ(chunkedMessageCache.size(), 0);
+
+    producer.close();
+    consumer.close();
 }
 
 // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't 
have INSTANTIATE_TEST_SUITE_P
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 938b284..3272bce 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -16,12 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+#ifndef PULSAR_FRIEND_HPP_
+#define PULSAR_FRIEND_HPP_
 
 #include <string>
 
 #include "lib/ClientConnection.h"
 #include "lib/ClientImpl.h"
 #include "lib/ConsumerImpl.h"
+#include "lib/MessageImpl.h"
 #include "lib/MultiTopicsConsumerImpl.h"
 #include "lib/NamespaceName.h"
 #include "lib/PartitionedProducerImpl.h"
@@ -180,5 +183,9 @@ class PulsarFriend {
     static size_t getNumberOfPendingTasks(const RetryableLookupService& 
lookupService) {
         return lookupService.backoffTimers_.size();
     }
+
+    static proto::MessageMetadata& getMessageMetadata(Message& message) { 
return message.impl_->metadata; }
 };
 }  // namespace pulsar
+
+#endif /* PULSAR_FRIEND_HPP_ */
diff --git a/tests/WaitUtils.h b/tests/WaitUtils.h
index abe3efc..d7db82e 100644
--- a/tests/WaitUtils.h
+++ b/tests/WaitUtils.h
@@ -25,14 +25,15 @@
 namespace pulsar {
 
 template <typename Rep, typename Period>
-inline void waitUntil(std::chrono::duration<Rep, Period> timeout, 
std::function<bool()> condition) {
+inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const 
std::function<bool()>& condition,
+                      long durationMs = 10) {
     auto timeoutMs = 
std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
     while (timeoutMs > 0) {
         auto now = std::chrono::high_resolution_clock::now();
         if (condition()) {
             break;
         }
-        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        std::this_thread::sleep_for(std::chrono::milliseconds(durationMs));
         auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
                            std::chrono::high_resolution_clock::now() - now)
                            .count();

Reply via email to