This is an automated email from the ASF dual-hosted git repository.
serverglen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 18bb4eb2 Support rvalue task in execution_queue_execute (#2308)
18bb4eb2 is described below
commit 18bb4eb2f5a74f13f64b6b3a29f961b6a161a3c5
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jul 10 13:04:26 2023 +0800
Support rvalue task in execution_queue_execute (#2308)
---
src/bthread/execution_queue.h | 38 ++++++++++++++++-------
src/bthread/execution_queue_inl.h | 43 ++++++++++++++++++++++++--
test/bthread_execution_queue_unittest.cpp | 51 +++++++++++++++++++++++++++++++
3 files changed, 118 insertions(+), 14 deletions(-)
diff --git a/src/bthread/execution_queue.h b/src/bthread/execution_queue.h
index f225f6d0..7fc46be5 100644
--- a/src/bthread/execution_queue.h
+++ b/src/bthread/execution_queue.h
@@ -44,7 +44,7 @@ public:
// more tasks and you can safely release all the related resources ever
// after.
bool is_queue_stopped() const { return _is_stopped; }
- operator bool() const;
+ explicit operator bool() const;
protected:
TaskIteratorBase(TaskNode* head, ExecutionQueueBase* queue,
bool is_stopped, bool high_priority)
@@ -120,7 +120,7 @@ struct TaskOptions {
// If |in_place_if_possible| is true, execution_queue_execute would call
// execute immediately instead of starting a bthread if possible
//
- // Note: Running callbacks in place might cause the dead lock issue, you
+ // Note: Running callbacks in place might cause the deadlock issue, you
// should be very careful turning this flag on.
//
// Default: false
@@ -151,10 +151,10 @@ struct ExecutionQueueOptions {
Executor * executor;
};
-// Start a ExecutionQueue. If |options| is NULL, the queue will be created with
+// Start an ExecutionQueue. If |options| is NULL, the queue will be created
with
// the default options.
// Returns 0 on success, errno otherwise
-// NOTE: type |T| can be non-POD but must be copy-constructible
+// NOTE: type |T| can be non-POD but must be copy-constructive
template <typename T>
int execution_queue_start(
ExecutionQueueId<T>* id,
@@ -168,17 +168,17 @@ int execution_queue_start(
// - The executor will call |execute| with TaskIterator::is_queue_stopped()
being
// true exactly once when all the pending tasks have been executed, and
after
// this point it's ok to release the resource referenced by |meta|.
-// Returns 0 on success, errno othrwise
+// Returns 0 on success, errno otherwise.
template <typename T>
int execution_queue_stop(ExecutionQueueId<T> id);
-// Wait until the the stop task (Iterator::is_queue_stopped() returns true) has
+// Wait until the stop task (Iterator::is_queue_stopped() returns true) has
// been executed
template <typename T>
int execution_queue_join(ExecutionQueueId<T> id);
// Thread-safe and Wait-free.
-// Execute a task with defaut TaskOptions (normal task);
+// Execute a task with default TaskOptions (normal task);
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task);
@@ -187,7 +187,7 @@ int execution_queue_execute(ExecutionQueueId<T> id,
// Execute a task with options. e.g
// bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT)
// If |options| is NULL, we will use default options (normal task)
-// If |handle| is not NULL, we will assign it with the hanlder of this task.
+// If |handle| is not NULL, we will assign it with the handler of this task.
template <typename T>
int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
@@ -198,6 +198,22 @@ int execution_queue_execute(ExecutionQueueId<T> id,
const TaskOptions* options,
TaskHandle* handle);
+
+template <typename T>
+int execution_queue_execute(ExecutionQueueId<T> id,
+ T&& task);
+
+template <typename T>
+int execution_queue_execute(ExecutionQueueId<T> id,
+ T&& task,
+ const TaskOptions* options);
+
+template <typename T>
+int execution_queue_execute(ExecutionQueueId<T> id,
+ T&& task,
+ const TaskOptions* options,
+ TaskHandle* handle);
+
// [Thread safe and ABA free] Cancel the corresponding task.
// Returns:
// -1: The task was executed or h is an invalid handle
@@ -210,15 +226,15 @@ int execution_queue_cancel(const TaskHandle& h);
// ExecutionQueue
//
// |execution_queue_execute| internally fetches a reference of ExecutionQueue
at
-// the begining and releases it at the end, which makes 2 additional cache
+// the beginning and releases it at the end, which makes 2 additional cache
// updates. In some critical situation where the overhead of
// execution_queue_execute matters, you can avoid this by addressing the
-// reference at the begining of every producer, and execute tasks execatly
+// reference at the beginning of every producer, and execute tasks execatly
// through the reference instead of id.
//
// Note: It makes |execution_queue_stop| a little complicated in the user
level,
// as we don't pass the `stop task' to |execute| until no one holds any
reference.
-// If you are not sure about the ownership of the return value (which releasees
+// If you are not sure about the ownership of the return value (which releases
// the reference of the very ExecutionQueue in the destructor) and don't that
// care the overhead of ExecutionQueue, DON'T use this function
template <typename T>
diff --git a/src/bthread/execution_queue_inl.h
b/src/bthread/execution_queue_inl.h
index c67cf659..b932cbc9 100644
--- a/src/bthread/execution_queue_inl.h
+++ b/src/bthread/execution_queue_inl.h
@@ -290,6 +290,16 @@ public:
int execute(typename butil::add_const_reference<T>::type task,
const TaskOptions* options, TaskHandle* handle) {
+ return execute(std::forward<T>(const_cast<T&>(task)), options, handle);
+ }
+
+
+ int execute(T&& task) {
+ return execute(std::forward<T>(task), NULL, NULL);
+ }
+
+ int execute(T&& task,
+ const TaskOptions* options, TaskHandle* handle) {
if (stopped()) {
return EINVAL;
}
@@ -302,7 +312,7 @@ public:
return_task_node(node);
return ENOMEM;
}
- new (mem) T(task);
+ new (mem) T(std::forward<T>(task));
node->stop_task = false;
TaskOptions opt;
if (options) {
@@ -356,7 +366,7 @@ inline int execution_queue_execute(ExecutionQueueId<T> id,
typename butil::add_const_reference<T>::type task,
const TaskOptions* options,
TaskHandle* handle) {
- typename ExecutionQueue<T>::scoped_ptr_t
+ typename ExecutionQueue<T>::scoped_ptr_t
ptr = ExecutionQueue<T>::address(id);
if (ptr != NULL) {
return ptr->execute(task, options, handle);
@@ -365,6 +375,33 @@ inline int execution_queue_execute(ExecutionQueueId<T> id,
}
}
+template <typename T>
+inline int execution_queue_execute(ExecutionQueueId<T> id,
+ T&& task) {
+ return execution_queue_execute(id, std::forward<T>(task), NULL);
+}
+
+template <typename T>
+inline int execution_queue_execute(ExecutionQueueId<T> id,
+ T&& task,
+ const TaskOptions* options) {
+ return execution_queue_execute(id, std::forward<T>(task), options, NULL);
+}
+
+template <typename T>
+inline int execution_queue_execute(ExecutionQueueId<T> id,
+ T&& task,
+ const TaskOptions* options,
+ TaskHandle* handle) {
+ typename ExecutionQueue<T>::scoped_ptr_t
+ ptr = ExecutionQueue<T>::address(id);
+ if (ptr != NULL) {
+ return ptr->execute(std::forward<T>(task), options, handle);
+ } else {
+ return EINVAL;
+ }
+}
+
template <typename T>
inline int execution_queue_stop(ExecutionQueueId<T> id) {
typename ExecutionQueue<T>::scoped_ptr_t
@@ -518,7 +555,7 @@ inline int ExecutionQueueBase::dereference() {
butil::memory_order_acquire,
butil::memory_order_relaxed)) {
_on_recycle();
- // We don't return m immediatly when the reference count
+ // We don't return m immediately when the reference count
// reaches 0 as there might be in processing tasks. Instead
// _on_recycle would push a `stop_task' after which is executed
// m would be finally returned and reset
diff --git a/test/bthread_execution_queue_unittest.cpp
b/test/bthread_execution_queue_unittest.cpp
index 8fb810d9..627d27c8 100644
--- a/test/bthread_execution_queue_unittest.cpp
+++ b/test/bthread_execution_queue_unittest.cpp
@@ -75,6 +75,57 @@ TEST_F(ExecutionQueueTest, single_thread) {
ASSERT_TRUE(stopped);
}
+class RValue {
+public:
+ RValue() : _value(0) {}
+ explicit RValue(int v) : _value(v) {}
+ RValue(RValue&& rhs) noexcept : _value(rhs._value) {}
+ RValue& operator=(RValue&& rhs) noexcept {
+ if (this != &rhs) {
+ _value = rhs._value;
+ }
+ return *this;
+ }
+
+ DISALLOW_COPY_AND_ASSIGN(RValue);
+
+ int value() const { return _value; }
+
+
+private:
+ int _value;
+};
+
+int add(void* meta, bthread::TaskIterator<RValue> &iter) {
+ stopped = iter.is_queue_stopped();
+ int* result = (int*)meta;
+ for (; iter; ++iter) {
+ *result += iter->value();
+ }
+ return 0;
+}
+
+TEST_F(ExecutionQueueTest, rvalue) {
+ int64_t result = 0;
+ int64_t expected_result = 0;
+ stopped = false;
+ bthread::ExecutionQueueId<RValue> queue_id;
+ bthread::ExecutionQueueOptions options;
+ ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
+ add, &result));
+ for (int i = 0; i < 100; ++i) {
+ expected_result += i;
+ RValue v(i);
+ ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, std::move(v)));
+ }
+ LOG(INFO) << "stop";
+ ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
+ ASSERT_NE(0, bthread::execution_queue_execute(queue_id, RValue(0)));
+ ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
+ ASSERT_EQ(expected_result, result);
+ ASSERT_TRUE(stopped);
+}
+
struct PushArg {
bthread::ExecutionQueueId<LongIntTask> id {0};
butil::atomic<int64_t> total_num {0};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]