morningman commented on code in PR #51690:
URL: https://github.com/apache/doris/pull/51690#discussion_r2214245449
##########
be/src/common/config.h:
##########
@@ -340,6 +340,18 @@ DECLARE_Int32(be_service_threads);
DECLARE_mInt32(pipeline_status_report_interval);
// Time slice for pipeline task execution (ms)
DECLARE_mInt32(pipeline_task_exec_time_slice);
+
+// task executor inital split concurrency
Review Comment:
Need more detail explain
##########
be/src/vec/CMakeLists.txt:
##########
@@ -21,8 +21,38 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/vec")
file(GLOB_RECURSE VEC_FILES CONFIGURE_DEPENDS *.cpp)
+list(FILTER VEC_FILES EXCLUDE REGEX ".*exec/executor/simulator/.*")
+
add_library(Vec STATIC
${VEC_FILES}
)
pch_reuse(Vec)
+
+if (${BUILD_TASK_EXECUTOR_SIMULATOR} STREQUAL "ON")
Review Comment:
Need a doc for this. Maybe we can add a document in
https://doris.apache.org/community/join-community -> Developer Guide
##########
be/src/vec/exec/scan/scanner_scheduler.cpp:
##########
@@ -359,4 +361,29 @@ int ScannerScheduler::get_remote_scan_thread_queue_size() {
return config::doris_remote_scanner_thread_pool_queue_size;
}
+Result<SharedListenableFuture<Void>>
ScannerSplitRunner::process_for(std::chrono::nanoseconds) {
Review Comment:
is `std::chrono::nanoseconds` ignored? But you pass the `SPLIT_RUN_QUANTA`
##########
be/src/common/config.cpp:
##########
@@ -293,6 +293,27 @@ DEFINE_Int32(be_service_threads, "64");
// The pipeline task has a high concurrency, therefore reducing its report
frequency
DEFINE_mInt32(pipeline_status_report_interval, "10");
DEFINE_mInt32(pipeline_task_exec_time_slice, "100");
+
+DEFINE_Int32(task_executor_initial_split_concurrency, "-1");
+DEFINE_Validator(task_executor_initial_split_concurrency, [](const int config)
-> bool {
+ if (config == -1) {
+ CpuInfo::init();
+ task_executor_initial_split_concurrency = std::max(48,
CpuInfo::num_cores() * 2);
+ }
+ return true;
+});
+
+DEFINE_Int32(task_executor_min_concurrency_per_task, "1");
+DEFINE_Int32(task_executor_max_concurrency_per_task, "-1");
+DEFINE_Validator(task_executor_max_concurrency_per_task, [](const int config)
-> bool {
+ if (config == -1) {
+ task_executor_max_concurrency_per_task =
std::numeric_limits<int>::max();
+ }
+ return true;
+});
+DEFINE_Bool(enable_task_executor_in_internal_table, "false");
Review Comment:
I suggest to set defualt is true for internal table, at least on master
branch.
##########
be/src/vec/exec/executor/listenable_future.h:
##########
@@ -0,0 +1,219 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <exception>
+#include <functional>
+#include <iostream>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "common/status.h"
+#include "glog/logging.h"
+
+namespace doris {
+namespace vectorized {
+
+struct Void {
+ bool operator==(const Void&) const { return true; }
+ bool operator!=(const Void&) const { return false; }
+};
+
+template <typename T>
+class SharedListenableFuture;
+
+template <typename T>
+class ListenableFuture {
+public:
+ using Callback = std::function<void(const T&, const doris::Status&)>;
+
+ ListenableFuture(const ListenableFuture&) = delete;
+ ListenableFuture& operator=(const ListenableFuture&) = delete;
+
+ ListenableFuture(ListenableFuture&& other) noexcept
+ : _ready(other._ready),
+ _value(std::move(other._value)),
+ _status(std::move(other._status)),
+ _callbacks(std::move(other._callbacks)) {
+ other._ready = false;
+ }
+
+ ListenableFuture& operator=(ListenableFuture&& other) noexcept {
+ if (this != &other) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ std::lock_guard<std::mutex> other_lock(other._mutex);
+ _ready = other._ready;
+ _value = std::move(other._value);
+ _status = std::move(other._status);
+ _callbacks = std::move(other._callbacks);
+ other._ready = false;
+ }
+ return *this;
+ }
+
+ ListenableFuture() : _ready(false) {}
+
+ void set_value(const T& value) { _execute(value, doris::Status::OK()); }
+
+ void set_error(const doris::Status& status) { _execute({}, status); }
+
+ void add_callback(Callback cb) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ if (_ready) {
+ cb(_value, _status);
Review Comment:
Do the callback within lock may impact the performance?
Just make sure the callback itself is lightweight
##########
be/src/vec/exec/scan/scanner_context.h:
##########
@@ -79,6 +86,8 @@ class ScanTask {
public:
std::weak_ptr<ScannerDelegate> scanner;
std::list<std::pair<vectorized::BlockUPtr, size_t>> cached_blocks;
+ bool is_first_schedule = true;
+ std::weak_ptr<SplitRunner> split_runner;
Review Comment:
Add comment to explain why need weak_ptr
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]