This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7248420cfd1 [chore](session_variable) Add 'data_queue_max_blocks' to
prevent the DataQueue from occupying too much memory. (#34017) (#34395)
7248420cfd1 is described below
commit 7248420cfd1679209651fd0e205a7dacd0e4ca5f
Author: Jerry Hu <[email protected]>
AuthorDate: Sun May 5 21:20:33 2024 +0800
[chore](session_variable) Add 'data_queue_max_blocks' to prevent the
DataQueue from occupying too much memory. (#34017) (#34395)
---
be/src/pipeline/exec/data_queue.cpp | 10 ++++++++-
be/src/pipeline/exec/data_queue.h | 4 ++++
be/src/pipeline/exec/union_sink_operator.cpp | 1 +
be/src/pipeline/pipeline_fragment_context.cpp | 5 +++++
be/src/runtime/runtime_state.h | 5 +++++
.../java/org/apache/doris/qe/SessionVariable.java | 10 +++++++++
gensrc/thrift/PaloInternalService.thrift | 3 +++
.../query_p0/aggregate/distinct_streaming_agg.out | 6 +++++
.../aggregate/distinct_streaming_agg.groovy | 26 ++++++++++++++++++++++
9 files changed, 69 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/exec/data_queue.cpp
b/be/src/pipeline/exec/data_queue.cpp
index 06c16c7dfa6..e251ace57a4 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -119,10 +119,13 @@ Status
DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
}
_cur_bytes_in_queue[_flag_queue_idx] -=
(*output_block)->allocated_bytes();
_cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
+ if (_cur_blocks_nums_in_queue[_flag_queue_idx] == 0 &&
+ _sink_dependencies[_flag_queue_idx] != nullptr) {
+ _sink_dependencies[_flag_queue_idx]->set_ready();
+ }
auto old_value = _cur_blocks_total_nums.fetch_sub(1);
if (old_value == 1 && _source_dependency) {
set_source_block();
- _sink_dependencies[_flag_queue_idx]->set_ready();
}
} else {
if (_is_finished[_flag_queue_idx]) {
@@ -142,6 +145,11 @@ void
DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
_cur_bytes_in_queue[child_idx] += block->allocated_bytes();
_queue_blocks[child_idx].emplace_back(std::move(block));
_cur_blocks_nums_in_queue[child_idx] += 1;
+
+ if (_cur_blocks_nums_in_queue[child_idx] > _max_blocks_in_sub_queue &&
+ _sink_dependencies[child_idx] != nullptr) {
+ _sink_dependencies[child_idx]->block();
+ }
_cur_blocks_total_nums++;
if (_source_dependency) {
set_source_ready();
diff --git a/be/src/pipeline/exec/data_queue.h
b/be/src/pipeline/exec/data_queue.h
index 1a50b7485d1..f5bd84cc278 100644
--- a/be/src/pipeline/exec/data_queue.h
+++ b/be/src/pipeline/exec/data_queue.h
@@ -70,6 +70,8 @@ public:
void set_source_ready();
void set_source_block();
+ void set_max_blocks_in_sub_queue(int64_t max_blocks) {
_max_blocks_in_sub_queue = max_blocks; }
+
private:
std::vector<std::unique_ptr<std::mutex>> _queue_blocks_lock;
std::vector<std::deque<std::unique_ptr<vectorized::Block>>> _queue_blocks;
@@ -93,6 +95,8 @@ private:
// only used by streaming agg source operator
bool _data_exhausted = false;
+ int64_t _max_blocks_in_sub_queue = 1;
+
//this only use to record the queue[0] for profile
int64_t _max_bytes_in_queue = 0;
int64_t _max_size_of_queue = 0;
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp
b/be/src/pipeline/exec/union_sink_operator.cpp
index e466237a375..40344882a84 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -111,6 +111,7 @@ Status UnionSinkLocalState::open(RuntimeState* state) {
for (size_t i = 0; i < p._child_expr.size(); i++) {
RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
}
+
_shared_state->data_queue.set_max_blocks_in_sub_queue(state->data_queue_max_blocks());
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index e53492c9fa0..e3f9de5fd03 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -542,6 +542,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
} else {
int child_count = union_node->children_count();
auto data_queue = std::make_shared<DataQueue>(child_count);
+
data_queue->set_max_blocks_in_sub_queue(_runtime_state->data_queue_max_blocks());
for (int child_id = 0; child_id < child_count; ++child_id) {
auto new_child_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id),
new_child_pipeline));
@@ -564,8 +565,11 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
std::to_string(agg_node->id()) +
": group by and output is empty");
}
+
+ const int64_t data_queue_max_blocks =
_runtime_state->data_queue_max_blocks();
if (agg_node->is_aggregate_evaluators_empty() &&
!agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
+ data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
data_queue);
@@ -577,6 +581,7 @@ Status PipelineFragmentContext::_build_pipelines(ExecNode*
node, PipelinePtr cur
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
} else if (agg_node->is_streaming_preagg() &&
!agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
+ data_queue->set_max_blocks_in_sub_queue(data_queue_max_blocks);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<StreamingAggSinkOperatorBuilder>(
node->id(), agg_node, data_queue);
RETURN_IF_ERROR(new_pipe->set_sink_builder(pre_agg_sink));
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e01eb6166dc..e4bec15bf63 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -431,6 +431,11 @@ public:
return _query_options.__isset.skip_missing_version &&
_query_options.skip_missing_version;
}
+ int64_t data_queue_max_blocks() const {
+ return _query_options.__isset.data_queue_max_blocks ?
_query_options.data_queue_max_blocks
+ : 1;
+ }
+
bool enable_page_cache() const;
int partitioned_hash_join_rows_threshold() const {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index d89ff2ea8df..7ad72283d44 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -493,6 +493,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_JOIN_SPILL = "enable_join_spill";
public static final String ENABLE_SORT_SPILL = "enable_sort_spill";
public static final String ENABLE_AGG_SPILL = "enable_agg_spill";
+ public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
@@ -1745,6 +1746,13 @@ public class SessionVariable implements Serializable,
Writable {
needForward = true, fuzzy = true)
public boolean enableAggSpill = false;
+ @VariableMgr.VarAttr(
+ name = DATA_QUEUE_MAX_BLOCKS,
+ description = {"DataQueue 中每个子队列允许最大的 block 个数",
+ "Max blocks in DataQueue."},
+ needForward = true, fuzzy = true)
+ public long dataQueueMaxBlocks = 1;
+
// If the memory consumption of sort node exceed this limit, will trigger
spill to disk;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
@@ -3171,6 +3179,8 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableSortSpill(enableSortSpill);
tResult.setEnableAggSpill(enableAggSpill);
tResult.setMinRevocableMem(minRevocableMem);
+ tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
+
return tResult;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 3265cb47b26..2f7886e10c2 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -285,6 +285,9 @@ struct TQueryOptions {
104: optional i64 min_revocable_mem = 0
105: optional i64 spill_streaming_agg_mem_limit = 0;
+
+ // max rows of each sub-queue in DataQueue.
+ 106: optional i64 data_queue_max_blocks = 0;
// For cloud, to control if the content would be written into file cache
1000: optional bool disable_file_cache = false
diff --git a/regression-test/data/query_p0/aggregate/distinct_streaming_agg.out
b/regression-test/data/query_p0/aggregate/distinct_streaming_agg.out
new file mode 100644
index 00000000000..d2ac59bdede
--- /dev/null
+++ b/regression-test/data/query_p0/aggregate/distinct_streaming_agg.out
@@ -0,0 +1,6 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+true
+false
+\N
+
diff --git
a/regression-test/suites/query_p0/aggregate/distinct_streaming_agg.groovy
b/regression-test/suites/query_p0/aggregate/distinct_streaming_agg.groovy
new file mode 100644
index 00000000000..bd3285a60f9
--- /dev/null
+++ b/regression-test/suites/query_p0/aggregate/distinct_streaming_agg.groovy
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+suite("distinct_streaming_agg") {
+ sql """ use test_query_db; """
+
+ qt_select """
+ select k6 from baseall union select k6 from bigtable order by 1;
+ """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]