This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new bd1e7ab7 [ISSUE #967] C++ client support namespace, empty body check 
and recall message (#968)
bd1e7ab7 is described below

commit bd1e7ab73a0c939d8e0d008b0e068bb63543e42b
Author: lizhimins <[email protected]>
AuthorDate: Wed Mar 26 11:18:31 2025 +0800

    [ISSUE #967] C++ client support namespace, empty body check and recall 
message (#968)
    
    * [ISSUE #967] C++ client support namespace, empty body check and recall 
message
---
 cpp/examples/ExampleProducerWithTimedMessage.cpp   |  23 +++-
 cpp/include/rocketmq/Configuration.h               |   7 ++
 cpp/include/rocketmq/ErrorCode.h                   |   5 +
 cpp/include/rocketmq/Producer.h                    |  12 +++
 .../rocketmq/{SendReceipt.h => RecallReceipt.h}    |   9 +-
 cpp/include/rocketmq/SendReceipt.h                 |   2 +
 cpp/source/base/Configuration.cpp                  |   5 +
 cpp/source/base/include/Protocol.h                 |   2 +
 cpp/source/client/ClientManagerImpl.cpp            | 118 ++++++++++++++++++++-
 cpp/source/client/RpcClientImpl.cpp                |   7 ++
 cpp/source/client/Signature.cpp                    |   1 +
 cpp/source/client/include/ClientManager.h          |  11 +-
 cpp/source/client/include/ClientManagerImpl.h      |   6 ++
 cpp/source/client/include/RpcClient.h              |   3 +
 cpp/source/client/include/RpcClientImpl.h          |   3 +
 cpp/source/client/include/SendResult.h             |   1 +
 cpp/source/client/mocks/include/RpcClientMock.h    |   3 +
 cpp/source/log/LoggerImpl.cpp                      |   2 +-
 cpp/source/rocketmq/Producer.cpp                   |   7 +-
 cpp/source/rocketmq/ProducerImpl.cpp               |  87 +++++++++++++--
 cpp/source/rocketmq/PushConsumer.cpp               |   1 +
 cpp/source/rocketmq/PushConsumerImpl.cpp           |  45 ++++++--
 cpp/source/rocketmq/SendContext.cpp                |   1 +
 cpp/source/rocketmq/SimpleConsumer.cpp             |   1 +
 cpp/source/rocketmq/SimpleConsumerImpl.cpp         |  20 +++-
 cpp/source/rocketmq/TopicPublishInfo.cpp           |   7 +-
 cpp/source/rocketmq/include/ClientImpl.h           |   4 +
 cpp/source/rocketmq/include/ProcessQueueImpl.h     |   1 +
 cpp/source/rocketmq/include/ProducerImpl.h         |   6 ++
 cpp/source/rocketmq/include/PushConsumerImpl.h     |   2 +-
 30 files changed, 365 insertions(+), 37 deletions(-)

diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp 
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index ba2a45f7..f4624deb 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -109,9 +109,30 @@ int main(int argc, char* argv[]) {
                              std::chrono::system_clock::now() +
                              std::chrono::seconds(10))  // This message would 
be available to consumers after 10 seconds
                          .build();
+
       std::error_code ec;
       SendReceipt send_receipt = producer.send(std::move(message), ec);
-      std::cout << "Message-ID: " << send_receipt.message_id << std::endl;
+
+      if (ec) {
+        std::cout << "Message-ID: " << send_receipt.message_id << " send 
error"<< std::endl;
+      } else {
+        std::cout << "Message-ID: " << send_receipt.message_id << ", "
+                  << "Message-Recall-Handle: " << send_receipt.recall_handle 
<< std::endl;
+
+        // To attempt to recall a message, server support is required to 
perform this operation.
+        if (i % 2) {
+          RecallReceipt recall_receipt = producer.recall(FLAGS_topic, 
send_receipt.recall_handle, ec);
+
+          if (ec) {
+            std::cout << "Message-ID: " << send_receipt.message_id
+                      << ", Recall ErrorCode: " << ec << std::endl;
+          } else {
+            std::cout << "Message-ID: " << send_receipt.message_id
+                      << ", Message-Recall-ID: " << recall_receipt.message_id 
<< std::endl;
+          }
+        }
+      }
+
       count++;
     }
   } catch (...) {
diff --git a/cpp/include/rocketmq/Configuration.h 
b/cpp/include/rocketmq/Configuration.h
index 6dcd4137..a653c87a 100644
--- a/cpp/include/rocketmq/Configuration.h
+++ b/cpp/include/rocketmq/Configuration.h
@@ -35,6 +35,10 @@ public:
     return endpoints_;
   }
 
+  const std::string& resourceNamespace() const {
+    return resource_namespace_;
+  }
+
   CredentialsProviderPtr credentialsProvider() const {
     return credentials_provider_;
   }
@@ -54,6 +58,7 @@ protected:
 
 private:
   std::string               endpoints_;
+  std::string               resource_namespace_;
   CredentialsProviderPtr    credentials_provider_;
   std::chrono::milliseconds 
request_timeout_{ConfigurationDefaults::RequestTimeout};
   bool tls_ = true;
@@ -63,6 +68,8 @@ class ConfigurationBuilder {
 public:
   ConfigurationBuilder& withEndpoints(std::string endpoints);
 
+  ConfigurationBuilder& withNamespace(std::string resource_namespace);
+
   ConfigurationBuilder& 
withCredentialsProvider(std::shared_ptr<CredentialsProvider> provider);
 
   ConfigurationBuilder& withRequestTimeout(std::chrono::milliseconds 
request_timeout);
diff --git a/cpp/include/rocketmq/ErrorCode.h b/cpp/include/rocketmq/ErrorCode.h
index 51784e16..eb111926 100644
--- a/cpp/include/rocketmq/ErrorCode.h
+++ b/cpp/include/rocketmq/ErrorCode.h
@@ -143,6 +143,11 @@ enum class ErrorCode : int {
    */
   MessageBodyTooLarge = 41301,
 
+  /**
+   * @brief Message body is empty.
+   */
+  MessageBodyEmpty = 41302,
+
   /**
    * @brief When trying to perform an action whose dependent procedure state is
    * not right, this code will be used.
diff --git a/cpp/include/rocketmq/Producer.h b/cpp/include/rocketmq/Producer.h
index d3de06e9..be1026f8 100644
--- a/cpp/include/rocketmq/Producer.h
+++ b/cpp/include/rocketmq/Producer.h
@@ -22,6 +22,7 @@
 
 #include "Configuration.h"
 #include "Message.h"
+#include "RecallReceipt.h"
 #include "SendCallback.h"
 #include "SendReceipt.h"
 #include "Transaction.h"
@@ -67,6 +68,17 @@ public:
 
   SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
 
+  /**
+   * @brief Attempts to cancel a scheduled message based on the provided topic 
and recall handle.
+   * This operation requires server support to be executed successfully.
+   *
+   * @param topic The topic associated with the scheduled message to be 
canceled.
+   * @param recall_handle A unique identifier or handle for the message 
cancellation operation.
+   * @param ec An error code that will be set if the operation encounters an 
error.
+   * @return RecallReceipt A receipt object indicating the result of the 
cancellation operation.
+   */
+  RecallReceipt recall(std::string& topic, std::string& recall_handle, 
std::error_code& ec) noexcept;
+
 private:
   explicit Producer(std::shared_ptr<ProducerImpl> impl) : 
impl_(std::move(impl)) {
   }
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/include/rocketmq/RecallReceipt.h
similarity index 85%
copy from cpp/include/rocketmq/SendReceipt.h
copy to cpp/include/rocketmq/RecallReceipt.h
index 7eef6e79..dbbb0ba7 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/RecallReceipt.h
@@ -19,18 +19,11 @@
 #include <string>
 
 #include "RocketMQ.h"
-#include "rocketmq/Message.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-struct SendReceipt {
-  std::string target;
-
+struct RecallReceipt {
   std::string message_id;
-
-  std::string transaction_id;
-
-  MessageConstPtr message;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/include/rocketmq/SendReceipt.h 
b/cpp/include/rocketmq/SendReceipt.h
index 7eef6e79..4e797770 100644
--- a/cpp/include/rocketmq/SendReceipt.h
+++ b/cpp/include/rocketmq/SendReceipt.h
@@ -30,6 +30,8 @@ struct SendReceipt {
 
   std::string transaction_id;
 
+  std::string recall_handle;
+
   MessageConstPtr message;
 };
 
diff --git a/cpp/source/base/Configuration.cpp 
b/cpp/source/base/Configuration.cpp
index 66cff2e8..13330261 100644
--- a/cpp/source/base/Configuration.cpp
+++ b/cpp/source/base/Configuration.cpp
@@ -28,6 +28,11 @@ ConfigurationBuilder& 
ConfigurationBuilder::withEndpoints(std::string endpoints)
   return *this;
 }
 
+ConfigurationBuilder& ConfigurationBuilder::withNamespace(std::string 
resource_namespace) {
+  configuration_.resource_namespace_ = std::move(resource_namespace);
+  return *this;
+}
+
 ConfigurationBuilder& 
ConfigurationBuilder::withCredentialsProvider(std::shared_ptr<CredentialsProvider>
 provider) {
   configuration_.credentials_provider_ = std::move(provider);
   return *this;
diff --git a/cpp/source/base/include/Protocol.h 
b/cpp/source/base/include/Protocol.h
index 5a8e671f..2c11e70d 100644
--- a/cpp/source/base/include/Protocol.h
+++ b/cpp/source/base/include/Protocol.h
@@ -46,6 +46,8 @@ using HeartbeatRequest = rmq::HeartbeatRequest;
 using HeartbeatResponse = rmq::HeartbeatResponse;
 using EndTransactionRequest = rmq::EndTransactionRequest;
 using EndTransactionResponse = rmq::EndTransactionResponse;
+using RecallMessageRequest = rmq::RecallMessageRequest;
+using RecallMessageResponse = rmq::RecallMessageResponse;
 using RecoverOrphanedTransactionCommand = 
rmq::RecoverOrphanedTransactionCommand;
 using PrintThreadStackTraceCommand = rmq::PrintThreadStackTraceCommand;
 using ThreadStackTrace = rmq::ThreadStackTrace;
diff --git a/cpp/source/client/ClientManagerImpl.cpp 
b/cpp/source/client/ClientManagerImpl.cpp
index 053f7723..3b22ddd6 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -325,8 +325,11 @@ bool ClientManagerImpl::send(const std::string& 
target_host,
           auto first = invocation_context->response.entries().begin();
           send_result.message_id = first->message_id();
           send_result.transaction_id = first->transaction_id();
+          // unique handle to identify a message to recall,
+          // only delay message is supported for now
+          send_result.recall_handle = first->recall_handle();
         } else {
-          SPDLOG_ERROR("Unexpected send-message-response: {}", 
invocation_context->response.DebugString());
+          SPDLOG_ERROR("Unexpected send-message-response: {}", 
invocation_context->response.ShortDebugString());
         }
         break;
       }
@@ -361,6 +364,12 @@ bool ClientManagerImpl::send(const std::string& 
target_host,
         break;
       }
 
+      case rmq::Code::ILLEGAL_DELIVERY_TIME: {
+        SPDLOG_ERROR("IllegalDeliveryTime: {}. Host={}", status.message(), 
invocation_context->remote_address);
+        send_result.ec = ErrorCode::IllegalMessageProperty;
+        break;
+      }
+
       case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: {
         SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", 
status.message(), invocation_context->remote_address);
         send_result.ec = ErrorCode::MessagePropertiesTooLarge;
@@ -373,6 +382,12 @@ bool ClientManagerImpl::send(const std::string& 
target_host,
         break;
       }
 
+      case rmq::Code::MESSAGE_BODY_EMPTY: {
+        SPDLOG_ERROR("MessageBodyEmpty: {}. Host={}", status.message(), 
invocation_context->remote_address);
+        send_result.ec = ErrorCode::MessageBodyTooLarge;
+        break;
+      }
+
       case rmq::Code::TOPIC_NOT_FOUND: {
         SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), 
invocation_context->remote_address);
         send_result.ec = ErrorCode::TopicNotFound;
@@ -1288,6 +1303,107 @@ void ClientManagerImpl::endTransaction(
   client->asyncEndTransaction(request, invocation_context);
 }
 
+void ClientManagerImpl::recallMessage(const std::string& target_host, const 
Metadata& metadata,
+                                      const RecallMessageRequest& request, 
std::chrono::milliseconds timeout,
+                                      const std::function<void(const 
std::error_code&, const RecallMessageResponse&)>& cb) {
+
+  SPDLOG_DEBUG("RecallMessage Request: {}", request.ShortDebugString());
+
+  RpcClientSharedPtr client = getRpcClient(target_host);
+  if (!client) {
+    SPDLOG_WARN("No RPC client for {}", target_host);
+    RecallMessageResponse response;
+    std::error_code ec = ErrorCode::BadRequest;
+    cb(ec, response);
+    return;
+  }
+
+  auto invocation_context = new InvocationContext<RecallMessageResponse>();
+  invocation_context->task_name = fmt::format("Recall message, topic={}, 
recall handle={} to {}",
+                                              
request.topic().ShortDebugString(), request.recall_handle().data(), 
target_host);
+  invocation_context->remote_address = target_host;
+  for (const auto& item : metadata) {
+    invocation_context->context.AddMetadata(item.first, item.second);
+  }
+  invocation_context->context.set_deadline(std::chrono::system_clock::now() + 
timeout);
+
+  auto callback =
+      [target_host, cb](const InvocationContext<RecallMessageResponse>* 
invocation_context) {
+
+    std::error_code ec;
+    if (!invocation_context->status.ok()) {
+      SPDLOG_WARN("Failed to write EndTransaction to wire. gRPC-code: {}, 
gRPC-message: {}, host={}",
+                  invocation_context->status.error_code(), 
invocation_context->status.error_message(),
+                  invocation_context->remote_address);
+      ec = ErrorCode::BadRequest;
+      cb(ec, invocation_context->response);
+      return;
+    }
+
+    auto&& status = invocation_context->response.status();
+    auto&& peer_address = invocation_context->remote_address;
+    switch (status.code()) {
+      case rmq::Code::OK: {
+        SPDLOG_DEBUG("Recall message OK. Response: {}, host={}",
+                     invocation_context->response.ShortDebugString(), 
peer_address);
+        break;
+      }
+
+      case rmq::Code::ILLEGAL_TOPIC: {
+        SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(), 
peer_address);
+        ec = ErrorCode::IllegalTopic;
+        break;
+      }
+
+      case rmq::Code::CLIENT_ID_REQUIRED: {
+        SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), 
peer_address);
+        ec = ErrorCode::InternalClientError;
+        break;
+      }
+
+      case rmq::Code::TOPIC_NOT_FOUND: {
+        SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), 
peer_address);
+        ec = ErrorCode::TopicNotFound;
+        break;
+      }
+
+      case rmq::Code::UNAUTHORIZED: {
+        SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), 
peer_address);
+        ec = ErrorCode::Unauthorized;
+        break;
+      }
+
+      case rmq::Code::FORBIDDEN: {
+        SPDLOG_WARN("Forbidden: {}, host={}", status.message(), peer_address);
+        ec = ErrorCode::Forbidden;
+        break;
+      }
+
+      case rmq::Code::INTERNAL_SERVER_ERROR: {
+        SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), 
peer_address);
+        ec = ErrorCode::InternalServerError;
+        break;
+      }
+
+      case rmq::Code::PROXY_TIMEOUT: {
+        SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(), 
peer_address);
+        ec = ErrorCode::GatewayTimeout;
+        break;
+      }
+
+      default: {
+        SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. {}, 
host={}", status.message(), peer_address);
+        ec = ErrorCode::NotSupported;
+        break;
+      }
+    }
+    cb(ec, invocation_context->response);
+  };
+
+  invocation_context->callback = callback;
+  client->asyncRecallMessage(request, invocation_context);
+}
+
 void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& 
target_host,
                                                         const Metadata& 
metadata,
                                                         const 
ForwardMessageToDeadLetterQueueRequest& request,
diff --git a/cpp/source/client/RpcClientImpl.cpp 
b/cpp/source/client/RpcClientImpl.cpp
index d9f10212..2e183f2a 100644
--- a/cpp/source/client/RpcClientImpl.cpp
+++ b/cpp/source/client/RpcClientImpl.cpp
@@ -124,6 +124,13 @@ void RpcClientImpl::asyncEndTransaction(const 
EndTransactionRequest& request,
   stub_->async()->EndTransaction(&invocation_context->context, &request, 
&invocation_context->response, callback);
 }
 
+void RpcClientImpl::asyncRecallMessage(const RecallMessageRequest& request,
+                                       
InvocationContext<RecallMessageResponse>* invocation_context) {
+  std::weak_ptr<RpcClient> rpc_client(shared_from_this());
+  auto callback = std::bind(&RpcClientImpl::asyncCallback, rpc_client, 
invocation_context, std::placeholders::_1);
+  stub_->async()->RecallMessage(&invocation_context->context, &request, 
&invocation_context->response, callback);
+}
+
 bool RpcClientImpl::ok() const {
   return channel_ && grpc_connectivity_state::GRPC_CHANNEL_SHUTDOWN != 
channel_->GetState(false);
 }
diff --git a/cpp/source/client/Signature.cpp b/cpp/source/client/Signature.cpp
index e25dbb54..4b43f216 100644
--- a/cpp/source/client/Signature.cpp
+++ b/cpp/source/client/Signature.cpp
@@ -28,6 +28,7 @@ void Signature::sign(const ClientConfig& client, 
absl::flat_hash_map<std::string
   metadata.insert({MetadataConstants::LANGUAGE_KEY, "CPP"});
   // Add common headers
   metadata.insert({MetadataConstants::CLIENT_ID_KEY, client.client_id});
+  metadata.insert({MetadataConstants::NAMESPACE_KEY, 
client.resource_namespace});
   metadata.insert({MetadataConstants::CLIENT_VERSION_KEY, 
MetadataConstants::CLIENT_VERSION});
   metadata.insert({MetadataConstants::PROTOCOL_VERSION_KEY, 
protocolVersion()});
 
diff --git a/cpp/source/client/include/ClientManager.h 
b/cpp/source/client/include/ClientManager.h
index 02b232b2..c67c6ea8 100644
--- a/cpp/source/client/include/ClientManager.h
+++ b/cpp/source/client/include/ClientManager.h
@@ -81,12 +81,15 @@ public:
                               const EndTransactionRequest& request, 
std::chrono::milliseconds timeout,
                               const std::function<void(const std::error_code&, 
const EndTransactionResponse&)>& cb) = 0;
 
+  virtual void recallMessage(const std::string& target_host, const Metadata& 
metadata,
+                             const RecallMessageRequest& request, 
std::chrono::milliseconds timeout,
+                             const std::function<void(const std::error_code&, 
const RecallMessageResponse&)>& cb) = 0;
+
   virtual void addClientObserver(std::weak_ptr<Client> client) = 0;
 
-  virtual void
-  queryAssignment(const std::string& target, const Metadata& metadata, const 
QueryAssignmentRequest& request,
-                  std::chrono::milliseconds timeout,
-                  const std::function<void(const std::error_code&, const 
QueryAssignmentResponse&)>& cb) = 0;
+  virtual void queryAssignment(const std::string& target, const Metadata& 
metadata,
+                               const QueryAssignmentRequest& request, 
std::chrono::milliseconds timeout,
+                               const std::function<void(const 
std::error_code&, const QueryAssignmentResponse&)>& cb) = 0;
 
   virtual void receiveMessage(const std::string& target, const Metadata& 
metadata, const ReceiveMessageRequest& request,
                               std::chrono::milliseconds timeout, 
ReceiveMessageCallback callback) = 0;
diff --git a/cpp/source/client/include/ClientManagerImpl.h 
b/cpp/source/client/include/ClientManagerImpl.h
index 5f1b27ca..cd862154 100644
--- a/cpp/source/client/include/ClientManagerImpl.h
+++ b/cpp/source/client/include/ClientManagerImpl.h
@@ -176,6 +176,12 @@ public:
                       std::chrono::milliseconds timeout,
                       const std::function<void(const std::error_code&, const 
EndTransactionResponse&)>& cb) override;
 
+  void recallMessage(const std::string& target_host,
+                     const Metadata& metadata,
+                     const RecallMessageRequest& request,
+                     std::chrono::milliseconds timeout,
+                     const std::function<void(const std::error_code&, const 
RecallMessageResponse&)>& cb) override;
+
   std::error_code notifyClientTermination(const std::string& target_host,
                                           const Metadata& metadata,
                                           const 
NotifyClientTerminationRequest& request,
diff --git a/cpp/source/client/include/RpcClient.h 
b/cpp/source/client/include/RpcClient.h
index fbb30171..ff926957 100644
--- a/cpp/source/client/include/RpcClient.h
+++ b/cpp/source/client/include/RpcClient.h
@@ -72,6 +72,9 @@ public:
   virtual void asyncEndTransaction(const EndTransactionRequest& request,
                                    InvocationContext<EndTransactionResponse>* 
invocation_context) = 0;
 
+  virtual void asyncRecallMessage(const RecallMessageRequest& request,
+                                   InvocationContext<RecallMessageResponse>* 
invocation_context) = 0;
+
   virtual std::shared_ptr<TelemetryBidiReactor> 
asyncTelemetry(std::weak_ptr<Client> client) = 0;
 
   virtual void asyncForwardMessageToDeadLetterQueue(
diff --git a/cpp/source/client/include/RpcClientImpl.h 
b/cpp/source/client/include/RpcClientImpl.h
index 35316ec6..9d4483e1 100644
--- a/cpp/source/client/include/RpcClientImpl.h
+++ b/cpp/source/client/include/RpcClientImpl.h
@@ -66,6 +66,9 @@ public:
   void asyncEndTransaction(const EndTransactionRequest& request,
                            InvocationContext<EndTransactionResponse>* 
invocation_context) override;
 
+  void asyncRecallMessage(const RecallMessageRequest& request,
+                          InvocationContext<RecallMessageResponse>* 
invocation_context) override;
+
   std::shared_ptr<TelemetryBidiReactor> asyncTelemetry(std::weak_ptr<Client> 
client) override;
 
   void asyncForwardMessageToDeadLetterQueue(
diff --git a/cpp/source/client/include/SendResult.h 
b/cpp/source/client/include/SendResult.h
index 3596f61f..04999555 100644
--- a/cpp/source/client/include/SendResult.h
+++ b/cpp/source/client/include/SendResult.h
@@ -28,6 +28,7 @@ struct SendResult {
 
   std::string message_id;
   std::string transaction_id;
+  std::string recall_handle;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/mocks/include/RpcClientMock.h 
b/cpp/source/client/mocks/include/RpcClientMock.h
index a8475976..898e8453 100644
--- a/cpp/source/client/mocks/include/RpcClientMock.h
+++ b/cpp/source/client/mocks/include/RpcClientMock.h
@@ -56,6 +56,9 @@ public:
   MOCK_METHOD(void, asyncEndTransaction, (const EndTransactionRequest&, 
InvocationContext<EndTransactionResponse>*),
               (override));
 
+  MOCK_METHOD(void, asyncRecallMessage, (const RecallMessageRequest&, 
InvocationContext<RecallMessageResponse>*),
+              (override));
+
   MOCK_METHOD(void, asyncForwardMessageToDeadLetterQueue,
               (const ForwardMessageToDeadLetterQueueRequest&,
                InvocationContext<ForwardMessageToDeadLetterQueueResponse>*),
diff --git a/cpp/source/log/LoggerImpl.cpp b/cpp/source/log/LoggerImpl.cpp
index cdd2f473..a7047d4a 100644
--- a/cpp/source/log/LoggerImpl.cpp
+++ b/cpp/source/log/LoggerImpl.cpp
@@ -76,7 +76,7 @@ void LoggerImpl::init0() {
       abort();
     }
   }
-  std::cout << "RocketMQ log files path: " << log_dir.c_str() << std::endl;
+  // std::cout << "RocketMQ log files path: " << log_dir.c_str() << std::endl;
 
   if (pattern_.empty()) {
     pattern_ = DEFAULT_PATTERN;
diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp
index 4f29383f..916c47a4 100644
--- a/cpp/source/rocketmq/Producer.cpp
+++ b/cpp/source/rocketmq/Producer.cpp
@@ -16,7 +16,6 @@
  */
 #include "rocketmq/Producer.h"
 
-#include <chrono>
 #include <memory>
 #include <system_error>
 #include <utility>
@@ -26,6 +25,7 @@
 #include "rocketmq/ErrorCode.h"
 #include "rocketmq/SendReceipt.h"
 #include "rocketmq/Transaction.h"
+#include "rocketmq/RecallReceipt.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -68,6 +68,10 @@ SendReceipt Producer::send(MessageConstPtr message, 
std::error_code& ec, Transac
   return impl_->send(std::move(message), ec, transaction);
 }
 
+RecallReceipt Producer::recall(std::string& topic, std::string& recall_handle, 
std::error_code& ec) noexcept {
+  return impl_->recall(topic, recall_handle, ec);
+}
+
 ProducerBuilder Producer::newBuilder() {
   return {};
 }
@@ -77,6 +81,7 @@ ProducerBuilder::ProducerBuilder() : 
impl_(std::make_shared<ProducerImpl>()){};
 ProducerBuilder& ProducerBuilder::withConfiguration(Configuration 
configuration) {
   auto name_server_resolver = 
std::make_shared<StaticNameServerResolver>(configuration.endpoints());
   impl_->withNameServerResolver(std::move(name_server_resolver));
+  impl_->withResourceNamespace(configuration.resourceNamespace());
   impl_->withCredentialsProvider(configuration.credentialsProvider());
   impl_->withRequestTimeout(configuration.requestTimeout());
   impl_->withSsl(configuration.withSsl());
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp 
b/cpp/source/rocketmq/ProducerImpl.cpp
index 3de3d37d..7ea7d4f6 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -17,14 +17,12 @@
 #include "ProducerImpl.h"
 
 #include <algorithm>
-#include <atomic>
 #include <cassert>
 #include <chrono>
 #include <memory>
 #include <system_error>
 #include <utility>
 
-#include "apache/rocketmq/v2/definition.pb.h"
 #include "MixAll.h"
 #include "Protocol.h"
 #include "PublishInfoCallback.h"
@@ -40,6 +38,7 @@
 #include "rocketmq/SendReceipt.h"
 #include "rocketmq/Transaction.h"
 #include "rocketmq/TransactionChecker.h"
+#include "rocketmq/RecallReceipt.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
@@ -97,8 +96,12 @@ void ProducerImpl::validate(const Message& message, 
std::error_code& ec) {
   MixAll::validate(message, ec);
 
   if (!ec) {
+    if (message.body().empty()) {
+      SPDLOG_WARN("Body of the message is null, topic={}", message.topic());
+      ec = ErrorCode::MessageBodyEmpty;
+    }
     if (message.body().length() > client_config_.publisher.max_body_size) {
-      SPDLOG_WARN("Body of the message to send is too large");
+      SPDLOG_WARN("Body of the message to send is too large, topic={}", 
message.topic());
       ec = ErrorCode::PayloadTooLarge;
     }
   }
@@ -224,13 +227,15 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec) noe
   SendReceipt   send_receipt;
 
   // Define callback
-  auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& 
receipt) mutable {
+  auto callback =
+      [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) 
mutable {
     ec = code;
-    SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+    auto& receipt_mut = const_cast<SendReceipt&>(receipt);
     send_receipt.target = std::move(receipt_mut.target);
     send_receipt.message_id = std::move(receipt_mut.message_id);
     send_receipt.message = std::move(receipt_mut.message);
     send_receipt.transaction_id = std::move(receipt_mut.transaction_id);
+    send_receipt.recall_handle = std::move(receipt_mut.recall_handle);
     {
       absl::MutexLock lk(mtx.get());
       completed = true;
@@ -450,8 +455,9 @@ bool ProducerImpl::endTransaction0(const MiniTransaction& 
transaction, Transacti
     }
   };
 
-  client_manager_->endTransaction(transaction.target, metadata, request, 
absl::ToChronoMilliseconds(requestTimeout()),
-                                  cb);
+  client_manager_->endTransaction(
+      transaction.target, metadata, request, 
absl::ToChronoMilliseconds(requestTimeout()), cb);
+
   {
     absl::MutexLock lk(mtx.get());
     cv->Wait(mtx.get());
@@ -509,6 +515,73 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, 
std::error_code& ec, Tra
   return send_receipt;
 }
 
+RecallReceipt ProducerImpl::recall(const std::string& topic, std::string& 
recall_handle, std::error_code& ec) noexcept {
+  ensureRunning(ec);
+  if (ec) {
+    SPDLOG_WARN("Producer is not running");
+    return RecallReceipt{};
+  }
+
+  auto topic_publish_info = getPublishInfo(topic);
+  if (!topic_publish_info) {
+    SPDLOG_WARN("Route of topic[{}] is not found", topic);
+    ec = ErrorCode::NotFound;
+    return RecallReceipt{};
+  }
+
+  std::vector<rmq::MessageQueue> message_queue_list;
+  absl::optional<std::string> message_group{};
+
+  if (!topic_publish_info->selectMessageQueues(message_group, 
message_queue_list)) {
+    SPDLOG_WARN("Failed to select an addressable message queue for timer 
topic[{}]", topic);
+    ec = ErrorCode::NotFound;
+    return RecallReceipt{};
+  }
+
+  const std::string& target = urlOf(message_queue_list.front());
+  if (target.empty()) {
+    SPDLOG_WARN("Failed to resolve broker address from MessageQueue");
+    ec = ErrorCode::BadGateway;
+    return RecallReceipt{};
+  }
+
+  RecallMessageRequest request;
+  request.mutable_topic()->set_resource_namespace(resourceNamespace());
+  request.mutable_topic()->set_name(topic);
+  request.set_recall_handle(recall_handle);
+
+  Metadata metadata;
+  Signature::sign(client_config_, metadata);
+
+  auto mtx = std::make_shared<absl::Mutex>();
+  auto cv = std::make_shared<absl::CondVar>();
+
+  RecallReceipt recall_receipt;
+  auto callback =
+      [&, mtx, cv, topic](const std::error_code& code, const 
RecallMessageResponse& response) {
+
+    ec = code;
+    if (!ec) {
+      recall_receipt.message_id = response.message_id();
+    }
+
+    {
+      absl::MutexLock lk(mtx.get());
+      cv->SignalAll();
+    }
+  };
+
+  client_manager_->recallMessage(
+      target, metadata, request, absl::ToChronoMilliseconds(requestTimeout()), 
callback);
+
+  {
+    absl::MutexLock lk(mtx.get());
+    cv->Wait(mtx.get());
+  }
+
+  return recall_receipt;
+}
+
 void ProducerImpl::getPublishInfoAsync(const std::string& topic, const 
PublishInfoCallback& cb) {
   TopicPublishInfoPtr ptr;
   {
diff --git a/cpp/source/rocketmq/PushConsumer.cpp 
b/cpp/source/rocketmq/PushConsumer.cpp
index 2b1c1566..a0fd0055 100644
--- a/cpp/source/rocketmq/PushConsumer.cpp
+++ b/cpp/source/rocketmq/PushConsumer.cpp
@@ -43,6 +43,7 @@ PushConsumer PushConsumerBuilder::build() {
   }
   impl->consumeThreadPoolSize(consume_thread_);
   
impl->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
+  impl->withResourceNamespace(configuration_.resourceNamespace());
   impl->withSsl(configuration_.withSsl());
   impl->withCredentialsProvider(configuration_.credentialsProvider());
   impl->withRequestTimeout(configuration_.requestTimeout());
diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp 
b/cpp/source/rocketmq/PushConsumerImpl.cpp
index 505854db..712ac814 100644
--- a/cpp/source/rocketmq/PushConsumerImpl.cpp
+++ b/cpp/source/rocketmq/PushConsumerImpl.cpp
@@ -70,13 +70,14 @@ void PushConsumerImpl::start() {
     return;
   }
 
+  client_config_.subscriber.group.set_resource_namespace(resourceNamespace());
   client_manager_->addClientObserver(shared_from_this());
 
   fetchRoutes();
 
-  SPDLOG_INFO("start concurrently consume service: {}", 
client_config_.subscriber.group.name());
-  consume_message_service_ =
-      std::make_shared<ConsumeMessageServiceImpl>(shared_from_this(), 
consume_thread_pool_size_, message_listener_);
+  SPDLOG_INFO("Start concurrently consume service: {}", 
client_config_.subscriber.group.name());
+  consume_message_service_ = std::make_shared<ConsumeMessageServiceImpl>(
+      shared_from_this(), consume_thread_pool_size_, message_listener_);
   consume_message_service_->start();
 
   // Heartbeat depends on initialization of consume-message-service
@@ -91,7 +92,8 @@ void PushConsumerImpl::start() {
   };
 
   scan_assignment_handle_ = client_manager_->getScheduler()->schedule(
-      scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME, 
std::chrono::milliseconds(100), std::chrono::seconds(5));
+      scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME,
+      std::chrono::milliseconds(100), std::chrono::seconds(5));
   SPDLOG_INFO("PushConsumer started, groupName={}", 
client_config_.subscriber.group.name());
 
   auto collect_stats_functor = [consumer_weak_ptr] {
@@ -101,8 +103,9 @@ void PushConsumerImpl::start() {
     }
   };
 
-  collect_stats_handle_ = 
client_manager_->getScheduler()->schedule(collect_stats_functor, 
COLLECT_STATS_TASK_NAME,
-                                                                    
std::chrono::seconds(3), std::chrono::seconds(3));
+  collect_stats_handle_ = client_manager_->getScheduler()->schedule(
+      collect_stats_functor, COLLECT_STATS_TASK_NAME,
+      std::chrono::seconds(3), std::chrono::seconds(3));
 }
 
 const char* PushConsumerImpl::SCAN_ASSIGNMENT_TASK_NAME = 
"scan-assignment-task";
@@ -191,10 +194,33 @@ void PushConsumerImpl::scanAssignments() {
 }
 
 bool PushConsumerImpl::selectBroker(const TopicRouteDataPtr& topic_route_data, 
std::string& broker_host) {
+
+  absl::flat_hash_set<std::string> endpoints;
+  endpointsInUse(endpoints);
+  if (endpoints.empty()) {
+    SPDLOG_WARN("No broker is available");
+    return false;
+  }
+
+  // preference for selecting the access point filled in by the user
   if (topic_route_data && !topic_route_data->messageQueues().empty()) {
+    uint32_t queue_count = topic_route_data->messageQueues().size();
     uint32_t index = TopicAssignment::getAndIncreaseQueryWhichBroker();
-    for (uint32_t i = index; i < index + 
topic_route_data->messageQueues().size(); i++) {
-      auto message_queue = topic_route_data->messageQueues().at(i % 
topic_route_data->messageQueues().size());
+    for (uint32_t i = index; i < index + queue_count; i++) {
+      auto message_queue = topic_route_data->messageQueues().at(i % 
queue_count);
+      if (MixAll::MASTER_BROKER_ID != message_queue.broker().id() || 
!readable(message_queue.permission())) {
+        continue;
+      }
+
+      std::string current_host = urlOf(message_queue);
+      if (endpoints.contains(current_host)) {
+        broker_host = current_host;
+        return true;
+      }
+    }
+
+    for (uint32_t i = index; i < index + queue_count; i++) {
+      auto message_queue = topic_route_data->messageQueues().at(i % 
queue_count);
       if (MixAll::MASTER_BROKER_ID != message_queue.broker().id() || 
!readable(message_queue.permission())) {
         continue;
       }
@@ -527,8 +553,7 @@ void PushConsumerImpl::buildClientSettings(rmq::Settings& 
settings) {
 
 void PushConsumerImpl::prepareHeartbeatData(HeartbeatRequest& request) {
   request.set_client_type(rmq::ClientType::PUSH_CONSUMER);
-  request.mutable_group()->set_resource_namespace(resourceNamespace());
-  request.mutable_group()->set_name(groupName());
+  request.mutable_group()->CopyFrom(client_config_.subscriber.group);
 }
 
 void PushConsumerImpl::notifyClientTermination() {
diff --git a/cpp/source/rocketmq/SendContext.cpp 
b/cpp/source/rocketmq/SendContext.cpp
index bd97384d..5f6b8fdc 100644
--- a/cpp/source/rocketmq/SendContext.cpp
+++ b/cpp/source/rocketmq/SendContext.cpp
@@ -57,6 +57,7 @@ void SendContext::onSuccess(const SendResult& send_result) 
noexcept {
   send_receipt.target = send_result.target;
   send_receipt.message_id = send_result.message_id;
   send_receipt.transaction_id = send_result.transaction_id;
+  send_receipt.recall_handle = send_result.recall_handle;
   send_receipt.message = std::move(message_);
   callback_(send_result.ec, send_receipt);
 }
diff --git a/cpp/source/rocketmq/SimpleConsumer.cpp 
b/cpp/source/rocketmq/SimpleConsumer.cpp
index 2b5e79b0..8acf16ac 100644
--- a/cpp/source/rocketmq/SimpleConsumer.cpp
+++ b/cpp/source/rocketmq/SimpleConsumer.cpp
@@ -125,6 +125,7 @@ SimpleConsumer SimpleConsumerBuilder::build() {
 
   simple_consumer.impl_->withRequestTimeout(configuration_.requestTimeout());
   
simple_consumer.impl_->withNameServerResolver(std::make_shared<StaticNameServerResolver>(configuration_.endpoints()));
+  
simple_consumer.impl_->withResourceNamespace(configuration_.resourceNamespace());
   
simple_consumer.impl_->withCredentialsProvider(configuration_.credentialsProvider());
   simple_consumer.impl_->withReceiveMessageTimeout(await_duration_);
   simple_consumer.impl_->withSsl(configuration_.withSsl());
diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp 
b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
index e0a78eeb..2441bb2d 100644
--- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp
+++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp
@@ -85,6 +85,7 @@ void SimpleConsumerImpl::start() {
   ClientImpl::start();
   State expected = State::STARTING;
   if (state_.compare_exchange_strong(expected, State::STARTED, 
std::memory_order_relaxed)) {
+    
client_config_.subscriber.group.set_resource_namespace(resourceNamespace());
     refreshAssignments();
 
     std::weak_ptr<SimpleConsumerImpl> consumer(shared_from_this());
@@ -301,8 +302,23 @@ void SimpleConsumerImpl::receive(std::size_t limit,
       callback(ec, messages);
       return;
     }
-    std::size_t idx = ++assignment_index_ % assignments_.size();
-    assignment.CopyFrom(assignments_[idx]);
+
+    // choose assign allow readable
+    std::size_t start_index = ++assignment_index_ % assignments_.size();
+    for (std::size_t i = 0; i < assignments_.size(); ++i) {
+      const auto& assign = assignments_[(start_index + i) % 
assignments_.size()];
+      if (readable(assign.message_queue().permission())) {
+        assignment.CopyFrom(assign);
+        break;
+      }
+    }
+
+    if (!assignment.IsInitialized()) {
+      std::error_code ec = ErrorCode::NotFound;
+      std::vector<MessageConstSharedPtr> messages;
+      callback(ec, messages);
+      return;
+    }
   }
 
   const auto& target = urlOf(assignment.message_queue());
diff --git a/cpp/source/rocketmq/TopicPublishInfo.cpp 
b/cpp/source/rocketmq/TopicPublishInfo.cpp
index 6eeece4a..a94013a5 100644
--- a/cpp/source/rocketmq/TopicPublishInfo.cpp
+++ b/cpp/source/rocketmq/TopicPublishInfo.cpp
@@ -70,8 +70,13 @@ bool 
TopicPublishInfo::selectMessageQueues(absl::optional<std::string>     messa
     absl::MutexLock lock(&queue_list_mtx_);
     for (std::vector<rmq::MessageQueue>::size_type i = 0; i < 
queue_list_.size(); ++i) {
       const rmq::MessageQueue& message_queue = queue_list_[index++ % 
(queue_list_.size())];
+      if (!writable(message_queue.permission())) {
+        continue;
+      }
+
       if (!producer->isEndpointIsolated(urlOf(message_queue))) {
-        auto search = std::find_if(result.begin(), result.end(), [&](const 
rmq::MessageQueue& item) {
+        auto search = std::find_if(
+            result.begin(), result.end(), [&](const rmq::MessageQueue& item) {
           return item.broker().name() == message_queue.broker().name();
         });
         if (std::end(result) == search) {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h 
b/cpp/source/rocketmq/include/ClientImpl.h
index 25cef46c..9b6a6502 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -90,6 +90,10 @@ public:
     client_config_.credentials_provider = std::move(credentials_provider);
   }
 
+  void withResourceNamespace(std::string resource_namespace) {
+    client_config_.resource_namespace = std::move(resource_namespace);
+  }
+
   void withRequestTimeout(std::chrono::milliseconds request_timeout) {
     client_config_.request_timeout = absl::FromChrono(request_timeout);
   }
diff --git a/cpp/source/rocketmq/include/ProcessQueueImpl.h 
b/cpp/source/rocketmq/include/ProcessQueueImpl.h
index 822f7c04..75a5d2d3 100644
--- a/cpp/source/rocketmq/include/ProcessQueueImpl.h
+++ b/cpp/source/rocketmq/include/ProcessQueueImpl.h
@@ -128,6 +128,7 @@ private:
   std::atomic<uint64_t> cached_message_memory_;
 
   void popMessage();
+
   void wrapPopMessageRequest(absl::flat_hash_map<std::string, std::string>& 
metadata,
                              rmq::ReceiveMessageRequest& request);
 
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h 
b/cpp/source/rocketmq/include/ProducerImpl.h
index 2cb9c44e..237c242e 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -30,6 +30,7 @@
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
 #include "rocketmq/Message.h"
+#include "rocketmq/RecallReceipt.h"
 #include "rocketmq/SendCallback.h"
 #include "rocketmq/SendReceipt.h"
 #include "rocketmq/TransactionChecker.h"
@@ -79,6 +80,11 @@ public:
    */
   SendReceipt send(MessageConstPtr message, std::error_code& ec, Transaction& 
transaction);
 
+  /**
+   * Recall message synchronously, only delay message is supported for now.
+   */
+  RecallReceipt recall(const std::string& topic, std::string& recall_handle, 
std::error_code& ec) noexcept;
+
   /**
    * Check if the RPC client for the target host is isolated or not
    * @param endpoint Address of target host.
diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h 
b/cpp/source/rocketmq/include/PushConsumerImpl.h
index 7a4ff1a3..8f360fda 100644
--- a/cpp/source/rocketmq/include/PushConsumerImpl.h
+++ b/cpp/source/rocketmq/include/PushConsumerImpl.h
@@ -72,7 +72,7 @@ public:
 
   void scanAssignments() LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
 
-  static bool selectBroker(const TopicRouteDataPtr& route, std::string& 
broker_host);
+  bool selectBroker(const TopicRouteDataPtr& route, std::string& broker_host);
 
   void wrapQueryAssignmentRequest(const std::string& topic,
                                   const std::string& consumer_group,


Reply via email to