royguo commented on code in PR #3168:
URL: https://github.com/apache/brpc/pull/3168#discussion_r2601363703


##########
test/bthread_idle_unittest.cpp:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <bthread/bthread.h>
+#include <bthread/task_group.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <set>
+#include <mutex>
+
+namespace {
+
+// Mock context to simulate per-thread state (e.g., io_uring ring)
+struct MockWorkerContext {
+    int worker_id;
+    int poll_count;
+
+    MockWorkerContext() : worker_id(-1), poll_count(0) {}
+};
+
+// Thread-local storage to simulate "Share-nothing" architecture

Review Comment:
   fixed



##########
src/bthread/task_group.cpp:
##########
@@ -179,7 +213,24 @@ bool TaskGroup::wait_task(bthread_t* tid) {
         if (steal_task(tid)) {
             return true;
         }
-        _pl->wait(st);
+        // Instead of waiting for signal, we shall wake up if there's a user 
idle task here.
+        // To avoid the current task never wake and missed the user's idle 
task.
+        if (_worker_idle_cb) {
+            if (HandleIdleTask()) {
+                if (_rq.pop(tid)) {
+                    return true;
+                }
+                if (steal_task(tid)) {
+                    return true;
+                }
+            }
+            timespec wait_time;
+            wait_time.tv_sec = _worker_idle_timeout_us / 1000000;
+            wait_time.tv_nsec = (_worker_idle_timeout_us % 1000000) * 1000;
+            _pl->wait(st, &wait_time);

Review Comment:
   fixed



##########
src/bthread/task_group.cpp:
##########
@@ -167,7 +184,24 @@ bool TaskGroup::wait_task(bthread_t* tid) {
         if (_last_pl_state.stopped()) {
             return false;
         }
-        _pl->wait(_last_pl_state);
+        // Instead of waiting for signal, we shall wake up if there's a user 
idle task here.
+        // To avoid the current task never wake and missed the user's idle 
task.
+        if (_worker_idle_cb) {
+            if (HandleIdleTask()) {
+                if (_rq.pop(tid)) {
+                    return true;
+                }
+                if (steal_task(tid)) {
+                    return true;
+                }
+            }
+            timespec wait_time;
+            wait_time.tv_sec = _worker_idle_timeout_us / 1000000;
+            wait_time.tv_nsec = (_worker_idle_timeout_us % 1000000) * 1000;
+            _pl->wait(_last_pl_state, &wait_time);

Review Comment:
   fixed



##########
src/bthread/task_group.cpp:
##########
@@ -167,7 +184,24 @@ bool TaskGroup::wait_task(bthread_t* tid) {
         if (_last_pl_state.stopped()) {
             return false;
         }
-        _pl->wait(_last_pl_state);
+        // Instead of waiting for signal, we shall wake up if there's a user 
idle task here.
+        // To avoid the current task never wake and missed the user's idle 
task.
+        if (_worker_idle_cb) {
+            if (HandleIdleTask()) {
+                if (_rq.pop(tid)) {
+                    return true;
+                }
+                if (steal_task(tid)) {
+                    return true;
+                }
+            }

Review Comment:
   fixed



##########
src/bthread/task_group.cpp:
##########
@@ -78,6 +78,23 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, 
NULL);
 
 const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
 
+TaskGroup::OnWorkerIdleFn TaskGroup::_worker_idle_cb = NULL;
+void* TaskGroup::_worker_idle_ctx = NULL;
+uint64_t TaskGroup::_worker_idle_timeout_us = 1000;

Review Comment:
   fixed by using atomic flag and std::call_once



##########
test/bthread_idle_unittest.cpp:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <bthread/bthread.h>
+#include <bthread/task_group.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <set>
+#include <mutex>
+
+namespace {
+
+// Mock context to simulate per-thread state (e.g., io_uring ring)
+struct MockWorkerContext {
+    int worker_id;
+    int poll_count;
+
+    MockWorkerContext() : worker_id(-1), poll_count(0) {}
+};
+
+// Thread-local storage to simulate "Share-nothing" architecture
+// In a real scenario, this would hold the something like io_uring instance.

Review Comment:
   fixed



##########
src/bthread/task_group.cpp:
##########
@@ -78,6 +78,23 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, 
NULL);
 
 const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
 
+TaskGroup::OnWorkerIdleFn TaskGroup::_worker_idle_cb = NULL;
+void* TaskGroup::_worker_idle_ctx = NULL;
+uint64_t TaskGroup::_worker_idle_timeout_us = 1000;
+
+// Set the global static idle task, we can use thread local variables to 
distinct different
+// task group's target resource (for example, users' can init thread local 
iouring per task group).
+void TaskGroup::SetWorkerIdleCallback(OnWorkerIdleFn fn, void* user_ctx,
+                                      uint64_t timeout_us) {

Review Comment:
   fixed



##########
src/bthread/task_group.cpp:
##########
@@ -78,6 +78,23 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, 
NULL);
 
 const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
 
+TaskGroup::OnWorkerIdleFn TaskGroup::_worker_idle_cb = NULL;
+void* TaskGroup::_worker_idle_ctx = NULL;
+uint64_t TaskGroup::_worker_idle_timeout_us = 1000;
+
+// Set the global static idle task, we can use thread local variables to 
distinct different
+// task group's target resource (for example, users' can init thread local 
iouring per task group).

Review Comment:
   fixed



##########
src/bthread/task_group.h:
##########
@@ -115,6 +115,15 @@ class TaskGroup {
     static void sched_to(TaskGroup** pg, bthread_t next_tid);
     static void exchange(TaskGroup** pg, TaskMeta* next_meta);
 
+    typedef bool (*OnWorkerIdleFn)(void* user_ctx);
+    // Set a callback to run when a worker has no task to run.
+    // If the callback returns true, it means some work is done and the worker
+    // should check the runqueue again immediately.
+    // |timeout_us|: The timeout for waiting if the callback returns false.
+    //               0 means infinite wait (original behavior).

Review Comment:
   fixed, 0 is not acceptable



##########
src/bthread/task_group.cpp:
##########
@@ -78,6 +78,23 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, 
NULL);
 
 const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
 
+TaskGroup::OnWorkerIdleFn TaskGroup::_worker_idle_cb = NULL;
+void* TaskGroup::_worker_idle_ctx = NULL;
+uint64_t TaskGroup::_worker_idle_timeout_us = 1000;
+
+// Set the global static idle task, we can use thread local variables to 
distinct different
+// task group's target resource (for example, users' can init thread local 
iouring per task group).
+void TaskGroup::SetWorkerIdleCallback(OnWorkerIdleFn fn, void* user_ctx,
+                                      uint64_t timeout_us) {
+    _worker_idle_cb = fn;
+    _worker_idle_ctx = user_ctx;
+    _worker_idle_timeout_us = timeout_us;
+}
+
+bool TaskGroup::HandleIdleTask() {
+    return _worker_idle_cb(_worker_idle_ctx);

Review Comment:
   fixed



##########
test/bthread_idle_unittest.cpp:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <bthread/bthread.h>
+#include <bthread/task_group.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <set>
+#include <mutex>
+
+namespace {
+
+// Mock context to simulate per-thread state (e.g., io_uring ring)
+struct MockWorkerContext {
+    int worker_id;
+    int poll_count;
+
+    MockWorkerContext() : worker_id(-1), poll_count(0) {}
+};
+
+// Thread-local storage to simulate "Share-nothing" architecture
+// In a real scenario, this would hold the something like io_uring instance.
+static __thread MockWorkerContext* tls_context = nullptr;
+
+// Set to collect all unique worker IDs we've seen
+static std::set<int> observed_worker_ids;
+static std::mutex stats_mutex;
+
+// The idle callback function
+bool MockIdlePoller(void* global_ctx) {
+    if (!tls_context) {
+        tls_context = new MockWorkerContext();
+        // Use pthread_self or a counter to assign a unique ID
+        static std::atomic<int> global_worker_counter(0);
+        tls_context->worker_id = global_worker_counter.fetch_add(1);
+        
+        std::lock_guard<std::mutex> lock(stats_mutex);
+        observed_worker_ids.insert(tls_context->worker_id);
+        LOG(INFO) << "Worker thread " << pthread_self() << " initialized with 
ID " << tls_context->worker_id;
+    }
+
+    tls_context->poll_count++;
+    
+    // Simulate some work occasionally to wake up the worker immediately
+    // For this test, we mostly want to verify it runs and has correct context
+    if (tls_context->poll_count % 100 == 0) {
+        return true; // Pretend we found work
+    }
+
+    return false; // Sleep with timeout
+}
+
+class IdleCallbackTest : public ::testing::Test {
+protected:
+    void SetUp() override {
+        // Reset state
+        observed_worker_ids.clear();
+    }
+
+    void TearDown() override {
+        // Clean up global callback to avoid affecting other tests
+        bthread::TaskGroup::SetWorkerIdleCallback(nullptr, nullptr);
+    }
+};
+
+void* dummy_task(void* arg) {
+    bthread_usleep(1000); // Sleep 1ms to allow workers to go idle
+    return nullptr;
+}
+
+TEST_F(IdleCallbackTest, WorkerIsolationAndExecution) {
+    // 1. Set the idle callback with a short timeout (e.g., 1ms)
+    bthread::TaskGroup::SetWorkerIdleCallback(MockIdlePoller, nullptr, 1000);
+
+    // 2. Determine number of workers (concurrency)
+    int concurrency = bthread_getconcurrency();
+    LOG(INFO) << "Current concurrency: " << concurrency;
+
+    // 3. Create enough bthreads to ensure all workers are activated at least 
once
+    // but also give them time to become idle.
+    std::vector<bthread_t> tids;
+    for (int i = 0; i < concurrency * 2; ++i) {
+        bthread_t tid;
+        bthread_start_background(&tid, nullptr, dummy_task, nullptr);
+        tids.push_back(tid);
+    }
+
+    // 4. Wait for all tasks to complete
+    for (bthread_t tid : tids) {
+        bthread_join(tid, nullptr);
+    }
+
+    // 5. Sleep a bit to ensure all workers have had a chance to hit the idle 
loop
+    usleep(50 * 1000); // 50ms
+
+    // 6. Verify results
+    std::lock_guard<std::mutex> lock(stats_mutex);
+    LOG(INFO) << "Observed " << observed_worker_ids.size() << " unique worker 
contexts.";
+    
+    // We expect at least one worker to have initialized its context.
+    // In a highly concurrent test environment, usually most workers will 
initialize.
+    ASSERT_GT(observed_worker_ids.size(), 0);
+    
+    // Check that we saw different IDs if concurrency > 1 (though not strictly 
guaranteed 
+    // that ALL workers will run if the OS scheduler is quirky, but >1 is 
highly likely)
+    if (concurrency > 1) {
+         EXPECT_GT(observed_worker_ids.size(), 0);

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to