This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 884ad96  Retry after 20ms if receiving message is throttled (#79)
884ad96 is described below

commit 884ad960b6be1a544777f60c25cfb2a47a6feb45
Author: Zhanhui Li <[email protected]>
AuthorDate: Thu Jul 28 11:52:42 2022 +0800

    Retry after 20ms if receiving message is throttled (#79)
    
    * Polish readme and build examples with cmake
    
    * Retry after 20ms once receiving message is throttled
---
 cpp/source/client/ReceiveMessageStreamReader.cpp       |  2 +-
 cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp    | 18 ++++++++++++------
 .../rocketmq/include/AsyncReceiveMessageCallback.h     |  2 +-
 3 files changed, 14 insertions(+), 8 deletions(-)

diff --git a/cpp/source/client/ReceiveMessageStreamReader.cpp 
b/cpp/source/client/ReceiveMessageStreamReader.cpp
index 4eccb9f..03204c6 100644
--- a/cpp/source/client/ReceiveMessageStreamReader.cpp
+++ b/cpp/source/client/ReceiveMessageStreamReader.cpp
@@ -30,7 +30,7 @@ 
ReceiveMessageStreamReader::ReceiveMessageStreamReader(std::weak_ptr<ClientManag
                                                        std::string 
peer_address,
                                                        
rmq::ReceiveMessageRequest request,
                                                        
std::unique_ptr<ReceiveMessageContext> context)
-    : client_manager_(client_manager),
+    : client_manager_(std::move(client_manager)),
       stub_(stub),
       peer_address_(std::move(peer_address)),
       request_(std::move(request)),
diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp 
b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
index 1e03802..1c96b09 100644
--- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -24,6 +24,7 @@
 #include "spdlog/spdlog.h"
 #include "ProcessQueue.h"
 #include "PushConsumerImpl.h"
+#include "rocketmq/ErrorCode.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -44,9 +45,15 @@ void AsyncReceiveMessageCallback::onCompletion(const 
std::error_code& ec, const
     return;
   }
 
+  if (ec == ErrorCode::TooManyRequests) {
+    SPDLOG_WARN("Action of receiving message is throttled. Retry after 20ms. 
Queue={}", process_queue->simpleName());
+    receiveMessageLater(std::chrono::milliseconds(20));
+    return;
+  }
+
   if (ec) {
-    SPDLOG_WARN("Receive message from {} failed. Cause: {}. Attempt later.", 
process_queue->simpleName(), ec.message());
-    receiveMessageLater();
+    SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 
second.", process_queue->simpleName(), ec.message());
+    receiveMessageLater(std::chrono::seconds (1));
     return;
   }
 
@@ -70,14 +77,14 @@ void 
AsyncReceiveMessageCallback::checkThrottleThenReceive() {
     SPDLOG_INFO("Number of messages in {} exceeds throttle threshold. Receive 
messages later.",
                 process_queue->simpleName());
     process_queue->syncIdleState();
-    receiveMessageLater();
+    receiveMessageLater(std::chrono::seconds(1));
   } else {
     // Receive message immediately
     receiveMessageImmediately();
   }
 }
 
-void AsyncReceiveMessageCallback::receiveMessageLater() {
+void 
AsyncReceiveMessageCallback::receiveMessageLater(std::chrono::milliseconds 
delay) {
   auto process_queue = process_queue_.lock();
   if (!process_queue) {
     return;
@@ -93,8 +100,7 @@ void AsyncReceiveMessageCallback::receiveMessageLater() {
     }
   };
 
-  client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, 
std::chrono::seconds(1),
-                                            std::chrono::seconds(0));
+  client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, 
delay, std::chrono::seconds(0));
 }
 
 void AsyncReceiveMessageCallback::receiveMessageImmediately() {
diff --git a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h 
b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
index b3fe2b1..5a13442 100644
--- a/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
+++ b/cpp/source/rocketmq/include/AsyncReceiveMessageCallback.h
@@ -31,7 +31,7 @@ public:
 
   void onCompletion(const std::error_code& ec, const ReceiveMessageResult& 
result);
 
-  void receiveMessageLater();
+  void receiveMessageLater(std::chrono::milliseconds delay);
 
   void receiveMessageImmediately();
 

Reply via email to