This is an automated email from the ASF dual-hosted git repository. lizhanhui pushed a commit to branch exclude_absl_from_api in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit c4728bb0ab3cc8b777b17a7ade198cf16a819814 Author: Li Zhanhui <[email protected]> AuthorDate: Tue Sep 27 13:29:09 2022 +0800 Exclude abseil from API definition --- cpp/examples/ExampleProducerWithAsync.cpp | 20 +++++++-------- cpp/include/rocketmq/Message.h | 29 +++++++--------------- cpp/source/base/Message.cpp | 5 +++- cpp/source/base/MessageExt.cpp | 6 +++-- cpp/source/base/tests/MessageBuilderTest.cpp | 4 +-- cpp/source/rocketmq/Producer.cpp | 2 +- cpp/source/rocketmq/ProducerImpl.cpp | 37 ++++++++++++++-------------- cpp/source/trace/TracingUtility.cpp | 8 +++--- 8 files changed, 51 insertions(+), 60 deletions(-) diff --git a/cpp/examples/ExampleProducerWithAsync.cpp b/cpp/examples/ExampleProducerWithAsync.cpp index 3a453a9..ac4610c 100644 --- a/cpp/examples/ExampleProducerWithAsync.cpp +++ b/cpp/examples/ExampleProducerWithAsync.cpp @@ -24,8 +24,6 @@ #include <string> #include <system_error> -#include "absl/base/thread_annotations.h" -#include "absl/synchronization/mutex.h" #include "gflags/gflags.h" #include "rocketmq/Message.h" #include "rocketmq/Producer.h" @@ -43,32 +41,32 @@ public: /** * @brief Acquire a permit. */ - void acquire() LOCKS_EXCLUDED(mtx_) { + void acquire() { while (true) { - absl::MutexLock lk(&mtx_); + std::unique_lock<std::mutex> lk(mtx_); if (permits_ > 0) { permits_--; return; } - cv_.Wait(&mtx_); + cv_.wait(lk, [this]() { return permits_ > 0; }); } } /** * @brief Release the permit back to semaphore. */ - void release() LOCKS_EXCLUDED(mtx_) { - absl::MutexLock lk(&mtx_); + void release() { + std::unique_lock<std::mutex> lk(mtx_); permits_++; if (1 == permits_) { - cv_.Signal(); + cv_.notify_one(); } } private: std::size_t permits_{0}; - absl::Mutex mtx_; - absl::CondVar cv_; + std::mutex mtx_; + std::condition_variable cv_; }; const std::string& alphaNumeric() { @@ -142,7 +140,7 @@ int main(int argc, char* argv[]) { std::mutex mtx; std::condition_variable cv; - auto semaphore = absl::make_unique<Semaphore>(FLAGS_concurrency); + std::unique_ptr<Semaphore> semaphore(new Semaphore(FLAGS_concurrency)); auto send_callback = [&](const std::error_code& ec, const SendReceipt& receipt) { std::unique_lock<std::mutex> lk(mtx); diff --git a/cpp/include/rocketmq/Message.h b/cpp/include/rocketmq/Message.h index 73336dc..3a917b1 100644 --- a/cpp/include/rocketmq/Message.h +++ b/cpp/include/rocketmq/Message.h @@ -26,7 +26,6 @@ #include <vector> #include "RocketMQ.h" -#include "absl/types/optional.h" ROCKETMQ_NAMESPACE_BEGIN @@ -61,25 +60,19 @@ public: return topic_; } - absl::optional<std::string> tag() const { - if (tag_.empty()) { - return {}; - } - return absl::make_optional(tag_); + const std::string& tag() const { + return tag_; } const std::vector<std::string>& keys() const { return keys_; } - absl::optional<std::string> traceContext() const { - if (trace_context_.empty()) { - return {}; - } - return absl::make_optional(trace_context_); + const std::string& traceContext() const { + return trace_context_; } - void traceContext(std::string &&trace_context) { + void traceContext(std::string&& trace_context) { trace_context_ = std::move(trace_context); } @@ -91,7 +84,7 @@ public: return born_time_; } - absl::optional<std::chrono::system_clock::time_point> deliveryTimestamp() const { + std::chrono::system_clock::time_point deliveryTimestamp() const { return delivery_timestamp_; } @@ -103,11 +96,8 @@ public: return properties_; } - absl::optional<std::string> group() const { - if (group_.empty()) { - return {}; - } - return absl::make_optional(group_); + const std::string& group() const { + return group_; } const Extension& extension() const { @@ -121,7 +111,6 @@ public: static MessageBuilder newBuilder(); protected: - friend std::unique_ptr<Message> absl::make_unique<Message>(); friend class MessageBuilder; Message(); @@ -134,7 +123,7 @@ private: std::string trace_context_; std::string born_host_; std::chrono::system_clock::time_point born_time_{std::chrono::system_clock::now()}; - absl::optional<std::chrono::system_clock::time_point> delivery_timestamp_; + std::chrono::system_clock::time_point delivery_timestamp_; std::string body_; std::unordered_map<std::string, std::string> properties_; std::string group_; diff --git a/cpp/source/base/Message.cpp b/cpp/source/base/Message.cpp index e89af54..cd3cc23 100644 --- a/cpp/source/base/Message.cpp +++ b/cpp/source/base/Message.cpp @@ -16,6 +16,9 @@ */ #include "rocketmq/Message.h" +#include <chrono> +#include <memory> + #include "UniqueIdGenerator.h" #include "absl/memory/memory.h" @@ -29,7 +32,7 @@ MessageBuilder Message::newBuilder() { return {}; } -MessageBuilder::MessageBuilder() : message_(absl::make_unique<Message>()) { +MessageBuilder::MessageBuilder() : message_(new Message()) { } MessageBuilder& MessageBuilder::withTopic(std::string topic) { diff --git a/cpp/source/base/MessageExt.cpp b/cpp/source/base/MessageExt.cpp index 5477df7..c2b5e4a 100644 --- a/cpp/source/base/MessageExt.cpp +++ b/cpp/source/base/MessageExt.cpp @@ -16,14 +16,16 @@ */ #include "MessageExt.h" +#include <chrono> + ROCKETMQ_NAMESPACE_BEGIN rmq::MessageType typeOf(const Message& message) { - if (message.group().has_value()) { + if (!message.group().empty()) { return rmq::MessageType::FIFO; } - if (message.deliveryTimestamp().has_value()) { + if (message.deliveryTimestamp().time_since_epoch().count()) { return rmq::MessageType::DELAY; } diff --git a/cpp/source/base/tests/MessageBuilderTest.cpp b/cpp/source/base/tests/MessageBuilderTest.cpp index 9ad4d58..e08bca5 100644 --- a/cpp/source/base/tests/MessageBuilderTest.cpp +++ b/cpp/source/base/tests/MessageBuilderTest.cpp @@ -38,7 +38,7 @@ TEST_F(MessageBuilderTest, testBuilder) { ASSERT_EQ(tag_, message->tag()); ASSERT_TRUE(keys_ == message->keys()); ASSERT_EQ(body_, message->body()); - ASSERT_EQ(false, message->traceContext().has_value()); + ASSERT_EQ(true, message->traceContext().empty()); } TEST_F(MessageBuilderTest, testBuilder2) { @@ -49,7 +49,7 @@ TEST_F(MessageBuilderTest, testBuilder2) { ASSERT_EQ(tag_, message->tag()); ASSERT_TRUE(keys_ == message->keys()); ASSERT_EQ(body_, message->body()); - ASSERT_EQ(false, message->traceContext().has_value()); + ASSERT_EQ(true, message->traceContext().empty()); } } diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp index 147fc8b..49b9f71 100644 --- a/cpp/source/rocketmq/Producer.cpp +++ b/cpp/source/rocketmq/Producer.cpp @@ -54,7 +54,7 @@ void Producer::send(MessageConstPtr message, const SendCallback& callback) noexc return; } - if (message->group().has_value()) { + if (!message->group().empty()) { SendReceipt empty; std::error_code ec = ErrorCode::BadRequestAsyncPubFifoMessage; callback(ec, empty); diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 7ad127c..e02cd1a 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -124,8 +124,8 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq auto system_properties = msg->mutable_system_properties(); // Handle Tag - if (message.tag().has_value()) { - system_properties->set_tag(message.tag().value()); + if (!message.tag().empty()) { + system_properties->set_tag(message.tag()); } // Handle Key @@ -135,16 +135,16 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq } // TraceContext - if (message.traceContext().has_value()) { - const auto& trace_context = message.traceContext().value(); + if (!message.traceContext().empty()) { + const auto& trace_context = message.traceContext(); if (!trace_context.empty()) { system_properties->set_trace_context(trace_context); } } // Delivery Timestamp - if (message.deliveryTimestamp().has_value()) { - auto delivery_timestamp = message.deliveryTimestamp().value(); + if (message.deliveryTimestamp().time_since_epoch().count()) { + auto delivery_timestamp = message.deliveryTimestamp(); if (delivery_timestamp.time_since_epoch().count()) { auto duration = delivery_timestamp.time_since_epoch(); system_properties->set_delivery_attempt(std::chrono::duration_cast<std::chrono::milliseconds>(duration).count()); @@ -159,9 +159,9 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq system_properties->set_born_host(UtilAll::hostname()); - if (message.deliveryTimestamp().has_value()) { + if (message.deliveryTimestamp().time_since_epoch().count()) { system_properties->set_message_type(rmq::MessageType::DELAY); - } else if (message.group().has_value()) { + } else if (!message.group().empty()) { system_properties->set_message_type(rmq::MessageType::FIFO); } else if (message.extension().transactional) { system_properties->set_message_type(rmq::MessageType::TRANSACTION); @@ -179,8 +179,8 @@ void ProducerImpl::wrapSendMessageRequest(const Message& message, SendMessageReq system_properties->set_body_encoding(rmq::Encoding::IDENTITY); } - if (message.group().has_value()) { - system_properties->set_message_group(message.group().value()); + if (!message.group().empty()) { + system_properties->set_message_group(message.group()); } system_properties->set_message_id(message.id()); @@ -301,9 +301,8 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) { { // Trace Send RPC - if (context->message_->traceContext().has_value() && client_config_.sampler_) { - auto span_context = - opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext().value()); + if (!context->message_->traceContext().empty() && client_config_.sampler_) { + auto span_context = opencensus::trace::propagation::FromTraceParentHeader(context->message_->traceContext()); auto span = opencensus::trace::Span::BlankSpan(); std::string span_name = resourceNamespace() + "/" + context->message_->topic() + " " + MixAll::SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION; @@ -320,9 +319,9 @@ void ProducerImpl::sendImpl(std::shared_ptr<SendContext> context) { TracingUtility::addUniversalSpanAttributes(*context->message_, config(), span); // Note: attempt-time is 0-based span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT, 1 + context->attempt_times_); - if (context->message_->deliveryTimestamp().has_value()) { + if (context->message_->deliveryTimestamp().time_since_epoch().count()) { span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP, - absl::FormatTime(absl::FromChrono(context->message_->deliveryTimestamp().value()))); + absl::FormatTime(absl::FromChrono(context->message_->deliveryTimestamp()))); } auto ptr = const_cast<Message*>(context->message_.get()); ptr->traceContext(opencensus::trace::propagation::ToTraceParentHeader(span.context())); @@ -463,15 +462,15 @@ void ProducerImpl::isolateEndpoint(const std::string& target) { void ProducerImpl::send(MessageConstPtr message, std::error_code& ec, Transaction& transaction) { MiniTransaction mini = {}; mini.topic = message->topic(); - mini.trace_context = message->traceContext().value_or(""); + mini.trace_context = message->traceContext(); - if (message->group().has_value()) { + if (!message->group().empty()) { ec = ErrorCode::MessagePropertyConflictWithType; SPDLOG_WARN("FIFO message may not be transactional"); return; } - if (message->deliveryTimestamp().has_value()) { + if (message->deliveryTimestamp().time_since_epoch().count()) { ec = ErrorCode::MessagePropertyConflictWithType; SPDLOG_WARN("Timed message may not be transactional"); return; @@ -564,7 +563,7 @@ void ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message) transaction.topic = message->topic(); transaction.message_id = message->id(); transaction.transaction_id = message->extension().transaction_id; - transaction.trace_context = message->traceContext().value_or(""); + transaction.trace_context = message->traceContext(); transaction.target = message->extension().target_endpoint; TransactionState state = transaction_checker_(*message); endTransaction0(transaction, state); diff --git a/cpp/source/trace/TracingUtility.cpp b/cpp/source/trace/TracingUtility.cpp index 4cd7a8d..7b4d8e5 100644 --- a/cpp/source/trace/TracingUtility.cpp +++ b/cpp/source/trace/TracingUtility.cpp @@ -27,8 +27,8 @@ void TracingUtility::addUniversalSpanAttributes(const Message& message, const Cl opencensus::trace::Span& span) { span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_ID, message.id()); span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES, message.body().length()); - if (message.tag().has_value()) { - span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, message.tag().value()); + if (!message.tag().empty()) { + span.AddAttribute(MixAll::SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG, message.tag()); } const std::vector<std::string>& keys = message.keys(); @@ -56,8 +56,8 @@ void TracingUtility::addUniversalSpanAttributes(const Message& message, const Cl break; } - if (message.deliveryTimestamp().has_value()) { - std::chrono::system_clock::time_point timestamp = message.deliveryTimestamp().value(); + if (message.deliveryTimestamp().time_since_epoch().count()) { + std::chrono::system_clock::time_point timestamp = message.deliveryTimestamp(); auto duration = absl::FromChrono(timestamp.time_since_epoch()); int64_t timestamp_millis = absl::ToInt64Milliseconds(duration); if (timestamp_millis > 0) {
