This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch fifo_opt in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit f705f03401d84cc40404b9a15f89c625697ae35e Author: Li Zhanhui <[email protected]> AuthorDate: Sun Apr 14 17:00:26 2024 +0800 Prepare to optimize FIFO publishing Signed-off-by: Li Zhanhui <[email protected]> --- cpp/include/rocketmq/SendReceipt.h | 6 +- cpp/source/client/ClientManagerImpl.cpp | 88 ++++++++++++-------------- cpp/source/client/include/ClientManager.h | 10 +-- cpp/source/client/include/ClientManagerImpl.h | 14 ++-- cpp/source/client/include/SendResult.h | 17 +++++ cpp/source/client/include/SendResultCallback.h | 11 ++++ cpp/source/rocketmq/ProducerImpl.cpp | 21 ++---- cpp/source/rocketmq/SendContext.cpp | 13 ++-- cpp/source/rocketmq/include/SendContext.h | 10 +-- 9 files changed, 96 insertions(+), 94 deletions(-) diff --git a/cpp/include/rocketmq/SendReceipt.h b/cpp/include/rocketmq/SendReceipt.h index 489df5ec..6c95ecc0 100644 --- a/cpp/include/rocketmq/SendReceipt.h +++ b/cpp/include/rocketmq/SendReceipt.h @@ -16,20 +16,18 @@ */ #pragma once -#include <cstdint> #include <string> -#include <utility> #include "RocketMQ.h" ROCKETMQ_NAMESPACE_BEGIN struct SendReceipt { + std::string target; + std::string message_id; std::string transaction_id; - - std::string target; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index 643d3741..7d724c7b 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -27,32 +27,27 @@ #include "InvocationContext.h" #include "LogInterceptor.h" #include "LogInterceptorFactory.h" -#include "MessageExt.h" -#include "MetadataConstants.h" #include "MixAll.h" #include "Protocol.h" #include "ReceiveMessageContext.h" #include "RpcClient.h" #include "RpcClientImpl.h" #include "Scheduler.h" -#include "TlsHelper.h" +#include "SchedulerImpl.h" #include "UtilAll.h" -#include "apache/rocketmq/v2/definition.pb.h" #include "google/protobuf/util/time_util.h" #include "grpcpp/create_channel.h" #include "rocketmq/ErrorCode.h" -#include "rocketmq/Logger.h" -#include "rocketmq/SendReceipt.h" #include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN -ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool withSsl) +ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_ssl) : scheduler_(std::make_shared<SchedulerImpl>()), resource_namespace_(std::move(resource_namespace)), state_(State::CREATED), callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())), - withSsl_(withSsl) { + with_ssl_(with_ssl) { certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>(); tls_channel_credential_options_.set_verify_server_certs(false); tls_channel_credential_options_.set_check_call_host(false); @@ -285,7 +280,7 @@ void ClientManagerImpl::doHeartbeat() { bool ClientManagerImpl::send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, - SendCallback cb) { + SendResultCallback cb) { assert(cb); SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString()); RpcClientSharedPtr client = getRpcClient(target_host); @@ -311,15 +306,14 @@ bool ClientManagerImpl::send(const std::string& target_host, return; } - SendReceipt send_receipt = {}; - send_receipt.target = target_host; - std::error_code ec; + SendResult send_result = {}; + send_result.target = target_host; if (!invocation_context->status.ok()) { SPDLOG_WARN("Failed to send message to {} due to gRPC error. gRPC code: {}, gRPC error message: {}", invocation_context->remote_address, invocation_context->status.error_code(), invocation_context->status.error_message()); - ec = ErrorCode::RequestTimeout; - cb(ec, send_receipt); + send_result.ec = ErrorCode::RequestTimeout; + cb(send_result); return; } @@ -328,8 +322,8 @@ bool ClientManagerImpl::send(const std::string& target_host, case rmq::Code::OK: { if (!invocation_context->response.entries().empty()) { auto first = invocation_context->response.entries().begin(); - send_receipt.message_id = first->message_id(); - send_receipt.transaction_id = first->transaction_id(); + send_result.message_id = first->message_id(); + send_result.transaction_id = first->transaction_id(); } else { SPDLOG_ERROR("Unexpected send-message-response: {}", invocation_context->response.DebugString()); } @@ -338,127 +332,127 @@ bool ClientManagerImpl::send(const std::string& target_host, case rmq::Code::ILLEGAL_TOPIC: { SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalTopic; + send_result.ec = ErrorCode::IllegalTopic; break; } case rmq::Code::ILLEGAL_MESSAGE_TAG: { SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageTag; + send_result.ec = ErrorCode::IllegalMessageTag; break; } case rmq::Code::ILLEGAL_MESSAGE_KEY: { SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageKey; + send_result.ec = ErrorCode::IllegalMessageKey; break; } case rmq::Code::ILLEGAL_MESSAGE_GROUP: { SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageGroup; + send_result.ec = ErrorCode::IllegalMessageGroup; break; } case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: { SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::IllegalMessageProperty; + send_result.ec = ErrorCode::IllegalMessageProperty; break; } case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: { SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::MessagePropertiesTooLarge; + send_result.ec = ErrorCode::MessagePropertiesTooLarge; break; } case rmq::Code::MESSAGE_BODY_TOO_LARGE: { SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::MessageBodyTooLarge; + send_result.ec = ErrorCode::MessageBodyTooLarge; break; } case rmq::Code::TOPIC_NOT_FOUND: { SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::TopicNotFound; + send_result.ec = ErrorCode::TopicNotFound; break; } case rmq::Code::NOT_FOUND: { SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::NotFound; + send_result.ec = ErrorCode::NotFound; break; } case rmq::Code::UNAUTHORIZED: { SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::Unauthorized; + send_result.ec = ErrorCode::Unauthorized; break; } case rmq::Code::FORBIDDEN: { SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::Forbidden; + send_result.ec = ErrorCode::Forbidden; break; } case rmq::Code::MESSAGE_CORRUPTED: { SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::MessageCorrupted; + send_result.ec = ErrorCode::MessageCorrupted; break; } case rmq::Code::TOO_MANY_REQUESTS: { SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::TooManyRequests; + send_result.ec = ErrorCode::TooManyRequests; break; } case rmq::Code::INTERNAL_SERVER_ERROR: { SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::InternalServerError; + send_result.ec = ErrorCode::InternalServerError; break; } case rmq::Code::HA_NOT_AVAILABLE: { SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::InternalServerError; + send_result.ec = ErrorCode::InternalServerError; break; } case rmq::Code::PROXY_TIMEOUT: { SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::GatewayTimeout; + send_result.ec = ErrorCode::GatewayTimeout; break; } case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: { SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::GatewayTimeout; + send_result.ec = ErrorCode::GatewayTimeout; break; } case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: { SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::GatewayTimeout; + send_result.ec = ErrorCode::GatewayTimeout; break; } case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: { SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address, invocation_context->response.DebugString()); - ec = ErrorCode::MessagePropertyConflictWithType; + send_result.ec = ErrorCode::MessagePropertyConflictWithType; break; } default: { SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address); - ec = ErrorCode::NotSupported; + send_result.ec = ErrorCode::NotSupported; break; } } - cb(ec, send_receipt); + cb(send_result); }; invocation_context->callback = completion_callback; @@ -476,7 +470,7 @@ std::shared_ptr<grpc::Channel> ClientManagerImpl::createChannel(const std::strin std::vector<std::unique_ptr<grpc::experimental::ClientInterceptorFactoryInterface>> interceptor_factories; interceptor_factories.emplace_back(absl::make_unique<LogInterceptorFactory>()); auto channel = grpc::experimental::CreateCustomChannelWithInterceptors( - target_host, withSsl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, + target_host, with_ssl_ ? channel_credential_ : grpc::InsecureChannelCredentials(), channel_arguments_, std::move(interceptor_factories)); return channel; } @@ -520,28 +514,28 @@ void ClientManagerImpl::cleanRpcClients() { rpc_clients_.clear(); } -SendReceipt ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue, - const SendMessageResponse& response, - std::error_code& ec) { - SendReceipt send_receipt; +SendResult ClientManagerImpl::processSendResponse(const rmq::MessageQueue& message_queue, + const SendMessageResponse& response, + std::error_code& ec) { + SendResult send_result; switch (response.status().code()) { case rmq::Code::OK: { assert(response.entries_size() > 0); - send_receipt.message_id = response.entries().begin()->message_id(); - send_receipt.transaction_id = response.entries().begin()->transaction_id(); - return send_receipt; + send_result.message_id = response.entries().begin()->message_id(); + send_result.transaction_id = response.entries().begin()->transaction_id(); + return send_result; } case rmq::Code::ILLEGAL_TOPIC: { ec = ErrorCode::BadRequest; - return send_receipt; + return send_result; } default: { // TODO: handle other cases. break; } } - return send_receipt; + return send_result; } void ClientManagerImpl::addClientObserver(std::weak_ptr<Client> client) { diff --git a/cpp/source/client/include/ClientManager.h b/cpp/source/client/include/ClientManager.h index 56325fa4..02b232b2 100644 --- a/cpp/source/client/include/ClientManager.h +++ b/cpp/source/client/include/ClientManager.h @@ -22,14 +22,12 @@ #include <system_error> #include "Client.h" -#include "MessageExt.h" #include "Metadata.h" #include "ReceiveMessageCallback.h" #include "RpcClient.h" #include "Scheduler.h" -#include "TelemetryBidiReactor.h" +#include "SendResultCallback.h" #include "TopicRouteData.h" -#include "rocketmq/SendCallback.h" #include "rocketmq/State.h" ROCKETMQ_NAMESPACE_BEGIN @@ -93,8 +91,10 @@ public: virtual void receiveMessage(const std::string& target, const Metadata& metadata, const ReceiveMessageRequest& request, std::chrono::milliseconds timeout, ReceiveMessageCallback callback) = 0; - virtual bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, - SendCallback cb) = 0; + virtual bool send(const std::string& target_host, + const Metadata& metadata, + SendMessageRequest& request, + SendResultCallback cb) = 0; virtual std::error_code notifyClientTermination(const std::string& target_host, const Metadata& metadata, const NotifyClientTerminationRequest& request, diff --git a/cpp/source/client/include/ClientManagerImpl.h b/cpp/source/client/include/ClientManagerImpl.h index 653fcad3..5f1b27ca 100644 --- a/cpp/source/client/include/ClientManagerImpl.h +++ b/cpp/source/client/include/ClientManagerImpl.h @@ -20,7 +20,6 @@ #include <chrono> #include <cstdint> #include <functional> -#include <future> #include <memory> #include <string> #include <system_error> @@ -29,18 +28,13 @@ #include "Client.h" #include "ClientManager.h" #include "InsecureCertificateVerifier.h" -#include "InvocationContext.h" #include "ReceiveMessageCallback.h" #include "RpcClientImpl.h" -#include "SchedulerImpl.h" -#include "SendMessageContext.h" -#include "TelemetryBidiReactor.h" #include "ThreadPoolImpl.h" #include "TopicRouteData.h" #include "absl/base/thread_annotations.h" #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" -#include "absl/strings/string_view.h" #include "absl/synchronization/mutex.h" #include "rocketmq/State.h" @@ -54,7 +48,7 @@ public: * effectively. * @param resource_namespace Abstract resource namespace, in which this client manager lives. */ - explicit ClientManagerImpl(std::string resource_namespace, bool withSsl = true); + explicit ClientManagerImpl(std::string resource_namespace, bool with_ssl = true); ~ClientManagerImpl() override; @@ -89,7 +83,7 @@ public: bool send(const std::string& target_host, const Metadata& metadata, SendMessageRequest& request, - SendCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_); + SendResultCallback cb) override LOCKS_EXCLUDED(rpc_clients_mtx_); /** * Get a RpcClient according to the given target hosts, which follows scheme specified @@ -105,7 +99,7 @@ public: RpcClientSharedPtr getRpcClient(const std::string& target_host, bool need_heartbeat = true) override LOCKS_EXCLUDED(rpc_clients_mtx_); - static SendReceipt processSendResponse(const rmq::MessageQueue& message_queue, + static SendResult processSendResponse(const rmq::MessageQueue& message_queue, const SendMessageResponse& response, std::error_code& ec); @@ -242,7 +236,7 @@ private: grpc::ChannelArguments channel_arguments_; bool trace_{false}; - bool withSsl_; + bool with_ssl_; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/include/SendResult.h b/cpp/source/client/include/SendResult.h new file mode 100644 index 00000000..d5f85277 --- /dev/null +++ b/cpp/source/client/include/SendResult.h @@ -0,0 +1,17 @@ +#pragma once + +#include <system_error> + +#include "rocketmq/RocketMQ.h" + +ROCKETMQ_NAMESPACE_BEGIN + +struct SendResult { + std::error_code ec; + std::string target; + + std::string message_id; + std::string transaction_id; +}; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/client/include/SendResultCallback.h b/cpp/source/client/include/SendResultCallback.h new file mode 100644 index 00000000..3dd5bda7 --- /dev/null +++ b/cpp/source/client/include/SendResultCallback.h @@ -0,0 +1,11 @@ +#pragma once + +#include <functional> + +#include "SendResult.h" + +ROCKETMQ_NAMESPACE_BEGIN + +using SendResultCallback = std::function<void(const SendResult&)>; + +ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 73130161..d5e6e485 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -17,38 +17,27 @@ #include "ProducerImpl.h" #include <algorithm> -#include <apache/rocketmq/v2/definition.pb.h> - #include <atomic> #include <cassert> #include <chrono> -#include <limits> #include <memory> #include <system_error> #include <utility> -#include "Client.h" -#include "MessageGroupQueueSelector.h" -#include "MetadataConstants.h" +#include "apache/rocketmq/v2/definition.pb.h" #include "MixAll.h" #include "Protocol.h" #include "PublishInfoCallback.h" -#include "RpcClient.h" #include "SendContext.h" -#include "SendMessageContext.h" #include "Signature.h" -#include "Tag.h" #include "TracingUtility.h" #include "TransactionImpl.h" -#include "UniqueIdGenerator.h" #include "UtilAll.h" -#include "absl/strings/str_join.h" #include "opencensus/trace/propagation/trace_context.h" #include "opencensus/trace/span.h" #include "rocketmq/ErrorCode.h" #include "rocketmq/Message.h" #include "rocketmq/SendReceipt.h" -#include "rocketmq/Tracing.h" #include "rocketmq/Transaction.h" #include "rocketmq/TransactionChecker.h" @@ -338,12 +327,12 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) { Metadata metadata; Signature::sign(client_config_, metadata); - auto callback = [context](const std::error_code& ec, const SendReceipt& send_receipt) { - if (ec) { - context->onFailure(ec); + auto callback = [context](const SendResult& send_result) { + if (send_result.ec) { + context->onFailure(send_result.ec); return; } - context->onSuccess(send_receipt); + context->onSuccess(send_result); }; client_manager_->send(target, metadata, request, callback); diff --git a/cpp/source/rocketmq/SendContext.cpp b/cpp/source/rocketmq/SendContext.cpp index 385a1a99..00f51c10 100644 --- a/cpp/source/rocketmq/SendContext.cpp +++ b/cpp/source/rocketmq/SendContext.cpp @@ -22,15 +22,14 @@ #include "PublishStats.h" #include "Tag.h" #include "TransactionImpl.h" -#include "opencensus/trace/propagation/trace_context.h" #include "opencensus/trace/span.h" -#include "rocketmq/Logger.h" #include "rocketmq/SendReceipt.h" #include "spdlog/spdlog.h" +#include "rocketmq/ErrorCode.h" ROCKETMQ_NAMESPACE_BEGIN -void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept { +void SendContext::onSuccess(const SendResult& send_result) noexcept { { // Mark end of send-message span. span_.SetStatus(opencensus::trace::StatusCode::OK); @@ -54,8 +53,12 @@ void SendContext::onSuccess(const SendReceipt& send_receipt) noexcept { } // send_receipt.traceContext(opencensus::trace::propagation::ToTraceParentHeader(span_.context())); - std::error_code ec; - callback_(ec, send_receipt); + SendReceipt send_receipt = {}; + send_receipt.target = send_result.target; + send_receipt.message_id = send_result.message_id; + send_receipt.transaction_id = send_result.transaction_id; + + callback_(send_result.ec, send_receipt); } void SendContext::onFailure(const std::error_code& ec) noexcept { diff --git a/cpp/source/rocketmq/include/SendContext.h b/cpp/source/rocketmq/include/SendContext.h index 4c05cebe..4067532b 100644 --- a/cpp/source/rocketmq/include/SendContext.h +++ b/cpp/source/rocketmq/include/SendContext.h @@ -19,16 +19,12 @@ #include <memory> #include <system_error> -#include "absl/container/flat_hash_map.h" -#include "absl/synchronization/mutex.h" -#include "opencensus/trace/span.h" - #include "Protocol.h" +#include "SendResult.h" #include "TransactionImpl.h" -#include "rocketmq/ErrorCode.h" +#include "opencensus/trace/span.h" #include "rocketmq/Message.h" #include "rocketmq/SendCallback.h" -#include "rocketmq/SendReceipt.h" ROCKETMQ_NAMESPACE_BEGIN @@ -47,7 +43,7 @@ public: span_(opencensus::trace::Span::BlankSpan()) { } - void onSuccess(const SendReceipt& send_receipt) noexcept; + void onSuccess(const SendResult& send_result) noexcept; void onFailure(const std::error_code& ec) noexcept;
