This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0a2fdcee62e [PipelineX](improvement) Prepare tasks in parallel (#40844)
0a2fdcee62e is described below
commit 0a2fdcee62e76fe3b2d41630de5e7cbf8c83fa55
Author: Gabriel <[email protected]>
AuthorDate: Sat Sep 14 17:46:00 2024 +0800
[PipelineX](improvement) Prepare tasks in parallel (#40844)
---
be/src/common/config.cpp | 2 +-
be/src/exprs/runtime_filter.cpp | 1 +
be/src/pipeline/pipeline.h | 3 +-
be/src/pipeline/pipeline_fragment_context.cpp | 98 ++++++++++++++++------
be/src/pipeline/pipeline_fragment_context.h | 9 +-
be/src/runtime/fragment_mgr.cpp | 3 +-
be/src/vec/sink/vdata_stream_sender.cpp | 8 ++
.../java/org/apache/doris/qe/SessionVariable.java | 9 +-
gensrc/thrift/PaloInternalService.thrift | 1 +
9 files changed, 101 insertions(+), 33 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 68e630fe830..88f7289dfbf 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -517,7 +517,7 @@ DEFINE_Int32(brpc_light_work_pool_max_queue_size, "-1");
DEFINE_mBool(enable_bthread_transmit_block, "true");
// The maximum amount of data that can be processed by a stream load
-DEFINE_mInt64(streaming_load_max_mb, "10240");
+DEFINE_mInt64(streaming_load_max_mb, "102400");
// Some data formats, such as JSON, cannot be streamed.
// Therefore, it is necessary to limit the maximum number of
// such data when using stream load to prevent excessive memory consumption.
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 2bda40caf68..96c99c60656 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1274,6 +1274,7 @@ void IRuntimeFilter::signal() {
}
void
IRuntimeFilter::set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>
timer) {
+ std::unique_lock lock(_inner_mutex);
_filter_timer.push_back(timer);
}
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index c014a090170..dfeb53ae006 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -105,7 +105,6 @@ public:
void set_children(std::vector<std::shared_ptr<Pipeline>> children) {
_children = children; }
void incr_created_tasks() { _num_tasks_created++; }
- bool need_to_create_task() const { return _num_tasks > _num_tasks_created;
}
void set_num_tasks(int num_tasks) {
_num_tasks = num_tasks;
for (auto& op : _operators) {
@@ -158,7 +157,7 @@ private:
// How many tasks should be created ?
int _num_tasks = 1;
// How many tasks are already created?
- int _num_tasks_created = 0;
+ std::atomic<int> _num_tasks_created = 0;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 3513c2ba176..8fb750b9e97 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -139,8 +139,10 @@ PipelineFragmentContext::~PipelineFragmentContext() {
}
}
_tasks.clear();
- for (auto& runtime_state : _task_runtime_states) {
- runtime_state.reset();
+ for (auto& runtime_states : _task_runtime_states) {
+ for (auto& runtime_state : runtime_states) {
+ runtime_state.reset();
+ }
}
_pipelines.clear();
_sink.reset();
@@ -231,7 +233,8 @@ PipelinePtr
PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
return pipeline;
}
-Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams&
request) {
+Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams&
request,
+ ThreadPool* thread_pool) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
@@ -348,7 +351,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
{
SCOPED_TIMER(_build_tasks_timer);
// 5. Build pipeline tasks and initialize local state.
- RETURN_IF_ERROR(_build_pipeline_tasks(request));
+ RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool));
}
_init_next_report_time();
@@ -357,17 +360,23 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
return Status::OK();
}
-Status PipelineFragmentContext::_build_pipeline_tasks(
- const doris::TPipelineFragmentParams& request) {
+Status PipelineFragmentContext::_build_pipeline_tasks(const
doris::TPipelineFragmentParams& request,
+ ThreadPool* thread_pool)
{
_total_tasks = 0;
- int target_size = request.local_params.size();
+ const auto target_size = request.local_params.size();
_tasks.resize(target_size);
+ _fragment_instance_ids.resize(target_size);
+ _runtime_filter_states.resize(target_size);
+ _task_runtime_states.resize(_pipelines.size());
+ for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
+ _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
+ }
auto pipeline_id_to_profile =
_runtime_state->build_pipeline_profile(_pipelines.size());
- for (size_t i = 0; i < target_size; i++) {
+ auto pre_and_submit = [&](int i, PipelineFragmentContext* ctx) {
const auto& local_params = request.local_params[i];
auto fragment_instance_id = local_params.fragment_instance_id;
- _fragment_instance_ids.push_back(fragment_instance_id);
+ _fragment_instance_ids[i] = fragment_instance_id;
std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
auto init_runtime_state = [&](std::unique_ptr<RuntimeState>&
runtime_state) {
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
@@ -426,7 +435,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
- _runtime_filter_states.push_back(std::move(filterparams));
+ _runtime_filter_states[i] = std::move(filterparams);
std::map<PipelineId, PipelineTask*> pipeline_id_to_task;
auto get_local_exchange_state = [&](PipelinePtr pipeline)
-> std::map<int,
std::pair<std::shared_ptr<LocalExchangeSharedState>,
@@ -449,13 +458,15 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
auto& pipeline = _pipelines[pip_idx];
- if (pipeline->need_to_create_task()) {
- // build task runtime state
- _task_runtime_states.push_back(RuntimeState::create_unique(
+ if (pipeline->num_tasks() > 1 || i == 0) {
+ DCHECK(_task_runtime_states[pip_idx][i] == nullptr)
+ <<
print_id(_task_runtime_states[pip_idx][i]->fragment_instance_id()) << " "
+ << pipeline->debug_string();
+ _task_runtime_states[pip_idx][i] = RuntimeState::create_unique(
this, local_params.fragment_instance_id,
request.query_id,
request.fragment_id, request.query_options,
_query_ctx->query_globals,
- _exec_env, _query_ctx.get()));
- auto& task_runtime_state = _task_runtime_states.back();
+ _exec_env, _query_ctx.get());
+ auto& task_runtime_state = _task_runtime_states[pip_idx][i];
init_runtime_state(task_runtime_state);
auto cur_task_id = _total_tasks++;
task_runtime_state->set_task_id(cur_task_id);
@@ -529,6 +540,39 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
std::lock_guard<std::mutex> l(_state_map_lock);
_runtime_filter_mgr_map[fragment_instance_id] =
std::move(runtime_filter_mgr);
}
+ return Status::OK();
+ };
+ if (target_size > 1 &&
+ (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
+ target_size >
_runtime_state->query_options().parallel_prepare_threshold)) {
+ std::vector<Status> prepare_status(target_size);
+ std::mutex m;
+ std::condition_variable cv;
+ int prepare_done = 0;
+ for (size_t i = 0; i < target_size; i++) {
+ RETURN_IF_ERROR(thread_pool->submit_func([&, i]() {
+ SCOPED_ATTACH_TASK(_query_ctx.get());
+ prepare_status[i] = pre_and_submit(i, this);
+ std::unique_lock<std::mutex> lock(m);
+ prepare_done++;
+ if (prepare_done == target_size) {
+ cv.notify_one();
+ }
+ }));
+ }
+ std::unique_lock<std::mutex> lock(m);
+ if (prepare_done != target_size) {
+ cv.wait(lock);
+ for (size_t i = 0; i < target_size; i++) {
+ if (!prepare_status[i].ok()) {
+ return prepare_status[i];
+ }
+ }
+ }
+ } else {
+ for (size_t i = 0; i < target_size; i++) {
+ RETURN_IF_ERROR(pre_and_submit(i, this));
+ }
}
_pipeline_parent_map.clear();
_dag.clear();
@@ -1749,8 +1793,12 @@ Status PipelineFragmentContext::send_report(bool done) {
std::vector<RuntimeState*> runtime_states;
- for (auto& task_state : _task_runtime_states) {
- runtime_states.push_back(task_state.get());
+ for (auto& task_states : _task_runtime_states) {
+ for (auto& task_state : task_states) {
+ if (task_state) {
+ runtime_states.push_back(task_state.get());
+ }
+ }
}
ReportStatusRequest req {exec_status,
@@ -1821,15 +1869,17 @@
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
return nullptr;
}
- for (auto& runtime_state : _task_runtime_states) {
- if (runtime_state->runtime_profile() == nullptr) {
- continue;
- }
+ for (auto& runtime_states : _task_runtime_states) {
+ for (auto& runtime_state : runtime_states) {
+ if (runtime_state->runtime_profile() == nullptr) {
+ continue;
+ }
- auto tmp_load_channel_profile =
std::make_shared<TRuntimeProfileTree>();
+ auto tmp_load_channel_profile =
std::make_shared<TRuntimeProfileTree>();
-
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
-
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
+
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
+
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
+ }
}
auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index f46835e95e0..f95eb03fb12 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -88,7 +88,7 @@ public:
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return
_runtime_state->is_cancelled(); }
- Status prepare(const doris::TPipelineFragmentParams& request);
+ Status prepare(const doris::TPipelineFragmentParams& request, ThreadPool*
thread_pool);
Status submit();
@@ -187,7 +187,8 @@ private:
bool _enable_local_shuffle() const { return
_runtime_state->enable_local_shuffle(); }
- Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request);
+ Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
+ ThreadPool* thread_pool);
void _close_fragment_instance();
void _init_next_report_time();
@@ -206,7 +207,7 @@ private:
int _closed_tasks = 0;
// After prepared, `_total_tasks` is equal to the size of `_tasks`.
// When submit fail, `_total_tasks` is equal to the number of tasks
submitted.
- int _total_tasks = 0;
+ std::atomic<int> _total_tasks = 0;
std::unique_ptr<RuntimeProfile> _runtime_profile;
bool _is_report_success = false;
@@ -303,7 +304,7 @@ private:
std::vector<TUniqueId> _fragment_instance_ids;
// Local runtime states for each task
- std::vector<std::unique_ptr<RuntimeState>> _task_runtime_states;
+ std::vector<std::vector<std::unique_ptr<RuntimeState>>>
_task_runtime_states;
std::vector<std::unique_ptr<RuntimeFilterParamsContext>>
_runtime_filter_states;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 4af6b72a220..6324cd31b7e 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -822,7 +822,8 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
{
SCOPED_RAW_TIMER(&duration_ns);
Status prepare_st = Status::OK();
- ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st =
context->prepare(params), prepare_st);
+ ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(params,
_thread_pool.get()),
+ prepare_st);
if (!prepare_st.ok()) {
query_ctx->cancel(prepare_st, params.fragment_id);
query_ctx->set_execution_dependency_ready();
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index dd221c6aaa3..f18467fbad9 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -92,6 +92,14 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
template <typename Parent>
Status Channel<Parent>::open(RuntimeState* state) {
+ if (_is_local) {
+ auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
+ _fragment_instance_id, _dest_node_id, &_local_recvr);
+ if (!st.ok()) {
+ // Recvr not found. Maybe downstream task is finished already.
+ LOG(INFO) << "Recvr is not found : " << st.to_string();
+ }
+ }
_be_number = state->be_number();
_brpc_request = std::make_shared<PTransmitDataParams>();
// initialize brpc request
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 1af6239bf4c..36650001cb0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -296,6 +296,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String AUTO_BROADCAST_JOIN_THRESHOLD =
"auto_broadcast_join_threshold";
+ public static final String PARALLEL_PREPARE_THRESHOLD =
"parallel_prepare_threshold";
+
public static final String ENABLE_PROJECTION = "enable_projection";
public static final String ENABLE_SHORT_CIRCUIT_QUERY =
"enable_short_circuit_query";
@@ -1046,7 +1048,7 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = PARALLEL_SCAN_MIN_ROWS_PER_SCANNER, fuzzy =
true,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
- private long parallelScanMinRowsPerScanner = 16384; // 16K
+ private long parallelScanMinRowsPerScanner = 2097152; // 16K
@VariableMgr.VarAttr(name = IGNORE_STORAGE_DATA_DISTRIBUTION, fuzzy =
false,
varType = VariableAnnotation.EXPERIMENTAL, needForward = true)
@@ -1089,6 +1091,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = AUTO_BROADCAST_JOIN_THRESHOLD)
public double autoBroadcastJoinThreshold = 0.8;
+ @VariableMgr.VarAttr(name = PARALLEL_PREPARE_THRESHOLD, fuzzy = true)
+ public int parallelPrepareThreshold = 32;
+
@VariableMgr.VarAttr(name = ENABLE_COST_BASED_JOIN_REORDER)
private boolean enableJoinReorderBasedCost = false;
@@ -2193,6 +2198,7 @@ public class SessionVariable implements Serializable,
Writable {
Random random = new SecureRandom();
this.parallelExecInstanceNum = random.nextInt(8) + 1;
this.parallelPipelineTaskNum = random.nextInt(8);
+ this.parallelPrepareThreshold = random.nextInt(32) + 1;
this.enableCommonExprPushdown = random.nextBoolean();
this.enableLocalExchange = random.nextBoolean();
// This will cause be dead loop, disable it first
@@ -3645,6 +3651,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setNumScannerThreads(numScannerThreads);
tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
tResult.setMaxColumnReaderNum(maxColumnReaderNum);
+ tResult.setParallelPrepareThreshold(parallelPrepareThreshold);
// TODO chenhao, reservation will be calculated by cost
tResult.setMinReservation(0);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 8ccf7b679e0..871101c5c35 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -342,6 +342,7 @@ struct TQueryOptions {
130: optional bool enable_adaptive_pipeline_task_serial_read_on_limit = true;
131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000;
+ 132: optional i32 parallel_prepare_threshold = 0;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]