This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d38693f39a5 [refactor](spill) Refactor logics of spilling (#37120)
d38693f39a5 is described below
commit d38693f39a5d13da63be3e883104a3e92d01bd6e
Author: Gabriel <[email protected]>
AuthorDate: Thu Jul 4 16:52:54 2024 +0800
[refactor](spill) Refactor logics of spilling (#37120)
Refactor spilling sort operator. Remove redundant code.
---
be/src/agent/workload_group_listener.cpp | 12 +++---
be/src/pipeline/exec/sort_sink_operator.cpp | 34 ++++-----------
be/src/pipeline/exec/sort_sink_operator.h | 11 ++---
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 44 +++++--------------
be/src/pipeline/exec/spill_sort_sink_operator.h | 1 -
.../pipeline/exec/spill_sort_source_operator.cpp | 10 ++---
be/src/pipeline/pipeline_fragment_context.cpp | 8 +++-
be/src/pipeline/pipeline_task.cpp | 6 +--
be/src/pipeline/pipeline_task.h | 12 ++++--
be/src/runtime/workload_group/workload_group.cpp | 50 +++++++++++-----------
be/src/runtime/workload_group/workload_group.h | 34 ++++++++-------
be/src/vec/common/sort/sorter.h | 2 +-
.../java/org/apache/doris/planner/SortNode.java | 19 ++++++++
.../main/java/org/apache/doris/qe/Coordinator.java | 9 ++--
gensrc/thrift/PlanNodes.thrift | 7 +++
15 files changed, 124 insertions(+), 135 deletions(-)
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index 15c61be5156..61af4543196 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -35,16 +35,18 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
is_set_workload_group_info = true;
// 1 parse topic info to group info
- WorkloadGroupInfo workload_group_info;
- Status ret =
WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info,
- &workload_group_info);
+ WorkloadGroupInfo workload_group_info =
+
WorkloadGroupInfo::parse_topic_info(topic_info.workload_group_info);
// it means FE has this wg, but may parse failed, so we should not
delete it.
if (workload_group_info.id != 0) {
current_wg_ids.insert(workload_group_info.id);
}
- if (!ret.ok()) {
+ if (!workload_group_info.valid) {
LOG(INFO) << "[topic_publish_wg]parse topic info failed, wg_id="
- << workload_group_info.id << ", reason:" <<
ret.to_string();
+ << workload_group_info.id << ", reason:
[tworkload_group_info.__isset.id: "
+ << topic_info.workload_group_info.__isset.id
+ << ", tworkload_group_info.__isset.version: "
+ << topic_info.workload_group_info.__isset.version << "]";
continue;
}
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index f2224383f86..7230116a1a0 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -43,19 +43,19 @@ Status SortSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(p._vsort_exec_exprs.clone(state, _vsort_exec_exprs));
switch (p._algorithm) {
- case SortAlgorithm::HEAP_SORT: {
+ case TSortAlgorithm::HEAP_SORT: {
_shared_state->sorter = vectorized::HeapSorter::create_unique(
_vsort_exec_exprs, p._limit, p._offset, p._pool,
p._is_asc_order, p._nulls_first,
p._child_x->row_desc());
break;
}
- case SortAlgorithm::TOPN_SORT: {
+ case TSortAlgorithm::TOPN_SORT: {
_shared_state->sorter = vectorized::TopNSorter::create_unique(
_vsort_exec_exprs, p._limit, p._offset, p._pool,
p._is_asc_order, p._nulls_first,
p._child_x->row_desc(), state, _profile);
break;
}
- case SortAlgorithm::FULL_SORT: {
+ case TSortAlgorithm::FULL_SORT: {
_shared_state->sorter = vectorized::FullSorter::create_unique(
_vsort_exec_exprs, p._limit, p._offset, p._pool,
p._is_asc_order, p._nulls_first,
p._child_x->row_desc(), state, _profile);
@@ -73,14 +73,13 @@ Status SortSinkLocalState::open(RuntimeState* state) {
}
SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
- const DescriptorTbl& descs, bool
require_bucket_distribution)
+ const DescriptorTbl& descs,
+ const bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id),
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
_pool(pool),
- _reuse_mem(true),
_limit(tnode.limit),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
- _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
_is_colocate(tnode.sort_node.__isset.is_colocate &&
tnode.sort_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
@@ -88,7 +87,10 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int
operator_id, const TP
? tnode.sort_node.is_analytic_sort
: false),
_partition_exprs(tnode.__isset.distribute_expr_lists ?
tnode.distribute_expr_lists[0]
- :
std::vector<TExpr> {}) {}
+ :
std::vector<TExpr> {}),
+ _algorithm(tnode.sort_node.__isset.algorithm ?
tnode.sort_node.algorithm
+ :
TSortAlgorithm::FULL_SORT),
+ _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {}
Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
@@ -105,24 +107,6 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
}
Status SortSinkOperatorX::prepare(RuntimeState* state) {
- const auto& row_desc = _child_x->row_desc();
-
- // If `limit` is smaller than HEAP_SORT_THRESHOLD, we consider using heap
sort in priority.
- // To do heap sorting, each income block will be filtered by heap-top row.
There will be some
- // `memcpy` operations. To ensure heap sort will not incur performance
fallback, we should
- // exclude cases which incoming blocks has string column which is
sensitive to operations like
- // `filter` and `memcpy`
- if (_limit > 0 && _limit + _offset <
vectorized::HeapSorter::HEAP_SORT_THRESHOLD &&
- (_use_two_phase_read ||
state->get_query_ctx()->has_runtime_predicate(_node_id) ||
- !row_desc.has_varlen_slots())) {
- _algorithm = SortAlgorithm::HEAP_SORT;
- _reuse_mem = false;
- } else if (_limit > 0 && row_desc.has_varlen_slots() &&
- _limit + _offset < vectorized::TopNSorter::TOPN_SORT_THRESHOLD)
{
- _algorithm = SortAlgorithm::TOPN_SORT;
- } else {
- _algorithm = SortAlgorithm::FULL_SORT;
- }
return _vsort_exec_exprs.prepare(state, _child_x->row_desc(),
_row_descriptor);
}
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index fa59b1715dc..b842a56f2ad 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -24,8 +24,6 @@
namespace doris::pipeline {
-enum class SortAlgorithm { HEAP_SORT, TOPN_SORT, FULL_SORT };
-
class SortSinkOperatorX;
class SortSinkLocalState : public PipelineXSinkLocalState<SortSharedState> {
@@ -53,7 +51,7 @@ private:
class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
public:
SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode&
tnode,
- const DescriptorTbl& descs, bool
require_bucket_distribution);
+ const DescriptorTbl& descs, const bool
require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<SortSinkLocalState>::_name);
@@ -77,8 +75,6 @@ public:
}
bool require_data_distribution() const override { return _is_colocate; }
- bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT;
}
-
size_t get_revocable_mem_size(RuntimeState* state) const;
Status prepare_for_spill(RuntimeState* state);
@@ -99,17 +95,16 @@ private:
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
- bool _reuse_mem;
const int64_t _limit;
- SortAlgorithm _algorithm;
const RowDescriptor _row_descriptor;
- const bool _use_two_phase_read;
const bool _merge_by_exchange;
const bool _is_colocate = false;
const bool _require_bucket_distribution = false;
const bool _is_analytic_sort = false;
const std::vector<TExpr> _partition_exprs;
+ const TSortAlgorithm::type _algorithm;
+ const bool _reuse_mem;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index b7fae82ca54..4c6eb290ef1 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -25,9 +25,8 @@
namespace doris::pipeline {
SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase*
parent, RuntimeState* state)
: Base(parent, state) {
- _finish_dependency =
- std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
- parent->get_name() +
"_SPILL_DEPENDENCY", true);
+ _finish_dependency = std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_SPILL_DEPENDENCY");
}
Status SpillSortSinkLocalState::init(doris::RuntimeState* state,
@@ -40,12 +39,8 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState*
state,
RETURN_IF_ERROR(setup_in_memory_sort_op(state));
- auto& parent = Base::_parent->template cast<Parent>();
- Base::_shared_state->enable_spill = parent._enable_spill;
-
Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill(parent._enable_spill);
- if (parent._enable_spill) {
- _finish_dependency->block();
- }
+ Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill();
+ _finish_dependency->block();
return Status::OK();
}
@@ -78,10 +73,7 @@ void SpillSortSinkLocalState::update_profile(RuntimeProfile*
child_profile) {
}
Status SpillSortSinkLocalState::close(RuntimeState* state, Status
execsink_status) {
- auto& parent = Base::_parent->template cast<Parent>();
- if (parent._enable_spill) {
- dec_running_big_mem_op_num(state);
- }
+ dec_running_big_mem_op_num(state);
return Status::OK();
}
Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
@@ -133,8 +125,6 @@ Status SpillSortSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state)
Status SpillSortSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::prepare(state));
RETURN_IF_ERROR(_sort_sink_operator->prepare(state));
- _enable_spill = _sort_sink_operator->is_full_sort();
- LOG(INFO) << "spill sort sink, enable spill: " << _enable_spill;
return Status::OK();
}
Status SpillSortSinkOperatorX::open(RuntimeState* state) {
@@ -142,16 +132,10 @@ Status SpillSortSinkOperatorX::open(RuntimeState* state) {
return _sort_sink_operator->open(state);
}
Status SpillSortSinkOperatorX::revoke_memory(RuntimeState* state) {
- if (!_enable_spill) {
- return Status::OK();
- }
auto& local_state = get_local_state(state);
return local_state.revoke_memory(state);
}
size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
- if (!_enable_spill) {
- return 0;
- }
auto& local_state = get_local_state(state);
if (!local_state.Base::_shared_state->sink_status.ok()) {
return UINT64_MAX;
@@ -161,9 +145,7 @@ size_t
SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state,
vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
- if (_enable_spill) {
- local_state.inc_running_big_mem_op_num(state);
- }
+ local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
@@ -177,17 +159,10 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
local_state._mem_tracker->set_consumption(
local_state._shared_state->in_mem_shared_state->sorter->data_size());
if (eos) {
- if (_enable_spill) {
- if (local_state._shared_state->is_spilled) {
- if (revocable_mem_size(state) > 0) {
- RETURN_IF_ERROR(revoke_memory(state));
- } else {
- local_state._dependency->set_ready_to_read();
- local_state._finish_dependency->set_ready();
- }
+ if (local_state._shared_state->is_spilled) {
+ if (revocable_mem_size(state) > 0) {
+ RETURN_IF_ERROR(revoke_memory(state));
} else {
- RETURN_IF_ERROR(
-
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
@@ -195,6 +170,7 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
RETURN_IF_ERROR(
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
local_state._dependency->set_ready_to_read();
+ local_state._finish_dependency->set_ready();
}
}
return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 5347f22d11f..c5b70d6fcea 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -96,6 +96,5 @@ public:
private:
friend class SpillSortSinkLocalState;
std::unique_ptr<SortSinkOperatorX> _sort_sink_operator;
- bool _enable_spill = false;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index b322f33caa2..72304291f6d 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -66,9 +66,7 @@ Status SpillSortLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
- if (Base::_shared_state->enable_spill) {
- dec_running_big_mem_op_num(state);
- }
+ dec_running_big_mem_op_num(state);
return Base::close(state);
}
int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
@@ -274,13 +272,11 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::Bloc
local_state._current_merging_streams.clear();
}
}};
- if (local_state.Base::_shared_state->enable_spill) {
- local_state.inc_running_big_mem_op_num(state);
- }
+ local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);
- if (local_state.Base::_shared_state->enable_spill &&
local_state._shared_state->is_spilled) {
+ if (local_state._shared_state->is_spilled) {
if (!local_state._merger) {
local_state._status =
local_state.initiate_merge_sort_spill_streams(state);
return local_state._status;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 0968de7951e..9ef551df6db 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -105,6 +105,8 @@
#include "util/container_util.hpp"
#include "util/debug_util.h"
#include "util/uid_util.h"
+#include "vec/common/sort/heap_sorter.h"
+#include "vec/common/sort/topn_sorter.h"
#include "vec/runtime/vdata_stream_mgr.h"
namespace doris::pipeline {
@@ -1332,7 +1334,9 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
break;
}
case TPlanNodeType::SORT_NODE: {
- if (_runtime_state->enable_sort_spill()) {
+ const auto should_spill = _runtime_state->enable_sort_spill() &&
+ tnode.sort_node.algorithm ==
TSortAlgorithm::FULL_SORT;
+ if (should_spill) {
op.reset(new SpillSortSourceOperatorX(pool, tnode,
next_operator_id(), descs));
} else {
op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(),
descs));
@@ -1347,7 +1351,7 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- if (_runtime_state->enable_sort_spill()) {
+ if (should_spill) {
sink.reset(new SpillSortSinkOperatorX(pool,
next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
} else {
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 52951e1c9c0..20c225dcba6 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -59,8 +59,8 @@ PipelineTask::PipelineTask(
_fragment_context(fragment_context),
_parent_profile(parent_profile),
_operators(pipeline->operator_xs()),
- _source(_operators.front()),
- _root(_operators.back()),
+ _source(_operators.front().get()),
+ _root(_operators.back().get()),
_sink(pipeline->sink_shared_pointer()),
_le_state_map(std::move(le_state_map)),
_task_idx(task_idx),
@@ -414,7 +414,7 @@ bool PipelineTask::should_revoke_memory(RuntimeState*
state, int64_t revocable_m
DCHECK(big_memory_operator_num >= 0);
int64_t mem_limit_of_op;
if (0 == big_memory_operator_num) {
- mem_limit_of_op = int64_t(query_weighted_limit * 0.8);
+ return false;
} else {
mem_limit_of_op = query_weighted_limit / big_memory_operator_num;
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 9983b315e82..63f464c03ad 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -132,8 +132,6 @@ public:
DataSinkOperatorXPtr sink() const { return _sink; }
- OperatorXPtr source() const { return _source; }
-
int task_id() const { return _index; };
bool is_finalized() const { return _finalized; }
@@ -178,6 +176,12 @@ public:
void set_core_id(int core_id) { this->_core_id = core_id; }
int get_core_id() const { return this->_core_id; }
+ /**
+ * Return true if:
+ * 1. `enable_force_spill` is true which forces this task to spill data.
+ * 2. Or memory consumption reaches the high water mark of current
workload group (80% of memory limitation by default) and revocable_mem_bytes is
bigger than min_revocable_mem_bytes.
+ * 3. Or memory consumption is higher than the low water mark of current
workload group (50% of memory limitation by default) and
`query_weighted_consumption >= query_weighted_limit` and revocable memory is
big enough.
+ */
static bool should_revoke_memory(RuntimeState* state, int64_t
revocable_mem_bytes);
void put_in_runnable_queue() {
@@ -278,8 +282,8 @@ private:
MonotonicStopWatch _pipeline_task_watcher;
OperatorXs _operators; // left is _source, right is _root
- OperatorXPtr _source;
- OperatorXPtr _root;
+ OperatorXBase* _source;
+ OperatorXBase* _root;
DataSinkOperatorXPtr _sink;
// `_read_dependencies` is stored as same order as `_operators`
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 1c496cde8d0..64a5c7aeffb 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -245,46 +245,41 @@ int64_t WorkloadGroup::gc_memory(int64_t need_free_mem,
RuntimeProfile* profile,
return freed_mem;
}
-Status WorkloadGroupInfo::parse_topic_info(const TWorkloadGroupInfo&
tworkload_group_info,
- WorkloadGroupInfo*
workload_group_info) {
+WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
+ const TWorkloadGroupInfo& tworkload_group_info) {
// 1 id
- int tg_id = 0;
+ uint64_t tg_id = 0;
if (tworkload_group_info.__isset.id) {
tg_id = tworkload_group_info.id;
} else {
- return Status::InternalError<false>("workload group id is required");
+ return {.valid = false};
}
- workload_group_info->id = tg_id;
// 2 name
std::string name = "INVALID_NAME";
if (tworkload_group_info.__isset.name) {
name = tworkload_group_info.name;
}
- workload_group_info->name = name;
// 3 version
int version = 0;
if (tworkload_group_info.__isset.version) {
version = tworkload_group_info.version;
} else {
- return Status::InternalError<false>("workload group version is
required");
+ return {.valid = false};
}
- workload_group_info->version = version;
// 4 cpu_share
uint64_t cpu_share = CPU_SHARE_DEFAULT_VALUE;
if (tworkload_group_info.__isset.cpu_share) {
cpu_share = tworkload_group_info.cpu_share;
}
- workload_group_info->cpu_share = cpu_share;
// 5 cpu hard limit
int cpu_hard_limit = CPU_HARD_LIMIT_DEFAULT_VALUE;
if (tworkload_group_info.__isset.cpu_hard_limit) {
cpu_hard_limit = tworkload_group_info.cpu_hard_limit;
}
- workload_group_info->cpu_hard_limit = cpu_hard_limit;
// 6 mem_limit
std::string mem_limit_str = MEMORY_LIMIT_DEFAULT_VALUE;
@@ -294,44 +289,37 @@ Status WorkloadGroupInfo::parse_topic_info(const
TWorkloadGroupInfo& tworkload_g
bool is_percent = true;
int64_t mem_limit =
ParseUtil::parse_mem_spec(mem_limit_str, -1, MemInfo::mem_limit(),
&is_percent);
- workload_group_info->memory_limit = mem_limit;
// 7 mem overcommit
bool enable_memory_overcommit = ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE;
if (tworkload_group_info.__isset.enable_memory_overcommit) {
enable_memory_overcommit =
tworkload_group_info.enable_memory_overcommit;
}
- workload_group_info->enable_memory_overcommit = enable_memory_overcommit;
// 8 cpu soft limit or hard limit
bool enable_cpu_hard_limit = false;
if (tworkload_group_info.__isset.enable_cpu_hard_limit) {
enable_cpu_hard_limit = tworkload_group_info.enable_cpu_hard_limit;
}
- workload_group_info->enable_cpu_hard_limit = enable_cpu_hard_limit;
// 9 scan thread num
- workload_group_info->scan_thread_num =
config::doris_scanner_thread_pool_thread_num;
+ int scan_thread_num = config::doris_scanner_thread_pool_thread_num;
if (tworkload_group_info.__isset.scan_thread_num &&
tworkload_group_info.scan_thread_num > 0) {
- workload_group_info->scan_thread_num =
tworkload_group_info.scan_thread_num;
+ scan_thread_num = tworkload_group_info.scan_thread_num;
}
// 10 max remote scan thread num
- workload_group_info->max_remote_scan_thread_num =
- vectorized::ScannerScheduler::get_remote_scan_thread_num();
+ int max_remote_scan_thread_num =
vectorized::ScannerScheduler::get_remote_scan_thread_num();
if (tworkload_group_info.__isset.max_remote_scan_thread_num &&
tworkload_group_info.max_remote_scan_thread_num > 0) {
- workload_group_info->max_remote_scan_thread_num =
- tworkload_group_info.max_remote_scan_thread_num;
+ max_remote_scan_thread_num =
tworkload_group_info.max_remote_scan_thread_num;
}
// 11 min remote scan thread num
- workload_group_info->min_remote_scan_thread_num =
- vectorized::ScannerScheduler::get_remote_scan_thread_num();
+ int min_remote_scan_thread_num =
vectorized::ScannerScheduler::get_remote_scan_thread_num();
if (tworkload_group_info.__isset.min_remote_scan_thread_num &&
tworkload_group_info.min_remote_scan_thread_num > 0) {
- workload_group_info->min_remote_scan_thread_num =
- tworkload_group_info.min_remote_scan_thread_num;
+ min_remote_scan_thread_num =
tworkload_group_info.min_remote_scan_thread_num;
}
// 12 spill low watermark
@@ -339,16 +327,26 @@ Status WorkloadGroupInfo::parse_topic_info(const
TWorkloadGroupInfo& tworkload_g
if (tworkload_group_info.__isset.spill_threshold_low_watermark) {
spill_low_watermark =
tworkload_group_info.spill_threshold_low_watermark;
}
- workload_group_info->spill_low_watermark = spill_low_watermark;
// 13 spil high watermark
int spill_high_watermark = SPILL_HIGH_WATERMARK_DEFAULT_VALUE;
if (tworkload_group_info.__isset.spill_threshold_high_watermark) {
spill_high_watermark =
tworkload_group_info.spill_threshold_high_watermark;
}
- workload_group_info->spill_high_watermark = spill_high_watermark;
- return Status::OK();
+ return {tg_id,
+ name,
+ cpu_share,
+ mem_limit,
+ enable_memory_overcommit,
+ version,
+ cpu_hard_limit,
+ enable_cpu_hard_limit,
+ scan_thread_num,
+ max_remote_scan_thread_num,
+ min_remote_scan_thread_num,
+ spill_low_watermark,
+ spill_high_watermark};
}
void WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv*
exec_env) {
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 971cc1cb023..a82efab0904 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -168,7 +168,9 @@ private:
const uint64_t _id;
std::string _name;
int64_t _version;
- int64_t _memory_limit; // bytes
+ int64_t _memory_limit; // bytes
+ // `_weighted_mem_used` is a rough memory usage in this group,
+ // because we can only get a precise memory usage by MemTracker which is
not include page cache.
std::atomic_int64_t _weighted_mem_used = 0; // bytes
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;
@@ -197,25 +199,25 @@ private:
using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
struct WorkloadGroupInfo {
- uint64_t id;
- std::string name;
- uint64_t cpu_share;
- int64_t memory_limit;
- bool enable_memory_overcommit;
- int64_t version;
- int cpu_hard_limit;
- bool enable_cpu_hard_limit;
- int scan_thread_num;
- int max_remote_scan_thread_num;
- int min_remote_scan_thread_num;
- int spill_low_watermark;
- int spill_high_watermark;
+ const uint64_t id = 0;
+ const std::string name;
+ const uint64_t cpu_share = 0;
+ const int64_t memory_limit = 0;
+ const bool enable_memory_overcommit = false;
+ const int64_t version = 0;
+ const int cpu_hard_limit = 0;
+ const bool enable_cpu_hard_limit = false;
+ const int scan_thread_num = 0;
+ const int max_remote_scan_thread_num = 0;
+ const int min_remote_scan_thread_num = 0;
+ const int spill_low_watermark = 0;
+ const int spill_high_watermark = 0;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
+ const bool valid = true;
- static Status parse_topic_info(const TWorkloadGroupInfo&
tworkload_group_info,
- WorkloadGroupInfo* workload_group_info);
+ static WorkloadGroupInfo parse_topic_info(const TWorkloadGroupInfo&
tworkload_group_info);
};
} // namespace doris
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 2525ca8c0c1..478e91c0783 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -136,7 +136,7 @@ public:
int64_t limit() const { return _limit; }
int64_t offset() const { return _offset; }
- void set_enable_spill(bool b) { _enable_spill = b; }
+ void set_enable_spill() { _enable_spill = true; }
protected:
Status partial_sort(Block& src_block, Block& dest_block);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index 24b384d4453..4cdc04d1f1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -34,6 +34,7 @@ import org.apache.doris.statistics.StatsRecursiveDerive;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
+import org.apache.doris.thrift.TSortAlgorithm;
import org.apache.doris.thrift.TSortInfo;
import org.apache.doris.thrift.TSortNode;
@@ -63,6 +64,7 @@ public class SortNode extends PlanNode {
private final boolean useTopN;
private boolean useTopnOpt = false;
private boolean useTwoPhaseReadOpt;
+ private boolean hasRuntimePredicate = false;
// If mergeByexchange is set to true, the sort information is pushed to the
// exchange node, and the sort node is used for the ORDER BY .
@@ -323,6 +325,19 @@ public class SortNode extends PlanNode {
msg.sort_node.setMergeByExchange(this.mergeByexchange);
msg.sort_node.setIsAnalyticSort(isAnalyticSort);
msg.sort_node.setIsColocate(isColocate);
+
+ boolean isFixedLength = info.getOrderingExprs().stream().allMatch(e ->
!e.getType().isStringType()
+ && !e.getType().isCollectionType());
+ TSortAlgorithm algorithm;
+ if (limit > 0 && limit + offset < 1024 && (useTwoPhaseReadOpt ||
hasRuntimePredicate
+ || isFixedLength)) {
+ algorithm = TSortAlgorithm.HEAP_SORT;
+ } else if (limit > 0 && !isFixedLength && limit + offset < 256) {
+ algorithm = TSortAlgorithm.TOPN_SORT;
+ } else {
+ algorithm = TSortAlgorithm.FULL_SORT;
+ }
+ msg.sort_node.setAlgorithm(algorithm);
}
@Override
@@ -348,4 +363,8 @@ public class SortNode extends PlanNode {
public void setColocate(boolean colocate) {
isColocate = colocate;
}
+
+ public void setHasRuntimePredicate() {
+ this.hasRuntimePredicate = true;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 2c2d4437441..9e7431e07c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -65,6 +65,7 @@ import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.planner.RuntimeFilterId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SetOperationNode;
+import org.apache.doris.planner.SortNode;
import org.apache.doris.planner.UnionNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PExecPlanFragmentResult;
@@ -3012,10 +3013,12 @@ public class Coordinator implements CoordInterface {
List<TExecPlanFragmentParams> toThrift(int backendNum) {
List<TExecPlanFragmentParams> paramsList = Lists.newArrayList();
- Set<Integer> topnFilterSources = scanNodes.stream()
+ Set<SortNode> topnSortNodes = scanNodes.stream()
.filter(scanNode -> scanNode instanceof OlapScanNode)
- .flatMap(scanNode -> ((OlapScanNode)
scanNode).getTopnFilterSortNodes().stream())
- .map(sort ->
sort.getId().asInt()).collect(Collectors.toSet());
+ .flatMap(scanNode ->
scanNode.getTopnFilterSortNodes().stream()).collect(Collectors.toSet());
+ topnSortNodes.forEach(SortNode::setHasRuntimePredicate);
+ Set<Integer> topnFilterSources = topnSortNodes.stream().map(
+ sort -> sort.getId().asInt()).collect(Collectors.toSet());
for (int i = 0; i < instanceExecParams.size(); ++i) {
final FInstanceExecParam instanceExecParam =
instanceExecParams.get(i);
TExecPlanFragmentParams params = new TExecPlanFragmentParams();
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1281a7fba49..cdc5e49decc 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -944,6 +944,12 @@ struct TPreAggregationNode {
2: required list<Exprs.TExpr> aggregate_exprs
}
+enum TSortAlgorithm {
+ HEAP_SORT,
+ TOPN_SORT,
+ FULL_SORT
+ }
+
struct TSortNode {
1: required TSortInfo sort_info
// Indicates whether the backend service should use topn vs. sorting
@@ -957,6 +963,7 @@ struct TSortNode {
8: optional bool merge_by_exchange
9: optional bool is_analytic_sort
10: optional bool is_colocate
+ 11: optional TSortAlgorithm algorithm
}
enum TopNAlgorithm {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]