This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new eeeb643d [ISSUE #928] [CPP] Fix some cpp client bug and make logs
cleaner (#929)
eeeb643d is described below
commit eeeb643d8f74779ea38a111a78cdd4b7785f560a
Author: lizhimins <[email protected]>
AuthorDate: Tue Jan 21 21:16:03 2025 +0800
[ISSUE #928] [CPP] Fix some cpp client bug and make logs cleaner (#929)
---
.github/workflows/cpp_build.yml | 2 +-
cpp/.gitignore | 1 +
cpp/examples/ExampleFifoProducer.cpp | 5 ++---
cpp/examples/ExampleProducer.cpp | 4 ++--
cpp/examples/ExampleProducerWithAsync.cpp | 4 ++--
cpp/examples/ExampleProducerWithFifoMessage.cpp | 4 ++--
cpp/examples/ExampleProducerWithTimedMessage.cpp | 5 ++---
.../ExampleProducerWithTransactionalMessage.cpp | 4 ++--
cpp/examples/ExamplePushConsumer.cpp | 6 +++---
cpp/examples/ExampleSimpleConsumer.cpp | 6 +++---
cpp/source/base/include/InvocationContext.h | 4 ++--
cpp/source/client/ClientManagerImpl.cpp | 11 ++++++-----
cpp/source/client/LogInterceptor.cpp | 14 +++++++-------
cpp/source/client/RpcClientImpl.cpp | 2 --
cpp/source/client/SessionImpl.cpp | 2 +-
cpp/source/client/TelemetryBidiReactor.cpp | 22 ++++++++++------------
cpp/source/client/include/TopicRouteData.h | 2 +-
cpp/source/log/LoggerImpl.cpp | 3 ++-
cpp/source/rocketmq/ClientImpl.cpp | 13 +++++--------
cpp/source/rocketmq/ProducerImpl.cpp | 8 ++++++--
cpp/source/rocketmq/include/ClientImpl.h | 1 +
cpp/source/rocketmq/include/ProducerImpl.h | 2 +-
22 files changed, 62 insertions(+), 63 deletions(-)
diff --git a/.github/workflows/cpp_build.yml b/.github/workflows/cpp_build.yml
index 7973881a..5dc0302b 100644
--- a/.github/workflows/cpp_build.yml
+++ b/.github/workflows/cpp_build.yml
@@ -11,7 +11,7 @@ jobs:
# Disable VS 2022 before
https://github.com/bazelbuild/bazel/issues/18592 issue is solved
# Remove macos-11 since there is no such runner available
# os: [ubuntu-20.04, ubuntu-22.04, macos-11, macos-12, windows-2019,
windows-2022]
- os: [ubuntu-20.04, ubuntu-22.04, macos-12, windows-2019]
+ os: [ubuntu-20.04, ubuntu-22.04, windows-2019]
steps:
- uses: actions/checkout@v2
- name: Compile On Linux
diff --git a/cpp/.gitignore b/cpp/.gitignore
index b7f10c0f..551fa09a 100644
--- a/cpp/.gitignore
+++ b/cpp/.gitignore
@@ -19,3 +19,4 @@ bazel-rocketmq-client-cpp
/compile_commands.json
/.cache/
.clangd
+build
\ No newline at end of file
diff --git a/cpp/examples/ExampleFifoProducer.cpp
b/cpp/examples/ExampleFifoProducer.cpp
index 9d99be36..1e7829d4 100644
--- a/cpp/examples/ExampleFifoProducer.cpp
+++ b/cpp/examples/ExampleFifoProducer.cpp
@@ -28,7 +28,6 @@
#include "rocketmq/FifoProducer.h"
#include "rocketmq/Logger.h"
#include "rocketmq/Message.h"
-#include "rocketmq/Producer.h"
#include "rocketmq/SendReceipt.h"
using namespace ROCKETMQ_NAMESPACE;
@@ -93,8 +92,8 @@ std::string randomString(std::string::size_type len) {
return result;
}
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by
your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp
index 5e20cc12..8c6011d8 100644
--- a/cpp/examples/ExampleProducer.cpp
+++ b/cpp/examples/ExampleProducer.cpp
@@ -51,8 +51,8 @@ std::string randomString(std::string::size_type len) {
return result;
}
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by
your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExampleProducerWithAsync.cpp
b/cpp/examples/ExampleProducerWithAsync.cpp
index d88dfc85..a46fdf43 100644
--- a/cpp/examples/ExampleProducerWithAsync.cpp
+++ b/cpp/examples/ExampleProducerWithAsync.cpp
@@ -89,8 +89,8 @@ std::string randomString(std::string::size_type len) {
return result;
}
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by
your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_uint32(concurrency, 128, "Concurrency of async send");
diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp
b/cpp/examples/ExampleProducerWithFifoMessage.cpp
index 4fa34f9d..2d6789ba 100644
--- a/cpp/examples/ExampleProducerWithFifoMessage.cpp
+++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp
@@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
return result;
}
-DEFINE_string(topic, "fifo_topic_sample", "Topic to which messages are
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_string(topic, "FifoTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by
your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp
b/cpp/examples/ExampleProducerWithTimedMessage.cpp
index d6237459..ba2a45f7 100644
--- a/cpp/examples/ExampleProducerWithTimedMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp
@@ -17,7 +17,6 @@
#include <algorithm>
#include <atomic>
#include <chrono>
-#include <cstddef>
#include <iostream>
#include <random>
#include <string>
@@ -50,8 +49,8 @@ std::string randomString(std::string::size_type len) {
return result;
}
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_string(topic, "TimerTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by
your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
index 50620c5a..f595c6ef 100644
--- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
+++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp
@@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) {
return result;
}
-DEFINE_string(topic, "tx_topic_sample", "Topic to which messages are
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
+DEFINE_string(topic, "TransTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by
your service provider");
DEFINE_int32(message_body_size, 4096, "Message body size");
DEFINE_uint32(total, 256, "Number of sample messages to publish");
DEFINE_string(access_key, "", "Your access key ID");
diff --git a/cpp/examples/ExamplePushConsumer.cpp
b/cpp/examples/ExamplePushConsumer.cpp
index 66a85f4b..7017ec92 100644
--- a/cpp/examples/ExamplePushConsumer.cpp
+++ b/cpp/examples/ExamplePushConsumer.cpp
@@ -24,9 +24,9 @@
using namespace ROCKETMQ_NAMESPACE;
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
-DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through
your instance management console");
+DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by
your service provider");
+DEFINE_string(group, "PushConsumer", "GroupId, created through your instance
management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
diff --git a/cpp/examples/ExampleSimpleConsumer.cpp
b/cpp/examples/ExampleSimpleConsumer.cpp
index aedec71e..41262ad0 100644
--- a/cpp/examples/ExampleSimpleConsumer.cpp
+++ b/cpp/examples/ExampleSimpleConsumer.cpp
@@ -23,9 +23,9 @@
using namespace ROCKETMQ_NAMESPACE;
-DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
-DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
-DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through
your instance management console");
+DEFINE_string(topic, "NormalTopic", "Topic to which messages are published");
+DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by
your service provider");
+DEFINE_string(group, "SimpleConsumer", "GroupId, created through your instance
management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL");
diff --git a/cpp/source/base/include/InvocationContext.h
b/cpp/source/base/include/InvocationContext.h
index 0e138c81..dd6864bf 100644
--- a/cpp/source/base/include/InvocationContext.h
+++ b/cpp/source/base/include/InvocationContext.h
@@ -81,8 +81,8 @@ struct InvocationContext : public BaseInvocationContext {
if (!status.ok() && grpc::StatusCode::DEADLINE_EXCEEDED ==
status.error_code()) {
auto diff =
-
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()
- context.deadline())
- .count();
+ std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::system_clock::now() - context.deadline()).count();
SPDLOG_WARN("Asynchronous RPC[{}.{}] timed out, elapsing {}ms,
deadline-over-due: {}ms",
absl::FormatTime(created_time, absl::UTCTimeZone()),
elapsed, diff);
}
diff --git a/cpp/source/client/ClientManagerImpl.cpp
b/cpp/source/client/ClientManagerImpl.cpp
index 7d724c7b..bb1e2e67 100644
--- a/cpp/source/client/ClientManagerImpl.cpp
+++ b/cpp/source/client/ClientManagerImpl.cpp
@@ -48,6 +48,7 @@ ClientManagerImpl::ClientManagerImpl(std::string
resource_namespace, bool with_s
state_(State::CREATED),
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
with_ssl_(with_ssl) {
+
certificate_verifier_ =
grpc::experimental::ExternalCertificateVerifier::Create<InsecureCertificateVerifier>();
tls_channel_credential_options_.set_verify_server_certs(false);
tls_channel_credential_options_.set_check_call_host(false);
@@ -78,7 +79,7 @@ ClientManagerImpl::ClientManagerImpl(std::string
resource_namespace, bool with_s
*/
channel_arguments_.SetInt(GRPC_ARG_ENABLE_RETRIES, 0);
- channel_arguments_.SetSslTargetNameOverride("localhost");
+ // channel_arguments_.SetSslTargetNameOverride("localhost");
SPDLOG_INFO("ClientManager[ResourceNamespace={}] created",
resource_namespace_);
}
@@ -282,7 +283,7 @@ bool ClientManagerImpl::send(const std::string& target_host,
SendMessageRequest& request,
SendResultCallback cb) {
assert(cb);
- SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}",
target_host, request.DebugString());
+ SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}",
target_host, request.ShortDebugString());
RpcClientSharedPtr client = getRpcClient(target_host);
// Invocation context will be deleted in its onComplete() method.
auto invocation_context = new InvocationContext<SendMessageResponse>();
@@ -440,7 +441,7 @@ bool ClientManagerImpl::send(const std::string& target_host,
case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: {
SPDLOG_WARN("Message-property-conflict-with-type: Host={},
Response={}", invocation_context->remote_address,
- invocation_context->response.DebugString());
+ invocation_context->response.ShortDebugString());
send_result.ec = ErrorCode::MessagePropertyConflictWithType;
break;
}
@@ -482,7 +483,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const
std::string& target_hos
auto search = rpc_clients_.find(target_host);
if (search == rpc_clients_.end() || !search->second->ok()) {
if (search == rpc_clients_.end()) {
- SPDLOG_INFO("Create a RPC client to {}", target_host.data());
+ SPDLOG_INFO("Create a RPC client to [{}]", target_host.data());
} else if (!search->second->ok()) {
SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one",
target_host);
}
@@ -549,7 +550,7 @@ void ClientManagerImpl::resolveRoute(const std::string&
target_host,
std::chrono::milliseconds timeout,
const std::function<void(const
std::error_code&, const TopicRouteDataPtr&)>& cb) {
SPDLOG_DEBUG("Name server connection URL: {}", target_host);
- SPDLOG_DEBUG("Query route request: {}", request.DebugString());
+ SPDLOG_DEBUG("Query route request: {}", request.ShortDebugString());
RpcClientSharedPtr client = getRpcClient(target_host, false);
if (!client) {
SPDLOG_WARN("Failed to create RPC client for name server[host={}]",
target_host);
diff --git a/cpp/source/client/LogInterceptor.cpp
b/cpp/source/client/LogInterceptor.cpp
index 77028645..9e6eded0 100644
--- a/cpp/source/client/LogInterceptor.cpp
+++ b/cpp/source/client/LogInterceptor.cpp
@@ -52,8 +52,8 @@ void
LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
if
(methods->QueryInterceptionHookPoint(grpc::experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA))
{
std::multimap<std::string, std::string>* metadata =
methods->GetSendInitialMetadata();
if (metadata) {
- SPDLOG_DEBUG("[Outbound]Headers of {}: \n{}", client_rpc_info_->method(),
- absl::StrJoin(*metadata, "\n", absl::PairFormatter(" -->
")));
+ SPDLOG_DEBUG("[Outbound]Headers of {}: {}", client_rpc_info_->method(),
+ absl::StrJoin(*metadata, " ", absl::PairFormatter(" -->
")));
}
}
@@ -73,8 +73,8 @@ void
LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
absl::string_view(it.second.data(),
it.second.length())});
}
if (!response_headers.empty()) {
- SPDLOG_DEBUG("[Inbound]Response Headers of {}:\n{}",
client_rpc_info_->method(),
- absl::StrJoin(response_headers, "\n",
absl::PairFormatter(" --> ")));
+ SPDLOG_DEBUG("[Inbound]Response Headers of {}: {}",
client_rpc_info_->method(),
+ absl::StrJoin(response_headers, " ",
absl::PairFormatter(" --> ")));
} else {
SPDLOG_DEBUG("[Inbound]Response metadata of {} is empty",
client_rpc_info_->method());
}
@@ -85,12 +85,12 @@ void
LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth
void* message = methods->GetRecvMessage();
if (message) {
auto* response = reinterpret_cast<google::protobuf::Message*>(message);
- std::string&& response_text = response->DebugString();
+ std::string&& response_text = response->ShortDebugString();
std::size_t limit = 1024;
if (response_text.size() <= limit) {
- SPDLOG_DEBUG("[Inbound] {}\n{}", client_rpc_info_->method(),
response_text);
+ SPDLOG_DEBUG("[Inbound] {} {}", client_rpc_info_->method(),
response_text);
} else {
- SPDLOG_DEBUG("[Inbound] {}\n{}...", client_rpc_info_->method(),
response_text.substr(0, limit));
+ SPDLOG_DEBUG("[Inbound] {} {}...", client_rpc_info_->method(),
response_text.substr(0, limit));
}
}
}
diff --git a/cpp/source/client/RpcClientImpl.cpp
b/cpp/source/client/RpcClientImpl.cpp
index 35016c34..d9f10212 100644
--- a/cpp/source/client/RpcClientImpl.cpp
+++ b/cpp/source/client/RpcClientImpl.cpp
@@ -16,7 +16,6 @@
*/
#include "RpcClientImpl.h"
-#include <chrono>
#include <functional>
#include <sstream>
#include <thread>
@@ -26,7 +25,6 @@
#include "RpcClient.h"
#include "TelemetryBidiReactor.h"
#include "TlsHelper.h"
-#include "absl/time/time.h"
ROCKETMQ_NAMESPACE_BEGIN
diff --git a/cpp/source/client/SessionImpl.cpp
b/cpp/source/client/SessionImpl.cpp
index b3f8b73b..151f1c3f 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -34,7 +34,7 @@ bool SessionImpl::await() {
void SessionImpl::syncSettings() {
auto ptr = client_.lock();
- SPDLOG_INFO("Sync client settings to {}", rpc_client_->remoteAddress());
+ SPDLOG_INFO("Request client settings to {}", rpc_client_->remoteAddress());
TelemetryCommand command;
command.mutable_settings()->CopyFrom(ptr->clientSettings());
telemetry_->write(command);
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp
b/cpp/source/client/TelemetryBidiReactor.cpp
index e0a83a28..27557cf2 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -16,15 +16,12 @@
*/
#include "TelemetryBidiReactor.h"
-#include <atomic>
-#include <cstdint>
#include <memory>
#include <utility>
#include "ClientManager.h"
#include "MessageExt.h"
#include "Metadata.h"
-#include "RpcClient.h"
#include "Signature.h"
#include "google/protobuf/util/time_util.h"
#include "rocketmq/Logger.h"
@@ -70,7 +67,7 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) {
RemoveHold();
if (!ok) {
- SPDLOG_WARN("Failed to write telemetry command {} to {}",
writes_.front().DebugString(), peer_address_);
+ SPDLOG_WARN("Failed to write telemetry command {} to {}",
writes_.front().ShortDebugString(), peer_address_);
signalClose();
return;
}
@@ -91,7 +88,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
if (!ok) {
// for read stream
RemoveHold();
- SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_);
+ // SPDLOG_WARN("Failed to read from telemetry stream from {}",
peer_address_);
signalClose();
return;
}
@@ -103,7 +100,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}
}
- SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_,
read_.DebugString());
+ SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_,
read_.ShortDebugString());
auto client = client_.lock();
if (!client) {
SPDLOG_INFO("Client for {} has destructed", peer_address_);
@@ -114,19 +111,20 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
switch (read_.command_case()) {
case rmq::TelemetryCommand::kSettings: {
auto settings = read_.settings();
- SPDLOG_INFO("Received settings from {}: {}", peer_address_,
settings.DebugString());
+ SPDLOG_INFO("Receive settings from {}: {}", peer_address_,
settings.ShortDebugString());
applySettings(settings);
sync_settings_promise_.set_value(true);
break;
}
+
case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: {
- SPDLOG_DEBUG("Receive orphan transaction command: {}",
read_.DebugString());
- auto message =
client->manager()->wrapMessage(read_.release_verify_message_command()->message());
+ SPDLOG_INFO("Receive orphan transaction command: {}",
read_.ShortDebugString());
+ auto message = client->manager()->wrapMessage(
+ read_.recover_orphaned_transaction_command().message());
auto raw = const_cast<Message*>(message.get());
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().transaction_id =
read_.recover_orphaned_transaction_command().transaction_id();
client->recoverOrphanedTransaction(message);
-
break;
}
@@ -156,7 +154,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
}
default: {
- SPDLOG_WARN("Unsupported command");
+ SPDLOG_WARN("Telemetry command receive unsupported command");
break;
}
}
@@ -291,7 +289,7 @@ void TelemetryBidiReactor::tryWriteNext() {
}
if (!writes_.empty()) {
- SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
writes_.front().DebugString());
+ SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_,
writes_.front().ShortDebugString());
AddHold();
StartWrite(&(writes_.front()));
}
diff --git a/cpp/source/client/include/TopicRouteData.h
b/cpp/source/client/include/TopicRouteData.h
index 807ac811..aac41f1c 100644
--- a/cpp/source/client/include/TopicRouteData.h
+++ b/cpp/source/client/include/TopicRouteData.h
@@ -43,7 +43,7 @@ public:
std::string debugString() const {
return absl::StrJoin(message_queues_.begin(), message_queues_.end(), ",",
- [](std::string* out, const rmq::MessageQueue& m) {
out->append(m.DebugString()); });
+ [](std::string* out, const rmq::MessageQueue& m) {
out->append(m.ShortDebugString()); });
};
private:
diff --git a/cpp/source/log/LoggerImpl.cpp b/cpp/source/log/LoggerImpl.cpp
index 6ff39f10..cdd2f473 100644
--- a/cpp/source/log/LoggerImpl.cpp
+++ b/cpp/source/log/LoggerImpl.cpp
@@ -131,6 +131,7 @@ Logger& getLogger() {
const std::size_t LoggerImpl::DEFAULT_MAX_LOG_FILE_QUANTITY = 16;
const std::size_t LoggerImpl::DEFAULT_FILE_SIZE = 1048576 * 256;
const char* LoggerImpl::USER_HOME_ENV = "HOME";
-const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n]
[%^---%L---%$] [thread %t] %v %@";
+const char* LoggerImpl::DEFAULT_PATTERN = "%Y-%m-%d %H:%M:%S.%e [%^--%L--%$]
[%7t] %v %@";
+// const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n]
[%^---%L---%$] [thread %t] %v %@";
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ClientImpl.cpp
b/cpp/source/rocketmq/ClientImpl.cpp
index 5be559b6..a4026cf6 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -19,11 +19,9 @@
#include <algorithm>
#include <atomic>
#include <chrono>
-#include <cstdint>
#include <cstdlib>
#include <exception>
#include <functional>
-#include <iterator>
#include <memory>
#include <string>
#include <system_error>
@@ -43,9 +41,6 @@
#include "absl/strings/str_split.h"
#include "fmt/format.h"
#include "opencensus/stats/stats.h"
-#include "rocketmq/Logger.h"
-#include "rocketmq/Message.h"
-#include "rocketmq/MessageListener.h"
#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -175,12 +170,14 @@ void ClientImpl::start() {
auto telemetry_functor = [ptr]() {
std::shared_ptr<ClientImpl> base = ptr.lock();
if (base) {
- SPDLOG_INFO("Sync client settings to servers");
+ SPDLOG_DEBUG("Sync client settings to servers");
base->syncClientSettings();
}
};
- telemetry_handle_ =
client_manager_->getScheduler()->schedule(telemetry_functor,
TELEMETRY_TASK_NAME,
-
std::chrono::minutes(5), std::chrono::minutes(5));
+
+ telemetry_handle_ = client_manager_->getScheduler()->schedule(
+ telemetry_functor, TELEMETRY_TASK_NAME,
+ std::chrono::minutes(5), std::chrono::minutes(5));
auto&& metric_service_endpoint = metricServiceEndpoint();
if (!metric_service_endpoint.empty()) {
diff --git a/cpp/source/rocketmq/ProducerImpl.cpp
b/cpp/source/rocketmq/ProducerImpl.cpp
index 34c5b29c..9b664d59 100644
--- a/cpp/source/rocketmq/ProducerImpl.cpp
+++ b/cpp/source/rocketmq/ProducerImpl.cpp
@@ -227,7 +227,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message,
std::error_code& ec) noe
auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt&
receipt) mutable {
ec = code;
SendReceipt& receipt_mut = const_cast<SendReceipt&>(receipt);
+ send_receipt.target = std::move(receipt_mut.target);
+ send_receipt.message_id = std::move(receipt_mut.message_id);
send_receipt.message = std::move(receipt_mut.message);
+ send_receipt.transaction_id = std::move(receipt_mut.transaction_id);
{
absl::MutexLock lk(mtx.get());
completed = true;
@@ -354,7 +357,7 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext>
context) {
client_manager_->send(target, metadata, request, callback);
}
-void ProducerImpl::send0(MessageConstPtr message, SendCallback callback,
std::vector<rmq::MessageQueue> list) {
+void ProducerImpl::send0(MessageConstPtr message, const SendCallback&
callback, std::vector<rmq::MessageQueue> list) {
SendReceipt send_receipt;
std::error_code ec;
validate(*message, ec);
@@ -371,7 +374,8 @@ void ProducerImpl::send0(MessageConstPtr message,
SendCallback callback, std::ve
return;
}
- auto context = std::make_shared<SendContext>(shared_from_this(),
std::move(message), callback, std::move(list));
+ auto context = std::make_shared<SendContext>(
+ shared_from_this(), std::move(message), callback, std::move(list));
sendImpl(context);
}
diff --git a/cpp/source/rocketmq/include/ClientImpl.h
b/cpp/source/rocketmq/include/ClientImpl.h
index d7693962..25cef46c 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -147,6 +147,7 @@ protected:
absl::flat_hash_map<std::string, std::vector<std::function<void(const
std::error_code&, const TopicRouteDataPtr&)>>>
inflight_route_requests_ GUARDED_BY(inflight_route_requests_mtx_);
absl::Mutex inflight_route_requests_mtx_
ACQUIRED_BEFORE(topic_route_table_mtx_); // Protects inflight_route_requests_
+
static const char* UPDATE_ROUTE_TASK_NAME;
std::uint32_t route_update_handle_{0};
diff --git a/cpp/source/rocketmq/include/ProducerImpl.h
b/cpp/source/rocketmq/include/ProducerImpl.h
index b572f20d..2c284172 100644
--- a/cpp/source/rocketmq/include/ProducerImpl.h
+++ b/cpp/source/rocketmq/include/ProducerImpl.h
@@ -176,7 +176,7 @@ private:
void validate(const Message& message, std::error_code& ec);
- void send0(MessageConstPtr message, SendCallback callback,
std::vector<rmq::MessageQueue> list);
+ void send0(MessageConstPtr message, const SendCallback& callback,
std::vector<rmq::MessageQueue> list);
void isolatedEndpoints(absl::flat_hash_set<std::string>& endpoints)
LOCKS_EXCLUDED(isolated_endpoints_mtx_);