Copilot commented on code in PR #3168:
URL: https://github.com/apache/brpc/pull/3168#discussion_r2601105662


##########
test/bthread_idle_unittest.cpp:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <bthread/bthread.h>
+#include <bthread/task_group.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <set>
+#include <mutex>
+
+namespace {
+
+// Mock context to simulate per-thread state (e.g., io_uring ring)
+struct MockWorkerContext {
+    int worker_id;
+    int poll_count;
+
+    MockWorkerContext() : worker_id(-1), poll_count(0) {}
+};
+
+// Thread-local storage to simulate "Share-nothing" architecture

Review Comment:
   The comment says "Share-nothing" but the correct term is "shared-nothing" 
(with a hyphen). "Shared-nothing architecture" is the standard terminology in 
distributed systems and parallel computing.
   ```suggestion
   // Thread-local storage to simulate "shared-nothing" architecture
   ```



##########
test/bthread_idle_unittest.cpp:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <bthread/bthread.h>
+#include <bthread/task_group.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <set>
+#include <mutex>
+
+namespace {
+
+// Mock context to simulate per-thread state (e.g., io_uring ring)
+struct MockWorkerContext {
+    int worker_id;
+    int poll_count;
+
+    MockWorkerContext() : worker_id(-1), poll_count(0) {}
+};
+
+// Thread-local storage to simulate "Share-nothing" architecture
+// In a real scenario, this would hold the something like io_uring instance.

Review Comment:
   Grammar issue in the comment: "would hold the something like io_uring 
instance" should be "would hold something like an io_uring instance" (remove 
"the" and add "an").
   ```suggestion
   // In a real scenario, this would hold something like an io_uring instance.
   ```



##########
src/bthread/task_group.cpp:
##########
@@ -78,6 +78,23 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, 
NULL);
 
 const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
 
+TaskGroup::OnWorkerIdleFn TaskGroup::_worker_idle_cb = NULL;
+void* TaskGroup::_worker_idle_ctx = NULL;
+uint64_t TaskGroup::_worker_idle_timeout_us = 1000;
+
+// Set the global static idle task, we can use thread local variables to 
distinct different
+// task group's target resource (for example, users' can init thread local 
iouring per task group).

Review Comment:
   Grammar issues in the comment: "users' can init" should be "users can init" 
(remove apostrophe), and "thread local" should be hyphenated as "thread-local" 
when used as an adjective.
   ```suggestion
   // Set the global static idle task, we can use thread-local variables to 
distinct different
   // task group's target resource (for example, users can init thread-local 
iouring per task group).
   ```



##########
src/bthread/task_group.cpp:
##########
@@ -167,7 +184,24 @@ bool TaskGroup::wait_task(bthread_t* tid) {
         if (_last_pl_state.stopped()) {
             return false;
         }
-        _pl->wait(_last_pl_state);
+        // Instead of waiting for signal, we shall wake up if there's a user 
idle task here.
+        // To avoid the current task never wake and missed the user's idle 
task.
+        if (_worker_idle_cb) {
+            if (HandleIdleTask()) {
+                if (_rq.pop(tid)) {
+                    return true;
+                }
+                if (steal_task(tid)) {
+                    return true;
+                }
+            }
+            timespec wait_time;
+            wait_time.tv_sec = _worker_idle_timeout_us / 1000000;
+            wait_time.tv_nsec = (_worker_idle_timeout_us % 1000000) * 1000;
+            _pl->wait(_last_pl_state, &wait_time);

Review Comment:
   The timeout conversion logic has a potential issue. If 
`_worker_idle_timeout_us` is 0, the comment in the header states it means 
"infinite wait (original behavior)", but the current code will set both 
`tv_sec` and `tv_nsec` to 0, which is not an infinite wait - it's an immediate 
timeout. This contradicts the documented behavior.
   
   The logic should handle the 0 case explicitly to pass `NULL` to `wait()` for 
infinite wait, or update the documentation to clarify that 0 is not supported 
for infinite wait.



##########
src/bthread/task_group.h:
##########
@@ -115,6 +115,15 @@ class TaskGroup {
     static void sched_to(TaskGroup** pg, bthread_t next_tid);
     static void exchange(TaskGroup** pg, TaskMeta* next_meta);
 
+    typedef bool (*OnWorkerIdleFn)(void* user_ctx);
+    // Set a callback to run when a worker has no task to run.
+    // If the callback returns true, it means some work is done and the worker
+    // should check the runqueue again immediately.
+    // |timeout_us|: The timeout for waiting if the callback returns false.
+    //               0 means infinite wait (original behavior).
+    static void SetWorkerIdleCallback(OnWorkerIdleFn fn, void* user_ctx,
+                                      uint64_t timeout_us = 1000);

Review Comment:
   The `SetWorkerIdleCallback` function is a global setter that affects all 
TaskGroups, but it's not thread-safe and provides no mechanism to prevent 
concurrent modifications. This API design makes it difficult to use safely in 
multi-threaded environments where different components might want to register 
or unregister callbacks.
   
   Consider:
   1. Making this a per-TaskGroup configuration rather than global static
   2. If it must be global, add thread-safety guarantees and document the 
thread-safety requirements
   3. Provide a mechanism to atomically swap callbacks or use a registration 
pattern that prevents races



##########
test/bthread_idle_unittest.cpp:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <bthread/bthread.h>
+#include <bthread/task_group.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <set>
+#include <mutex>
+
+namespace {
+
+// Mock context to simulate per-thread state (e.g., io_uring ring)
+struct MockWorkerContext {
+    int worker_id;
+    int poll_count;
+
+    MockWorkerContext() : worker_id(-1), poll_count(0) {}
+};
+
+// Thread-local storage to simulate "Share-nothing" architecture
+// In a real scenario, this would hold the something like io_uring instance.
+static __thread MockWorkerContext* tls_context = nullptr;
+
+// Set to collect all unique worker IDs we've seen
+static std::set<int> observed_worker_ids;
+static std::mutex stats_mutex;
+
+// The idle callback function
+bool MockIdlePoller(void* global_ctx) {
+    if (!tls_context) {
+        tls_context = new MockWorkerContext();
+        // Use pthread_self or a counter to assign a unique ID
+        static std::atomic<int> global_worker_counter(0);
+        tls_context->worker_id = global_worker_counter.fetch_add(1);
+        
+        std::lock_guard<std::mutex> lock(stats_mutex);
+        observed_worker_ids.insert(tls_context->worker_id);
+        LOG(INFO) << "Worker thread " << pthread_self() << " initialized with 
ID " << tls_context->worker_id;
+    }
+
+    tls_context->poll_count++;
+    
+    // Simulate some work occasionally to wake up the worker immediately
+    // For this test, we mostly want to verify it runs and has correct context
+    if (tls_context->poll_count % 100 == 0) {
+        return true; // Pretend we found work
+    }
+
+    return false; // Sleep with timeout
+}
+
+class IdleCallbackTest : public ::testing::Test {
+protected:
+    void SetUp() override {
+        // Reset state
+        observed_worker_ids.clear();
+    }
+
+    void TearDown() override {
+        // Clean up global callback to avoid affecting other tests
+        bthread::TaskGroup::SetWorkerIdleCallback(nullptr, nullptr);
+    }
+};
+
+void* dummy_task(void* arg) {
+    bthread_usleep(1000); // Sleep 1ms to allow workers to go idle
+    return nullptr;
+}
+
+TEST_F(IdleCallbackTest, WorkerIsolationAndExecution) {
+    // 1. Set the idle callback with a short timeout (e.g., 1ms)
+    bthread::TaskGroup::SetWorkerIdleCallback(MockIdlePoller, nullptr, 1000);
+
+    // 2. Determine number of workers (concurrency)
+    int concurrency = bthread_getconcurrency();
+    LOG(INFO) << "Current concurrency: " << concurrency;
+
+    // 3. Create enough bthreads to ensure all workers are activated at least 
once
+    // but also give them time to become idle.
+    std::vector<bthread_t> tids;
+    for (int i = 0; i < concurrency * 2; ++i) {
+        bthread_t tid;
+        bthread_start_background(&tid, nullptr, dummy_task, nullptr);
+        tids.push_back(tid);
+    }
+
+    // 4. Wait for all tasks to complete
+    for (bthread_t tid : tids) {
+        bthread_join(tid, nullptr);
+    }
+
+    // 5. Sleep a bit to ensure all workers have had a chance to hit the idle 
loop
+    usleep(50 * 1000); // 50ms
+
+    // 6. Verify results
+    std::lock_guard<std::mutex> lock(stats_mutex);
+    LOG(INFO) << "Observed " << observed_worker_ids.size() << " unique worker 
contexts.";
+    
+    // We expect at least one worker to have initialized its context.
+    // In a highly concurrent test environment, usually most workers will 
initialize.
+    ASSERT_GT(observed_worker_ids.size(), 0);
+    
+    // Check that we saw different IDs if concurrency > 1 (though not strictly 
guaranteed 
+    // that ALL workers will run if the OS scheduler is quirky, but >1 is 
highly likely)
+    if (concurrency > 1) {
+         EXPECT_GT(observed_worker_ids.size(), 0);

Review Comment:
   The test assertion on line 122 is redundant with the assertion on line 117. 
Both check that `observed_worker_ids.size() > 0`. The second check doesn't add 
any value since if concurrency > 1 and the first assertion passed, this one 
will always pass too.
   
   This appears to be a copy-paste error. Either remove this redundant check or 
modify it to check for a meaningful condition like 
`EXPECT_GT(observed_worker_ids.size(), 1)` to verify that multiple workers 
actually ran when concurrency > 1.
   ```suggestion
            EXPECT_GT(observed_worker_ids.size(), 1);
   ```



##########
src/bthread/task_group.cpp:
##########
@@ -179,7 +213,24 @@ bool TaskGroup::wait_task(bthread_t* tid) {
         if (steal_task(tid)) {
             return true;
         }
-        _pl->wait(st);
+        // Instead of waiting for signal, we shall wake up if there's a user 
idle task here.
+        // To avoid the current task never wake and missed the user's idle 
task.
+        if (_worker_idle_cb) {
+            if (HandleIdleTask()) {
+                if (_rq.pop(tid)) {
+                    return true;
+                }
+                if (steal_task(tid)) {
+                    return true;
+                }
+            }
+            timespec wait_time;
+            wait_time.tv_sec = _worker_idle_timeout_us / 1000000;
+            wait_time.tv_nsec = (_worker_idle_timeout_us % 1000000) * 1000;
+            _pl->wait(st, &wait_time);

Review Comment:
   Duplicate timeout conversion logic. The same timeout calculation appears in 
two places (lines 198-201 and 227-230). This code duplication makes maintenance 
harder and increases the risk of inconsistencies if one is updated but not the 
other.
   
   Consider extracting this logic into a helper function:
   ```cpp
   static timespec ConvertTimeout(uint64_t timeout_us) {
       timespec wait_time;
       wait_time.tv_sec = timeout_us / 1000000;
       wait_time.tv_nsec = (timeout_us % 1000000) * 1000;
       return wait_time;
   }
   ```



##########
src/bthread/task_group.h:
##########
@@ -115,6 +115,15 @@ class TaskGroup {
     static void sched_to(TaskGroup** pg, bthread_t next_tid);
     static void exchange(TaskGroup** pg, TaskMeta* next_meta);
 
+    typedef bool (*OnWorkerIdleFn)(void* user_ctx);
+    // Set a callback to run when a worker has no task to run.
+    // If the callback returns true, it means some work is done and the worker
+    // should check the runqueue again immediately.
+    // |timeout_us|: The timeout for waiting if the callback returns false.
+    //               0 means infinite wait (original behavior).

Review Comment:
   The documentation comment says "0 means infinite wait (original behavior)" 
but the default parameter value is 1000, not 0. This is inconsistent and 
confusing. Either the default should be 0 to match the documented behavior, or 
the documentation should be updated to reflect that the default is 1000 
microseconds (1ms).
   ```suggestion
       //               Default is 1000 microseconds (1ms). If 0 is specified, 
it means infinite wait (original behavior).
   ```



##########
src/bthread/task_group.cpp:
##########
@@ -78,6 +78,23 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, 
NULL);
 
 const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
 
+TaskGroup::OnWorkerIdleFn TaskGroup::_worker_idle_cb = NULL;
+void* TaskGroup::_worker_idle_ctx = NULL;
+uint64_t TaskGroup::_worker_idle_timeout_us = 1000;

Review Comment:
   The static variables `_worker_idle_cb`, `_worker_idle_ctx`, and 
`_worker_idle_timeout_us` are accessed and modified without any synchronization 
mechanism. The `SetWorkerIdleCallback` function can be called while worker 
threads are concurrently reading these variables in `wait_task()` and 
`HandleIdleTask()`. This creates a data race and undefined behavior.
   
   Consider adding proper synchronization (e.g., atomic operations for the 
callback pointer with memory ordering, or a mutex) to ensure thread-safe access 
to these shared static variables.



##########
src/bthread/task_group.cpp:
##########
@@ -167,7 +184,24 @@ bool TaskGroup::wait_task(bthread_t* tid) {
         if (_last_pl_state.stopped()) {
             return false;
         }
-        _pl->wait(_last_pl_state);
+        // Instead of waiting for signal, we shall wake up if there's a user 
idle task here.
+        // To avoid the current task never wake and missed the user's idle 
task.
+        if (_worker_idle_cb) {
+            if (HandleIdleTask()) {
+                if (_rq.pop(tid)) {
+                    return true;
+                }
+                if (steal_task(tid)) {
+                    return true;
+                }
+            }

Review Comment:
   The duplicated logic for checking the idle callback, calling 
`HandleIdleTask()`, and attempting to get work from queues appears in two 
locations (lines 189-197 and 218-226). This code duplication makes the function 
harder to maintain.
   
   Consider refactoring this repeated pattern into a helper function to reduce 
duplication and improve maintainability.



##########
test/bthread_idle_unittest.cpp:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <bthread/bthread.h>
+#include <bthread/task_group.h>
+#include <butil/logging.h>
+#include <butil/time.h>
+#include <set>
+#include <mutex>
+
+namespace {
+
+// Mock context to simulate per-thread state (e.g., io_uring ring)
+struct MockWorkerContext {
+    int worker_id;
+    int poll_count;
+
+    MockWorkerContext() : worker_id(-1), poll_count(0) {}
+};
+
+// Thread-local storage to simulate "Share-nothing" architecture
+// In a real scenario, this would hold the something like io_uring instance.
+static __thread MockWorkerContext* tls_context = nullptr;
+
+// Set to collect all unique worker IDs we've seen
+static std::set<int> observed_worker_ids;
+static std::mutex stats_mutex;

Review Comment:
   The global state in the test (static variables `observed_worker_ids`, 
`stats_mutex`, and thread-local `tls_context`) is not cleaned up properly. The 
`TearDown()` method clears the idle callback but doesn't clean up the 
thread-local `tls_context`, which will leak memory if the same worker threads 
are reused across tests. Additionally, `observed_worker_ids` is cleared in 
`SetUp()` but the thread-local contexts persist.
   
   Consider adding cleanup logic for thread-local contexts, or documenting this 
as a known limitation of the test.



##########
src/bthread/task_group.cpp:
##########
@@ -78,6 +78,23 @@ BAIDU_VOLATILE_THREAD_LOCAL(void*, tls_unique_user_ptr, 
NULL);
 
 const TaskStatistics EMPTY_STAT = { 0, 0, 0 };
 
+TaskGroup::OnWorkerIdleFn TaskGroup::_worker_idle_cb = NULL;
+void* TaskGroup::_worker_idle_ctx = NULL;
+uint64_t TaskGroup::_worker_idle_timeout_us = 1000;
+
+// Set the global static idle task, we can use thread local variables to 
distinct different
+// task group's target resource (for example, users' can init thread local 
iouring per task group).
+void TaskGroup::SetWorkerIdleCallback(OnWorkerIdleFn fn, void* user_ctx,
+                                      uint64_t timeout_us) {
+    _worker_idle_cb = fn;
+    _worker_idle_ctx = user_ctx;
+    _worker_idle_timeout_us = timeout_us;
+}
+
+bool TaskGroup::HandleIdleTask() {
+    return _worker_idle_cb(_worker_idle_ctx);

Review Comment:
   The `HandleIdleTask()` function doesn't check if `_worker_idle_cb` is null 
before dereferencing it. While the calling code checks for null, if there's a 
race condition where `SetWorkerIdleCallback` is called with `nullptr` between 
the check and the call to `HandleIdleTask()`, this will result in a null 
pointer dereference and crash.
   
   Add a null check inside `HandleIdleTask()`:
   ```cpp
   bool TaskGroup::HandleIdleTask() {
       if (_worker_idle_cb) {
           return _worker_idle_cb(_worker_idle_ctx);
       }
       return false;
   }
   ```
   ```suggestion
       if (_worker_idle_cb) {
           return _worker_idle_cb(_worker_idle_ctx);
       }
       return false;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to