This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch cpp in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit fad375e843278e29260dda98f243dcd7498341e2 Author: Li Zhanhui <[email protected]> AuthorDate: Mon Jul 4 11:37:33 2022 +0800 Fix to make it compile on Windows --- cpp/.bazelrc | 4 +--- cpp/api/rocketmq/Tracing.h | 10 ++++++++-- cpp/src/main/cpp/base/Tracing.cpp | 26 ------------------------- cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp | 14 +++++++------ cpp/src/main/cpp/client/RpcClientImpl.cpp | 4 +--- cpp/src/main/cpp/client/include/ClientConfig.h | 7 ++++--- cpp/src/main/cpp/client/include/RpcClient.h | 7 ++----- cpp/src/main/cpp/client/include/RpcClientImpl.h | 7 ++----- cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp | 2 +- cpp/src/main/cpp/rocketmq/ProducerImpl.cpp | 13 +++++++------ cpp/src/main/cpp/rocketmq/include/ClientImpl.h | 6 ++++++ 11 files changed, 40 insertions(+), 60 deletions(-) diff --git a/cpp/.bazelrc b/cpp/.bazelrc index d5ec2a9..6b7b1da 100644 --- a/cpp/.bazelrc +++ b/cpp/.bazelrc @@ -13,8 +13,6 @@ run --color=yes build --color=yes build --host_force_python=PY3 -build --host_javabase=@bazel_tools//tools/jdk:remote_jdk11 -build --javabase=@bazel_tools//tools/jdk:remote_jdk11 # https://docs.bazel.build/versions/main/command-line-reference.html#flag--enable_platform_specific_config # If true, Bazel picks up host-OS-specific config lines from bazelrc files. For example, if the host OS is Linux and @@ -48,7 +46,7 @@ build --action_env=LD_LIBRARY_PATH build --action_env=LLVM_CONFIG build --action_env=PATH -build --copt=-maes +build:linux --copt=-maes # Common flags for sanitizers build:sanitizer --define tcmalloc=disabled diff --git a/cpp/api/rocketmq/Tracing.h b/cpp/api/rocketmq/Tracing.h index bc12362..cdec455 100644 --- a/cpp/api/rocketmq/Tracing.h +++ b/cpp/api/rocketmq/Tracing.h @@ -16,12 +16,18 @@ */ #pragma once -#include "opencensus/trace/sampler.h" +#include <memory> #include "RocketMQ.h" +#include "opencensus/trace/sampler.h" ROCKETMQ_NAMESPACE_BEGIN -opencensus::trace::Sampler* traceSampler() __attribute__((weak)); +class TracingSamplerProvider { +public: + virtual ~TracingSamplerProvider() = default; + + virtual std::unique_ptr<opencensus::trace::Sampler> tracingSampler() = 0; +}; ROCKETMQ_NAMESPACE_END diff --git a/cpp/src/main/cpp/base/Tracing.cpp b/cpp/src/main/cpp/base/Tracing.cpp deleted file mode 100644 index 9eb7335..0000000 --- a/cpp/src/main/cpp/base/Tracing.cpp +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "rocketmq/Tracing.h" - -ROCKETMQ_NAMESPACE_BEGIN - -opencensus::trace::Sampler* traceSampler() { - static opencensus::trace::NeverSampler sampler; - return &sampler; -} - -ROCKETMQ_NAMESPACE_END diff --git a/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp b/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp index 168b87a..1e3c7bf 100644 --- a/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp +++ b/cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp @@ -20,12 +20,14 @@ ROCKETMQ_NAMESPACE_BEGIN TEST(RetryPolicyTest, testBackoff) { - RetryPolicy policy{.max_attempt = 3, - .strategy = BackoffStrategy::Customized, - .initial = absl::Milliseconds(0), - .max = absl::Milliseconds(0), - .multiplier = 0.0f, - .next = {absl::Milliseconds(10), absl::Milliseconds(100), absl::Milliseconds(500)}}; + RetryPolicy policy; + policy.max_attempt = 3; + policy.strategy = BackoffStrategy::Customized; + policy.initial = absl::Milliseconds(0); + policy.max = absl::Milliseconds(0); + policy.multiplier = 0.0f; + policy.next = {absl::Milliseconds(10), absl::Milliseconds(100), absl::Milliseconds(500)}; + ASSERT_EQ(policy.backoff(1), 10); ASSERT_EQ(policy.backoff(2), 100); ASSERT_EQ(policy.backoff(3), 500); diff --git a/cpp/src/main/cpp/client/RpcClientImpl.cpp b/cpp/src/main/cpp/client/RpcClientImpl.cpp index 623547e..35016c3 100644 --- a/cpp/src/main/cpp/client/RpcClientImpl.cpp +++ b/cpp/src/main/cpp/client/RpcClientImpl.cpp @@ -21,14 +21,12 @@ #include <sstream> #include <thread> -#include "absl/time/time.h" - #include "ClientManager.h" #include "ReceiveMessageStreamReader.h" #include "RpcClient.h" #include "TelemetryBidiReactor.h" #include "TlsHelper.h" -#include "include/ReceiveMessageContext.h" +#include "absl/time/time.h" ROCKETMQ_NAMESPACE_BEGIN diff --git a/cpp/src/main/cpp/client/include/ClientConfig.h b/cpp/src/main/cpp/client/include/ClientConfig.h index 66ca230..bade9a4 100644 --- a/cpp/src/main/cpp/client/include/ClientConfig.h +++ b/cpp/src/main/cpp/client/include/ClientConfig.h @@ -21,12 +21,12 @@ #include <string> #include <vector> -#include "absl/container/flat_hash_map.h" -#include "absl/time/time.h" - #include "Protocol.h" #include "RetryPolicy.h" +#include "absl/container/flat_hash_map.h" +#include "absl/time/time.h" #include "rocketmq/CredentialsProvider.h" +#include "rocketmq/Tracing.h" ROCKETMQ_NAMESPACE_BEGIN @@ -61,6 +61,7 @@ struct ClientConfig { PublisherConfig publisher; SubscriberConfig subscriber; Metric metric; + std::unique_ptr<opencensus::trace::Sampler> sampler_; }; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/src/main/cpp/client/include/RpcClient.h b/cpp/src/main/cpp/client/include/RpcClient.h index 2fc8448..fbb3017 100644 --- a/cpp/src/main/cpp/client/include/RpcClient.h +++ b/cpp/src/main/cpp/client/include/RpcClient.h @@ -21,15 +21,12 @@ #include <memory> #include <string> -#include "ReceiveMessageResult.h" +#include "Protocol.h" +#include "ReceiveMessageContext.h" #include "absl/container/flat_hash_map.h" #include "absl/strings/string_view.h" #include "grpcpp/grpcpp.h" -#include "InvocationContext.h" -#include "Protocol.h" -#include "ReceiveMessageContext.h" - ROCKETMQ_NAMESPACE_BEGIN using Channel = grpc::Channel; diff --git a/cpp/src/main/cpp/client/include/RpcClientImpl.h b/cpp/src/main/cpp/client/include/RpcClientImpl.h index 6406c74..35316ec 100644 --- a/cpp/src/main/cpp/client/include/RpcClientImpl.h +++ b/cpp/src/main/cpp/client/include/RpcClientImpl.h @@ -18,14 +18,11 @@ #include <memory> -#include "InvocationContext.h" -#include "ReceiveMessageCallback.h" -#include "ReceiveMessageContext.h" -#include "absl/container/flat_hash_map.h" - #include "Client.h" #include "ClientManager.h" +#include "ReceiveMessageContext.h" #include "RpcClient.h" +#include "absl/container/flat_hash_map.h" ROCKETMQ_NAMESPACE_BEGIN diff --git a/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp b/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp index 0eeb08e..9dc38ed 100644 --- a/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp +++ b/cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp @@ -113,7 +113,7 @@ void ProcessQueueImpl::popMessage() { std::weak_ptr<AsyncReceiveMessageCallback> cb{receive_callback_}; auto callback = [cb](const std::error_code& ec, const ReceiveMessageResult& result) { - auto recv_cb = cb.lock(); + std::shared_ptr<AsyncReceiveMessageCallback> recv_cb = cb.lock(); if (recv_cb) { recv_cb->onCompletion(ec, result); } diff --git a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp index 4b6c3f0..65cc5ba 100644 --- a/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp +++ b/cpp/src/main/cpp/rocketmq/ProducerImpl.cpp @@ -290,16 +290,17 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) { { // Trace Send RPC - if (context->message_->traceContext().has_value()) { + if (context->message_->traceContext().has_value() && client_config_.sampler_) { auto span_context = opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext().value()); auto span = opencensus::trace::Span::BlankSpan(); std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " + MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION; if (span_context.IsValid()) { - span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()}); + span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, + {client_config_.sampler_.get()}); } else { - span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()}); + span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()}); } span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION); @@ -380,7 +381,7 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt bool completed = false; bool success = false; auto span = opencensus::trace::Span::BlankSpan(); - if (!transaction.traceContext().empty()) { + if (!transaction.traceContext().empty() && client_config_.sampler_) { // Trace transactional message opencensus::trace::SpanContext span_context = opencensus::trace::propagation::FromTraceParentHeader(transaction.traceContext()); @@ -389,9 +390,9 @@ bool ProducerImpl::endTransaction0(const Transaction& transaction, TransactionSt : MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION; std::string span_name = resourceNamespace() + "/" + transaction.topic() + " " + trace_operation_name; if (span_context.IsValid()) { - span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {traceSampler()}); + span = opencensus::trace::Span::StartSpanWithRemoteParent(span_name, span_context, {client_config_.sampler_.get()}); } else { - span = opencensus::trace::Span::StartSpan(span_name, nullptr, {traceSampler()}); + span = opencensus::trace::Span::StartSpan(span_name, nullptr, {client_config_.sampler_.get()}); } span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION, trace_operation_name); span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION, trace_operation_name); diff --git a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h index ff61af3..c136cc9 100644 --- a/cpp/src/main/cpp/rocketmq/include/ClientImpl.h +++ b/cpp/src/main/cpp/rocketmq/include/ClientImpl.h @@ -110,6 +110,12 @@ public: virtual void buildClientSettings(rmq::Settings& settings) { } + void registerTracingSampler(TracingSamplerProvider *provider) { + if (provider) { + client_config_.sampler_ = provider->tracingSampler(); + } + } + protected: ClientConfig client_config_;
