This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 7041d39da9f Spill updated (#40798)
7041d39da9f is described below
commit 7041d39da9f7120b67856c753425c1089199206d
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Sep 13 16:10:09 2024 +0800
Spill updated (#40798)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/dependency.h | 3 +-
be/src/pipeline/exec/aggregation_sink_operator.cpp | 8 ++--
be/src/pipeline/exec/aggregation_sink_operator.h | 2 +-
.../pipeline/exec/aggregation_source_operator.cpp | 4 +-
be/src/pipeline/exec/analytic_sink_operator.cpp | 2 +-
be/src/pipeline/exec/analytic_sink_operator.h | 2 +-
be/src/pipeline/exec/analytic_source_operator.cpp | 2 +-
be/src/pipeline/exec/assert_num_rows_operator.cpp | 2 +-
be/src/pipeline/exec/datagen_operator.cpp | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 9 -----
.../exec/nested_loop_join_probe_operator.cpp | 8 ++--
be/src/pipeline/exec/operator.h | 22 +----------
.../exec/partitioned_aggregation_sink_operator.h | 2 +-
.../partitioned_aggregation_source_operator.cpp | 23 +++++++++--
.../exec/partitioned_hash_join_probe_operator.cpp | 3 ++
.../exec/partitioned_hash_join_sink_operator.cpp | 45 ++++++++++++----------
be/src/pipeline/exec/repeat_operator.cpp | 4 +-
be/src/pipeline/exec/set_probe_sink_operator.cpp | 2 +-
be/src/pipeline/exec/set_probe_sink_operator.h | 2 +-
be/src/pipeline/exec/set_source_operator.cpp | 2 +-
be/src/pipeline/exec/sort_source_operator.cpp | 2 +-
.../exec/streaming_aggregation_operator.cpp | 4 +-
be/src/pipeline/exec/union_source_operator.cpp | 2 +-
be/src/pipeline/pipeline_task.cpp | 23 +++++++----
be/src/vec/core/block.cpp | 17 ++++----
25 files changed, 102 insertions(+), 95 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index f7990c097ef..547271d87fb 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -453,7 +453,8 @@ struct PartitionedAggSharedState : public BasicSharedState,
std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
size_t get_partition_index(size_t hash_value) const {
- return (hash_value >> (32 - partition_count_bits)) &
max_partition_index;
+ // return (hash_value >> (32 - partition_count_bits)) &
max_partition_index;
+ return hash_value % partition_count;
}
};
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 166187ffc6d..e1708421b0d 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -178,7 +178,7 @@ Status
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
DCHECK(_agg_data->without_key != nullptr);
SCOPED_TIMER(_build_timer);
_memory_usage_last_executing = 0;
- ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+ SCOPED_PEAK_MEM(&_memory_usage_last_executing);
for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i)
{
RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add(
block,
@@ -191,7 +191,7 @@ Status
AggSinkLocalState::_execute_without_key(vectorized::Block* block) {
Status AggSinkLocalState::_merge_with_serialized_key(vectorized::Block* block)
{
_memory_usage_last_executing = 0;
- ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+ SCOPED_PEAK_MEM(&_memory_usage_last_executing);
if (_shared_state->reach_limit) {
return _merge_with_serialized_key_helper<true, false>(block);
} else {
@@ -401,7 +401,7 @@ Status
AggSinkLocalState::_merge_without_key(vectorized::Block* block) {
DCHECK(_agg_data->without_key != nullptr);
_memory_usage_last_executing = 0;
- ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+ SCOPED_PEAK_MEM(&_memory_usage_last_executing);
for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i)
{
if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) {
int col_id = AggSharedState::get_slot_column_id(
@@ -440,7 +440,7 @@ void AggSinkLocalState::_update_memusage_without_key() {
Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block*
block) {
_memory_usage_last_executing = 0;
- ScopedMemTracker mem_tracker(_memory_usage_last_executing);
+ SCOPED_PEAK_MEM(&_memory_usage_last_executing);
if (_shared_state->reach_limit) {
return _execute_with_serialized_key_helper<true>(block);
} else {
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index ea54281c40c..0071f7bfc03 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -126,7 +126,7 @@ protected:
std::unique_ptr<ExecutorBase> _executor = nullptr;
- size_t _memory_usage_last_executing = 0;
+ int64_t _memory_usage_last_executing = 0;
};
class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index c1cb187f3e6..6df089bbb5b 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -440,7 +440,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- ScopedMemTracker scoped_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
@@ -482,7 +482,7 @@ void
AggLocalState::make_nullable_output_key(vectorized::Block* block) {
template <bool limit>
Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block*
block) {
SCOPED_TIMER(_merge_timer);
- ScopedMemTracker scoped_tracker(_estimate_memory_usage);
+ SCOPED_PEAK_MEM(&_estimate_memory_usage);
size_t key_size = Base::_shared_state->probe_expr_ctxs.size();
vectorized::ColumnRawPtrs key_columns(key_size);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 04f33d2f15b..b7623651cfc 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -269,7 +269,7 @@ Status AnalyticSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Block
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
local_state._reserve_mem_size = 0;
- ScopedMemTracker mem_tracker(local_state._reserve_mem_size);
+ SCOPED_PEAK_MEM(&local_state._reserve_mem_size);
local_state._shared_state->input_eos = eos;
if (local_state._shared_state->input_eos && input_block->rows() == 0) {
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 9d812e7cc28..93c68539144 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -62,7 +62,7 @@ private:
std::vector<vectorized::VExprContextSPtrs> _agg_expr_ctxs;
- size_t _reserve_mem_size = 0;
+ int64_t _reserve_mem_size = 0;
};
class AnalyticSinkOperatorX final : public
DataSinkOperatorX<AnalyticSinkLocalState> {
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index caacf06b8ea..0155190f042 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -512,7 +512,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState*
state, vectorized::Block
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
if (local_state._shared_state->input_eos &&
(local_state._output_block_index ==
local_state._shared_state->input_blocks.size() ||
local_state._shared_state->input_total_rows == 0)) {
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 6c6a28029e2..f83569b1b34 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -42,7 +42,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
+ SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
local_state.add_num_rows_returned(block->rows());
int64_t num_rows_returned = local_state.num_rows_returned();
bool assert_res = false;
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index ba3ab5e42da..466f16c82fd 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -68,7 +68,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block*
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
+ SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
Status res = local_state._table_func->get_next(state, block, eos);
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block,
block->columns()));
local_state.reached_limit(block, eos);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 6c132649ddb..e4a9e5df72d 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -137,15 +137,6 @@ size_t
HashJoinBuildSinkLocalState::get_reserve_mem_size(RuntimeState* state) {
size_to_reserve += rows * sizeof(uint8_t); // JoinHashTable::visited
}
size_to_reserve += _evaluate_mem_usage;
-
- if (size_to_reserve > 2L * 1024 * 1024 * 1024) [[unlikely]] {
- LOG(INFO) << "**** too big reserve size: " << size_to_reserve << ",
rows: " << rows
- << ", bucket_size: " << bucket_size
- << ", mutable block size: " <<
_build_side_mutable_block.allocated_bytes()
- << ", mutable block cols: " <<
_build_side_mutable_block.columns()
- << ", _build_col_ids.size: " << _build_col_ids.size();
- }
-
return size_to_reserve;
}
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index c5642b8a731..9e5964841ef 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -472,7 +472,7 @@ Status
NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized
bool eos) const {
auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state._probe_rows_counter, block->rows());
- ScopedMemTracker scoped_mem_tracker(local_state.estimate_memory_usage());
+ SCOPED_PEAK_MEM(&local_state.estimate_memory_usage());
local_state._cur_probe_row_visited_flags.resize(block->rows());
std::fill(local_state._cur_probe_row_visited_flags.begin(),
local_state._cur_probe_row_visited_flags.end(), 0);
@@ -499,12 +499,12 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState*
state, vectorized::Block
bool* eos) const {
auto& local_state = get_local_state(state);
if (_is_output_left_side_only) {
- ScopedMemTracker
scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(),
block));
*eos = local_state._shared_state->left_side_eos;
local_state._need_more_input_data =
!local_state._shared_state->left_side_eos;
} else {
- ScopedMemTracker
scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
*eos = ((_match_all_build || _is_right_semi_anti)
? local_state._output_null_idx_build_side ==
local_state._shared_state->build_blocks.size() &&
@@ -537,7 +537,7 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState*
state, vectorized::Block
state, join_op_variants);
};
SCOPED_TIMER(local_state._loop_join_timer);
- ScopedMemTracker
scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
RETURN_IF_ERROR(std::visit(
func, local_state._shared_state->join_op_variants,
vectorized::make_bool_variant(_match_all_build ||
_is_right_semi_anti),
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 4932504b424..dca3e136526 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -199,7 +199,7 @@ public:
void update_estimate_memory_usage(size_t usage) { _estimate_memory_usage
+= usage; }
- size_t& estimate_memory_usage() { return _estimate_memory_usage; }
+ int64_t& estimate_memory_usage() { return _estimate_memory_usage; }
void reset_estimate_memory_usage() { _estimate_memory_usage = 0; }
@@ -210,7 +210,7 @@ protected:
ObjectPool* _pool = nullptr;
int64_t _num_rows_returned {0};
- size_t _estimate_memory_usage {0};
+ int64_t _estimate_memory_usage {0};
std::unique_ptr<RuntimeProfile> _runtime_profile;
@@ -243,24 +243,6 @@ protected:
vectorized::Block _origin_block;
};
-class ScopedMemTracker {
-public:
- ScopedMemTracker(size_t& counter) : _counter(counter),
_mem_tracker("ScopedMemTracker") {
-
thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(&_mem_tracker);
- _peak_usage = _mem_tracker.peak_consumption();
- }
-
- ~ScopedMemTracker() {
- thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
- _counter += (_mem_tracker.peak_consumption() - _peak_usage);
- }
-
-private:
- size_t& _counter;
- size_t _peak_usage = 0;
- MemTracker _mem_tracker;
-};
-
template <typename SharedStateArg = FakeSharedState>
class PipelineXLocalState : public PipelineXLocalStateBase {
public:
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 4bf2e41befc..378ca3b1d20 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -348,6 +348,6 @@ private:
friend class PartitionedAggSinkLocalState;
std::unique_ptr<AggSinkOperatorX> _agg_sink_operator;
- size_t _spill_partition_count_bits = 6;
+ size_t _spill_partition_count_bits = 5;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 333f98a66cb..d83af9f6c4e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -207,7 +207,7 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
_is_merging = true;
VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node " <<
_parent->node_id()
- << " merge spilled agg data";
+ << ", task id: " << _state->task_id() << " merge spilled agg
data";
RETURN_IF_ERROR(Base::_shared_state->in_mem_shared_state->reset_hash_table());
_spill_dependency->Dependency::block();
@@ -218,6 +218,9 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
submit_timer.start();
auto spill_func = [this, state, query_id, submit_timer] {
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ MonotonicStopWatch execution_timer;
+ execution_timer.start();
+ size_t read_size = 0;
Defer defer {[&]() {
if (!_status.ok() || state->is_cancelled()) {
if (!_status.ok()) {
@@ -226,9 +229,13 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
<< " merge spilled agg data error: " <<
_status;
}
_shared_state->close();
- } else if (_shared_state->spill_partitions.empty()) {
+ } else {
VLOG_DEBUG << "query " << print_id(query_id) << " agg node "
<< _parent->node_id()
- << " merge spilled agg data finish";
+ << ", task id: " << _state->task_id()
+ << " merge spilled agg data finish, time used: "
+ << (execution_timer.elapsed_time() / (1000L * 1000
* 1000))
+ << "s, read size: " << read_size << ", "
+ << _shared_state->spill_partitions.size() << "
partitions left";
}
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
_is_merging = false;
@@ -261,6 +268,7 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
if (!block.empty()) {
has_agg_data = true;
+ read_size += block.bytes();
_status = parent._agg_source_operator
->merge_with_serialized_key_helper<false>(
_runtime_state.get(),
&block);
@@ -268,6 +276,15 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
}
}
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+
+ if (!has_agg_data) {
+ VLOG_DEBUG << "query " << print_id(query_id) << " agg node
"
+ << _parent->node_id() << ", task id: " <<
_state->task_id()
+ << " merge spilled agg data finish, time used: "
+ << execution_timer.elapsed_time() << ", empty
partition "
+ << read_size << ", " <<
_shared_state->spill_partitions.size()
+ << " partitions left";
+ }
}
_shared_state->spill_partitions.pop_front();
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index bc60d2c376a..042d7aea75e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -809,6 +809,9 @@ Status
PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
<< ", task: " << state->task_id() << ", child eos: " <<
local_state._child_eos;
if (local_state._child_eos) {
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash
probe node: " << node_id()
+ << ", task: " << state->task_id() << ", child eos: " <<
local_state._child_eos
+ << ", will not revoke size: " << revocable_mem_size(state);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index d166d65b4c9..14c7df63262 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -27,6 +27,7 @@
#include "runtime/fragment_mgr.h"
#include "util/mem_info.h"
#include "util/runtime_profile.h"
+#include "vec/spill/spill_stream.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
@@ -353,21 +354,20 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
}
}
+ std::unique_lock<std::mutex> lock(_spill_lock);
if (_spilling_streams_count > 0) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- if (_spilling_streams_count > 0) {
- _spill_dependency->block();
- } else if (_child_eos) {
- LOG(INFO) << "hash join sink " << _parent->node_id() << "
set_ready_to_read"
- << ", task id: " << state->task_id();
- std::for_each(_shared_state->partitioned_build_blocks.begin(),
- _shared_state->partitioned_build_blocks.end(),
[&](auto& block) {
- if (block) {
- COUNTER_UPDATE(_in_mem_rows_counter,
block->rows());
- }
- });
- _dependency->set_ready_to_read();
- }
+ _spill_dependency->block();
+ } else if (_child_eos) {
+ VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join
sink "
+ << _parent->node_id() << " set_ready_to_read"
+ << ", task id: " << state->task_id();
+ std::for_each(_shared_state->partitioned_build_blocks.begin(),
+ _shared_state->partitioned_build_blocks.end(), [&](auto&
block) {
+ if (block) {
+ COUNTER_UPDATE(_in_mem_rows_counter,
block->rows());
+ }
+ });
+ _dependency->set_ready_to_read();
}
return Status::OK();
}
@@ -438,8 +438,9 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
std::unique_lock<std::mutex> lock(_spill_lock);
_spill_dependency->set_ready();
if (_child_eos) {
- LOG(INFO) << "hash join sink " << _parent->node_id() << "
set_ready_to_read"
- << ", task id: " << state()->task_id();
+ VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) <<
", hash join sink "
+ << _parent->node_id() << " set_ready_to_read"
+ << ", task id: " << state()->task_id();
std::for_each(_shared_state->partitioned_build_blocks.begin(),
_shared_state->partitioned_build_blocks.end(),
[&](auto& block) {
if (block) {
@@ -553,8 +554,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
const auto need_to_spill = local_state._shared_state->need_to_spill;
if (rows == 0) {
if (eos) {
- LOG(INFO) << "hash join sink " << node_id() << " sink eos,
set_ready_to_read"
- << ", task id: " << state->task_id() << ", need spil: "
<< need_to_spill;
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash
join sink "
+ << node_id() << " sink eos, set_ready_to_read"
+ << ", task id: " << state->task_id() << ", need spill:
" << need_to_spill;
if (!need_to_spill) {
if (UNLIKELY(!local_state._shared_state->inner_runtime_state))
{
@@ -596,6 +598,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
RETURN_IF_ERROR(local_state._partition_block(state, in_block, 0,
rows));
if (eos) {
return revoke_memory(state);
+ } else if (revocable_mem_size(state) >
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) {
+ return revoke_memory(state);
}
} else {
if (UNLIKELY(!local_state._shared_state->inner_runtime_state)) {
@@ -613,8 +617,9 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
local_state._shared_state->inner_runtime_state.get(),
in_block, eos));
if (eos) {
- LOG(INFO) << "hash join sink " << node_id() << " sink eos,
set_ready_to_read"
- << ", task id: " << state->task_id();
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash
join sink "
+ << node_id() << " sink eos, set_ready_to_read"
+ << ", task id: " << state->task_id();
local_state._dependency->set_ready_to_read();
}
}
diff --git a/be/src/pipeline/exec/repeat_operator.cpp
b/be/src/pipeline/exec/repeat_operator.cpp
index 07b8bae3fd1..e194b72a852 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -178,7 +178,7 @@ Status RepeatOperatorX::push(RuntimeState* state,
vectorized::Block* input_block
auto& _expr_ctxs = local_state._expr_ctxs;
DCHECK(!_intermediate_block || _intermediate_block->rows() == 0);
if (input_block->rows() > 0) {
- ScopedMemTracker
scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
_intermediate_block = vectorized::Block::create_unique();
for (auto& expr : _expr_ctxs) {
@@ -205,7 +205,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state,
vectorized::Block* outp
auto& _intermediate_block = local_state._intermediate_block;
RETURN_IF_CANCELLED(state);
- ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
DCHECK(_repeat_id_idx >= 0);
for (const std::vector<int64_t>& v : _grouping_list) {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 174d017aa53..a2ed417ed35 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -73,7 +73,7 @@ Status
SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, vectorized
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
- ScopedMemTracker mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
auto probe_rows = in_block->rows();
if (probe_rows > 0) {
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index bab04206d58..05024471585 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -55,7 +55,7 @@ private:
template <class HashTableContext, bool is_intersected>
friend struct vectorized::HashTableProbe;
- size_t _estimate_memory_usage = 0;
+ int64_t _estimate_memory_usage = 0;
//record insert column id during probe
std::vector<uint16_t> _probe_column_inserted_id;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index 6c3260ba850..d1c42ae3c08 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -74,7 +74,7 @@ Status
SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, vectoriz
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
_create_mutable_cols(local_state, block);
auto st = std::visit(
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp
b/be/src/pipeline/exec/sort_source_operator.cpp
index 47ed935ba8d..cb18019b35c 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -61,7 +61,7 @@ Status SortSourceOperatorX::open(RuntimeState* state) {
Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
RETURN_IF_ERROR(local_state._shared_state->sorter->get_next(state, block,
eos));
local_state.reached_limit(block, eos);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index b573a736280..caeb32b4155 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1279,7 +1279,7 @@ Status StreamingAggLocalState::close(RuntimeState* state)
{
Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block*
block, bool* eos) const {
auto& local_state = get_local_state(state);
- ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
if (!local_state._pre_aggregated_block->empty()) {
local_state._pre_aggregated_block->swap(*block);
} else {
@@ -1296,7 +1296,7 @@ Status StreamingAggOperatorX::pull(RuntimeState* state,
vectorized::Block* block
Status StreamingAggOperatorX::push(RuntimeState* state, vectorized::Block*
in_block,
bool eos) const {
auto& local_state = get_local_state(state);
- ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
local_state._input_num_rows += in_block->rows();
if (in_block->rows() > 0) {
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index 24d172f6708..18058c95cae 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -141,7 +141,7 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState*
state, vectorized::Blo
auto& local_state =
state->get_local_state(operator_id())->cast<UnionSourceLocalState>();
DCHECK_LT(local_state._const_expr_list_idx, _const_expr_lists.size());
- ScopedMemTracker scoped_mem_tracker(local_state._estimate_memory_usage);
+ SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
auto& _const_expr_list_idx = local_state._const_expr_list_idx;
vectorized::MutableBlock mblock =
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 6c5e018d3ec..d6b5b7504b2 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -396,15 +396,22 @@ Status PipelineTask::execute(bool* eos) {
bool is_high_wartermark = false;
bool is_low_wartermark = false;
workload_group->check_mem_used(&is_low_wartermark,
&is_high_wartermark);
- if (is_low_wartermark || is_high_wartermark) {
- /// The larger reserved memory size is likely due to a
larger available revocable size.
- /// If the available memory for revoking is large
enough, here trigger revoking proactively.
- if (_sink->revocable_mem_size(_state) > 512L * 1024 *
1024) {
- LOG(INFO) << "query: " << print_id(query_id)
- << " has big memory to revoke.";
- RETURN_IF_ERROR(_sink->revoke_memory(_state));
- }
+ /// The larger reserved memory size is likely due to a
larger available revocable size.
+ /// If the available memory for revoking is large enough,
here trigger revoking proactively.
+ bool need_to_pause = false;
+ const auto revocable_mem_size =
_sink->revocable_mem_size(_state);
+ if (revocable_mem_size > 1024L * 1024 * 1024) {
+ LOG(INFO) << "query: " << print_id(query_id)
+ << ", task id: " << _state->task_id()
+ << " has big memory to revoke: " <<
revocable_mem_size;
+ RETURN_IF_ERROR(_sink->revoke_memory(_state));
+ need_to_pause = true;
+ } else {
+ need_to_pause = is_low_wartermark ||
is_high_wartermark;
+ }
+
+ if (need_to_pause) {
_memory_sufficient_dependency->block();
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this());
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 8bcce65f229..b84db1c5500 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -90,7 +90,7 @@ Status Block::deserialize(const PBlock& pblock) {
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
const char* buf = nullptr;
- std::string compression_scratch;
+ faststring compression_scratch;
if (pblock.compressed()) {
// Decompress
SCOPED_RAW_TIMER(&_decompress_time_ns);
@@ -113,11 +113,11 @@ Status Block::deserialize(const PBlock& pblock) {
DCHECK(success) << "snappy::GetUncompressedLength failed";
compression_scratch.resize(uncompressed_size);
success = snappy::RawUncompress(compressed_data, compressed_size,
- compression_scratch.data());
+
reinterpret_cast<char*>(compression_scratch.data()));
DCHECK(success) << "snappy::RawUncompress failed";
}
_decompressed_bytes = uncompressed_size;
- buf = compression_scratch.data();
+ buf = reinterpret_cast<char*>(compression_scratch.data());
} else {
buf = pblock.column_values().data();
}
@@ -927,7 +927,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
// serialize data values
// when data type is HLL, content_uncompressed_size maybe larger than real
size.
- std::string column_values;
+ faststring column_values;
try {
// TODO: After support c++23, we should use resize_and_overwrite to
replace resize
column_values.resize(content_uncompressed_size);
@@ -937,13 +937,14 @@ Status Block::serialize(int be_exec_version, PBlock*
pblock,
LOG(WARNING) << msg;
return Status::BufferAllocFailed(msg);
}
- char* buf = column_values.data();
+ char* buf = reinterpret_cast<char*>(column_values.data());
for (const auto& c : *this) {
buf = c.type->serialize(*(c.column), buf, pblock->be_exec_version());
}
*uncompressed_bytes = content_uncompressed_size;
- const size_t serialize_bytes = buf - column_values.data() +
STREAMVBYTE_PADDING;
+ const size_t serialize_bytes =
+ buf - reinterpret_cast<char*>(column_values.data()) +
STREAMVBYTE_PADDING;
*compressed_bytes = serialize_bytes;
column_values.resize(serialize_bytes);
@@ -966,13 +967,13 @@ Status Block::serialize(int be_exec_version, PBlock*
pblock,
pblock->set_compressed(true);
*compressed_bytes = compressed_size;
} else {
- pblock->set_column_values(std::move(column_values));
+ pblock->set_column_values(column_values.data(),
column_values.size());
}
VLOG_ROW << "uncompressed size: " << content_uncompressed_size
<< ", compressed size: " << compressed_size;
} else {
- pblock->set_column_values(std::move(column_values));
+ pblock->set_column_values(column_values.data(), column_values.size());
}
if (!allow_transfer_large_data && *compressed_bytes >=
std::numeric_limits<int32_t>::max()) {
return Status::InternalError("The block is large than 2GB({}), can not
send by Protobuf.",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]