This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new ed3ca41faa3 Opt the spill logic (#41415)
ed3ca41faa3 is described below
commit ed3ca41faa300562002729006845877f014fad5d
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Sep 27 19:37:16 2024 +0800
Opt the spill logic (#41415)
1. Add some counters in join/agg
2. Optimize the log, add some debug infomation
3. Revoke memory from non-sink operator(join probe)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 65 ++++++++++++----------
be/src/pipeline/exec/aggregation_sink_operator.h | 3 +
.../pipeline/exec/aggregation_source_operator.cpp | 13 ++++-
be/src/pipeline/exec/aggregation_source_operator.h | 2 +
be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 6 ++
.../exec/partitioned_aggregation_sink_operator.h | 2 +
.../partitioned_aggregation_source_operator.cpp | 8 ++-
.../exec/partitioned_aggregation_source_operator.h | 1 +
.../exec/partitioned_hash_join_probe_operator.cpp | 41 ++++++++++++--
.../exec/partitioned_hash_join_probe_operator.h | 7 ++-
.../exec/partitioned_hash_join_sink_operator.cpp | 7 ++-
be/src/pipeline/exec/spill_utils.h | 24 ++++++--
.../exec/streaming_aggregation_operator.cpp | 4 +-
be/src/pipeline/pipeline_task.cpp | 15 ++---
be/src/runtime/query_context.cpp | 15 +++--
be/src/runtime/query_context.h | 6 +-
.../workload_group/workload_group_manager.cpp | 32 +++++++----
.../workload_group/workload_group_manager.h | 3 +-
.../java/org/apache/doris/qe/SessionVariable.java | 1 +
20 files changed, 186 insertions(+), 73 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 9597371057e..b8d38d32674 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -24,6 +24,7 @@
#include "pipeline/exec/operator.h"
#include "runtime/primitive_type.h"
#include "runtime/thread_context.h"
+#include "util/runtime_profile.h"
#include "vec/common/hash_table/hash.h"
#include "vec/exprs/vectorized_agg_fn.h"
@@ -58,8 +59,8 @@ Status AggSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
_agg_data = Base::_shared_state->agg_data.get();
_agg_arena_pool = Base::_shared_state->agg_arena_pool.get();
_hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize",
TUnit::UNIT);
- _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"HashTable",
- TUnit::BYTES,
"MemoryUsage", 1);
+ _hash_table_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableMemoryUsage",
TUnit::BYTES, 1);
_serialize_key_arena_memory_usage =
Base::profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
@@ -76,6 +77,9 @@ Status AggSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
_hash_table_input_counter = ADD_COUNTER(Base::profile(),
"HashTableInputCount", TUnit::UNIT);
_max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes",
TUnit::UNIT);
+ _container_memory_usage = ADD_COUNTER(profile(), "ContainerMemoryUsage",
TUnit::BYTES);
+ _arena_memory_usage = ADD_COUNTER(profile(), "ArenaMemoryUsage",
TUnit::BYTES);
+
return Status::OK();
}
@@ -226,32 +230,36 @@ size_t AggSinkLocalState::_memory_usage() const {
}
void AggSinkLocalState::_update_memusage_with_serialized_key() {
- std::visit(vectorized::Overload {
- [&](std::monostate& arg) -> void {
- throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
- },
- [&](auto& agg_method) -> void {
- auto& data = *agg_method.hash_table;
- auto arena_memory_usage =
- _agg_arena_pool->size() +
-
Base::_shared_state->aggregate_data_container->memory_usage() -
-
Base::_shared_state->mem_usage_record.used_in_arena;
- Base::_mem_tracker->consume(arena_memory_usage);
- Base::_mem_tracker->consume(
- data.get_buffer_size_in_bytes() -
-
Base::_shared_state->mem_usage_record.used_in_state);
-
_serialize_key_arena_memory_usage->add(arena_memory_usage);
- COUNTER_UPDATE(
- _hash_table_memory_usage,
- data.get_buffer_size_in_bytes() -
-
Base::_shared_state->mem_usage_record.used_in_state);
- Base::_shared_state->mem_usage_record.used_in_state
=
- data.get_buffer_size_in_bytes();
- Base::_shared_state->mem_usage_record.used_in_arena
=
- _agg_arena_pool->size() +
-
Base::_shared_state->aggregate_data_container->memory_usage();
- }},
- _agg_data->method_variant);
+ std::visit(
+ vectorized::Overload {
+ [&](std::monostate& arg) -> void {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"uninited hash table");
+ },
+ [&](auto& agg_method) -> void {
+ auto& data = *agg_method.hash_table;
+ auto arena_memory_usage =
+ _agg_arena_pool->size() +
+
Base::_shared_state->aggregate_data_container->memory_usage() -
+
Base::_shared_state->mem_usage_record.used_in_arena;
+ Base::_mem_tracker->consume(arena_memory_usage);
+ Base::_mem_tracker->consume(
+ data.get_buffer_size_in_bytes() -
+
Base::_shared_state->mem_usage_record.used_in_state);
+
_serialize_key_arena_memory_usage->add(arena_memory_usage);
+ COUNTER_SET(_container_memory_usage,
+
Base::_shared_state->aggregate_data_container->memory_usage());
+ COUNTER_SET(_arena_memory_usage,
+
static_cast<int64_t>(_agg_arena_pool->size()));
+ COUNTER_UPDATE(_hash_table_memory_usage,
+ data.get_buffer_size_in_bytes() -
+
Base::_shared_state->mem_usage_record.used_in_state);
+ Base::_shared_state->mem_usage_record.used_in_state =
+ data.get_buffer_size_in_bytes();
+ Base::_shared_state->mem_usage_record.used_in_arena =
+ _agg_arena_pool->size() +
+
Base::_shared_state->aggregate_data_container->memory_usage();
+ }},
+ _agg_data->method_variant);
}
Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr
data) {
@@ -887,6 +895,7 @@ Status AggSinkOperatorX::reset_hash_table(RuntimeState*
state) {
auto& ss = *local_state.Base::_shared_state;
RETURN_IF_ERROR(ss.reset_hash_table());
local_state._agg_arena_pool = ss.agg_arena_pool.get();
+ local_state._serialize_key_arena_memory_usage->set(0);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 39f11f6270f..2fa2d18a7e6 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -21,6 +21,7 @@
#include "pipeline/exec/operator.h"
#include "runtime/exec_env.h"
+#include "util/runtime_profile.h"
namespace doris::pipeline {
@@ -111,6 +112,8 @@ protected:
RuntimeProfile::Counter* _max_row_size_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
+ RuntimeProfile::Counter* _container_memory_usage = nullptr;
+ RuntimeProfile::Counter* _arena_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage =
nullptr;
bool _should_limit_output = false;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 6df089bbb5b..6391be5d70a 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -23,6 +23,7 @@
#include "common/exception.h"
#include "pipeline/exec/operator.h"
#include "runtime/thread_context.h"
+#include "util/runtime_profile.h"
#include "vec/exprs/vectorized_agg_fn.h"
#include "vec/exprs/vexpr_fwd.h"
@@ -44,7 +45,12 @@ Status AggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
_deserialize_data_timer = ADD_TIMER(Base::profile(),
"DeserializeAndMergeTime");
_hash_table_compute_timer = ADD_TIMER(Base::profile(),
"HashTableComputeTime");
_hash_table_emplace_timer = ADD_TIMER(Base::profile(),
"HashTableEmplaceTime");
- _hash_table_input_counter = ADD_COUNTER(Base::profile(),
"HashTableInputCount", TUnit::UNIT);
+ _hash_table_input_counter =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableInputCount",
TUnit::UNIT, 1);
+ _hash_table_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableMemoryUsage",
TUnit::BYTES, 1);
+ _hash_table_size_counter =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableSize",
TUnit::UNIT, 1);
auto& p = _parent->template cast<AggSourceOperatorX>();
if (p._without_key) {
@@ -626,6 +632,11 @@ void
AggLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* place
}
COUNTER_UPDATE(_hash_table_input_counter, num_rows);
+ COUNTER_SET(_hash_table_memory_usage,
+ static_cast<int64_t>(
+
agg_method.hash_table->get_buffer_size_in_bytes()));
+ COUNTER_SET(_hash_table_size_counter,
+
static_cast<int64_t>(agg_method.hash_table->size()));
}},
_shared_state->agg_data->method_variant);
}
diff --git a/be/src/pipeline/exec/aggregation_source_operator.h
b/be/src/pipeline/exec/aggregation_source_operator.h
index a3824a381eb..98ddd6a2142 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.h
+++ b/be/src/pipeline/exec/aggregation_source_operator.h
@@ -82,6 +82,8 @@ protected:
RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
+ RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
+ RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _merge_timer = nullptr;
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index ef706dd0dea..02e538c4ab3 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -74,7 +74,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
_build_blocks_memory_usage =
ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "BuildBlocks",
TUnit::BYTES, "MemoryUsage", 1);
_hash_table_memory_usage =
- ADD_CHILD_COUNTER_WITH_LEVEL(profile(), "HashTable", TUnit::BYTES,
"MemoryUsage", 1);
+ ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage",
TUnit::BYTES, 1);
_build_arena_memory_usage =
profile()->AddHighWaterMarkCounter("BuildKeyArena", TUnit::BYTES,
"MemoryUsage", 1);
@@ -337,6 +337,8 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
_mem_tracker->consume(arg.hash_table->get_byte_size() -
old_hash_table_size);
_mem_tracker->consume(arg.serialized_keys_size(true) -
old_key_size);
+ COUNTER_SET(_hash_table_memory_usage,
+ int64_t(arg.hash_table->get_byte_size()));
return st;
}},
*_shared_state->hash_table_variants,
_shared_state->join_op_variants,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 6d84b8e8bb5..0bed1d9e13d 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -107,6 +107,10 @@ void PartitionedAggSinkLocalState::_init_counters() {
_hash_table_emplace_timer = ADD_TIMER(Base::profile(),
"HashTableEmplaceTime");
_hash_table_input_counter = ADD_COUNTER(Base::profile(),
"HashTableInputCount", TUnit::UNIT);
_max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes",
TUnit::UNIT);
+ _container_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "ContainerMemoryUsage",
TUnit::BYTES, 1);
+ _arena_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "ArenaMemoryUsage",
TUnit::BYTES, 1);
COUNTER_SET(_max_row_size_counter, (int64_t)0);
_spill_serialize_hash_table_timer =
@@ -133,6 +137,8 @@ void
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
UPDATE_PROFILE(_hash_table_emplace_timer, "HashTableEmplaceTime");
UPDATE_PROFILE(_hash_table_input_counter, "HashTableInputCount");
UPDATE_PROFILE(_max_row_size_counter, "MaxRowSizeInBytes");
+ UPDATE_PROFILE(_container_memory_usage, "ContainerMemoryUsage");
+ UPDATE_PROFILE(_arena_memory_usage, "ArenaMemoryUsage");
update_max_min_rows_counter();
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 0027754cde0..22001b752a2 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -303,6 +303,8 @@ public:
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _max_row_size_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
+ RuntimeProfile::Counter* _container_memory_usage = nullptr;
+ RuntimeProfile::Counter* _arena_memory_usage = nullptr;
RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage =
nullptr;
RuntimeProfile::Counter* _spill_serialize_hash_table_timer = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index bf7ec22793f..8b281a88684 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -64,13 +64,16 @@ void PartitionedAggLocalState::_init_counters() {
_hash_table_iterate_timer = ADD_TIMER(profile(), "HashTableIterateTime");
_insert_keys_to_column_timer = ADD_TIMER(profile(),
"InsertKeysToColumnTime");
_serialize_data_timer = ADD_TIMER(profile(), "SerializeDataTime");
- _hash_table_size_counter = ADD_COUNTER(profile(), "HashTableSize",
TUnit::UNIT);
+ _hash_table_size_counter = ADD_COUNTER_WITH_LEVEL(profile(),
"HashTableSize", TUnit::UNIT, 1);
_merge_timer = ADD_TIMER(profile(), "MergeTime");
_deserialize_data_timer = ADD_TIMER(profile(), "DeserializeAndMergeTime");
_hash_table_compute_timer = ADD_TIMER(profile(), "HashTableComputeTime");
_hash_table_emplace_timer = ADD_TIMER(profile(), "HashTableEmplaceTime");
- _hash_table_input_counter = ADD_COUNTER(profile(), "HashTableInputCount",
TUnit::UNIT);
+ _hash_table_input_counter =
+ ADD_COUNTER_WITH_LEVEL(profile(), "HashTableInputCount",
TUnit::UNIT, 1);
+ _hash_table_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage",
TUnit::BYTES, 1);
}
#define UPDATE_PROFILE(counter, name) \
@@ -88,6 +91,7 @@ void PartitionedAggLocalState::update_profile(RuntimeProfile*
child_profile) {
UPDATE_PROFILE(_insert_keys_to_column_timer, "InsertKeysToColumnTime");
UPDATE_PROFILE(_serialize_data_timer, "SerializeDataTime");
UPDATE_PROFILE(_hash_table_size_counter, "HashTableSize");
+ UPDATE_PROFILE(_hash_table_memory_usage, "HashTableMemoryUsage");
}
Status PartitionedAggLocalState::close(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 3505cf7eed8..05f9ff6eff0 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -74,6 +74,7 @@ protected:
RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
+ RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
};
class AggSourceOperatorX;
class PartitionedAggSourceOperatorX : public
OperatorX<PartitionedAggLocalState> {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index fa9e3ff23b7..4b769544b52 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -17,11 +17,13 @@
#include "partitioned_hash_join_probe_operator.h"
+#include <gen_cpp/Metrics_types.h>
#include <glog/logging.h>
#include "pipeline/pipeline_task.h"
#include "runtime/fragment_mgr.h"
#include "util/mem_info.h"
+#include "util/runtime_profile.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris::pipeline {
@@ -60,6 +62,8 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState*
state, LocalStateI
_recovery_probe_blocks = ADD_COUNTER(profile(),
"SpillRecoveryProbeBlocks", TUnit::UNIT);
_recovery_probe_timer = ADD_TIMER_WITH_LEVEL(profile(),
"SpillRecoveryProbeTime", 1);
+ _probe_blocks_bytes = ADD_COUNTER_WITH_LEVEL(profile(),
"ProbeBlocksBytes", TUnit::BYTES, 1);
+
_spill_serialize_block_timer =
ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime",
1);
_spill_write_disk_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteDiskTime", 1);
@@ -82,6 +86,10 @@ Status
PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
_build_expr_call_timer = ADD_CHILD_TIMER(profile(), "BuildExprCallTime",
"BuildPhase");
_build_side_compute_hash_timer =
ADD_CHILD_TIMER(profile(), "BuildSideHashComputingTime",
"BuildPhase");
+
+ _hash_table_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(profile(), "HashTableMemoryUsage",
TUnit::BYTES, 1);
+
_allocate_resource_timer = ADD_CHILD_TIMER(profile(),
"AllocateResourceTime", "BuildPhase");
// Probe phase
@@ -158,7 +166,8 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
return Status::OK();
}
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState*
state, bool force) {
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(
+ RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context, bool force) {
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
auto query_id = state->query_id();
@@ -212,7 +221,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
return Status::OK();
};
- auto exception_catch_func = [query_id, spill_func, this]() {
+ auto exception_catch_func = [query_id, spill_func, spill_context, this]() {
SCOPED_TIMER(_spill_timer);
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
{
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
@@ -228,8 +237,14 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
_spill_status = std::move(status);
}
_spill_dependency->set_ready();
+ if (spill_context) {
+ spill_context->on_non_sink_task_finished();
+ }
};
+ if (spill_context) {
+ spill_context->on_non_sink_task_started();
+ }
_spill_dependency->block();
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_submit_func",
{
return Status::Error<INTERNAL_ERROR>(
@@ -575,6 +590,7 @@ Status
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
}
SCOPED_TIMER(local_state._partition_shuffle_timer);
+ int64_t bytes_of_blocks = 0;
for (uint32_t i = 0; i != _partition_count; ++i) {
const auto count = partition_indexes[i].size();
if (UNLIKELY(count == 0)) {
@@ -592,9 +608,17 @@ Status
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
(eos && partitioned_blocks[i]->rows() > 0)) {
local_state._probe_blocks[i].emplace_back(partitioned_blocks[i]->to_block());
partitioned_blocks[i].reset();
+ } else {
+ bytes_of_blocks += partitioned_blocks[i]->allocated_bytes();
+ }
+
+ for (auto& block : local_state._probe_blocks[i]) {
+ bytes_of_blocks += block.allocated_bytes();
}
}
+ COUNTER_SET(local_state._probe_blocks_bytes, bytes_of_blocks);
+
return Status::OK();
}
@@ -604,6 +628,12 @@ Status
PartitionedHashJoinProbeOperatorX::_setup_internal_operator_for_non_spill
local_state._runtime_state =
std::move(local_state._shared_state->inner_runtime_state);
local_state._in_mem_shared_state_sptr =
std::move(local_state._shared_state->inner_shared_state);
+
+ auto* sink_state = local_state._runtime_state->get_sink_local_state();
+ if (sink_state != nullptr) {
+ COUNTER_SET(local_state._hash_table_memory_usage,
+
sink_state->profile()->get_counter("HashTableMemoryUsage")->value());
+ }
return Status::OK();
}
@@ -670,6 +700,9 @@ Status
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
<< ", partition: " << local_state._partition_cursor << "rows: "
<< block.rows()
<< ", usage: "
<<
_inner_sink_operator->get_memory_usage(local_state._runtime_state.get());
+
+ COUNTER_SET(local_state._hash_table_memory_usage,
+
sink_local_state->profile()->get_counter("HashTableMemoryUsage")->value());
return Status::OK();
}
@@ -805,10 +838,10 @@ Status PartitionedHashJoinProbeOperatorX::revoke_memory(
return Status::OK();
}
- RETURN_IF_ERROR(local_state.spill_probe_blocks(state, true));
+ RETURN_IF_ERROR(local_state.spill_probe_blocks(state, spill_context,
true));
if (_child) {
- return _child->revoke_memory(state, nullptr);
+ return _child->revoke_memory(state, spill_context);
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 621681ca4cf..3effcadfaa1 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -47,7 +47,9 @@ public:
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
- Status spill_probe_blocks(RuntimeState* state, bool force = false);
+ Status spill_probe_blocks(RuntimeState* state,
+ const std::shared_ptr<SpillContext>&
spill_context = nullptr,
+ bool force = false);
Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t
partition_index,
bool& has_data);
@@ -123,6 +125,9 @@ private:
RuntimeProfile::Counter* _build_side_compute_hash_timer = nullptr;
RuntimeProfile::Counter* _build_side_merge_block_timer = nullptr;
+ RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
+ RuntimeProfile::Counter* _probe_blocks_bytes = nullptr;
+
RuntimeProfile::Counter* _allocate_resource_timer = nullptr;
RuntimeProfile::Counter* _probe_phase_label = nullptr;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 5323d74341a..136466aa6b2 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -287,8 +287,9 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
Status PartitionedHashJoinSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
- LOG(INFO) << "hash join sink " << _parent->node_id() << " revoke_memory"
- << ", eos: " << _child_eos;
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " <<
state->task_id()
+ << " hash join sink " << _parent->node_id() << " revoke_memory"
+ << ", eos: " << _child_eos;
DCHECK_EQ(_spilling_streams_count, 0);
CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
@@ -450,6 +451,8 @@ void PartitionedHashJoinSinkLocalState::_spill_to_disk(
}
}
+ VLOG_DEBUG << "query: " << print_id(_state->query_id()) << ", task: " <<
_state->task_id()
+ << ", join sink " << _parent->node_id() << " revoke done";
auto num = _spilling_streams_count.fetch_sub(1);
DCHECK_GE(_spilling_streams_count, 0);
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index 086a6881fcd..d2b157463ae 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -38,10 +38,10 @@ using SpillPartitionerType =
vectorized::Crc32HashPartitioner<vectorized::SpillP
struct SpillContext {
std::atomic_int running_tasks_count;
TUniqueId query_id;
- std::function<void()> all_tasks_finished_callback;
+ std::function<void(SpillContext*)> all_tasks_finished_callback;
SpillContext(int running_tasks_count_, TUniqueId query_id_,
- std::function<void()> all_tasks_finished_callback_)
+ std::function<void(SpillContext*)>
all_tasks_finished_callback_)
: running_tasks_count(running_tasks_count_),
query_id(std::move(query_id_)),
all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
@@ -50,14 +50,30 @@ struct SpillContext {
LOG_IF(WARNING, running_tasks_count.load() != 0)
<< "query: " << print_id(query_id)
<< " not all spill tasks finished, remaining tasks: " <<
running_tasks_count.load();
+
+ LOG_IF(WARNING, _running_non_sink_tasks_count.load() != 0)
+ << "query: " << print_id(query_id)
+ << " not all spill tasks(non sink tasks) finished, remaining
tasks: "
+ << _running_non_sink_tasks_count.load();
}
void on_task_finished() {
auto count = running_tasks_count.fetch_sub(1);
- if (count == 1) {
- all_tasks_finished_callback();
+ if (count == 1 && _running_non_sink_tasks_count.load() == 0) {
+ all_tasks_finished_callback(this);
}
}
+
+ void on_non_sink_task_started() {
_running_non_sink_tasks_count.fetch_add(1); }
+ void on_non_sink_task_finished() {
+ const auto count = _running_non_sink_tasks_count.fetch_sub(1);
+ if (count == 1 && running_tasks_count.load() == 0) {
+ all_tasks_finished_callback(this);
+ }
+ }
+
+private:
+ std::atomic_int _running_non_sink_tasks_count {0};
};
class SpillRunnable : public Runnable {
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 41424914edc..a959b8c1456 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -87,8 +87,8 @@ Status StreamingAggLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(Base::exec_time_counter());
SCOPED_TIMER(Base::_init_timer);
- _hash_table_memory_usage = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(),
"HashTable",
- TUnit::BYTES,
"MemoryUsage", 1);
+ _hash_table_memory_usage =
+ ADD_COUNTER_WITH_LEVEL(Base::profile(), "HashTableMemoryUsage",
TUnit::BYTES, 1);
_serialize_key_arena_memory_usage =
Base::profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage", 1);
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 0028614b22a..aa2b01b741a 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -388,7 +388,7 @@ Status PipelineTask::execute(bool* eos) {
if (!st.ok()) {
LOG(INFO) << "query: " << print_id(query_id)
<< ", try to reserve: " << reserve_size <<
"(sink reserve size:("
- << sink_reserve_size << " )"
+ << sink_reserve_size << ")"
<< ", sink name: " << _sink->get_name()
<< ", node id: " << _sink->node_id() << "
failed: " << st.to_string()
<< ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
@@ -547,19 +547,20 @@ size_t PipelineTask::get_revocable_size() const {
return 0;
}
- auto revocable_size = _root->revocable_mem_size(_state);
- revocable_size += _sink->revocable_mem_size(_state);
-
- return revocable_size;
+ return _sink->revocable_mem_size(_state) +
_root->revocable_mem_size(_state);
}
Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>&
spill_context) {
- if (_sink->revocable_mem_size(_state) >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+ RETURN_IF_ERROR(_root->revoke_memory(_state, spill_context));
+
+ const auto revocable_size = _sink->revocable_mem_size(_state);
+ if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context));
} else if (spill_context) {
spill_context->on_task_finished();
+ LOG(INFO) << "query: " << print_id(_state->query_id()) << ", task: "
<< ((void*)this)
+ << " has not enough data to revoke: " << revocable_size;
}
-
return Status::OK();
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 51e01a071bb..af4cd3d417d 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -486,26 +486,25 @@ Status QueryContext::revoke_memory() {
}
std::weak_ptr<QueryContext> this_ctx = shared_from_this();
- auto spill_context =
- std::make_shared<pipeline::SpillContext>(chosen_tasks.size(),
_query_id, [this_ctx] {
+ auto spill_context = std::make_shared<pipeline::SpillContext>(
+ chosen_tasks.size(), _query_id, [this_ctx](pipeline::SpillContext*
context) {
auto query_context = this_ctx.lock();
if (!query_context) {
return;
}
LOG(INFO) << "query: " << print_id(query_context->_query_id)
- << " all revoking tasks done, resume it.";
+ << ", context: " << ((void*)context)
+ << " all revoking tasks done, resumt it.";
query_context->set_memory_sufficient(true);
});
+ LOG(INFO) << "query: " << print_id(_query_id) << ", context: " <<
((void*)spill_context.get())
+ << " total revoked size: " << revoked_size << ", tasks count: "
<< chosen_tasks.size()
+ << "/" << tasks.size();
for (auto* task : chosen_tasks) {
RETURN_IF_ERROR(task->revoke_memory(spill_context));
}
-
- LOG(INFO) << "query: " << print_id(_query_id) << " total revoked size: "
<< revoked_size
- << ", target_size: " <<
PrettyPrinter::print(target_revoking_size, TUnit::BYTES)
- << ", tasks count: " << chosen_tasks.size() << "/" <<
tasks.size();
-
return Status::OK();
}
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 4ad946562bb..299e4ced55c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -245,13 +245,15 @@ public:
query_mem_tracker->set_limit(std::min<int64_t>(new_mem_limit,
_user_set_mem_limit));
}
+ int64_t get_mem_limit() const { return query_mem_tracker->limit(); }
+
std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return
query_mem_tracker; }
- int32_t get_slot_count() {
+ int32_t get_slot_count() const {
return _query_options.__isset.query_slot_count ?
_query_options.query_slot_count : 1;
}
- bool enable_query_slot_hard_limit() {
+ bool enable_query_slot_hard_limit() const {
return _query_options.__isset.enable_query_slot_hard_limit
? _query_options.enable_query_slot_hard_limit
: false;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index df65124f635..287a6b45729 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -328,7 +328,8 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
}
if
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
- bool spill_res = handle_single_query(query_ctx,
query_ctx->paused_reason());
+ bool spill_res = handle_single_query(query_ctx,
query_it->reserve_size_,
+
query_ctx->paused_reason());
if (!spill_res) {
++query_it;
continue;
@@ -412,7 +413,8 @@ void
WorkloadGroupMgr::handle_non_overcommit_wg_paused_queries() {
}
if (query_it->cache_ratio_ < 0.001) {
// TODO: Find other exceed limit workload group and cancel
query.
- bool spill_res = handle_single_query(query_ctx,
query_ctx->paused_reason());
+ bool spill_res = handle_single_query(query_ctx,
query_it->reserve_size_,
+
query_ctx->paused_reason());
if (!spill_res) {
++query_it;
continue;
@@ -473,11 +475,12 @@ void
WorkloadGroupMgr::handle_overcommit_wg_paused_queries() {
// If the query could not release memory, then cancel the query, the return
value is true.
// If the query is not ready to do these tasks, it means just wait.
bool WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext>
query_ctx,
- Status paused_reason) {
+ size_t size_to_reserve, Status
paused_reason) {
// TODO: If the query is an insert into select query, should consider
memtable as revoke memory.
size_t revocable_size = 0;
size_t memory_usage = 0;
bool has_running_task = false;
+ const auto query_id = print_id(query_ctx->query_id());
query_ctx->get_revocable_info(&revocable_size, &memory_usage,
&has_running_task);
if (has_running_task) {
LOG(INFO) << "query: " << print_id(query_ctx->query_id())
@@ -488,16 +491,25 @@ bool
WorkloadGroupMgr::handle_single_query(std::shared_ptr<QueryContext> query_c
auto revocable_tasks = query_ctx->get_revocable_tasks();
if (revocable_tasks.empty()) {
if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
- // Use MEM_LIMIT_EXCEEDED so that FE could parse the error code
and do try logic
-
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
- "query reserve memory failed, but could not find memory
that "
- "could "
- "release or spill to disk"));
+ const auto limit = query_ctx->get_mem_limit();
+ if ((memory_usage + size_to_reserve) < limit) {
+ LOG(INFO) << "query: " << query_id << ", usage(" <<
memory_usage << " + "
+ << size_to_reserve << ") less than limit(" << limit
<< "), resume it.";
+ query_ctx->set_memory_sufficient(true);
+ return true;
+ } else {
+ // Use MEM_LIMIT_EXCEEDED so that FE could parse the error
code and do try logic
+
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
+ "query({}) reserve memory failed, but could not find
memory that could "
+ "release or spill to disk(usage:{}, limit: {})",
+ query_id, memory_usage, query_ctx->get_mem_limit()));
+ }
} else {
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
- "The query reserved memory failed because process limit
exceeded, and "
+ "The query({}) reserved memory failed because process
limit exceeded, and "
"there is no cache now. And could not find task to spill.
Maybe you should set "
- "the workload group's limit to a lower value."));
+ "the workload group's limit to a lower value.",
+ query_id));
}
} else {
SCOPED_ATTACH_TASK(query_ctx.get());
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index 8f69d5653b4..03f134006f5 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -106,7 +106,8 @@ public:
void update_load_memtable_usage(const std::map<uint64_t, MemtableUsage>&
wg_memtable_usages);
private:
- bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, Status
paused_reason);
+ bool handle_single_query(std::shared_ptr<QueryContext> query_ctx, size_t
size_to_reserve,
+ Status paused_reason);
void handle_non_overcommit_wg_paused_queries();
void handle_overcommit_wg_paused_queries();
void change_query_to_hard_limit(WorkloadGroupPtr wg, bool
enable_hard_limit);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 125cacd9353..a9c9ab2a2ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -3793,6 +3793,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableSortSpill(enableSortSpill);
tResult.setEnableAggSpill(enableAggSpill);
tResult.setEnableForceSpill(enableForceSpill);
+ tResult.setExternalAggPartitionBits(externalAggPartitionBits);
tResult.setMinRevocableMem(minRevocableMem);
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]