This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 51ba957fd6b [improve](partition_topn) Add partition threshold check in
hash table to control partition nums (#39057)
51ba957fd6b is described below
commit 51ba957fd6b274886d89ad28b6c8c4899bd5bdba
Author: zhangstar333 <[email protected]>
AuthorDate: Fri Sep 20 15:06:21 2024 +0800
[improve](partition_topn) Add partition threshold check in hash table to
control partition nums (#39057)
## Proposed changes
1. Add a session variable to control partition_topn partition threshold
2. move the partition threshold check at emplace data to hash table to
control partition nums, so get check every rows.
this could improve some bad case about 50%+ performance improvement, and
some better case before, after move the check in hash table, maybe have
performance degradation almost 10%, I think this is within the
acceptable result。
<!--Describe your changes.-->
---
be/src/common/config.cpp | 2 -
be/src/common/config.h | 4 --
.../pipeline/exec/partition_sort_sink_operator.cpp | 61 ++++++++++++++++++----
.../pipeline/exec/partition_sort_sink_operator.h | 7 ++-
.../exec/partition_sort_source_operator.cpp | 6 +--
be/src/runtime/runtime_state.h | 12 +++++
.../java/org/apache/doris/qe/SessionVariable.java | 20 +++++++
gensrc/thrift/PaloInternalService.thrift | 2 +
.../external_table_p0/jdbc/test_jdbc_query_pg.out | 6 +--
.../jdbc/test_jdbc_query_pg.groovy | 2 +-
10 files changed, 98 insertions(+), 24 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index b0b0dd7aafc..58679fbe9b4 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1115,8 +1115,6 @@ DEFINE_mInt32(publish_version_gap_logging_threshold,
"200");
// The secure path with user files, used in the `local` table function.
DEFINE_mString(user_files_secure_path, "${DORIS_HOME}");
-DEFINE_Int32(partition_topn_partition_threshold, "1024");
-
DEFINE_Int32(fe_expire_duration_seconds, "60");
DEFINE_Int32(grace_shutdown_wait_seconds, "120");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4dabccc7db3..56a9357e72e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1185,10 +1185,6 @@ DECLARE_mInt32(publish_version_gap_logging_threshold);
// The secure path with user files, used in the `local` table function.
DECLARE_mString(user_files_secure_path);
-// This threshold determines how many partitions will be allocated for window
function get topn.
-// and if this threshold is exceeded, the remaining data will be pass through
to other node directly.
-DECLARE_Int32(partition_topn_partition_threshold);
-
// If fe's frontend info has not been updated for more than
fe_expire_duration_seconds, it will be regarded
// as an abnormal fe, this will cause be to cancel this fe's related query.
DECLARE_Int32(fe_expire_duration_seconds);
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 94c51e160da..3a850a40b13 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -17,6 +17,8 @@
#include "partition_sort_sink_operator.h"
+#include <cstdint>
+
#include "common/status.h"
#include "partition_sort_source_operator.h"
#include "vec/common/hash_table/hash.h"
@@ -107,8 +109,13 @@ Status PartitionSortSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
for (size_t i = 0; i < p._partition_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(p._partition_expr_ctxs[i]->clone(state,
_partition_expr_ctxs[i]));
}
+ _topn_phase = p._topn_phase;
_partition_exprs_num = p._partition_exprs_num;
_hash_table_size_counter = ADD_COUNTER(_profile, "HashTableSize",
TUnit::UNIT);
+ _serialize_key_arena_memory_usage =
+ _profile->AddHighWaterMarkCounter("SerializeKeyArena",
TUnit::BYTES, "MemoryUsage", 1);
+ _hash_table_memory_usage =
+ ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "HashTable", TUnit::BYTES,
"MemoryUsage", 1);
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
@@ -119,6 +126,8 @@ Status PartitionSortSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order,
p._nulls_first,
p._child->row_desc(), state, _profile, p._has_global_limit,
p._partition_inner_limit,
p._top_n_algorithm, p._topn_phase);
+ _profile->add_info_string("PartitionTopNPhase", to_string(p._topn_phase));
+ _profile->add_info_string("PartitionTopNLimit",
std::to_string(p._partition_inner_limit));
RETURN_IF_ERROR(_init_hash_method());
return Status::OK();
}
@@ -177,11 +186,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
}
local_state._value_places[0]->append_whole_block(input_block,
_child->row_desc());
} else {
- //just simply use partition num to check
- //if is TWO_PHASE_GLOBAL, must be sort all data thought partition
num threshold have been exceeded.
- if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL &&
- local_state._num_partition >
config::partition_topn_partition_threshold &&
- local_state._sorted_partition_input_rows < 10000 *
local_state._num_partition) {
+ if (local_state._is_need_passthrough) {
{
COUNTER_UPDATE(local_state._passthrough_rows_counter,
(int64_t)current_rows);
std::lock_guard<std::mutex>
lock(local_state._shared_state->buffer_mutex);
@@ -193,8 +198,6 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
RETURN_IF_ERROR(_split_block_by_partition(input_block,
local_state, eos));
RETURN_IF_CANCELLED(state);
input_block->clear_column_data();
- local_state._sorted_partition_input_rows =
- local_state._sorted_partition_input_rows +
current_rows;
}
}
}
@@ -225,6 +228,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state._shared_state->sink_eos = true;
local_state._dependency->set_ready_to_read();
}
+ local_state._profile->add_info_string("HasPassThrough",
+ local_state._is_need_passthrough
? "Yes" : "No");
}
return Status::OK();
@@ -245,7 +250,7 @@ Status
PartitionSortSinkOperatorX::_split_block_by_partition(
}
Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
- const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block*
input_block,
+ const vectorized::ColumnRawPtrs& key_columns, vectorized::Block*
input_block,
PartitionSortSinkLocalState& local_state, bool eos) {
return std::visit(
vectorized::Overload {
@@ -280,15 +285,37 @@ Status
PartitionSortSinkOperatorX::_emplace_into_hash_table(
};
SCOPED_TIMER(local_state._emplace_key_timer);
- for (size_t row = 0; row < num_rows; ++row) {
+ int row = num_rows;
+ for (row = row - 1; row >= 0 &&
!local_state._is_need_passthrough; --row) {
auto& mapped = agg_method.lazy_emplace(state, row,
creator,
creator_for_null_key);
mapped->add_row_idx(row);
+ local_state._sorted_partition_input_rows++;
+ local_state._is_need_passthrough =
+
local_state.check_whether_need_passthrough();
}
for (auto* place : local_state._value_places) {
SCOPED_TIMER(local_state._selector_block_timer);
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
}
+ if (local_state._is_need_passthrough) {
+ {
+
COUNTER_UPDATE(local_state._passthrough_rows_counter,
+ (int64_t)(num_rows - row));
+ std::lock_guard<std::mutex> lock(
+
local_state._shared_state->buffer_mutex);
+ // have emplace (num_rows - row) to hashtable,
and now have row remaining needed in block;
+ input_block->set_num_rows(row);
+ local_state._shared_state->blocks_buffer.push(
+ std::move(*input_block));
+ // buffer have data, source could read this.
+ local_state._dependency->set_ready_to_read();
+ }
+ }
+ local_state._serialize_key_arena_memory_usage->set(
+ (int64_t)local_state._agg_arena_pool->size());
+ COUNTER_SET(local_state._hash_table_memory_usage,
+
(int64_t)agg_method.hash_table->get_buffer_size_in_bytes());
return Status::OK();
}},
local_state._partitioned_data->method_variant);
@@ -303,4 +330,20 @@ Status PartitionSortSinkLocalState::_init_hash_method() {
return Status::OK();
}
+// NOLINTBEGIN(readability-simplify-boolean-expr)
+// just simply use partition num to check
+// but if is TWO_PHASE_GLOBAL, must be sort all data thought partition num
threshold have been exceeded.
+// partition_topn_max_partitions default is : 1024
+// partition_topn_per_partition_rows default is : 1000
+bool PartitionSortSinkLocalState::check_whether_need_passthrough() {
+ if (_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL &&
+ _num_partition > _state->partition_topn_max_partitions() &&
+ _sorted_partition_input_rows <
+ _state->partition_topn_per_partition_rows() * _num_partition) {
+ return true;
+ }
+ return false;
+}
+// NOLINTEND(readability-simplify-boolean-expr)
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index e58ac5fea9e..f16df509dca 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -239,6 +239,8 @@ private:
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
int _partition_exprs_num = 0;
std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr;
+ TPartTopNPhase::type _topn_phase;
+ bool _is_need_passthrough = false;
RuntimeProfile::Counter* _build_timer = nullptr;
RuntimeProfile::Counter* _emplace_key_timer = nullptr;
@@ -246,7 +248,10 @@ private:
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _passthrough_rows_counter = nullptr;
RuntimeProfile::Counter* _sorted_partition_input_rows_counter = nullptr;
+ RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
+ RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage =
nullptr;
Status _init_hash_method();
+ bool check_whether_need_passthrough();
};
class PartitionSortSinkOperatorX final : public
DataSinkOperatorX<PartitionSortSinkLocalState> {
@@ -289,7 +294,7 @@ private:
Status _split_block_by_partition(vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state,
bool eos);
Status _emplace_into_hash_table(const vectorized::ColumnRawPtrs&
key_columns,
- const vectorized::Block* input_block,
+ vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state,
bool eos);
};
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 2f94a652a89..f2cd8dea0b9 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -95,12 +95,10 @@ Status
PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
if (local_state._sort_idx <
local_state._shared_state->partition_sorts.size()) {
RETURN_IF_ERROR(local_state._shared_state->partition_sorts[local_state._sort_idx]->get_next(
state, output_block, ¤t_eos));
+ COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter,
output_block->rows());
}
if (current_eos) {
- //current sort have eos, so get next idx
- auto rows =
local_state._shared_state->partition_sorts[local_state._sort_idx]
- ->get_output_rows();
- COUNTER_UPDATE(local_state._sorted_partition_output_rows_counter,
rows);
+ // current sort have eos, so get next idx
local_state._shared_state->partition_sorts[local_state._sort_idx].reset(nullptr);
local_state._sort_idx++;
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f43d0a163df..90cf1bc34bd 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -490,6 +490,18 @@ public:
: 0;
}
+ int partition_topn_max_partitions() const {
+ return _query_options.__isset.partition_topn_max_partitions
+ ? _query_options.partition_topn_max_partitions
+ : 1024;
+ }
+
+ int partition_topn_per_partition_rows() const {
+ return _query_options.__isset.partition_topn_pre_partition_rows
+ ? _query_options.partition_topn_pre_partition_rows
+ : 1000;
+ }
+
int64_t parallel_scan_min_rows_per_scanner() const {
return _query_options.__isset.parallel_scan_min_rows_per_scanner
? _query_options.parallel_scan_min_rows_per_scanner
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 94cf0cb3469..e4fb6153e33 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -240,6 +240,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String MAX_JOIN_NUMBER_BUSHY_TREE =
"max_join_number_bushy_tree";
public static final String ENABLE_PARTITION_TOPN = "enable_partition_topn";
+ public static final String PARTITION_TOPN_MAX_PARTITIONS =
"partition_topn_max_partitions";
+ public static final String PARTITION_TOPN_PER_PARTITION_ROWS =
"partition_topn_pre_partition_rows";
public static final String GLOBAL_PARTITION_TOPN_THRESHOLD =
"global_partition_topn_threshold";
@@ -1240,6 +1242,22 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_PARTITION_TOPN)
private boolean enablePartitionTopN = true;
+ @VariableMgr.VarAttr(name = PARTITION_TOPN_MAX_PARTITIONS, needForward =
true, description = {
+
"这个阈值决定了partition_topn计算时的最大分区数量,超过这个阈值后且输入总行数少于预估总量,剩余的数据将直接透传给下一个算子",
+ "This threshold determines how many partitions will be allocated
for window function get topn."
+ + " if this threshold is exceeded and input rows less than
the estimated total rows, the remaining"
+ + " data will be pass through to other node directly."
+ })
+ private int partitionTopNMaxPartitions = 1024;
+
+ @VariableMgr.VarAttr(name = PARTITION_TOPN_PER_PARTITION_ROWS, needForward
= true, description = {
+ "这个数值用于partition_topn预估每个分区的行数,用来计算所有分区的预估数据总量,决定是否能透传下一个算子",
+ "This value is used for partition_topn to estimate the number of
rows in each partition, to calculate "
+ + " the estimated total amount of data for all partitions, and to
determine whether the next operator "
+ + " can be passed transparently."
+ })
+ private int partitionTopNPerPartitionRows = 1000;
+
@VariableMgr.VarAttr(name = GLOBAL_PARTITION_TOPN_THRESHOLD)
private double globalPartitionTopNThreshold = 100;
@@ -3661,6 +3679,8 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setBatchSize(batchSize);
tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
tResult.setEnableDistinctStreamingAggregation(enableDistinctStreamingAggregation);
+ tResult.setPartitionTopnMaxPartitions(partitionTopNMaxPartitions);
+
tResult.setPartitionTopnPrePartitionRows(partitionTopNPerPartitionRows);
if (maxScanKeyNum > 0) {
tResult.setMaxScanKeyNum(maxScanKeyNum);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 871101c5c35..48f41e8e0ab 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -343,6 +343,8 @@ struct TQueryOptions {
131: optional i32 adaptive_pipeline_task_serial_read_on_limit = 10000;
132: optional i32 parallel_prepare_threshold = 0;
+ 133: optional i32 partition_topn_max_partitions = 1024;
+ 134: optional i32 partition_topn_pre_partition_rows = 1000;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
diff --git a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
index ec2058ca46e..acfe2adad2e 100644
--- a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
+++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
@@ -1358,9 +1358,9 @@ true abc def 2022-10-11 1.234 1
2 3 2022-10-22T10:59:59 34.123 true abc def 2022
6
-- !sql87 --
-1 3
-2 0
-3 1
+1 0
+2 1
+3 2
-- !sql88 --
1
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
index 492fdeb349b..623bd6e8932 100644
--- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
+++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
@@ -575,7 +575,7 @@ suite("test_jdbc_query_pg",
"p0,external,pg,external_docker,external_docker_pg")
order_qt_sql84 """ SELECT NULL, NULL INTERSECT SELECT NULL, NULL FROM
$jdbcPg14Table1 """
order_qt_sql85 """ SELECT COUNT(*) FROM $jdbcPg14Table1 INTERSECT
SELECT COUNT(k8) FROM $jdbcPg14Table1 HAVING SUM(k7) IS NOT NULL """
order_qt_sql86 """ SELECT k8 FROM $jdbcPg14Table1 WHERE k8 < 7 EXCEPT
SELECT k8 FROM $jdbcPg14Table1 WHERE k8 > 21 """
- order_qt_sql87 """ SELECT row_number() OVER (PARTITION BY k7) rn, k8
FROM $jdbcPg14Table1 LIMIT 3 """
+ order_qt_sql87 """ SELECT row_number() OVER (PARTITION BY k7 order by
k8) rn, k8 FROM $jdbcPg14Table1 LIMIT 3 """
order_qt_sql88 """ SELECT row_number() OVER (PARTITION BY k7 ORDER BY
k8) rn FROM $jdbcPg14Table1 LIMIT 3 """
order_qt_sql89 """ SELECT row_number() OVER (ORDER BY k8) rn FROM
$jdbcPg14Table1 LIMIT 3 """
order_qt_sql90 """ SELECT row_number() OVER () FROM $jdbcPg14Table1 as
a JOIN ${dorisExTable1} as b ON a.k8 = b.id WHERE a.k8 > 111 LIMIT 2 """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]