This is an automated email from the ASF dual-hosted git repository.

serverglen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 18bb4eb2 Support rvalue task in execution_queue_execute (#2308)
18bb4eb2 is described below

commit 18bb4eb2f5a74f13f64b6b3a29f961b6a161a3c5
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jul 10 13:04:26 2023 +0800

    Support rvalue task in execution_queue_execute (#2308)
---
 src/bthread/execution_queue.h             | 38 ++++++++++++++++-------
 src/bthread/execution_queue_inl.h         | 43 ++++++++++++++++++++++++--
 test/bthread_execution_queue_unittest.cpp | 51 +++++++++++++++++++++++++++++++
 3 files changed, 118 insertions(+), 14 deletions(-)

diff --git a/src/bthread/execution_queue.h b/src/bthread/execution_queue.h
index f225f6d0..7fc46be5 100644
--- a/src/bthread/execution_queue.h
+++ b/src/bthread/execution_queue.h
@@ -44,7 +44,7 @@ public:
     // more tasks and you can safely release all the related resources ever 
     // after.
     bool is_queue_stopped() const { return _is_stopped; }
-    operator bool() const;
+    explicit operator bool() const;
 protected:
     TaskIteratorBase(TaskNode* head, ExecutionQueueBase* queue,
                      bool is_stopped, bool high_priority)
@@ -120,7 +120,7 @@ struct TaskOptions {
     // If |in_place_if_possible| is true, execution_queue_execute would call 
     // execute immediately instead of starting a bthread if possible
     //
-    // Note: Running callbacks in place might cause the dead lock issue, you
+    // Note: Running callbacks in place might cause the deadlock issue, you
     // should be very careful turning this flag on.
     //
     // Default: false
@@ -151,10 +151,10 @@ struct ExecutionQueueOptions {
     Executor * executor;
 };
 
-// Start a ExecutionQueue. If |options| is NULL, the queue will be created with
+// Start an ExecutionQueue. If |options| is NULL, the queue will be created 
with
 // the default options. 
 // Returns 0 on success, errno otherwise
-// NOTE: type |T| can be non-POD but must be copy-constructible
+// NOTE: type |T| can be non-POD but must be copy-constructive
 template <typename T>
 int execution_queue_start(
         ExecutionQueueId<T>* id, 
@@ -168,17 +168,17 @@ int execution_queue_start(
 //  - The executor will call |execute| with TaskIterator::is_queue_stopped() 
being 
 //    true exactly once when all the pending tasks have been executed, and 
after
 //    this point it's ok to release the resource referenced by |meta|.
-// Returns 0 on success, errno othrwise
+// Returns 0 on success, errno otherwise.
 template <typename T>
 int execution_queue_stop(ExecutionQueueId<T> id);
 
-// Wait until the the stop task (Iterator::is_queue_stopped() returns true) has
+// Wait until the stop task (Iterator::is_queue_stopped() returns true) has
 // been executed
 template <typename T>
 int execution_queue_join(ExecutionQueueId<T> id);
 
 // Thread-safe and Wait-free.
-// Execute a task with defaut TaskOptions (normal task);
+// Execute a task with default TaskOptions (normal task);
 template <typename T>
 int execution_queue_execute(ExecutionQueueId<T> id, 
                             typename butil::add_const_reference<T>::type task);
@@ -187,7 +187,7 @@ int execution_queue_execute(ExecutionQueueId<T> id,
 // Execute a task with options. e.g
 // bthread::execution_queue_execute(queue, task, &bthread::TASK_OPTIONS_URGENT)
 // If |options| is NULL, we will use default options (normal task)
-// If |handle| is not NULL, we will assign it with the hanlder of this task.
+// If |handle| is not NULL, we will assign it with the handler of this task.
 template <typename T>
 int execution_queue_execute(ExecutionQueueId<T> id, 
                             typename butil::add_const_reference<T>::type task,
@@ -198,6 +198,22 @@ int execution_queue_execute(ExecutionQueueId<T> id,
                             const TaskOptions* options,
                             TaskHandle* handle);
 
+
+template <typename T>
+int execution_queue_execute(ExecutionQueueId<T> id,
+                            T&& task);
+
+template <typename T>
+int execution_queue_execute(ExecutionQueueId<T> id,
+                            T&& task,
+                            const TaskOptions* options);
+
+template <typename T>
+int execution_queue_execute(ExecutionQueueId<T> id,
+                            T&& task,
+                            const TaskOptions* options,
+                            TaskHandle* handle);
+
 // [Thread safe and ABA free] Cancel the corresponding task.
 // Returns:
 //  -1: The task was executed or h is an invalid handle
@@ -210,15 +226,15 @@ int execution_queue_cancel(const TaskHandle& h);
 // ExecutionQueue
 //
 // |execution_queue_execute| internally fetches a reference of ExecutionQueue 
at
-// the begining and releases it at the end, which makes 2 additional cache
+// the beginning and releases it at the end, which makes 2 additional cache
 // updates. In some critical situation where the overhead of
 // execution_queue_execute matters, you can avoid this by addressing the 
-// reference at the begining of every producer, and execute tasks execatly 
+// reference at the beginning of every producer, and execute tasks execatly
 // through the reference instead of id.
 //
 // Note: It makes |execution_queue_stop| a little complicated in the user 
level,
 // as we don't pass the `stop task' to |execute| until no one holds any 
reference.
-// If you are not sure about the ownership of the return value (which releasees
+// If you are not sure about the ownership of the return value (which releases
 // the reference of the very ExecutionQueue in the destructor) and don't that
 // care the overhead of ExecutionQueue, DON'T use this function
 template <typename T>
diff --git a/src/bthread/execution_queue_inl.h 
b/src/bthread/execution_queue_inl.h
index c67cf659..b932cbc9 100644
--- a/src/bthread/execution_queue_inl.h
+++ b/src/bthread/execution_queue_inl.h
@@ -290,6 +290,16 @@ public:
 
     int execute(typename butil::add_const_reference<T>::type task,
                 const TaskOptions* options, TaskHandle* handle) {
+        return execute(std::forward<T>(const_cast<T&>(task)), options, handle);
+    }
+
+
+    int execute(T&& task) {
+        return execute(std::forward<T>(task), NULL, NULL);
+    }
+
+    int execute(T&& task,
+                const TaskOptions* options, TaskHandle* handle) {
         if (stopped()) {
             return EINVAL;
         }
@@ -302,7 +312,7 @@ public:
             return_task_node(node);
             return ENOMEM;
         }
-        new (mem) T(task);
+        new (mem) T(std::forward<T>(task));
         node->stop_task = false;
         TaskOptions opt;
         if (options) {
@@ -356,7 +366,7 @@ inline int execution_queue_execute(ExecutionQueueId<T> id,
                        typename butil::add_const_reference<T>::type task,
                        const TaskOptions* options,
                        TaskHandle* handle) {
-    typename ExecutionQueue<T>::scoped_ptr_t 
+    typename ExecutionQueue<T>::scoped_ptr_t
         ptr = ExecutionQueue<T>::address(id);
     if (ptr != NULL) {
         return ptr->execute(task, options, handle);
@@ -365,6 +375,33 @@ inline int execution_queue_execute(ExecutionQueueId<T> id,
     }
 }
 
+template <typename T>
+inline int execution_queue_execute(ExecutionQueueId<T> id,
+                                   T&& task) {
+    return execution_queue_execute(id, std::forward<T>(task), NULL);
+}
+
+template <typename T>
+inline int execution_queue_execute(ExecutionQueueId<T> id,
+                                   T&& task,
+                                   const TaskOptions* options) {
+    return execution_queue_execute(id, std::forward<T>(task), options, NULL);
+}
+
+template <typename T>
+inline int execution_queue_execute(ExecutionQueueId<T> id,
+                                   T&& task,
+                                   const TaskOptions* options,
+                                   TaskHandle* handle) {
+    typename ExecutionQueue<T>::scoped_ptr_t
+            ptr = ExecutionQueue<T>::address(id);
+    if (ptr != NULL) {
+        return ptr->execute(std::forward<T>(task), options, handle);
+    } else {
+        return EINVAL;
+    }
+}
+
 template <typename T>
 inline int execution_queue_stop(ExecutionQueueId<T> id) {
     typename ExecutionQueue<T>::scoped_ptr_t 
@@ -518,7 +555,7 @@ inline int ExecutionQueueBase::dereference() {
                         butil::memory_order_acquire,
                         butil::memory_order_relaxed)) {
                 _on_recycle();
-                // We don't return m immediatly when the reference count
+                // We don't return m immediately when the reference count
                 // reaches 0 as there might be in processing tasks. Instead
                 // _on_recycle would push a `stop_task' after which is executed
                 // m would be finally returned and reset
diff --git a/test/bthread_execution_queue_unittest.cpp 
b/test/bthread_execution_queue_unittest.cpp
index 8fb810d9..627d27c8 100644
--- a/test/bthread_execution_queue_unittest.cpp
+++ b/test/bthread_execution_queue_unittest.cpp
@@ -75,6 +75,57 @@ TEST_F(ExecutionQueueTest, single_thread) {
     ASSERT_TRUE(stopped);
 }
 
+class RValue {
+public:
+    RValue() : _value(0) {}
+    explicit RValue(int v) : _value(v) {}
+    RValue(RValue&& rhs)  noexcept : _value(rhs._value) {}
+    RValue& operator=(RValue&& rhs) noexcept {
+        if (this != &rhs) {
+            _value = rhs._value;
+        }
+        return *this;
+    }
+
+    DISALLOW_COPY_AND_ASSIGN(RValue);
+
+    int value() const { return _value; }
+
+
+private:
+    int _value;
+};
+
+int add(void* meta, bthread::TaskIterator<RValue> &iter) {
+    stopped = iter.is_queue_stopped();
+    int* result = (int*)meta;
+    for (; iter; ++iter) {
+        *result += iter->value();
+    }
+    return 0;
+}
+
+TEST_F(ExecutionQueueTest, rvalue) {
+    int64_t result = 0;
+    int64_t expected_result = 0;
+    stopped = false;
+    bthread::ExecutionQueueId<RValue> queue_id;
+    bthread::ExecutionQueueOptions options;
+    ASSERT_EQ(0, bthread::execution_queue_start(&queue_id, &options,
+                                                    add, &result));
+    for (int i = 0; i < 100; ++i) {
+        expected_result += i;
+        RValue v(i);
+        ASSERT_EQ(0, bthread::execution_queue_execute(queue_id, std::move(v)));
+    }
+    LOG(INFO) << "stop";
+    ASSERT_EQ(0, bthread::execution_queue_stop(queue_id));
+    ASSERT_NE(0, bthread::execution_queue_execute(queue_id, RValue(0)));
+    ASSERT_EQ(0, bthread::execution_queue_join(queue_id));
+    ASSERT_EQ(expected_result, result);
+    ASSERT_TRUE(stopped);
+}
+
 struct PushArg {
     bthread::ExecutionQueueId<LongIntTask> id {0};
     butil::atomic<int64_t> total_num {0};


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to