This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new da39ef79 fix #521 sync state of consume-message-service according to
start/shutdown lifecycle management (#618)
da39ef79 is described below
commit da39ef79838d56a1880083ce8e3fe7bd6660b94f
Author: Zhanhui Li <[email protected]>
AuthorDate: Sun Oct 8 20:42:44 2023 +0800
fix #521 sync state of consume-message-service according to start/shutdown
lifecycle management (#618)
Signed-off-by: Li Zhanhui <[email protected]>
---
cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp | 16 ++++++---
.../rocketmq/include/ConsumeMessageServiceImpl.h | 41 +++++++++++++---------
cpp/source/rocketmq/tests/BUILD.bazel | 8 +++++
.../rocketmq/tests/ConsumeMessageServiceTest.cpp | 38 ++++++++++++++++++++
4 files changed, 83 insertions(+), 20 deletions(-)
diff --git a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
index 11e14ce3..2b5e7c70 100644
--- a/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
+++ b/cpp/source/rocketmq/ConsumeMessageServiceImpl.cpp
@@ -16,14 +16,16 @@
*/
#include "ConsumeMessageServiceImpl.h"
+#include <atomic>
+
#include "ConsumeStats.h"
#include "ConsumeTask.h"
-#include "rocketmq/Logger.h"
-#include "spdlog/spdlog.h"
#include "PushConsumerImpl.h"
#include "Tag.h"
#include "ThreadPoolImpl.h"
#include "rocketmq/ErrorCode.h"
+#include "rocketmq/Logger.h"
+#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
@@ -41,16 +43,22 @@ void ConsumeMessageServiceImpl::start() {
State expected = State::CREATED;
if (state_.compare_exchange_strong(expected, State::STARTING,
std::memory_order_relaxed)) {
pool_->start();
+ state_.store(State::STARTED, std::memory_order_relaxed);
}
}
void ConsumeMessageServiceImpl::shutdown() {
- State expected = State::STOPPING;
- if (state_.compare_exchange_strong(expected, State::STOPPED,
std::memory_order_relaxed)) {
+ State expected = State::STARTED;
+ if (state_.compare_exchange_strong(expected, State::STOPPING,
std::memory_order_relaxed)) {
pool_->shutdown();
+ state_.store(State::STOPPED, std::memory_order_relaxed);
}
}
+State ConsumeMessageServiceImpl::state() const {
+ return state_.load(std::memory_order_relaxed);
+}
+
void ConsumeMessageServiceImpl::dispatch(std::shared_ptr<ProcessQueue>
process_queue,
std::vector<MessageConstSharedPtr>
messages) {
auto consumer = consumer_.lock();
diff --git a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
index 91979d22..f2f54bbd 100644
--- a/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
+++ b/cpp/source/rocketmq/include/ConsumeMessageServiceImpl.h
@@ -30,49 +30,58 @@ ROCKETMQ_NAMESPACE_BEGIN
class PushConsumerImpl;
-class ConsumeMessageServiceImpl : public ConsumeMessageService,
- public
std::enable_shared_from_this<ConsumeMessageServiceImpl> {
+class ConsumeMessageServiceImpl
+ : public ConsumeMessageService,
+ public std::enable_shared_from_this<ConsumeMessageServiceImpl> {
public:
ConsumeMessageServiceImpl(std::weak_ptr<PushConsumerImpl> consumer,
- int thread_count,
- MessageListener message_listener);
+ int thread_count, MessageListener
message_listener);
~ConsumeMessageServiceImpl() override = default;
/**
* Make it noncopyable.
*/
- ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl& other) = delete;
- ConsumeMessageServiceImpl& operator=(const ConsumeMessageServiceImpl& other)
= delete;
+ ConsumeMessageServiceImpl(const ConsumeMessageServiceImpl &other) = delete;
+ ConsumeMessageServiceImpl &
+ operator=(const ConsumeMessageServiceImpl &other) = delete;
void start() override;
void shutdown() override;
- MessageListener& listener() override {
- return message_listener_;
- }
+ MessageListener &listener() override { return message_listener_; }
- bool preHandle(const Message& message) override;
+ bool preHandle(const Message &message) override;
- bool postHandle(const Message& message, ConsumeResult result) override;
+ bool postHandle(const Message &message, ConsumeResult result) override;
void submit(std::shared_ptr<ConsumeTask> task) override;
- void dispatch(std::shared_ptr<ProcessQueue> process_queue,
std::vector<MessageConstSharedPtr> messages) override;
+ void dispatch(std::shared_ptr<ProcessQueue> process_queue,
+ std::vector<MessageConstSharedPtr> messages) override;
- void ack(const Message& message, std::function<void(const std::error_code&)>
cb) override;
+ void ack(const Message &message,
+ std::function<void(const std::error_code &)> cb) override;
- void nack(const Message& message, std::function<void(const
std::error_code&)> cb) override;
+ void nack(const Message &message,
+ std::function<void(const std::error_code &)> cb) override;
- void forward(const Message& message, std::function<void(const
std::error_code&)> cb) override;
+ void forward(const Message &message,
+ std::function<void(const std::error_code &)> cb) override;
- void schedule(std::shared_ptr<ConsumeTask> task, std::chrono::milliseconds
delay) override;
+ void schedule(std::shared_ptr<ConsumeTask> task,
+ std::chrono::milliseconds delay) override;
std::size_t maxDeliveryAttempt() override;
std::weak_ptr<PushConsumerImpl> consumer() override;
+ /**
+ * Current state of the consume message service.
+ */
+ State state() const;
+
protected:
std::atomic<State> state_;
diff --git a/cpp/source/rocketmq/tests/BUILD.bazel
b/cpp/source/rocketmq/tests/BUILD.bazel
index 99f2fb2a..a8d10e92 100644
--- a/cpp/source/rocketmq/tests/BUILD.bazel
+++ b/cpp/source/rocketmq/tests/BUILD.bazel
@@ -51,4 +51,12 @@ cc_test(
"StaticNameServerResolverTest.cpp",
],
deps = base_deps,
+)
+
+cc_test(
+ name = "consume_message_service_test",
+ srcs = [
+ "ConsumeMessageServiceTest.cpp",
+ ],
+ deps = base_deps,
)
\ No newline at end of file
diff --git a/cpp/source/rocketmq/tests/ConsumeMessageServiceTest.cpp
b/cpp/source/rocketmq/tests/ConsumeMessageServiceTest.cpp
new file mode 100644
index 00000000..0b858271
--- /dev/null
+++ b/cpp/source/rocketmq/tests/ConsumeMessageServiceTest.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <memory>
+
+#include "ConsumeMessageServiceImpl.h"
+#include "PushConsumerImpl.h"
+#include "gtest/gtest.h"
+#include "rocketmq/ConsumeResult.h"
+#include "rocketmq/RocketMQ.h"
+
+ROCKETMQ_NAMESPACE_BEGIN
+
+TEST(ConsumeMessageServiceTest, testLifecycle) {
+ auto listener = [](const Message&) { return ConsumeResult::SUCCESS; };
+ std::weak_ptr<PushConsumerImpl> consumer;
+ auto svc = std::make_shared<ConsumeMessageServiceImpl>(consumer, 2,
listener);
+ svc->start();
+ ASSERT_EQ(State::STARTED, svc->state());
+
+ svc->shutdown();
+ ASSERT_EQ(State::STOPPED, svc->state());
+}
+
+ROCKETMQ_NAMESPACE_END
\ No newline at end of file