This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch cpp in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit f40067beeffe26fcbd94b470b06d1f20b0ee0e94 Author: Li Zhanhui <[email protected]> AuthorDate: Thu Jun 30 19:15:22 2022 +0800 Add error handling for EndTransaction --- cpp/api/rocketmq/ErrorCode.h | 16 ++-- cpp/src/main/cpp/client/ClientManagerImpl.cpp | 113 +++++++++++++++++++++++--- 2 files changed, 112 insertions(+), 17 deletions(-) diff --git a/cpp/api/rocketmq/ErrorCode.h b/cpp/api/rocketmq/ErrorCode.h index e911533..4f05ded 100644 --- a/cpp/api/rocketmq/ErrorCode.h +++ b/cpp/api/rocketmq/ErrorCode.h @@ -31,29 +31,35 @@ enum class ErrorCode : int { * @brief Client state not as expected. Call Producer#start() first. * */ - IllegalState = 1, + IllegalState = 10000, /** * @brief To publish FIFO messages, only synchronous API is supported. */ - BadRequestAsyncPubFifoMessage = 10001, + BadRequestAsyncPubFifoMessage = 10100, + + /** + * @brief 102XX is used for client side error. + * + */ + InternalClientError = 10200, /** * @brief Broker has processed the request but is not going to return any content. */ - NoContent = 204, + NoContent = 20400, /** * @brief Bad configuration. For example, negative max-attempt-times. * */ - BadConfiguration = 300, + BadConfiguration = 30000, /** * @brief Generic code, representing multiple return results. * */ - MultipleResults = 30000, + MultipleResults = 30100, /** * @brief The server cannot process the request due to apparent client-side diff --git a/cpp/src/main/cpp/client/ClientManagerImpl.cpp b/cpp/src/main/cpp/client/ClientManagerImpl.cpp index 3f572b3..1602769 100644 --- a/cpp/src/main/cpp/client/ClientManagerImpl.cpp +++ b/cpp/src/main/cpp/client/ClientManagerImpl.cpp @@ -1188,27 +1188,72 @@ void ClientManagerImpl::endTransaction( } auto&& status = invocation_context->response.status(); + auto&& peer_address = invocation_context->remote_address; switch (status.code()) { case rmq::Code::OK: { SPDLOG_DEBUG("endTransaction completed OK. Response: {}, host={}", invocation_context->response.DebugString(), - invocation_context->remote_address); - } break; + peer_address); + break; + } + + case rmq::Code::ILLEGAL_TOPIC: { + SPDLOG_WARN("IllegalTopic: {}, host={}", status.message(), peer_address); + ec = ErrorCode::IllegalTopic; + break; + } + + case rmq::Code::ILLEGAL_CONSUMER_GROUP: { + SPDLOG_WARN("IllegalConsumerGroup: {}, host={}", status.message(), peer_address); + ec = ErrorCode::IllegalConsumerGroup; + break; + } + + case rmq::Code::INVALID_TRANSACTION_ID: { + SPDLOG_WARN("InvalidTransactionId: {}, host={}", status.message(), peer_address); + ec = ErrorCode::InvalidTransactionId; + break; + } + + case rmq::Code::CLIENT_ID_REQUIRED: { + SPDLOG_WARN("ClientIdRequired: {}, host={}", status.message(), peer_address); + ec = ErrorCode::ClientIdRequired; + break; + } + + case rmq::Code::TOPIC_NOT_FOUND: { + SPDLOG_WARN("TopicNotFound: {}, host={}", status.message(), peer_address); + ec = ErrorCode::TopicNotFound; + break; + } + case rmq::Code::UNAUTHORIZED: { - SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), invocation_context->remote_address); + SPDLOG_WARN("Unauthorized: {}, host={}", status.message(), peer_address); ec = ErrorCode::Unauthorized; - } break; + break; + } + case rmq::Code::FORBIDDEN: { - SPDLOG_WARN("Forbidden: {}, host={}", status.message(), invocation_context->remote_address); + SPDLOG_WARN("Forbidden: {}, host={}", status.message(), peer_address); ec = ErrorCode::Forbidden; - } break; + break; + } + case rmq::Code::INTERNAL_SERVER_ERROR: { - SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), invocation_context->remote_address); + SPDLOG_WARN("InternalServerError: {}, host={}", status.message(), peer_address); ec = ErrorCode::InternalServerError; - } break; + break; + } + + case rmq::Code::PROXY_TIMEOUT: { + SPDLOG_WARN("GatewayTimeout: {}, host={}", status.message(), peer_address); + ec = ErrorCode::GatewayTimeout; + break; + } + default: { - SPDLOG_WARN("NotImplemented: please upgrade SDK to latest release. {}, host={}", status.message(), - invocation_context->remote_address); - ec = ErrorCode::NotImplemented; + SPDLOG_WARN("NotSupported: please upgrade SDK to latest release. {}, host={}", status.message(), peer_address); + ec = ErrorCode::NotSupported; + break; } } cb(ec, invocation_context->response); @@ -1246,22 +1291,66 @@ void ClientManagerImpl::forwardMessageToDeadLetterQueue(const std::string& targe SPDLOG_DEBUG("Received forwardToDeadLetterQueue response from server[host={}]", invocation_context->remote_address); std::error_code ec; - switch (invocation_context->response.status().code()) { + auto&& status = invocation_context->response.status(); + auto&& peer_address = invocation_context->remote_address; + switch (status.code()) { case rmq::Code::OK: { break; } + case rmq::Code::ILLEGAL_TOPIC: { + SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address); + ec = ErrorCode::IllegalTopic; + break; + } + + case rmq::Code::ILLEGAL_CONSUMER_GROUP: { + SPDLOG_WARN("IllegalConsumerGroup: {}. Host={}", status.message(), peer_address); + ec = ErrorCode::IllegalConsumerGroup; + break; + } + + case rmq::Code::INVALID_RECEIPT_HANDLE: { + SPDLOG_WARN("IllegalReceiptHandle: {}. Host={}", status.message(), peer_address); + ec = ErrorCode::InvalidReceiptHandle; + break; + } + + case rmq::Code::CLIENT_ID_REQUIRED: { + SPDLOG_WARN("IllegalTopic: {}. Host={}", status.message(), peer_address); + + // TODO: translate to client internal error? + ec = ErrorCode::InternalServerError; + break; + } + + case rmq::Code::TOPIC_NOT_FOUND: { + ec = ErrorCode::TopicNotFound; + break; + } + case rmq::Code::INTERNAL_SERVER_ERROR: { ec = ErrorCode::ServiceUnavailable; + break; } + case rmq::Code::TOO_MANY_REQUESTS: { ec = ErrorCode::TooManyRequests; + break; + } + + case rmq::Code::PROXY_TIMEOUT: { + ec = ErrorCode::GatewayTimeout; + break; } + default: { ec = ErrorCode::NotImplemented; + break; } } cb(ec); }; + invocation_context->callback = callback; client->asyncForwardMessageToDeadLetterQueue(request, invocation_context); }
