This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new ffcf519d841 memtable flush reserve process mem and improve logs
(#45743)
ffcf519d841 is described below
commit ffcf519d841d295285cc5caef15b47a15acb7d20
Author: TengJianPing <[email protected]>
AuthorDate: Tue Dec 24 16:29:15 2024 +0800
memtable flush reserve process mem and improve logs (#45743)
---
be/src/olap/memtable.cpp | 14 +++
be/src/olap/memtable.h | 1 +
be/src/olap/memtable_flush_executor.cpp | 40 ++++++++
be/src/olap/memtable_flush_executor.h | 19 ++++
be/src/olap/memtable_memory_limiter.cpp | 28 ++++--
be/src/olap/memtable_memory_limiter.h | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 5 +-
be/src/pipeline/pipeline_task.cpp | 10 +-
be/src/pipeline/pipeline_task.h | 2 +
be/src/runtime/memory/global_memory_arbitrator.h | 14 +--
be/src/runtime/memory/mem_tracker_limiter.cpp | 3 +-
be/src/runtime/memory/memory_profile.cpp | 13 ++-
be/src/runtime/memory/memory_profile.h | 1 +
be/src/runtime/memory/thread_mem_tracker_mgr.h | 48 ++++++----
be/src/runtime/runtime_state.h | 14 ---
be/src/runtime/thread_context.h | 11 ++-
be/src/runtime/workload_group/workload_group.cpp | 20 ++--
be/src/runtime/workload_group/workload_group.h | 6 +-
.../workload_group/workload_group_manager.cpp | 101 ++++++++++++---------
.../workload_group/workload_group_manager.h | 3 +-
be/src/vec/exec/scan/scanner_context.h | 2 +
be/src/vec/exec/scan/scanner_scheduler.cpp | 69 ++++++++++----
be/src/vec/exec/scan/vscanner.h | 11 ++-
.../java/org/apache/doris/qe/SessionVariable.java | 15 ---
gensrc/thrift/PaloInternalService.thrift | 1 +
25 files changed, 303 insertions(+), 150 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 995402bc273..bf7c5d53d25 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -611,6 +611,20 @@ bool MemTable::need_agg() const {
return false;
}
+size_t MemTable::get_flush_reserve_memory_size() const {
+ size_t reserve_size = 0;
+ if (_keys_type == KeysType::DUP_KEYS) {
+ if (_tablet_schema->num_key_columns() == 0) {
+ // no need to reserve
+ } else {
+ reserve_size = _input_mutable_block.allocated_bytes();
+ }
+ } else {
+ reserve_size = _input_mutable_block.allocated_bytes();
+ }
+ return reserve_size;
+}
+
Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
size_t same_keys_num = _sort();
if (_keys_type == KeysType::DUP_KEYS || same_keys_num == 0) {
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 77ff2e886bf..09591df2745 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -181,6 +181,7 @@ public:
int64_t tablet_id() const { return _tablet_id; }
size_t memory_usage() const { return _mem_tracker->consumption(); }
+ size_t get_flush_reserve_memory_size() const;
// insert tuple from (row_pos) to (row_pos+num_rows)
Status insert(const vectorized::Block* block, const std::vector<uint32_t>&
row_idxs);
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index 50ccbb25958..5533a360fac 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -28,6 +28,7 @@
#include "common/signal_handler.h"
#include "olap/memtable.h"
#include "olap/rowset/rowset_writer.h"
+#include "olap/storage_engine.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/metrics.h"
@@ -140,6 +141,36 @@ Status FlushToken::wait() {
return Status::OK();
}
+Status FlushToken::_try_reserve_memory(QueryThreadContext
query_thread_context, int64_t size) {
+ auto* thread_context = doris::thread_context();
+ auto* memtable_flush_executor =
+ ExecEnv::GetInstance()->storage_engine().memtable_flush_executor();
+ Status st;
+ do {
+ // only try to reserve process memory
+ st = thread_context->try_reserve_process_memory(size);
+ if (st.ok()) {
+ memtable_flush_executor->inc_flushing_task();
+ break;
+ }
+ if (_is_shutdown() ||
query_thread_context.get_memory_tracker()->is_query_cancelled()) {
+ st = Status::Cancelled("flush memtable already cancelled");
+ break;
+ }
+ // Make sure at least one memtable is flushing even reserve memory
failed.
+ if (memtable_flush_executor->check_and_inc_has_any_flushing_task()) {
+ // If there are already any flushing task, Wait for some time and
retry.
+ LOG_EVERY_T(INFO, 60) << fmt::format(
+ "Failed to reserve memory {} for flush memtable, retry
after 100ms", size);
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+ } else {
+ st = Status::OK();
+ break;
+ }
+ } while (true);
+ return st;
+}
+
Status FlushToken::_do_flush_memtable(MemTable* memtable, int32_t segment_id,
int64_t* flush_size) {
VLOG_CRITICAL << "begin to flush memtable for tablet: " <<
memtable->tablet_id()
<< ", memsize: " << memtable->memory_usage()
@@ -150,10 +181,19 @@ Status FlushToken::_do_flush_memtable(MemTable* memtable,
int32_t segment_id, in
SCOPED_ATTACH_TASK(memtable->query_thread_context());
signal::set_signal_task_id(_rowset_writer->load_id());
signal::tablet_id = memtable->tablet_id();
+
+ DEFER_RELEASE_RESERVED();
+
+ auto reserve_size = memtable->get_flush_reserve_memory_size();
+ RETURN_IF_ERROR(_try_reserve_memory(memtable->query_thread_context(),
reserve_size));
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
memtable->query_thread_context().query_mem_tracker->write_tracker());
SCOPED_CONSUME_MEM_TRACKER(memtable->mem_tracker());
+
+ Defer defer {[&]() {
+
ExecEnv::GetInstance()->storage_engine().memtable_flush_executor()->dec_flushing_task();
+ }};
std::unique_ptr<vectorized::Block> block;
RETURN_IF_ERROR(memtable->to_block(&block));
RETURN_IF_ERROR(_rowset_writer->flush_memtable(block.get(),
segment_id, flush_size));
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index 27e8e8a9b0e..040a8fa5449 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -94,6 +94,8 @@ private:
Status _do_flush_memtable(MemTable* memtable, int32_t segment_id, int64_t*
flush_size);
+ Status _try_reserve_memory(QueryThreadContext query_thread_context,
int64_t size);
+
// Records the current flush status of the tablet.
// Note: Once its value is set to Failed, it cannot return to SUCCESS.
std::shared_mutex _flush_status_lock;
@@ -140,12 +142,29 @@ public:
std::shared_ptr<RowsetWriter> rowset_writer,
bool is_high_priority,
std::shared_ptr<WorkloadGroup> wg_sptr);
+ // return true if it already has any flushing task
+ bool check_and_inc_has_any_flushing_task() {
+ // need to use CAS instead of only `if (0 == _flushing_task_count)`
statement,
+ // to avoid concurrent entries both pass the if statement
+ int expected_count = 0;
+ if (!_flushing_task_count.compare_exchange_strong(expected_count, 1)) {
+ return true;
+ }
+ DCHECK(expected_count == 0 && _flushing_task_count == 1);
+ return false;
+ }
+
+ void inc_flushing_task() { _flushing_task_count++; }
+
+ void dec_flushing_task() { _flushing_task_count--; }
+
private:
void _register_metrics();
static void _deregister_metrics();
std::unique_ptr<ThreadPool> _flush_pool;
std::unique_ptr<ThreadPool> _high_prio_flush_pool;
+ std::atomic<int> _flushing_task_count = 0;
};
} // namespace doris
diff --git a/be/src/olap/memtable_memory_limiter.cpp
b/be/src/olap/memtable_memory_limiter.cpp
index b222c041b34..22b842ec672 100644
--- a/be/src/olap/memtable_memory_limiter.cpp
+++ b/be/src/olap/memtable_memory_limiter.cpp
@@ -124,10 +124,10 @@ void
MemTableMemoryLimiter::handle_workload_group_memtable_flush(WorkloadGroupPt
--sleep_times;
}
// Check process memory again.
- handle_memtable_flush();
+ handle_memtable_flush(wg);
}
-void MemTableMemoryLimiter::handle_memtable_flush() {
+void MemTableMemoryLimiter::handle_memtable_flush(WorkloadGroupPtr wg) {
// Check the soft limit.
DCHECK(_load_soft_mem_limit > 0);
if (!_soft_limit_reached() || _load_usage_low()) {
@@ -150,12 +150,17 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
if (need_flush > 0) {
auto limit = _hard_limit_reached() ? Limit::HARD : Limit::SOFT;
LOG(INFO) << "reached memtable memory " << (limit == Limit::HARD ?
"hard" : "soft")
- << ", " <<
GlobalMemoryArbitrator::process_memory_used_details_str()
+ << ", " <<
GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
+ <<
GlobalMemoryArbitrator::sys_mem_available_details_str()
<< ", load mem: " <<
PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< ", active: " <<
PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " <<
PrettyPrinter::print_bytes(_queue_mem_usage)
- << ", flush: " <<
PrettyPrinter::print_bytes(_flush_mem_usage);
+ << ", flush: " <<
PrettyPrinter::print_bytes(_flush_mem_usage)
+ << ", wg: " << (wg ? wg->debug_string() : "null\n")
+ << doris::ProcessProfile::instance()
+ ->memory_profile()
+ ->process_memory_detail_str();
_flush_active_memtables(0, need_flush);
}
} while (_hard_limit_reached() && !_load_usage_low());
@@ -163,7 +168,16 @@ void MemTableMemoryLimiter::handle_memtable_flush() {
timer.stop();
int64_t time_ms = timer.elapsed_time() / 1000 / 1000;
g_memtable_memory_limit_latency_ms << time_ms;
- LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit";
+ LOG(INFO) << "waited " << time_ms << " ms for memtable memory limit"
+ << ", " <<
GlobalMemoryArbitrator::process_memory_used_details_str() << ", "
+ << GlobalMemoryArbitrator::sys_mem_available_details_str()
+ << ", load mem: " <<
PrettyPrinter::print_bytes(_mem_tracker->consumption())
+ << ", memtable writers num: " << _writers.size()
+ << ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
+ << ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
+ << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage)
+ << ", wg: " << (wg ? wg->debug_string() : "null.\n")
+ <<
doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str();
}
int64_t MemTableMemoryLimiter::flush_workload_group_memtables(uint64_t wg_id,
int64_t need_flush) {
@@ -270,11 +284,13 @@ void MemTableMemoryLimiter::refresh_mem_tracker() {
_last_limit = limit;
_log_timer.reset();
LOG(INFO) << ss.str() << ", " <<
GlobalMemoryArbitrator::process_memory_used_details_str()
+ << ", " <<
GlobalMemoryArbitrator::sys_mem_available_details_str()
<< ", load mem: " <<
PrettyPrinter::print_bytes(_mem_tracker->consumption())
<< ", memtable writers num: " << _writers.size()
<< ", active: " << PrettyPrinter::print_bytes(_active_mem_usage)
<< ", queue: " << PrettyPrinter::print_bytes(_queue_mem_usage)
- << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage);
+ << ", flush: " << PrettyPrinter::print_bytes(_flush_mem_usage)
<< "\n"
+ <<
doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str();
}
void MemTableMemoryLimiter::_refresh_mem_tracker() {
diff --git a/be/src/olap/memtable_memory_limiter.h
b/be/src/olap/memtable_memory_limiter.h
index de2fb802165..155a1dd424b 100644
--- a/be/src/olap/memtable_memory_limiter.h
+++ b/be/src/olap/memtable_memory_limiter.h
@@ -43,7 +43,7 @@ public:
// If yes, it will flush memtable to try to reduce memory consumption.
// Every write operation will call this API to check if need flush
memtable OR hang
// when memory is not available.
- void handle_memtable_flush();
+ void handle_memtable_flush(WorkloadGroupPtr wg);
int64_t flush_workload_group_memtables(uint64_t wg_id, int64_t
need_flush_bytes);
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 42d2640441e..f86fb491d71 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1821,8 +1821,9 @@ size_t PipelineFragmentContext::get_revocable_size(bool*
has_running_task) const
if (task->is_running() || task->is_revoking()) {
LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id)
<< " is running, task: " <<
(void*)task.get()
- << ", task->is_revoking(): " <<
task->is_revoking() << ", "
- << task->is_running();
+ << ", is_revoking: " <<
task->is_revoking()
+ << ", is_running: " << task->is_running()
+ << ", task info: " <<
task->debug_string();
*has_running_task = true;
return 0;
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 0ddc329da3b..3fef0fc4c93 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -437,11 +437,8 @@ Status PipelineTask::execute(bool* eos) {
}
LOG(INFO) << debug_msg;
- _state->get_query_ctx()->update_paused_reason(st);
- _state->get_query_ctx()->set_low_memory_mode();
- _state->get_query_ctx()->set_memory_sufficient(false);
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this(),
reserve_size);
+ _state->get_query_ctx()->shared_from_this(),
reserve_size, st);
continue;
}
}
@@ -484,11 +481,8 @@ Status PipelineTask::execute(bool* eos) {
DCHECK_EQ(_pending_block.get(), nullptr);
_pending_block = std::move(_block);
_block =
vectorized::Block::create_unique(_pending_block->clone_empty());
- _state->get_query_ctx()->update_paused_reason(status);
- _state->get_query_ctx()->set_low_memory_mode();
- _state->get_query_ctx()->set_memory_sufficient(false);
ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
- _state->get_query_ctx()->shared_from_this(),
sink_reserve_size);
+ _state->get_query_ctx()->shared_from_this(),
sink_reserve_size, status);
_pending_eos = *eos;
*eos = false;
continue;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 99decc05a9d..19315bef89d 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -251,6 +251,8 @@ public:
return _memory_sufficient_dependency.get();
}
+ void inc_memory_reserve_failed_times() {
COUNTER_UPDATE(_memory_reserve_failed_times, 1); }
+
private:
friend class RuntimeFilterDependency;
bool _is_blocked();
diff --git a/be/src/runtime/memory/global_memory_arbitrator.h
b/be/src/runtime/memory/global_memory_arbitrator.h
index 05963132cb1..790301b68f9 100644
--- a/be/src/runtime/memory/global_memory_arbitrator.h
+++ b/be/src/runtime/memory/global_memory_arbitrator.h
@@ -91,12 +91,13 @@ public:
static inline std::string sys_mem_available_details_str() {
auto msg = fmt::format(
"sys available memory {}(= {}[proc/available] - {}[reserved] -
"
- "{}B[waiting_refresh])",
+ "{}B[waiting_refresh] + {}[tc/jemalloc_cache])",
PrettyPrinter::print(sys_mem_available(), TUnit::BYTES),
PrettyPrinter::print(MemInfo::_s_sys_mem_available.load(std::memory_order_relaxed),
TUnit::BYTES),
PrettyPrinter::print(process_reserved_memory(), TUnit::BYTES),
- refresh_interval_memory_growth);
+ refresh_interval_memory_growth,
+
PrettyPrinter::print_bytes(static_cast<uint64_t>(MemInfo::allocator_cache_mem())));
#ifdef ADDRESS_SANITIZER
msg = "[ASAN]" + msg;
#endif
@@ -165,15 +166,16 @@ public:
static std::string process_limit_exceeded_errmsg_str() {
return fmt::format(
- "{} exceed limit {} or {} less than low water mark {}",
process_memory_used_str(),
- MemInfo::mem_limit_str(), sys_mem_available_str(),
+ "{} exceed limit {} or {} less than low water mark {}",
+ process_memory_used_details_str(), MemInfo::mem_limit_str(),
+ sys_mem_available_details_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(),
TUnit::BYTES));
}
static std::string process_soft_limit_exceeded_errmsg_str() {
return fmt::format("{} exceed soft limit {} or {} less than warning
water mark {}.",
- process_memory_used_str(),
MemInfo::soft_mem_limit_str(),
- sys_mem_available_str(),
+ process_memory_used_details_str(),
MemInfo::soft_mem_limit_str(),
+ sys_mem_available_details_str(),
PrettyPrinter::print(MemInfo::sys_mem_available_warning_water_mark(),
TUnit::BYTES));
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 068c3427b84..bd9aa705a33 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -358,7 +358,8 @@ std::string MemTrackerLimiter::tracker_limit_exceeded_str()
{
"{}, peak used {}, current used {}. backend {}, {}.",
label(), type_string(_type), MemCounter::print_bytes(limit()),
MemCounter::print_bytes(peak_consumption()),
MemCounter::print_bytes(consumption()),
- BackendOptions::get_localhost(),
GlobalMemoryArbitrator::process_memory_used_str());
+ BackendOptions::get_localhost(),
+ GlobalMemoryArbitrator::process_memory_used_details_str());
if (_type == Type::QUERY || _type == Type::LOAD) {
err_msg += fmt::format(
" exec node:<{}>, can `set exec_mem_limit=8G` to change limit,
details see "
diff --git a/be/src/runtime/memory/memory_profile.cpp
b/be/src/runtime/memory/memory_profile.cpp
index c7421236c42..fa8b45abfb4 100644
--- a/be/src/runtime/memory/memory_profile.cpp
+++ b/be/src/runtime/memory/memory_profile.cpp
@@ -340,14 +340,17 @@ int64_t MemoryProfile::other_current_usage() {
return memory_other_trackers_sum_bytes.get_value();
}
+std::string MemoryProfile::process_memory_detail_str() const {
+ return fmt::format("Process Memory Summary: {}\n, {}\n, {}\n, {}",
+ GlobalMemoryArbitrator::process_mem_log_str(),
+ print_memory_overview_profile(),
print_global_memory_profile(),
+ print_top_memory_tasks_profile());
+}
+
void MemoryProfile::print_log_process_usage() {
if (_enable_print_log_process_usage) {
_enable_print_log_process_usage = false;
- LOG(WARNING) << "Process Memory Summary: " +
GlobalMemoryArbitrator::process_mem_log_str()
- << "\n"
- << print_memory_overview_profile() << "\n"
- << print_global_memory_profile() << "\n"
- << print_top_memory_tasks_profile();
+ LOG(WARNING) << process_memory_detail_str();
}
}
diff --git a/be/src/runtime/memory/memory_profile.h
b/be/src/runtime/memory/memory_profile.h
index 9f1bab0c02a..bf3a6aa9f39 100644
--- a/be/src/runtime/memory/memory_profile.h
+++ b/be/src/runtime/memory/memory_profile.h
@@ -69,6 +69,7 @@ public:
// process memory changes more than 256M, or the GC ends
void enable_print_log_process_usage() { _enable_print_log_process_usage =
true; }
void print_log_process_usage();
+ std::string process_memory_detail_str() const;
private:
MultiVersion<RuntimeProfile> _memory_overview_profile;
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 83caf753aed..8193d89394a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -83,7 +83,7 @@ public:
void consume(int64_t size, int skip_large_memory_check = 0);
void flush_untracked_mem();
- doris::Status try_reserve(int64_t size);
+ doris::Status try_reserve(int64_t size, bool only_check_process_memory);
void release_reserved();
@@ -278,7 +278,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
_stop_consume = false;
}
-inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size) {
+inline doris::Status ThreadMemTrackerMgr::try_reserve(int64_t size,
+ bool
only_check_process_memory) {
DCHECK(_limiter_tracker);
DCHECK(size >= 0);
CHECK(init());
@@ -286,26 +287,35 @@ inline doris::Status
ThreadMemTrackerMgr::try_reserve(int64_t size) {
// _untracked_mem store bytes that not synchronized to process reserved
memory.
flush_untracked_mem();
auto wg_ptr = _wg_wptr.lock();
- if (!_limiter_tracker->try_reserve(size)) {
- auto err_msg = fmt::format(
- "reserve memory failed, size: {}, because query memory
exceeded, memory tracker "
- "consumption: {}, limit: {}",
- PrettyPrinter::print(size, TUnit::BYTES),
- PrettyPrinter::print(_limiter_tracker->consumption(),
TUnit::BYTES),
- PrettyPrinter::print(_limiter_tracker->limit(), TUnit::BYTES));
- return doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
- }
- if (wg_ptr) {
- if (!wg_ptr->add_wg_refresh_interval_memory_growth(size)) {
+ if (only_check_process_memory) {
+ _limiter_tracker->reserve(size);
+ if (wg_ptr) {
+ wg_ptr->add_wg_refresh_interval_memory_growth(size);
+ }
+ } else {
+ if (!_limiter_tracker->try_reserve(size)) {
auto err_msg = fmt::format(
- "reserve memory failed, size: {}, because workload group
memory exceeded, "
- "workload group: {}",
- PrettyPrinter::print(size, TUnit::BYTES),
wg_ptr->memory_debug_string());
- _limiter_tracker->release(size); // rollback
- _limiter_tracker->release_reserved(size); // rollback
- return
doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg);
+ "reserve memory failed, size: {}, because query memory
exceeded, memory "
+ "tracker "
+ "consumption: {}, limit: {}",
+ PrettyPrinter::print(size, TUnit::BYTES),
+ PrettyPrinter::print(_limiter_tracker->consumption(),
TUnit::BYTES),
+ PrettyPrinter::print(_limiter_tracker->limit(),
TUnit::BYTES));
+ return
doris::Status::Error<ErrorCode::QUERY_MEMORY_EXCEEDED>(err_msg);
+ }
+ if (wg_ptr) {
+ if (!wg_ptr->try_add_wg_refresh_interval_memory_growth(size)) {
+ auto err_msg = fmt::format(
+ "reserve memory failed, size: {}, because workload
group memory exceeded, "
+ "workload group: {}",
+ PrettyPrinter::print(size, TUnit::BYTES),
wg_ptr->memory_debug_string());
+ _limiter_tracker->release(size); // rollback
+ _limiter_tracker->release_reserved(size); // rollback
+ return
doris::Status::Error<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>(err_msg);
+ }
}
}
+
if (!doris::GlobalMemoryArbitrator::try_reserve_process_memory(size)) {
auto err_msg =
fmt::format("reserve memory failed, size: {}, because proccess
memory exceeded, {}",
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 16f500b2fcc..879bd647d96 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -410,20 +410,6 @@ public:
bool enable_page_cache() const;
- int partitioned_hash_join_rows_threshold() const {
- if (!_query_options.__isset.partitioned_hash_join_rows_threshold) {
- return 0;
- }
- return _query_options.partitioned_hash_join_rows_threshold;
- }
-
- int partitioned_hash_agg_rows_threshold() const {
- if (!_query_options.__isset.partitioned_hash_agg_rows_threshold) {
- return 0;
- }
- return _query_options.partitioned_hash_agg_rows_threshold;
- }
-
const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
return _tablet_commit_infos;
}
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index a48bc680925..a9aede24487 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -249,13 +249,22 @@ public:
thread_mem_tracker_mgr->consume(size, skip_large_memory_check);
}
+ doris::Status try_reserve_process_memory(const int64_t size) const {
+#ifdef USE_MEM_TRACKER
+ DCHECK(doris::k_doris_exit ||
!doris::config::enable_memory_orphan_check ||
+ thread_mem_tracker()->label() != "Orphan")
+ << doris::memory_orphan_check_msg;
+#endif
+ return thread_mem_tracker_mgr->try_reserve(size, true);
+ }
+
doris::Status try_reserve_memory(const int64_t size) const {
#ifdef USE_MEM_TRACKER
DCHECK(doris::k_doris_exit ||
!doris::config::enable_memory_orphan_check ||
thread_mem_tracker()->label() != "Orphan")
<< doris::memory_orphan_check_msg;
#endif
- return thread_mem_tracker_mgr->try_reserve(size);
+ return thread_mem_tracker_mgr->try_reserve(size, false);
}
void release_reserved_memory() const {
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 7e9a9812956..c87a927a455 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -118,7 +118,7 @@ std::string WorkloadGroup::debug_string() const {
_remote_scan_bytes_per_second);
}
-bool WorkloadGroup::add_wg_refresh_interval_memory_growth(int64_t size) {
+bool WorkloadGroup::try_add_wg_refresh_interval_memory_growth(int64_t size) {
auto realtime_total_mem_used =
_total_mem_used + _wg_refresh_interval_memory_growth.load() + size;
if ((realtime_total_mem_used >
@@ -137,15 +137,19 @@ std::string WorkloadGroup::memory_debug_string() const {
auto realtime_total_mem_used = _total_mem_used +
_wg_refresh_interval_memory_growth.load();
auto mem_used_ratio = realtime_total_mem_used / ((double)_memory_limit +
1);
return fmt::format(
- "WorkloadGroup[id = {}, name = {}, memory_limit = {},
enable_memory_overcommit = {}, "
- "total_mem_used = {}, wg_refresh_interval_memory_growth = {},
mem_used_ratio = {}, "
- "memory_low_watermark = {}, memory_high_watermark = {}, version =
{}, is_shutdown = "
- "{}, query_num = {}]",
- _id, _name, PrettyPrinter::print(_memory_limit, TUnit::BYTES),
- _enable_memory_overcommit ? "true" : "false",
+ "WorkloadGroup[id = {}, name = {}, version = {},"
+ "total_query_slot_count = {}, "
+ "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio=
{}%, "
+ "enable_memory_overcommit = {}, total_mem_used = {}
(write_buffer_size={}),"
+ "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, "
+ "memory_low_watermark={}, memory_high_watermark={},
is_shutdown={}, query_num={}]",
+ _id, _name, _version, _total_query_slot_count,
+ PrettyPrinter::print(_memory_limit, TUnit::BYTES),
to_string(_slot_mem_policy),
+ _load_buffer_ratio, _enable_memory_overcommit ? "true" : "false",
PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
+ PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(),
TUnit::BYTES),
- mem_used_ratio, _memory_low_watermark, _memory_high_watermark,
_version, _is_shutdown,
+ mem_used_ratio, _memory_low_watermark, _memory_high_watermark,
_is_shutdown,
_query_ctxs.size());
}
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 1d617b22bfe..73fc4c965b8 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -113,7 +113,11 @@ public:
return _total_query_slot_count.load(std::memory_order_relaxed);
}
- bool add_wg_refresh_interval_memory_growth(int64_t size);
+ void add_wg_refresh_interval_memory_growth(int64_t size) {
+ _wg_refresh_interval_memory_growth.fetch_add(size);
+ }
+
+ bool try_add_wg_refresh_interval_memory_growth(int64_t size);
void sub_wg_refresh_interval_memory_growth(int64_t size) {
_wg_refresh_interval_memory_growth.fetch_sub(size);
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index ade4f228850..dd3d7d970dd 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -271,9 +271,12 @@ void
WorkloadGroupMgr::get_wg_resource_usage(vectorized::Block* block) {
}
void WorkloadGroupMgr::add_paused_query(const std::shared_ptr<QueryContext>&
query_ctx,
- int64_t reserve_size) {
- std::lock_guard<std::mutex> lock(_paused_queries_lock);
+ int64_t reserve_size, const Status&
status) {
DCHECK(query_ctx != nullptr);
+ query_ctx->update_paused_reason(status);
+ query_ctx->set_low_memory_mode();
+ query_ctx->set_memory_sufficient(false);
+ std::lock_guard<std::mutex> lock(_paused_queries_lock);
auto wg = query_ctx->workload_group();
auto&& [it, inserted] = _paused_queries_list[wg].emplace(
query_ctx,
doris::GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted,
@@ -281,7 +284,6 @@ void WorkloadGroupMgr::add_paused_query(const
std::shared_ptr<QueryContext>& que
// Check if this is an invalid reserve, for example, if the reserve size
is too large, larger than the query limit
// if hard limit is enabled, then not need enable other queries hard limit.
if (inserted) {
- query_ctx->set_memory_sufficient(false);
LOG(INFO) << "Insert one new paused query: " <<
query_ctx->debug_string()
<< ", workload group: " << wg->debug_string();
}
@@ -399,8 +401,13 @@ void WorkloadGroupMgr::handle_paused_queries() {
<<
PrettyPrinter::print_bytes(query_it->reserve_size_)
<< ") failed due to workload group memory
exceed, "
"should set the workload group work in memory
insufficent mode, "
- "so that other query will reduce their
memory. wg: "
- << wg->debug_string();
+ "so that other query will reduce their
memory."
+ << " Query mem limit: "
+ <<
PrettyPrinter::print_bytes(query_ctx->get_mem_limit())
+ << " mem usage: "
+ << PrettyPrinter::print_bytes(
+
query_ctx->get_mem_tracker()->consumption())
+ << ", wg: " << wg->debug_string();
}
if (wg->slot_memory_policy() == TWgSlotMemoryPolicy::NONE) {
// If not enable slot memory policy, then should spill
directly
@@ -657,11 +664,12 @@ bool WorkloadGroupMgr::handle_single_query_(const
std::shared_ptr<QueryContext>&
return false;
}
+ const auto wg = query_ctx->workload_group();
auto revocable_tasks = query_ctx->get_revocable_tasks();
if (revocable_tasks.empty()) {
+ const auto limit = query_ctx->get_mem_limit();
+ const auto reserved_size =
query_ctx->query_mem_tracker->reserved_consumption();
if (paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
- const auto limit = query_ctx->get_mem_limit();
- const auto reserved_size =
query_ctx->query_mem_tracker->reserved_consumption();
// During waiting time, another operator in the query may finished
and release
// many memory and we could run.
if ((memory_usage + size_to_reserve) < limit) {
@@ -674,43 +682,44 @@ bool WorkloadGroupMgr::handle_single_query_(const
std::shared_ptr<QueryContext>&
} else if (time_in_queue >=
config::spill_in_paused_queue_timeout_ms) {
// Use MEM_LIMIT_EXCEEDED so that FE could parse the error
code and do try logic
auto msg1 = fmt::format(
- "Query {} reserve memory failed, but could not find
memory that could "
- "release or spill to disk. Query memory usage: {},
reserved size: {}, try "
- "to reserve: {} "
- ", limit: {} ,process memory info: {}, wg info: {}.",
+ "Query {} failed beause query limit is exceeded, but
could "
+ "not find memory that could release or spill to disk.
Query memory usage: "
+ "{}, limit: {}, reserved "
+ "size: {}, try to reserve: {}, wg info: {}.",
query_id, PrettyPrinter::print_bytes(memory_usage),
+ PrettyPrinter::print_bytes(limit),
PrettyPrinter::print_bytes(reserved_size),
- PrettyPrinter::print_bytes(size_to_reserve),
- PrettyPrinter::print_bytes(query_ctx->get_mem_limit()),
-
GlobalMemoryArbitrator::process_memory_used_details_str(),
- query_ctx->workload_group()->memory_debug_string());
- auto msg2 = msg1 + fmt::format(
- " Query Memory Tracker Summary: {}."
- " Load Memory Tracker Summary: {}",
-
MemTrackerLimiter::make_type_trackers_profile_str(
-
MemTrackerLimiter::Type::QUERY),
-
MemTrackerLimiter::make_type_trackers_profile_str(
-
MemTrackerLimiter::Type::LOAD));
- LOG(INFO) << msg2;
+ PrettyPrinter::print_bytes(size_to_reserve),
wg->memory_debug_string());
+ LOG(INFO) << fmt::format("{}.\n{}", msg1,
+ doris::ProcessProfile::instance()
+ ->memory_profile()
+
->process_memory_detail_str());
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
} else {
return false;
}
} else if
(paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
- if (!query_ctx->workload_group()->exceed_limit()) {
+ if (!wg->exceed_limit()) {
LOG(INFO) << "Query: " << query_id
<< " paused caused by
WORKLOAD_GROUP_MEMORY_EXCEEDED, now resume it.";
query_ctx->set_memory_sufficient(true);
return true;
} else if (time_in_queue >
config::spill_in_paused_queue_timeout_ms) {
- LOG(INFO) << "Query: " << query_id << ", workload group
exceeded, info: "
- <<
GlobalMemoryArbitrator::process_memory_used_details_str()
- << ", wg info: " <<
query_ctx->workload_group()->memory_debug_string();
-
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
- "The query({}) reserved memory failed because workload
group limit "
- "exceeded, and there is no cache now. And could not
find task to spill. "
- "Maybe you should set the workload group's limit to a
lower value.",
- query_id));
+ auto msg1 = fmt::format(
+ "Query {} failed because workload group memory is
exceeded"
+ ", and there is no cache now. And could not find task
to spill. "
+ "Query memory usage: {}, limit: {}, reserved "
+ "size: {}, try to reserve: {}, wg info: {}."
+ " Maybe you should set the workload group's limit to a
lower value.",
+ query_id, PrettyPrinter::print_bytes(memory_usage),
+ PrettyPrinter::print_bytes(limit),
+ PrettyPrinter::print_bytes(reserved_size),
+ PrettyPrinter::print_bytes(size_to_reserve),
wg->memory_debug_string());
+ LOG(INFO) << fmt::format("{}.\n{}", msg1,
+ doris::ProcessProfile::instance()
+ ->memory_profile()
+
->process_memory_detail_str());
+
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
} else {
return false;
}
@@ -724,21 +733,25 @@ bool WorkloadGroupMgr::handle_single_query_(const
std::shared_ptr<QueryContext>&
<< ", process limit not exceeded now, resume this
query"
<< ", process memory info: "
<<
GlobalMemoryArbitrator::process_memory_used_details_str()
- << ", wg info: " <<
query_ctx->workload_group()->memory_debug_string();
+ << ", wg info: " << wg->debug_string();
query_ctx->set_memory_sufficient(true);
return true;
} else if (time_in_queue >
config::spill_in_paused_queue_timeout_ms) {
- LOG(INFO) << "Query: " << query_id << ", process limit
exceeded, info: "
- <<
GlobalMemoryArbitrator::process_memory_used_details_str()
- << ", wg info: " <<
query_ctx->workload_group()->memory_debug_string();
-
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(
- "The query({}) reserved memory failed because process
limit exceeded, "
- "and "
- "there is no cache now. And could not find task to
spill. Maybe you "
- "should "
- "set "
- "the workload group's limit to a lower value.",
- query_id));
+ auto msg1 = fmt::format(
+ "Query {} failed because process memory is exceeded"
+ ", and there is no cache now. And could not find task
to spill. "
+ "Query memory usage: {}, limit: {}, reserved "
+ "size: {}, try to reserve: {}, wg info: {}."
+ " Maybe you should set the workload group's limit to a
lower value.",
+ query_id, PrettyPrinter::print_bytes(memory_usage),
+ PrettyPrinter::print_bytes(limit),
+ PrettyPrinter::print_bytes(reserved_size),
+ PrettyPrinter::print_bytes(size_to_reserve),
wg->memory_debug_string());
+ LOG(INFO) << fmt::format("{}.\n{}", msg1,
+ doris::ProcessProfile::instance()
+ ->memory_profile()
+
->process_memory_detail_str());
+
query_ctx->cancel(doris::Status::Error<ErrorCode::MEM_LIMIT_EXCEEDED>(msg1));
} else {
return false;
}
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index fc53bfea858..47ea5540d3f 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -105,7 +105,8 @@ public:
return _workload_groups[INTERNAL_WORKLOAD_GROUP_ID];
}
- void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx,
int64_t reserve_size);
+ void add_paused_query(const std::shared_ptr<QueryContext>& query_ctx,
int64_t reserve_size,
+ const Status& status);
void handle_paused_queries();
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 2e1a4e9a343..ed5224289bd 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -170,6 +170,8 @@ public:
int32_t low_memory_mode_scanners() const { return 4; }
+ pipeline::ScanLocalStateBase* local_state() const { return _local_state; }
+
// the unique id of this context
std::string ctx_id;
TUniqueId _query_id;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index a7bf8600663..d8858dd5aba 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -33,10 +33,12 @@
#include "common/logging.h"
#include "common/status.h"
#include "olap/tablet.h"
+#include "pipeline/pipeline_task.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
+#include "runtime/workload_group/workload_group_manager.h"
#include "util/async_io.h" // IWYU pragma: keep
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
@@ -209,6 +211,32 @@ std::unique_ptr<ThreadPoolToken>
ScannerScheduler::new_limited_scan_pool_token(
return _limited_scan_thread_pool->new_token(mode, max_concurrency);
}
+void handle_reserve_memory_failure(RuntimeState* state,
std::shared_ptr<ScannerContext> ctx,
+ const Status& st, size_t reserve_size) {
+ ctx->clear_free_blocks();
+ auto* pipeline_task = state->get_task();
+ auto* local_state = ctx->local_state();
+
+ pipeline_task->inc_memory_reserve_failed_times();
+ auto debug_msg = fmt::format(
+ "Query: {} , scanner try to reserve: {}, operator name {}, "
+ "operator "
+ "id: {}, "
+ "task id: "
+ "{}, revocable mem size: {}, failed: {}",
+ print_id(state->query_id()),
PrettyPrinter::print_bytes(reserve_size),
+ local_state->get_name(), local_state->parent()->node_id(),
state->task_id(),
+ PrettyPrinter::print_bytes(pipeline_task->get_revocable_size()),
st.to_string());
+ // PROCESS_MEMORY_EXCEEDED error msg alread contains process_mem_log_str
+ if (!st.is<ErrorCode::PROCESS_MEMORY_EXCEEDED>()) {
+ debug_msg += fmt::format(", debug info: {}",
GlobalMemoryArbitrator::process_mem_log_str());
+ }
+ LOG(INFO) << debug_msg;
+
+ ExecEnv::GetInstance()->workload_group_mgr()->add_paused_query(
+ state->get_query_ctx()->shared_from_this(), reserve_size, st);
+}
+
void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task) {
auto task_lock = ctx->task_exec_ctx();
@@ -246,6 +274,10 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
bool eos = false;
ASSIGN_STATUS_IF_CATCH_EXCEPTION(
RuntimeState* state = ctx->state(); DCHECK(nullptr != state);
+ // scanner->open may alloc plenty amount of memory(read blocks of
data),
+ // so better to also check low memory and clear free blocks here.
+ if (ctx->low_memory_mode()) { ctx->clear_free_blocks(); }
+
if (!scanner->is_init()) {
status = scanner->init();
if (!status.ok()) {
@@ -268,16 +300,17 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
}
size_t raw_bytes_threshold = config::doris_scanner_row_bytes;
- if (ctx->low_memory_mode() &&
- raw_bytes_threshold >
ctx->low_memory_mode_scan_bytes_per_scanner()) {
- raw_bytes_threshold =
ctx->low_memory_mode_scan_bytes_per_scanner();
+ if (ctx->low_memory_mode()) {
+ ctx->clear_free_blocks();
+ if (raw_bytes_threshold >
ctx->low_memory_mode_scan_bytes_per_scanner()) {
+ raw_bytes_threshold =
ctx->low_memory_mode_scan_bytes_per_scanner();
+ }
}
size_t raw_bytes_read = 0;
bool first_read = true;
// If the first block is full, then it is true. Or the first block
+ second block > batch_size
bool has_first_full_block = false;
- size_t block_avg_bytes = ctx->batch_size();
// During low memory mode, every scan task will return at most 2
block to reduce memory usage.
while (!eos && raw_bytes_read < raw_bytes_threshold &&
@@ -298,15 +331,10 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
free_block = ctx->get_free_block(first_read);
} else {
if (state->enable_reserve_memory()) {
- auto reserve_status =
thread_context()->try_reserve_memory(block_avg_bytes);
- if (!reserve_status.ok()) {
- LOG(INFO) << "query: " <<
print_id(state->query_id())
- << ", scanner try to reserve: "
- << PrettyPrinter::print(block_avg_bytes,
TUnit::BYTES)
- << ", failed: " <<
reserve_status.to_string()
- << ", process info: "
- <<
GlobalMemoryArbitrator::process_mem_log_str();
- ctx->clear_free_blocks();
+ size_t block_avg_bytes =
scanner->get_block_avg_bytes();
+ auto st =
thread_context()->try_reserve_memory(block_avg_bytes);
+ if (!st.ok()) {
+ handle_reserve_memory_failure(state, ctx, st,
block_avg_bytes);
break;
}
}
@@ -353,10 +381,17 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
scan_task->cached_blocks.emplace_back(std::move(free_block), free_block_bytes);
}
if (scan_task->cached_blocks.back().first->rows() > 0) {
- block_avg_bytes =
(scan_task->cached_blocks.back().first->allocated_bytes() +
-
scan_task->cached_blocks.back().first->rows() - 1) /
-
scan_task->cached_blocks.back().first->rows() *
- ctx->batch_size();
+ auto block_avg_bytes =
+
(scan_task->cached_blocks.back().first->allocated_bytes() +
+ scan_task->cached_blocks.back().first->rows() -
1) /
+ scan_task->cached_blocks.back().first->rows() *
ctx->batch_size();
+ scanner->update_block_avg_bytes(block_avg_bytes);
+ }
+ if (ctx->low_memory_mode()) {
+ ctx->clear_free_blocks();
+ if (raw_bytes_threshold >
ctx->low_memory_mode_scan_bytes_per_scanner()) {
+ raw_bytes_threshold =
ctx->low_memory_mode_scan_bytes_per_scanner();
+ }
}
} // end for while
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 6c4f3294ce1..dab49b757c8 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -72,7 +72,10 @@ public:
virtual Status init() { return Status::OK(); }
// Not virtual, all child will call this method explictly
virtual Status prepare(RuntimeState* state, const VExprContextSPtrs&
conjuncts);
- virtual Status open(RuntimeState* state) { return Status::OK(); }
+ virtual Status open(RuntimeState* state) {
+ _block_avg_bytes = state->batch_size() * 8;
+ return Status::OK();
+ }
Status get_block(RuntimeState* state, Block* block, bool* eos);
Status get_block_after_projects(RuntimeState* state, vectorized::Block*
block, bool* eos);
@@ -156,6 +159,10 @@ public:
_query_statistics = query_statistics;
}
+ auto get_block_avg_bytes() const { return _block_avg_bytes; }
+
+ void update_block_avg_bytes(size_t block_avg_bytes) { _block_avg_bytes =
block_avg_bytes; }
+
protected:
void _discard_conjuncts() {
for (auto& conjunct : _conjuncts) {
@@ -211,6 +218,8 @@ protected:
// num of rows return from scanner, after filter block
int64_t _num_rows_return = 0;
+ size_t _block_avg_bytes = 0;
+
// Set true after counter is updated finally
bool _has_updated_counter = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index f0380efd7b6..8de6f379ecd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -397,7 +397,6 @@ public class SessionVariable implements Serializable,
Writable {
public static final String INTERNAL_SESSION = "internal_session";
- public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD =
"partitioned_hash_join_rows_threshold";
public static final String PARTITIONED_HASH_AGG_ROWS_THRESHOLD =
"partitioned_hash_agg_rows_threshold";
public static final String PARTITION_PRUNING_EXPAND_THRESHOLD =
"partition_pruning_expand_threshold";
@@ -1587,10 +1586,6 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = INTERNAL_SESSION)
public boolean internalSession = false;
- // Use partitioned hash join if build side row count >= the threshold . 0
- the threshold is not set.
- @VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD, fuzzy =
true)
- public int partitionedHashJoinRowsThreshold = 0;
-
// Use partitioned hash join if build side row count >= the threshold . 0
- the threshold is not set.
@VariableMgr.VarAttr(name = PARTITIONED_HASH_AGG_ROWS_THRESHOLD, fuzzy =
true)
public int partitionedHashAggRowsThreshold = 0;
@@ -2377,7 +2372,6 @@ public class SessionVariable implements Serializable,
Writable {
// this.disableJoinReorder = random.nextBoolean();
this.enableCommonExpPushDownForInvertedIndex = random.nextBoolean();
this.disableStreamPreaggregations = random.nextBoolean();
- this.partitionedHashJoinRowsThreshold = random.nextBoolean() ? 8 :
1048576;
this.partitionedHashAggRowsThreshold = random.nextBoolean() ? 8 :
1048576;
this.enableShareHashTableForBroadcastJoin = random.nextBoolean();
// this.enableHashJoinEarlyStartProbe = random.nextBoolean();
@@ -3075,14 +3069,6 @@ public class SessionVariable implements Serializable,
Writable {
this.queryCacheEntryMaxRows = queryCacheEntryMaxRows;
}
- public int getPartitionedHashJoinRowsThreshold() {
- return partitionedHashJoinRowsThreshold;
- }
-
- public void setPartitionedHashJoinRowsThreshold(int threshold) {
- this.partitionedHashJoinRowsThreshold = threshold;
- }
-
// Serialize to thrift object
public boolean getForwardToMaster() {
return forwardToMaster;
@@ -3910,7 +3896,6 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setSkipDeleteBitmap(skipDeleteBitmap);
-
tResult.setPartitionedHashJoinRowsThreshold(partitionedHashJoinRowsThreshold);
tResult.setPartitionedHashAggRowsThreshold(partitionedHashAggRowsThreshold);
tResult.setExternalSortBytesThreshold(externalSortBytesThreshold);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index b44196d3df2..8cf33a6218b 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -182,6 +182,7 @@ struct TQueryOptions {
52: optional i32 be_exec_version = 0
+ // not used any more
53: optional i32 partitioned_hash_join_rows_threshold = 0
54: optional bool enable_share_hash_table_for_broadcast_join
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]