This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch cpp in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit efdd9e10b866ae0d4badb90a50d6419b28cae210 Author: Li Zhanhui <[email protected]> AuthorDate: Thu Jun 30 16:53:08 2022 +0800 Finish error handling of SendMessage --- cpp/api/rocketmq/ErrorCode.h | 2 +- cpp/proto/apache/rocketmq/v2/definition.proto | 55 +++++++---- cpp/src/main/cpp/base/ErrorCategory.cpp | 22 ++++- cpp/src/main/cpp/client/ClientManagerImpl.cpp | 109 +++++++++++++++++++-- .../main/cpp/client/ReceiveMessageStreamReader.cpp | 2 +- 5 files changed, 158 insertions(+), 32 deletions(-) diff --git a/cpp/api/rocketmq/ErrorCode.h b/cpp/api/rocketmq/ErrorCode.h index 14ec504..e911533 100644 --- a/cpp/api/rocketmq/ErrorCode.h +++ b/cpp/api/rocketmq/ErrorCode.h @@ -151,7 +151,7 @@ enum class ErrorCode : int { * amount of time. * */ - TooManyRequest = 42900, + TooManyRequests = 42900, /** * @brief The server is unwilling to process the request because either an diff --git a/cpp/proto/apache/rocketmq/v2/definition.proto b/cpp/proto/apache/rocketmq/v2/definition.proto index ab1fd09..67520fe 100644 --- a/cpp/proto/apache/rocketmq/v2/definition.proto +++ b/cpp/proto/apache/rocketmq/v2/definition.proto @@ -316,31 +316,30 @@ enum Code { ILLEGAL_MESSAGE_GROUP = 40006; // Format of message property key is illegal. ILLEGAL_MESSAGE_PROPERTY_KEY = 40007; - // Message properties total size exceeds the threshold. - MESSAGE_PROPERTIES_TOO_LARGE = 40008; - // Message body size exceeds the threshold. - MESSAGE_BODY_TOO_LARGE = 40009; // Transaction id is invalid. - INVALID_TRANSACTION_ID = 40010; + INVALID_TRANSACTION_ID = 40008; // Format of message id is illegal. - ILLEGAL_MESSAGE_ID = 40011; + ILLEGAL_MESSAGE_ID = 40009; // Format of filter expression is illegal. - ILLEGAL_FILTER_EXPRESSION = 40012; + ILLEGAL_FILTER_EXPRESSION = 40010; // Receipt handle of message is invalid. - INVALID_RECEIPT_HANDLE = 40013; - // Message property is not match the message type. - MESSAGE_PROPERTY_DOES_NOT_MATCH_MESSAGE_TYPE = 40014; + INVALID_RECEIPT_HANDLE = 40011; + // Message property conflicts with its type. + MESSAGE_PROPERTY_CONFLICT_WITH_TYPE = 40012; // Client type could not be recognized. - UNRECOGNIZED_CLIENT_TYPE = 40015; + UNRECOGNIZED_CLIENT_TYPE = 40013; // Message is corrupted. - MESSAGE_CORRUPTED = 40016; + MESSAGE_CORRUPTED = 40014; // Request is rejected due to missing of x-mq-client-id header. - CLIENT_ID_REQUIRED = 40017; + CLIENT_ID_REQUIRED = 40015; // Generic code indicates that the client request lacks valid authentication // credentials for the requested resource. UNAUTHORIZED = 40100; + // Generic code indicates that the account is suspended due to overdue of payment. + PAYMENT_REQUIRED = 40200; + // Generic code for the case that user does not have the permission to operate. FORBIDDEN = 40300; @@ -353,10 +352,29 @@ enum Code { // Consumer group resource does not exist. CONSUMER_GROUP_NOT_FOUND = 40403; + // Generic code representing client side timeout when connecting to, reading data from, or write data to server. + REQUEST_TIMEOUT = 40800; + + // Generic code represents that the request entity is larger than limits defined by server. + PAYLOAD_TOO_LARGE = 41300; + // Message body size exceeds the threshold. + MESSAGE_BODY_TOO_LARGE = 41301; + + // Generic code for use cases where pre-conditions are not met. + // For example, if a producer instance is used to publish messages without prior start() invocation, + // this error code will be raised. + PRECONDITION_FAILED = 42800; + // Generic code indicates that too many requests are made in short period of duration. // Requests are throttled. TOO_MANY_REQUESTS = 42900; + // Generic code for the case that the server is unwilling to process the request because its header fields are too large. + // The request may be resubmitted after reducing the size of the request header fields. + REQUEST_HEADER_FIELDS_TOO_LARGE = 43100; + // Message properties total size exceeds the threshold. + MESSAGE_PROPERTIES_TOO_LARGE = 43101; + // Generic code indicates that server/client encountered an unexpected // condition that prevented it from fulfilling the request. INTERNAL_ERROR = 50000; @@ -377,17 +395,16 @@ enum Code { // functionality required to fulfill the request. NOT_IMPLEMENTED = 50100; - // Generic code for timeout. - TIMEOUT = 50400; + // Generic code represents that the server, which acts as a gateway or proxy, + // does not get an satisfied response in time from its upstream servers. + // See https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/504 + PROXY_TIMEOUT = 50400; // Message persistence timeout. MASTER_PERSISTENCE_TIMEOUT = 50401; // Slave persistence timeout. SLAVE_PERSISTENCE_TIMEOUT = 50402; - // Code indicates that the server, while acting as a gateway or proxy, - // did not get a response in time from the upstream server that - // it needed in order to complete the request. - PROXY_TIMEOUT = 50403; + // Generic code for unsupported operation. UNSUPPORTED = 50500; // Operation is not allowed in current version. VERSION_UNSUPPORTED = 50501; diff --git a/cpp/src/main/cpp/base/ErrorCategory.cpp b/cpp/src/main/cpp/base/ErrorCategory.cpp index 4f1b290..5ca2528 100644 --- a/cpp/src/main/cpp/base/ErrorCategory.cpp +++ b/cpp/src/main/cpp/base/ErrorCategory.cpp @@ -16,6 +16,8 @@ */ #include "rocketmq/ErrorCategory.h" +#include "rocketmq/ErrorCode.h" + ROCKETMQ_NAMESPACE_BEGIN std::string ErrorCategory::message(int code) const { @@ -65,7 +67,7 @@ std::string ErrorCategory::message(int code) const { case ErrorCode::PreconditionRequired: return "State of dependent procedure is not right"; - case ErrorCode::TooManyRequest: + case ErrorCode::TooManyRequests: return "Quota exchausted. The user has sent too many requests in a given " "amount of time."; @@ -107,6 +109,24 @@ std::string ErrorCategory::message(int code) const { case ErrorCode::InsufficientStorage: return "The server is unable to store the representation needed to " "complete the request."; + + case ErrorCode::MultipleResults: + return "Multiple results are available"; + + case ErrorCode::IllegalAccessPoint: + return "Access point is either malformed or invalid"; + + case ErrorCode::IllegalTopic: { + return "Topic is illegal either due to length is too long or invalid character is included"; + } + + case ErrorCode::IllegalConsumerGroup: { + return "ConsumerGroup is illegal due to its length is too long or invalid character is included"; + } + + case ErrorCode::IllegalMessageTag: { + return "Format of message tag is illegal."; + } } return ""; diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp index e4be88b..b6e8a2e 100644 --- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp +++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp @@ -242,7 +242,7 @@ void ClientManagerImpl::heartbeat(const std::string& target_host, const Metadata case rmq::Code::TOO_MANY_REQUESTS: { SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::TooManyRequest; + ec = ErrorCode::TooManyRequests; cb(ec, invocation_context->response); break; } @@ -352,33 +352,122 @@ bool ClientManagerImpl::send(const std::string& target_host, const Metadata& met send_receipt.message_id = invocation_context->response.entries().begin()->message_id(); break; } + + case rmq::Code::ILLEGAL_TOPIC: { + SPDLOG_ERROR("IllegalTopic: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::IllegalTopic; + break; + } + + case rmq::Code::ILLEGAL_MESSAGE_TAG: { + SPDLOG_ERROR("IllegalMessageTag: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::IllegalMessageTag; + break; + } + + case rmq::Code::ILLEGAL_MESSAGE_KEY: { + SPDLOG_ERROR("IllegalMessageKey: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::IllegalMessageKey; + break; + } + + case rmq::Code::ILLEGAL_MESSAGE_GROUP: { + SPDLOG_ERROR("IllegalMessageGroup: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::IllegalMessageGroup; + break; + } + + case rmq::Code::ILLEGAL_MESSAGE_PROPERTY_KEY: { + SPDLOG_ERROR("IllegalMessageProperty: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::IllegalMessageProperty; + break; + } + + case rmq::Code::MESSAGE_PROPERTIES_TOO_LARGE: { + SPDLOG_ERROR("MessagePropertiesTooLarge: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::MessagePropertiesTooLarge; + break; + } + + case rmq::Code::MESSAGE_BODY_TOO_LARGE: { + SPDLOG_ERROR("MessageBodyTooLarge: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::MessageBodyTooLarge; + break; + } + case rmq::Code::TOPIC_NOT_FOUND: { - SPDLOG_WARN("TopicNotFound: {}", status.message()); + SPDLOG_WARN("TopicNotFound: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::TopicNotFound; + break; + } + + case rmq::Code::NOT_FOUND: { + SPDLOG_WARN("NotFound: {}. Host={}", status.message(), invocation_context->remote_address); ec = ErrorCode::NotFound; break; } + case rmq::Code::UNAUTHORIZED: { - SPDLOG_WARN("Unauthenticated: {}", status.message()); + SPDLOG_WARN("Unauthenticated: {}. Host={}", status.message(), invocation_context->remote_address); ec = ErrorCode::Unauthorized; break; } + case rmq::Code::FORBIDDEN: { - SPDLOG_WARN("Forbidden: {}", status.message()); + SPDLOG_WARN("Forbidden: {}. Host={}", status.message(), invocation_context->remote_address); ec = ErrorCode::Forbidden; - cb(ec, send_receipt); break; } + + case rmq::Code::MESSAGE_CORRUPTED: { + SPDLOG_WARN("MessageCorrupted: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::MessageCorrupted; + break; + } + + case rmq::Code::TOO_MANY_REQUESTS: { + SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::TooManyRequests; + break; + } + case rmq::Code::INTERNAL_SERVER_ERROR: { - SPDLOG_WARN("InternalServerError: {}", status.message()); + SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address); ec = ErrorCode::InternalServerError; break; } + + case rmq::Code::HA_NOT_AVAILABLE: { + SPDLOG_WARN("InternalServerError: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::InternalServerError; + break; + } + + case rmq::Code::PROXY_TIMEOUT: { + SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::GatewayTimeout; + break; + } + + case rmq::Code::MASTER_PERSISTENCE_TIMEOUT: { + SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::GatewayTimeout; + break; + } + + case rmq::Code::SLAVE_PERSISTENCE_TIMEOUT: { + SPDLOG_WARN("GatewayTimeout: {}. Host={}", status.message(), invocation_context->remote_address); + ec = ErrorCode::GatewayTimeout; + break; + } + default: { - SPDLOG_WARN("Unsupported status code. Check and upgrade SDK to the latest"); - ec = ErrorCode::NotImplemented; + SPDLOG_WARN("NotSupported: Check and upgrade SDK to the latest. Host={}", invocation_context->remote_address); + ec = ErrorCode::NotSupported; break; } } + cb(ec, send_receipt); }; @@ -537,7 +626,7 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, const Metad case rmq::Code::TOO_MANY_REQUESTS: { SPDLOG_WARN("TooManyRequest: {}. Host={}", status.message(), invocation_context->remote_address); - ec = ErrorCode::TooManyRequest; + ec = ErrorCode::TooManyRequests; cb(ec, nullptr); break; } @@ -1058,7 +1147,7 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe ec = ErrorCode::ServiceUnavailable; } case rmq::Code::TOO_MANY_REQUESTS: { - ec = ErrorCode::TooManyRequest; + ec = ErrorCode::TooManyRequests; } default: { ec = ErrorCode::NotImplemented; diff --git a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp index 4667de4..3ee8f68 100644 --- a/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp +++ b/cpp/src/main/cpp/client/ReceiveMessageStreamReader.cpp @@ -73,7 +73,7 @@ void ReceiveMessageStreamReader::OnReadDone(bool ok) { break; } case rmq::Code::TOO_MANY_REQUESTS: { - ec_ = ErrorCode::TooManyRequest; + ec_ = ErrorCode::TooManyRequests; break; } case rmq::Code::MESSAGE_NOT_FOUND: {
