This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new bba1119f741 Opt debug log (#44331)
bba1119f741 is described below
commit bba1119f741fdbb5b6c02a42310f8607c5741e24
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Nov 20 15:34:32 2024 +0800
Opt debug log (#44331)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 2 +-
be/src/pipeline/exec/multi_cast_data_streamer.cpp | 13 ++-
.../exec/partitioned_aggregation_sink_operator.cpp | 6 +-
.../exec/partitioned_aggregation_sink_operator.h | 2 +-
.../partitioned_aggregation_source_operator.cpp | 2 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 28 +++---
.../exec/partitioned_hash_join_sink_operator.cpp | 13 +--
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 +-
.../pipeline/exec/spill_sort_source_operator.cpp | 8 +-
be/src/pipeline/exec/spill_utils.h | 4 +-
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
be/src/pipeline/pipeline_task.cpp | 39 ++++----
be/src/runtime/fragment_mgr.cpp | 4 +-
.../workload_group/workload_group_manager.cpp | 101 +++++++++++++--------
.../workload_group/workload_group_manager.h | 4 +-
.../workload_management/workload_action.cpp | 2 +-
16 files changed, 130 insertions(+), 106 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 52c55ccabbc..0e26570d9d4 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -106,7 +106,7 @@ void
ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
std::lock_guard<std::mutex> lock(_finished_channels_mutex);
if (_finished_channels.contains(channel_id)) {
- LOG(WARNING) << "query: " << print_id(_state->query_id())
+ LOG(WARNING) << "Query: " << print_id(_state->query_id())
<< ", on_channel_finished on already finished channel: "
<< channel_id;
return;
} else {
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index f1e399a3289..e1484c64614 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -126,7 +126,7 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int
sender_idx, vectoriz
const auto end = _multi_cast_blocks.end();
if (pos_to_pull == end) {
_block_reading(sender_idx);
- VLOG_DEBUG << "query: " << print_id(state->query_id())
+ VLOG_DEBUG << "Query: " << print_id(state->query_id())
<< ", pos_to_pull end: " << (void*)(_write_dependency);
*eos = _eos;
return Status::OK();
@@ -151,8 +151,6 @@ Status MultiCastDataStreamer::pull(RuntimeState* state, int
sender_idx, vectoriz
_cumulative_mem_size.fetch_sub(mem_size);
_multi_cast_blocks.pop_front();
_write_dependency->set_ready();
- VLOG_DEBUG << "**** query: " << print_id(state->query_id())
- << ", set ready: " << (void*)(_write_dependency);
} else {
_copy_block(block, *un_finish_copy);
}
@@ -175,6 +173,11 @@ void
MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_f
}
Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state,
bool* triggered) {
+ if (!state->enable_spill() && !state->enable_force_spill()) {
+ *triggered = false;
+ return Status::OK();
+ }
+
vectorized::SpillStreamSPtr spill_stream;
*triggered = false;
if (_cumulative_mem_size.load() >= config::exchg_node_buffer_size_bytes &&
@@ -245,7 +248,7 @@ Status
MultiCastDataStreamer::_submit_spill_task(RuntimeState* state,
RETURN_IF_ERROR(spill_stream->spill_block(state, block, false));
}
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << " multi cast
write "
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << " multi cast
write "
<< blocks_count << " blocks";
return spill_stream->spill_eof();
};
@@ -256,7 +259,7 @@ Status
MultiCastDataStreamer::_submit_spill_task(RuntimeState* state,
_write_dependency->set_ready();
if (!status.ok()) {
- LOG(WARNING) << "query: " << query_id
+ LOG(WARNING) << "Query: " << query_id
<< " multi cast write failed: " << status.to_string()
<< ", dependency: " << (void*)_spill_dependency.get();
} else {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index de2f3b29d36..bc3c1fccba5 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -268,7 +268,7 @@ size_t
PartitionedAggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bo
Status PartitionedAggSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
const auto size_to_revoke = _parent->revocable_mem_size(state);
- VLOG_DEBUG << "query " << print_id(state->query_id()) << " agg node "
+ VLOG_DEBUG << "Query " << print_id(state->query_id()) << " agg node "
<< Base::_parent->node_id()
<< " revoke_memory, size: " <<
_parent->revocable_mem_size(state)
<< ", eos: " << _eos;
@@ -318,13 +318,13 @@ Status PartitionedAggSinkLocalState::revoke_memory(
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
- LOG(WARNING) << "query " << print_id(query_id) <<
" agg node "
+ LOG(WARNING) << "Query " << print_id(query_id) <<
" agg node "
<< Base::_parent->node_id()
<< " revoke_memory error: " << status;
}
_shared_state->close();
} else {
- VLOG_DEBUG << "query " << print_id(query_id) << " agg
node "
+ VLOG_DEBUG << "Query " << print_id(query_id) << " agg
node "
<< Base::_parent->node_id() << "
revoke_memory finish, size: "
<< _parent->revocable_mem_size(state) << ",
eos: " << _eos;
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 8b7836134fa..2c77ed15436 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -83,7 +83,7 @@ public:
std::max<size_t>(4096,
vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM *
total_rows / size_to_revoke_));
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: "
<< _parent->node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: "
<< _parent->node_id()
<< ", spill_batch_rows: " << spill_batch_rows << ", total
rows: " << total_rows
<< ", size_to_revoke: " << size_to_revoke;
size_t row_count = 0;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 027e726e358..046f8df44ae 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -249,7 +249,7 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
- LOG(WARNING) << "query " << print_id(query_id) << " agg
node "
+ LOG(WARNING) << "Query " << print_id(query_id) << " agg
node "
<< _parent->node_id() << " recover agg data
error: " << status;
}
_shared_state->close();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index bdba90aac37..1e3c1d18a71 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -237,7 +237,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
}
COUNTER_SET(_probe_blocks_bytes, int64_t(not_revoked_size));
- VLOG_DEBUG << "query: " << print_id(query_id)
+ VLOG_DEBUG << "Query: " << print_id(query_id)
<< " hash probe revoke done, node: " << p.node_id()
<< ", task: " << state->task_id();
return Status::OK();
@@ -285,7 +285,7 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(RuntimeState*
state,
uint32_t partition_index,
bool& has_data) {
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " <<
partition_index
<< " recover_build_blocks_from_disk";
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
@@ -301,7 +301,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
SCOPED_TIMER(_recovery_build_timer);
bool eos = false;
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: "
<< _parent->node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: "
<< _parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " <<
partition_index
<< ", recoverying build data";
Status status;
@@ -348,7 +348,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
if (eos) {
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
_shared_state->spilled_streams[partition_index].reset();
- VLOG_DEBUG << "query: " << print_id(state->query_id())
+ VLOG_DEBUG << "Query: " << print_id(state->query_id())
<< ", node: " << _parent->node_id() << ", task id: " <<
state->task_id()
<< ", partition: " << partition_index;
}
@@ -379,7 +379,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
auto* pipeline_task = state->get_task();
if (pipeline_task) {
auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node:
" << p.node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node:
" << p.node_id()
<< ", task id: " << state->task_id() << ", partition: "
<< partition_index
<< ", dependency: " << _dependency
<< ", task debug_string: " <<
pipeline_task->debug_string();
@@ -396,7 +396,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
auto spill_runnable = std::make_shared<SpillRecoverRunnable>(
state, _spill_dependency, _runtime_profile.get(),
_shared_state->shared_from_this(),
exception_catch_func);
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " <<
partition_index
<< " recover_build_blocks_from_disk submit func";
return spill_io_pool->submit(std::move(spill_runnable));
@@ -466,7 +466,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
}
}
if (eos) {
- VLOG_DEBUG << "query: " << print_id(query_id)
+ VLOG_DEBUG << "Query: " << print_id(query_id)
<< ", recovery probe data done: " <<
spilled_stream->get_spill_dir();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
@@ -683,7 +683,7 @@ Status
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
});
RETURN_IF_ERROR(_inner_sink_operator->sink(local_state._runtime_state.get(),
&block, true));
- VLOG_DEBUG << "query: " << print_id(state->query_id())
+ VLOG_DEBUG << "Query: " << print_id(state->query_id())
<< ", internal build operator finished, node id: " << node_id()
<< ", task id: " << state->task_id()
<< ", partition: " << local_state._partition_cursor << "rows: "
<< block.rows()
@@ -744,7 +744,7 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
if (!has_data) {
vectorized::Block block;
RETURN_IF_ERROR(_inner_probe_operator->push(runtime_state,
&block, true));
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ",
node: " << node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ",
node: " << node_id()
<< ", task: " << state->task_id() << "partition: "
<< partition_index
<< " has no data to recovery";
break;
@@ -765,7 +765,7 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
*eos = false;
if (in_mem_eos) {
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: "
<< node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", node: "
<< node_id()
<< ", task: " << state->task_id()
<< ", partition: " << local_state._partition_cursor;
local_state._partition_cursor++;
@@ -858,11 +858,11 @@ size_t
PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* sta
Status PartitionedHashJoinProbeOperatorX::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
auto& local_state = get_local_state(state);
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe
node: " << node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe
node: " << node_id()
<< ", task: " << state->task_id() << ", child eos: " <<
local_state._child_eos;
if (local_state._child_eos) {
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash
probe node: " << node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash
probe node: " << node_id()
<< ", task: " << state->task_id() << ", child eos: " <<
local_state._child_eos
<< ", will not revoke size: " << revocable_mem_size(state);
return Status::OK();
@@ -878,7 +878,7 @@ Status PartitionedHashJoinProbeOperatorX::revoke_memory(
Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
auto& local_state = get_local_state(state);
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe
node: " << node_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash probe
node: " << node_id()
<< ", task: " << state->task_id();
RETURN_IF_ERROR(local_state.spill_probe_blocks(state));
@@ -915,7 +915,7 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
#ifndef NDEBUG
Defer eos_check_defer([&] {
if (*eos) {
- LOG(INFO) << "query: " << print_id(state->query_id())
+ LOG(INFO) << "Query: " << print_id(state->query_id())
<< ", hash probe node: " << node_id() << ", task: " <<
state->task_id()
<< ", eos with child eos: " << local_state._child_eos
<< ", need spill: " << need_to_spill;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 6f2f7c8bc15..cabcfd7d450 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -291,7 +291,7 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
Status PartitionedHashJoinSinkLocalState::revoke_memory(
RuntimeState* state, const std::shared_ptr<SpillContext>&
spill_context) {
SCOPED_TIMER(_spill_total_timer);
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " <<
state->task_id()
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", task: " <<
state->task_id()
<< " hash join sink " << _parent->node_id() << " revoke_memory"
<< ", eos: " << _child_eos;
DCHECK_EQ(_spilling_task_count, 0);
@@ -317,7 +317,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
Status status;
if (_child_eos) {
- VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) <<
", hash join sink "
+ VLOG_DEBUG << "Query:" << print_id(this->state()->query_id()) <<
", hash join sink "
<< _parent->node_id() << " set_ready_to_read"
<< ", task id: " << state->task_id();
std::for_each(_shared_state->partitioned_build_blocks.begin(),
@@ -403,7 +403,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
}
if (_child_eos) {
- VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join
sink "
+ VLOG_DEBUG << "Query:" << print_id(state->query_id()) << ", hash join
sink "
<< _parent->node_id() << " set_ready_to_read"
<< ", task id: " << state->task_id();
std::for_each(_shared_state->partitioned_build_blocks.begin(),
@@ -493,9 +493,6 @@ Status PartitionedHashJoinSinkLocalState::_spill_to_disk(
status = spilling_stream->spill_block(state(), block, false);
}
- VLOG_DEBUG << "query: " << print_id(_state->query_id()) << ", task: " <<
_state->task_id()
- << ", join sink " << _parent->node_id() << " revoke done";
-
return status;
}
@@ -597,7 +594,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
const auto need_to_spill = local_state._shared_state->need_to_spill;
if (rows == 0) {
if (eos) {
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash
join sink "
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash
join sink "
<< node_id() << " sink eos, set_ready_to_read"
<< ", task id: " << state->task_id() << ", need spill:
" << need_to_spill;
@@ -655,7 +652,7 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
local_state._shared_state->inner_runtime_state.get(),
in_block, eos));
if (eos) {
- VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash
join sink "
+ VLOG_DEBUG << "Query: " << print_id(state->query_id()) << ", hash
join sink "
<< node_id() << " sink eos, set_ready_to_read"
<< ", task id: " << state->task_id();
local_state._dependency->set_ready_to_read();
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 83c7ccfc1a3..cd40e6a9ded 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -207,7 +207,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state,
profile()->add_info_string("Spilled", "true");
}
- VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node "
+ VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node "
<< Base::_parent->node_id() << " revoke_memory"
<< ", eos: " << _eos;
if (!_shared_state->_spill_status.ok()) {
@@ -235,12 +235,12 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
- LOG(WARNING) << "query " << print_id(query_id) << " sort
node "
+ LOG(WARNING) << "Query " << print_id(query_id) << " sort
node "
<< _parent->node_id() << " revoke memory
error: " << status;
}
_shared_state->close();
} else {
- VLOG_DEBUG << "query " << print_id(query_id) << " sort node "
<< _parent->node_id()
+ VLOG_DEBUG << "Query " << print_id(query_id) << " sort node "
<< _parent->node_id()
<< " revoke memory finish";
}
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index b0b5ebbcbd7..447c306c9ba 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -73,7 +73,7 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
}
Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState*
state) {
auto& parent = Base::_parent->template cast<Parent>();
- VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node " <<
_parent->node_id()
+ VLOG_DEBUG << "Query " << print_id(state->query_id()) << " sort node " <<
_parent->node_id()
<< " merge spill data";
_spill_dependency->Dependency::block();
@@ -85,7 +85,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
Defer defer {[&]() {
if (!status.ok() || state->is_cancelled()) {
if (!status.ok()) {
- LOG(WARNING) << "query " << print_id(query_id) << " sort
node "
+ LOG(WARNING) << "Query " << print_id(query_id) << " sort
node "
<< _parent->node_id() << " merge spill data
error: " << status;
}
_shared_state->close();
@@ -94,7 +94,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
}
_current_merging_streams.clear();
} else {
- VLOG_DEBUG << "query " << print_id(query_id) << " sort node "
<< _parent->node_id()
+ VLOG_DEBUG << "Query " << print_id(query_id) << " sort node "
<< _parent->node_id()
<< " merge spill data finish";
}
}};
@@ -102,7 +102,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
vectorized::SpillStreamSPtr tmp_stream;
while (!state->is_cancelled()) {
int max_stream_count = _calc_spill_blocks_to_merge();
- VLOG_DEBUG << "query " << print_id(query_id) << " sort node " <<
_parent->node_id()
+ VLOG_DEBUG << "Query " << print_id(query_id) << " sort node " <<
_parent->node_id()
<< " merge spill streams, streams count: "
<< _shared_state->sorted_streams.size()
<< ", curren merge max stream count: " <<
max_stream_count;
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index bf877382129..9986031ccc3 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -49,11 +49,11 @@ struct SpillContext {
~SpillContext() {
LOG_IF(WARNING, running_tasks_count.load() != 0)
- << "query: " << print_id(query_id)
+ << "Query: " << print_id(query_id)
<< " not all spill tasks finished, remaining tasks: " <<
running_tasks_count.load();
LOG_IF(WARNING, _running_non_sink_tasks_count.load() != 0)
- << "query: " << print_id(query_id)
+ << "Query: " << print_id(query_id)
<< " not all spill tasks(non sink tasks) finished, remaining
tasks: "
<< _running_non_sink_tasks_count.load();
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 944374b66e2..74008ccd527 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1843,7 +1843,7 @@ size_t PipelineFragmentContext::get_revocable_size(bool*
has_running_task) const
for (const auto& task_instances : _tasks) {
for (const auto& task : task_instances) {
if (task->is_running() || task->is_revoking()) {
- LOG_EVERY_N(INFO, 50) << "query: " << print_id(_query_id)
+ LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
<< " is running, task: " <<
(void*)task.get()
<< ", task->is_revoking(): " <<
task->is_revoking() << ", "
<< task->is_running();
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 69a1c490911..cbc0f491dc2 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -300,11 +300,6 @@ bool PipelineTask::_is_blocked() {
}
// If all dependencies are ready for this operator, we can execute
this task if no datum is needed from upstream operators.
if (!_operators[i]->need_more_input_data(_state)) {
- // if (VLOG_DEBUG_IS_ON) {
- // VLOG_DEBUG << "query: " << print_id(_state->query_id())
- // << ", task id: " << _index << ", operator "
<< i
- // << " not need_more_input_data";
- // }
break;
}
}
@@ -408,7 +403,7 @@ Status PipelineTask::execute(bool* eos) {
// _state->get_query_ctx()->update_low_memory_mode();
if (_pending_block) [[unlikely]] {
- LOG(INFO) << "query: " << print_id(query_id)
+ LOG(INFO) << "Query: " << print_id(query_id)
<< " has pending block, size: " <<
_pending_block->allocated_bytes();
_block = std::move(_pending_block);
block = _block.get();
@@ -432,16 +427,17 @@ Status PipelineTask::execute(bool* eos) {
COUNTER_UPDATE(_memory_reserve_times, 1);
if (!st.ok()) {
COUNTER_UPDATE(_memory_reserve_failed_times, 1);
- LOG(INFO) << "query: " << print_id(query_id) << ", try to
reserve: "
- << PrettyPrinter::print(reserve_size,
TUnit::BYTES)
- << ", sink name: " << _sink->get_name()
- << ", node id: " << _sink->node_id()
- << ", task id: " << _state->task_id()
- << ", failed: " << st.to_string()
- << ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
+ VLOG_DEBUG << "Query: " << print_id(query_id) << ", try to
reserve: "
+ << PrettyPrinter::print(reserve_size,
TUnit::BYTES)
+ << ", sink name: " << _sink->get_name()
+ << ", node id: " << _sink->node_id()
+ << ", task id: " << _state->task_id()
+ << ", failed: " << st.to_string()
+ << ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
_state->get_query_ctx()->update_paused_reason(st);
_state->get_query_ctx()->set_low_memory_mode();
+ _state->get_query_ctx()->set_memory_sufficient(false);
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(),
reserve_size);
continue;
@@ -462,15 +458,16 @@ Status PipelineTask::execute(bool* eos) {
status =
thread_context()->try_reserve_memory(sink_reserve_size);
if (!status.ok()) {
COUNTER_UPDATE(_memory_reserve_failed_times, 1);
- LOG(INFO) << "query: " << print_id(query_id) << ", try to
reserve: "
- << PrettyPrinter::print(sink_reserve_size,
TUnit::BYTES)
- << ", sink name: " << _sink->get_name()
- << ", node id: " << _sink->node_id()
- << ", task id: " << _state->task_id()
- << ", failed: " << status.to_string()
- << ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
+ VLOG_DEBUG << "Query: " << print_id(query_id) << ", try to
reserve: "
+ << PrettyPrinter::print(sink_reserve_size,
TUnit::BYTES)
+ << ", sink name: " << _sink->get_name()
+ << ", node id: " << _sink->node_id()
+ << ", task id: " << _state->task_id()
+ << ", failed: " << status.to_string()
+ << ", debug info: " <<
GlobalMemoryArbitrator::process_mem_log_str();
_state->get_query_ctx()->update_paused_reason(status);
_state->get_query_ctx()->set_low_memory_mode();
+ _state->get_query_ctx()->set_memory_sufficient(false);
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
_state->get_query_ctx()->shared_from_this(),
sink_reserve_size);
DCHECK_EQ(_pending_block.get(), nullptr);
@@ -617,7 +614,7 @@ Status PipelineTask::revoke_memory(const
std::shared_ptr<SpillContext>& spill_co
RETURN_IF_ERROR(_sink->revoke_memory(_state, spill_context));
} else if (spill_context) {
spill_context->on_task_finished();
- LOG(INFO) << "query: " << print_id(_state->query_id()) << ", task: "
<< ((void*)this)
+ LOG(INFO) << "Query: " << print_id(_state->query_id()) << ", task: "
<< ((void*)this)
<< " has not enough data to revoke: " << revocable_size;
}
return Status::OK();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 18aacb452a6..113b08c5a9f 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -755,11 +755,11 @@ std::string FragmentMgr::dump_pipeline_tasks(TUniqueId&
query_id) {
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
QuerySource query_source, const
FinishCallback& cb) {
- VLOG_ROW << "query: " << print_id(params.query_id) << " exec_plan_fragment
params is "
+ VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment
params is "
<< apache::thrift::ThriftDebugString(params).c_str();
// sometimes TExecPlanFragmentParams debug string is too long and glog
// will truncate the log line, so print query options seperately for
debuggin purpose
- VLOG_ROW << "query: " << print_id(params.query_id) << "query options is "
+ VLOG_ROW << "Query: " << print_id(params.query_id) << "query options is "
<<
apache::thrift::ThriftDebugString(params.query_options).c_str();
std::shared_ptr<QueryContext> query_ctx;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index 8d82817b3b0..b22eb21d31d 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -264,6 +264,8 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<QueryContext>& que
}
}
+constexpr int64_t TIMEOUT_IN_QUEUE_LIMIT = 1000L * 60;
+
/**
* Strategy 1: A revocable query should not have any running
task(PipelineTask).
* strategy 2: If the workload group has any task exceed workload group
memlimit, then set all queryctx's memlimit
@@ -281,13 +283,15 @@ void WorkloadGroupMgr::handle_paused_queries() {
}
}
}
- const int64_t TIMEOUT_IN_QUEUE = 1000L * 3;
+
std::unique_lock<std::mutex> lock(_paused_queries_lock);
bool has_revoked_from_other_group = false;
for (auto it = _paused_queries_list.begin(); it !=
_paused_queries_list.end();) {
auto& queries_list = it->second;
const auto& wg = it->first;
+ LOG_EVERY_T(INFO, 120) << "Paused queries count: " <<
queries_list.size();
+
bool is_low_watermark = false;
bool is_high_watermark = false;
wg->check_mem_used(&is_low_watermark, &is_high_watermark);
@@ -302,10 +306,11 @@ void WorkloadGroupMgr::handle_paused_queries() {
// The query is finished during in paused list.
if (query_ctx == nullptr) {
query_it = queries_list.erase(query_it);
+ LOG(INFO) << "Query: " << query_it->query_id() << " is
nullptr, erase it.";
continue;
}
if (query_ctx->is_cancelled()) {
- LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+ LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
<< " was canceled, remove from paused list";
query_it = queries_list.erase(query_it);
continue;
@@ -314,8 +319,9 @@ void WorkloadGroupMgr::handle_paused_queries() {
if
(query_ctx->paused_reason().is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
// Streamload, kafka load, group commit will never have query
memory exceeded error because
// their query limit is very large.
- bool spill_res = handle_single_query_(query_ctx,
query_it->reserve_size_,
-
query_ctx->paused_reason());
+ bool spill_res =
+ handle_single_query_(query_ctx,
query_it->reserve_size_,
+ query_it->elapsed_time(),
query_ctx->paused_reason());
if (!spill_res) {
++query_it;
continue;
@@ -331,7 +337,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
// the wg is converted to soft limit.
// So that should resume the query.
LOG(WARNING)
- << "query: " << print_id(query_ctx->query_id())
+ << "Query: " << print_id(query_ctx->query_id())
<< " reserve memory failed because exceed workload
group memlimit, it "
"should not happen, resume it again. paused
reason: "
<< query_ctx->paused_reason();
@@ -346,7 +352,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
query_ctx->get_mem_tracker()->consumption() +
query_it->reserve_size_) {
query_ctx->set_mem_limit(query_ctx->expected_mem_limit());
query_ctx->set_memory_sufficient(true);
- LOG(INFO) << "workload group memory reserve failed because
"
+ LOG(INFO) << "Workload group memory reserve failed because
"
<< query_ctx->debug_string() << " reserve size "
<<
PrettyPrinter::print_bytes(query_it->reserve_size_)
<< " is too large, set hard limit to "
@@ -368,8 +374,9 @@ void WorkloadGroupMgr::handle_paused_queries() {
if (!has_changed_hard_limit) {
update_queries_limit_(wg, true);
has_changed_hard_limit = true;
- LOG(INFO) << "query: " << print_id(query_ctx->query_id())
- << " reserve memory failed due to workload group
memory exceed, "
+ LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
<< " reserve memory("
+ <<
PrettyPrinter::print_bytes(query_it->reserve_size_)
+ << ") failed due to workload group memory
exceed, "
"should set the workload group work in memory
insufficent mode, "
"so that other query will reduce their
memory. wg: "
<< wg->debug_string();
@@ -380,6 +387,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
// not encourage not enable slot memory.
// TODO should kill the query that exceed limit.
bool spill_res = handle_single_query_(query_ctx,
query_it->reserve_size_,
+
query_it->elapsed_time(),
query_ctx->paused_reason());
if (!spill_res) {
++query_it;
@@ -391,9 +399,9 @@ void WorkloadGroupMgr::handle_paused_queries() {
} else {
// Should not put the query back to task scheduler
immediately, because when wg's memory not sufficient,
// and then set wg's flag, other query may not free memory
very quickly.
- if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE) {
+ if (query_it->elapsed_time() > TIMEOUT_IN_QUEUE_LIMIT) {
// set wg's memory to insufficent, then add it back to
task scheduler to run.
- LOG(INFO) << "query: " <<
print_id(query_ctx->query_id())
+ LOG(INFO) << "Query: " <<
print_id(query_ctx->query_id())
<< " will be resume.";
query_ctx->set_memory_sufficient(true);
query_it = queries_list.erase(query_it);
@@ -441,7 +449,8 @@ void WorkloadGroupMgr::handle_paused_queries() {
continue;
} else {
bool spill_res = handle_single_query_(
- query_ctx, query_it->reserve_size_,
query_ctx->paused_reason());
+ query_ctx, query_it->reserve_size_,
query_it->elapsed_time(),
+ query_ctx->paused_reason());
if (spill_res) {
query_it = queries_list.erase(query_it);
continue;
@@ -461,7 +470,7 @@ void WorkloadGroupMgr::handle_paused_queries() {
if
(doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted <
0.001 &&
query_it->cache_ratio_ > 0.001) {
- LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+ LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
<< " will be resume after cache adjust.";
query_ctx->set_memory_sufficient(true);
query_it = queries_list.erase(query_it);
@@ -613,15 +622,16 @@ int64_t
WorkloadGroupMgr::cancel_top_query_in_overcommit_group_(int64_t need_fre
// If the query could release some memory, for example, spill disk, then the
return value is true.
// If the query could not release memory, then cancel the query, the return
value is true.
// If the query is not ready to do these tasks, it means just wait, then
return value is false.
-bool WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext>
query_ctx,
- size_t size_to_reserve, Status
paused_reason) {
+bool WorkloadGroupMgr::handle_single_query_(const
std::shared_ptr<QueryContext>& query_ctx,
+ size_t size_to_reserve, int64_t
time_in_queue,
+ Status paused_reason) {
size_t revocable_size = 0;
size_t memory_usage = 0;
bool has_running_task = false;
const auto query_id = print_id(query_ctx->query_id());
query_ctx->get_revocable_info(&revocable_size, &memory_usage,
&has_running_task);
if (has_running_task) {
- LOG(INFO) << "query: " << print_id(query_ctx->query_id())
+ LOG(INFO) << "Query: " << print_id(query_ctx->query_id())
<< " is paused, but still has running task, skip it.";
return false;
}
@@ -633,14 +643,14 @@ bool
WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_
// During waiting time, another operator in the query may finished
and release
// many memory and we could run.
if ((memory_usage + size_to_reserve) < limit) {
- LOG(INFO) << "query: " << query_id << ", usage(" <<
memory_usage << " + "
+ LOG(INFO) << "Query: " << query_id << ", usage(" <<
memory_usage << " + "
<< size_to_reserve << ") less than limit(" << limit
<< "), resume it.";
query_ctx->set_memory_sufficient(true);
return true;
- } else {
+ } else if (time_in_queue >= TIMEOUT_IN_QUEUE_LIMIT) {
// Use MEM_LIMIT_EXCEEDED so that FE could parse the error
code and do try logic
auto msg1 = fmt::format(
- "query {} reserve memory failed, but could not find
memory that could "
+ "Query {} reserve memory failed, but could not find
memory that could "
"release or spill to disk. Query memory usage: {},
limit: {}, process "
"memory info: {}"
", wg info: {}.",
@@ -657,37 +667,54 @@ bool
WorkloadGroupMgr::handle_single_query_(std::shared_ptr<QueryContext> query_
MemTrackerLimiter::Type::LOAD));
LOG(INFO) << msg2;
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
+ } else {
+ return false;
+ }
+ } else if
(paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
+ if (!query_ctx->workload_group()->exceed_limit()) {
+ LOG(INFO) << "Query: " << query_id
+ << " paused caused by
WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it.";
+ query_ctx->set_memory_sufficient(true);
+ return true;
+ } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) {
+ LOG(INFO) << "Query: " << query_id << ", workload group
exceeded, info: "
+ <<
GlobalMemoryArbitrator::process_memory_used_details_str()
+ << ", wg info: " <<
query_ctx->workload_group()->memory_debug_string();
+
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
+ "The query({}) reserved memory failed because workload
group limit "
+ "exceeded, and there is no cache now. And could not
find task to spill. "
+ "Maybe you should set the workload group's limit to a
lower value.",
+ query_id));
+ } else {
+ return false;
}
} else {
// Should not consider about process memory. For example, the
query's limit is 100g, workload
// group's memlimit is 10g, process memory is 20g. The query
reserve will always failed in wg
// limit, and process is always have memory, so that it will
resume and failed reserve again.
- /*
if (!GlobalMemoryArbitrator::is_exceed_hard_mem_limit()) {
- LOG(INFO) << "query: " << query_id
+ LOG(INFO) << "Query: " << query_id
<< ", process limit not exceeded now, resume this
query"
<< ", process memory info: "
<<
GlobalMemoryArbitrator::process_memory_used_details_str()
<< ", wg info: " <<
query_ctx->workload_group()->memory_debug_string();
query_ctx->set_memory_sufficient(true);
return true;
+ } else if (time_in_queue > TIMEOUT_IN_QUEUE_LIMIT) {
+ LOG(INFO) << "Query: " << query_id << ", process limit
exceeded, info: "
+ <<
GlobalMemoryArbitrator::process_memory_used_details_str()
+ << ", wg info: " <<
query_ctx->workload_group()->memory_debug_string();
+
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
+ "The query({}) reserved memory failed because process
limit exceeded, "
+ "and "
+ "there is no cache now. And could not find task to
spill. Maybe you "
+ "should "
+ "set "
+ "the workload group's limit to a lower value.",
+ query_id));
+ } else {
+ return false;
}
-
- LOG(INFO) << "query: " << query_id << ", process limit exceeded,
info: "
- <<
GlobalMemoryArbitrator::process_memory_used_details_str()
- << ", wg info: " <<
query_ctx->workload_group()->memory_debug_string();
-
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
- "The query({}) reserved memory failed because process
limit exceeded, and "
- "there is no cache now. And could not find task to spill.
Maybe you should "
- "set "
- "the workload group's limit to a lower value.",
- query_id));
- */
-
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
- "The query({}) reserved memory failed and could not find
task to spill. Maybe "
- "you should "
- "set the query's memlimit or workload group's limit to a
lower value.",
- query_id));
}
} else {
SCOPED_ATTACH_TASK(query_ctx.get());
@@ -759,7 +786,7 @@ void
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::FIXED) {
if (total_slot_count < 1) {
LOG(WARNING)
- << "query " << print_id(query_ctx->query_id())
+ << "Query " << print_id(query_ctx->query_id())
<< " enabled hard limit, but the slot count < 1, could
not take affect";
} else {
// If the query enable hard limit, then not use weighted info
any more, just use the settings limit.
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index 065528c66ec..9e6ac17b5dc 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -110,8 +110,8 @@ private:
RuntimeProfile* profile);
int64_t flush_memtable_from_current_group_(std::shared_ptr<QueryContext>
requestor,
WorkloadGroupPtr wg, int64_t
need_free_mem);
- bool handle_single_query_(std::shared_ptr<QueryContext> query_ctx, size_t
size_to_reserve,
- Status paused_reason);
+ bool handle_single_query_(const std::shared_ptr<QueryContext>& query_ctx,
+ size_t size_to_reserve, int64_t time_in_queue,
Status paused_reason);
int64_t revoke_memory_from_other_group_(std::shared_ptr<QueryContext>
requestor,
bool hard_limit, int64_t
need_free_mem);
int64_t revoke_overcommited_memory_(std::shared_ptr<QueryContext>
requestor,
diff --git a/be/src/runtime/workload_management/workload_action.cpp
b/be/src/runtime/workload_management/workload_action.cpp
index 8e6e3b19e2c..895269870ea 100644
--- a/be/src/runtime/workload_management/workload_action.cpp
+++ b/be/src/runtime/workload_management/workload_action.cpp
@@ -23,7 +23,7 @@ namespace doris {
void WorkloadActionCancelQuery::exec(WorkloadQueryInfo* query_info) {
std::stringstream msg;
- msg << "query " << query_info->query_id
+ msg << "Query " << query_info->query_id
<< " cancelled by workload policy: " << query_info->policy_name
<< ", id:" << query_info->policy_id;
std::string msg_str = msg.str();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]