This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 74198c1f30 [pipeline](task_queue) remove disable steal in task queue
to speed up query (#21737)
74198c1f30 is described below
commit 74198c1f3009d8defe9be7dd56a957b40027f26f
Author: HappenLee <[email protected]>
AuthorDate: Tue Jul 11 23:23:36 2023 +0800
[pipeline](task_queue) remove disable steal in task queue to speed up query
(#21737)
---
be/src/pipeline/pipeline.h | 8 +-------
be/src/pipeline/pipeline_fragment_context.cpp | 1 -
be/src/pipeline/pipeline_task.h | 4 ----
be/src/pipeline/task_queue.cpp | 3 ---
4 files changed, 1 insertion(+), 15 deletions(-)
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 676eb9efa1..73b2c3850c 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -49,10 +49,7 @@ class Pipeline : public
std::enable_shared_from_this<Pipeline> {
public:
Pipeline() = delete;
explicit Pipeline(PipelineId pipeline_id,
std::weak_ptr<PipelineFragmentContext> context)
- : _complete_dependency(0),
- _pipeline_id(pipeline_id),
- _context(context),
- _can_steal(true) {
+ : _complete_dependency(0), _pipeline_id(pipeline_id),
_context(context) {
_init_profile();
}
@@ -84,8 +81,6 @@ public:
RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
- void disable_task_steal() { _can_steal = false; }
-
private:
void _init_profile();
std::atomic<uint32_t> _complete_dependency;
@@ -98,7 +93,6 @@ private:
PipelineId _pipeline_id;
std::weak_ptr<PipelineFragmentContext> _context;
- bool _can_steal;
int _previous_schedule_id = -1;
std::unique_ptr<RuntimeProfile> _pipeline_profile;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 73feac07ac..c697b11928 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -600,7 +600,6 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
OperatorBuilderPtr join_sink =
std::make_shared<HashJoinBuildSinkBuilder>(next_operator_builder_id(),
join_node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
- new_pipe->disable_task_steal();
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr join_source =
std::make_shared<HashJoinProbeOperatorBuilder>(
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 65e9ad83ed..acb5d1202f 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -121,7 +121,6 @@ public:
_sink(sink),
_prepared(false),
_opened(false),
- _can_steal(pipeline->_can_steal),
_state(state),
_cur_state(PipelineTaskState::NOT_READY),
_data_state(SourceState::DEPEND_ON_SOURCE),
@@ -159,8 +158,6 @@ public:
bool sink_can_write() { return _sink->can_write(); }
- bool can_steal() const { return _can_steal; }
-
Status finalize();
PipelineFragmentContext* fragment_context() { return _fragment_context; }
@@ -229,7 +226,6 @@ private:
bool _prepared;
bool _opened;
- bool _can_steal;
RuntimeState* _state;
int _previous_schedule_id = -1;
uint32_t _schedule_time = 0;
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 807a344cf9..2c09ac8edd 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -35,9 +35,6 @@ PipelineTask* SubTaskQueue::try_take(bool is_steal) {
return nullptr;
}
auto task = _queue.front();
- if (!task->can_steal() && is_steal) {
- return nullptr;
- }
_queue.pop();
return task;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]