github-actions[bot] commented on code in PR #16031:
URL: https://github.com/apache/doris/pull/16031#discussion_r1072323733
##########
be/src/util/async_io.h:
##########
@@ -0,0 +1,89 @@
+#pragma once
+
+#include <bthread/bthread.h>
+
+#include "io/fs/file_system.h"
+#include "olap/olap_define.h"
+#include "priority_thread_pool.hpp"
+#include "runtime/threadlocal.h"
+
+namespace doris {
+
+struct AsyncIOCtx {
+ int nice;
+};
+
+/**
+ * Separate task from bthread to pthread, specific for IO task.
+ */
+class AsyncIO {
+public:
+ AsyncIO() {
+ _io_thread_pool = new
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
+
config::doris_scanner_thread_pool_queue_size,
+ "async_io_thread_pool");
+ _remote_thread_pool = new PriorityThreadPool(
+ config::doris_remote_scanner_thread_pool_thread_num,
+ config::doris_remote_scanner_thread_pool_queue_size,
"async_remote_thread_pool");
+ }
+
+ ~AsyncIO() {
+ SAFE_DELETE(_io_thread_pool);
+ SAFE_DELETE(_remote_thread_pool);
+ }
+
+ AsyncIO& operator=(const AsyncIO&) = delete;
+ AsyncIO(const AsyncIO&) = delete;
+
+ static AsyncIO& instance() {
+ static AsyncIO instance;
+ return instance;
+ }
+
+ // This function should run on the bthread, and it will put the task into
+ // thread_pool and release the bthread_worker at cv.wait. When the task is
completed,
+ // the bthread will continue to execute.
+ static void run_task(std::function<void()> fn, io::FileSystemType
file_type) {
+ DCHECK(bthread_self() != 0);
+ doris::Mutex mutex;
+ doris::ConditionVariable cv;
+ std::unique_lock l(mutex);
+
+ AsyncIOCtx* ctx =
static_cast<AsyncIOCtx*>(bthread_getspecific(btls_io_ctx_key));
+ int nice = -1;
+ if (ctx == nullptr) {
+ nice = 18;
+ } else {
+ nice = ctx->nice;
+ }
+
+ PriorityThreadPool::Task task;
+ task.priority = nice;
+ task.work_function = [&] {
+ fn();
+ std::unique_lock l(mutex);
+ cv.notify_one();
+ };
+
+ if (file_type == io::FileSystemType::S3) {
+ AsyncIO::instance().remote_thread_pool()->offer(task);
+ } else {
+ AsyncIO::instance().io_thread_pool()->offer(task);
+ }
+ cv.wait(l);
+ }
+
+ inline static bthread_key_t btls_io_ctx_key;
+
+ static void io_ctx_key_deleter(void* d) { delete
static_cast<AsyncIOCtx*>(d); }
+
+private:
+ PriorityThreadPool* _io_thread_pool = nullptr;
+ PriorityThreadPool* _remote_thread_pool = nullptr;
+
+private:
Review Comment:
warning: redundant access specifier has the same accessibility as the
previous access specifier [readability-redundant-access-specifiers]
```suggestion
```
**be/src/util/async_io.h:79:** previously declared here
```cpp
private:
^
```
##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -97,7 +98,7 @@
// Any work Offer()'ed during DrainAndshutdown may or may not be processed.
void drain_and_shutdown() override {
{
- std::unique_lock<std::mutex> l(_lock);
+ std::unique_lock l(_lock);
Review Comment:
warning: use of undeclared identifier '_lock'; did you mean 'clock'?
[clang-diagnostic-error]
```suggestion
std::unique_lock l(clock);
```
**/usr/include/time.h:71:** 'clock' declared here
```cpp
extern clock_t clock (void) __THROW;
^
```
##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -97,7 +98,7 @@
// Any work Offer()'ed during DrainAndshutdown may or may not be processed.
void drain_and_shutdown() override {
{
- std::unique_lock<std::mutex> l(_lock);
+ std::unique_lock l(_lock);
while (get_queue_size() != 0) {
_empty_cv.wait(l);
Review Comment:
warning: use of undeclared identifier '_empty_cv' [clang-diagnostic-error]
```cpp
_empty_cv.wait(l);
^
```
##########
be/src/util/priority_work_stealing_thread_pool.hpp:
##########
@@ -97,7 +98,7 @@ class PriorityWorkStealingThreadPool : public
PriorityThreadPool {
// Any work Offer()'ed during DrainAndshutdown may or may not be processed.
void drain_and_shutdown() override {
Review Comment:
warning: only virtual member functions can be marked 'override'
[clang-diagnostic-error]
```suggestion
void drain_and_shutdown() {
```
##########
be/src/util/once.h:
##########
@@ -54,10 +55,16 @@ class DorisCallOnce {
// lambda and stores its return value. Otherwise, returns the stored
Status.
template <typename Fn>
ReturnType call(Fn fn) {
- std::call_once(_once_flag, [this, fn] {
- _status = fn();
- _has_called.store(true, std::memory_order_release);
- });
+ if (!_has_called.load(std::memory_order_acquire)) {
+ do {
+ std::lock_guard l(_mutex);
+ if (_has_called.load(std::memory_order_acquire)) break;
Review Comment:
warning: statement should be inside braces
[readability-braces-around-statements]
```suggestion
if (_has_called.load(std::memory_order_acquire)) { break;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]