This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e56c4f0998a [test](join) Fuzzy disable runtime filters in BE (#45654)
(#46963)
e56c4f0998a is described below
commit e56c4f0998ae02d93f6cf8fa6767eca0a17fec22
Author: Pxl <[email protected]>
AuthorDate: Tue Jan 14 16:46:14 2025 +0800
[test](join) Fuzzy disable runtime filters in BE (#45654) (#46963)
pick from #45654
Co-authored-by: Jerry Hu <[email protected]>
---
be/src/exprs/runtime_filter.cpp | 42 +++++++++++++++---
be/src/exprs/runtime_filter.h | 3 ++
be/src/exprs/runtime_filter_slots.h | 25 ++++++++---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 51 +++++++++++++++++++---
be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++
be/src/runtime/runtime_filter_mgr.cpp | 5 ++-
be/src/runtime/runtime_state.h | 5 +++
be/src/vec/runtime/shared_hash_table_controller.h | 1 +
.../java/org/apache/doris/qe/SessionVariable.java | 12 +++++
gensrc/proto/internal_service.proto | 3 ++
gensrc/thrift/PaloInternalService.thrift | 2 +
11 files changed, 134 insertions(+), 19 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 2169ec727b2..74ac7bd18be 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -476,9 +476,15 @@ public:
const TExpr& probe_expr);
Status merge(const RuntimePredicateWrapper* wrapper) {
- if (wrapper->is_ignored()) {
+ if (wrapper->is_disabled()) {
+ set_disabled();
return Status::OK();
}
+
+ if (wrapper->is_ignored() || is_disabled()) {
+ return Status::OK();
+ }
+
_context->ignored = false;
bool can_not_merge_in_or_bloom =
@@ -939,6 +945,10 @@ public:
void set_ignored() { _context->ignored = true; }
+ bool is_disabled() const { return _context->disabled; }
+
+ void set_disabled() { _context->disabled = true; }
+
void batch_assign(const PInFilter* filter,
void (*assign_func)(std::shared_ptr<HybridSetBase>&
_hybrid_set,
PColumnValue&)) {
@@ -1219,9 +1229,10 @@ Status IRuntimeFilter::push_to_remote(RuntimeState*
state, const TNetworkAddress
merge_filter_callback->cntl_->ignore_eovercrowded();
}
- if (get_ignored()) {
+ if (get_ignored() || get_disabled()) {
merge_filter_request->set_filter_type(PFilterType::UNKNOW_FILTER);
- merge_filter_request->set_ignored(true);
+ merge_filter_request->set_ignored(get_ignored());
+ merge_filter_request->set_disabled(get_disabled());
} else {
RETURN_IF_ERROR(serialize(merge_filter_request.get(), &data, &len));
}
@@ -1243,7 +1254,7 @@ Status
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
bool is_late_arrival) {
DCHECK(is_consumer());
auto origin_size = push_exprs.size();
- if (!_wrapper->is_ignored()) {
+ if (!_wrapper->is_ignored() && !_wrapper->is_disabled()) {
_set_push_down(!is_late_arrival);
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs,
_probe_expr));
}
@@ -1339,12 +1350,21 @@ bool IRuntimeFilter::get_ignored() {
return _wrapper->is_ignored();
}
+void IRuntimeFilter::set_disabled() {
+ _wrapper->set_disabled();
+}
+
+bool IRuntimeFilter::get_disabled() const {
+ return _wrapper->is_disabled();
+}
+
std::string IRuntimeFilter::formatted_state() const {
return fmt::format(
"[Id = {}, IsPushDown = {}, RuntimeFilterState = {},
HasRemoteTarget = {}, "
- "HasLocalTarget = {}, Ignored = {}]",
+ "HasLocalTarget = {}, Ignored = {}, Disabled = {}, Type = {},
WaitTimeMS = {}]",
_filter_id, _is_push_down, _get_explain_state_string(),
_has_remote_target,
- _has_local_target, _wrapper->_context->ignored);
+ _has_local_target, _wrapper->_context->ignored,
_wrapper->_context->disabled,
+ _wrapper->get_real_type(), wait_time_ms());
}
Status IRuntimeFilter::init_with_desc(const TRuntimeFilterDesc* desc, const
TQueryOptions* options,
@@ -1451,6 +1471,11 @@ Status IRuntimeFilter::create_wrapper(const
UpdateRuntimeFilterParamsV2* param,
*wrapper = std::make_shared<RuntimePredicateWrapper>(column_type,
get_type(filter_type),
param->request->filter_id());
+ if (param->request->has_disabled() && param->request->disabled()) {
+ (*wrapper)->set_disabled();
+ return Status::OK();
+ }
+
if (param->request->has_ignored() && param->request->ignored()) {
(*wrapper)->set_ignored();
return Status::OK();
@@ -1497,6 +1522,11 @@ Status IRuntimeFilter::_create_wrapper(const T* param,
*wrapper = std::make_unique<RuntimePredicateWrapper>(column_type,
get_type(filter_type),
param->request->filter_id());
+ if (param->request->has_disabled() && param->request->disabled()) {
+ (*wrapper)->set_disabled();
+ return Status::OK();
+ }
+
if (param->request->has_ignored() && param->request->ignored()) {
(*wrapper)->set_ignored();
return Status::OK();
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 8b52031d765..d0bc9be4145 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -284,6 +284,9 @@ public:
bool get_ignored();
+ void set_disabled();
+ bool get_disabled() const;
+
RuntimeFilterType get_real_type();
bool need_sync_filter_size();
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index d150e4cf98a..160664a45f6 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -67,11 +67,17 @@ public:
: hash_table_size;
}
- Status ignore_filters(RuntimeState* state) {
+ /**
+ Disable meaningless filters, such as filters:
+ RF1: col1 in (1, 3, 5)
+ RF2: col1 min: 1, max: 5
+ We consider RF2 is meaningless, because RF1 has already filtered out
all values that RF2 can filter.
+ */
+ Status disable_meaningless_filters(RuntimeState* state) {
// process ignore duplicate IN_FILTER
std::unordered_set<int> has_in_filter;
for (auto filter : _runtime_filters) {
- if (filter->get_ignored()) {
+ if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
@@ -82,7 +88,7 @@ public:
continue;
}
if (has_in_filter.contains(filter->expr_order())) {
- filter->set_ignored();
+ filter->set_disabled();
continue;
}
has_in_filter.insert(filter->expr_order());
@@ -90,14 +96,14 @@ public:
// process ignore filter when it has IN_FILTER on same expr
for (auto filter : _runtime_filters) {
- if (filter->get_ignored()) {
+ if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
!has_in_filter.contains(filter->expr_order())) {
continue;
}
- filter->set_ignored();
+ filter->set_disabled();
}
return Status::OK();
}
@@ -109,6 +115,13 @@ public:
return Status::OK();
}
+ Status disable_all_filters() {
+ for (auto filter : _runtime_filters) {
+ filter->set_disabled();
+ }
+ return Status::OK();
+ }
+
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto filter : _runtime_filters) {
@@ -140,7 +153,7 @@ public:
int result_column_id =
_build_expr_context[i]->get_last_result_column_id();
const auto& column =
block->get_by_position(result_column_id).column;
for (auto* filter : iter->second) {
- if (filter->get_ignored()) {
+ if (filter->get_ignored() || filter->get_disabled()) {
continue;
}
filter->insert_batch(column, 1);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index ffb51c9bc4b..3f096450204 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -17,6 +17,7 @@
#include "hashjoin_build_sink.h"
+#include <cstdlib>
#include <string>
#include "exprs/bloom_filter_func.h"
@@ -105,6 +106,15 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState*
state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
RETURN_IF_ERROR(JoinBuildSinkLocalState::open(state));
+
+#ifndef NDEBUG
+ if (state->fuzzy_disable_runtime_filter_in_be()) {
+ if ((_parent->operator_id() + random()) % 2 == 0) {
+ RETURN_IF_ERROR(disable_runtime_filters(state));
+ }
+ }
+#endif
+
return Status::OK();
}
@@ -123,7 +133,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
}
}};
- if (!_runtime_filter_slots || _runtime_filters.empty() ||
state->is_cancelled() || !_eos) {
+ if (!_runtime_filter_slots || _runtime_filters.empty() ||
state->is_cancelled() || !_eos ||
+ _runtime_filters_disabled) {
return Base::close(state, exec_status);
}
@@ -138,7 +149,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
{
SCOPED_TIMER(_runtime_filter_init_timer);
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state,
hash_table_size));
- RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
+
RETURN_IF_ERROR(_runtime_filter_slots->disable_meaningless_filters(state));
}
if (hash_table_size > 1) {
SCOPED_TIMER(_runtime_filter_compute_timer);
@@ -168,6 +179,33 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
return Base::close(state, exec_status);
}
+Status HashJoinBuildSinkLocalState::disable_runtime_filters(RuntimeState*
state) {
+ if (_runtime_filters_disabled) {
+ return Status::OK();
+ }
+
+ if (_runtime_filters.empty()) {
+ return Status::OK();
+ }
+
+ if (!_should_build_hash_table) {
+ return Status::OK();
+ }
+
+ if (_runtime_filters.empty()) {
+ return Status::OK();
+ }
+
+ DCHECK(_runtime_filter_slots) << "_runtime_filter_slots should be
initialized";
+
+ _runtime_filters_disabled = true;
+ RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0,
_finish_dependency));
+ RETURN_IF_ERROR(_runtime_filter_slots->disable_all_filters());
+
+ SCOPED_TIMER(_publish_runtime_filter_timer);
+ return _runtime_filter_slots->publish(state, !_should_build_hash_table);
+}
+
bool HashJoinBuildSinkLocalState::build_unique() const {
return _parent->cast<HashJoinBuildSinkOperatorX>()._build_unique;
}
@@ -570,9 +608,12 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state._shared_state->build_block =
std::make_shared<vectorized::Block>(
local_state._build_side_mutable_block.to_block());
- RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size(
- state, local_state._shared_state->build_block->rows(),
- local_state._finish_dependency));
+ if (!local_state._runtime_filters_disabled) {
+
RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size(
+ state, local_state._shared_state->build_block->rows(),
+ local_state._finish_dependency));
+ }
+
RETURN_IF_ERROR(
local_state.process_build_block(state,
(*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index f8634ac4c49..d6dac6d1fc2 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -54,6 +54,8 @@ public:
Status close(RuntimeState* state, Status exec_status) override;
+ Status disable_runtime_filters(RuntimeState* state);
+
protected:
void _hash_table_init(RuntimeState* state);
void _set_build_ignore_flag(vectorized::Block& block, const
std::vector<int>& res_col_ids);
@@ -75,6 +77,8 @@ protected:
bool _should_build_hash_table = true;
+ bool _runtime_filters_disabled = false;
+
size_t _build_side_rows = 0;
vectorized::MutableBlock _build_side_mutable_block;
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index a6883660be7..f7e22cde144 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -429,10 +429,11 @@ Status
RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> que
void* data = nullptr;
int len = 0;
bool has_attachment = false;
- if (!cnt_val->filter->get_ignored()) {
+ if (!cnt_val->filter->get_ignored() &&
!cnt_val->filter->get_disabled()) {
RETURN_IF_ERROR(cnt_val->filter->serialize(&apply_request, &data,
&len));
} else {
- apply_request.set_ignored(true);
+ apply_request.set_ignored(cnt_val->filter->get_ignored());
+ apply_request.set_disabled(cnt_val->filter->get_disabled());
apply_request.set_filter_type(PFilterType::UNKNOW_FILTER);
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index e1891c23a4e..cd9de143522 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -593,6 +593,11 @@ public:
_query_options.enable_local_merge_sort;
}
+ bool fuzzy_disable_runtime_filter_in_be() const {
+ return _query_options.__isset.fuzzy_disable_runtime_filter_in_be &&
+ _query_options.fuzzy_disable_runtime_filter_in_be;
+ }
+
int64_t min_revocable_mem() const {
if (_query_options.__isset.min_revocable_mem) {
return std::max(_query_options.min_revocable_mem, (int64_t)1);
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index ae5917efe91..421faa4c1d9 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -46,6 +46,7 @@ struct RuntimeFilterContext {
std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
std::shared_ptr<BitmapFilterFuncBase> bitmap_filter_func;
bool ignored = false;
+ bool disabled = false;
std::string err_msg;
};
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 25ca788145a..fcff2697e5c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -569,6 +569,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_FORCE_SPILL = "enable_force_spill";
public static final String DATA_QUEUE_MAX_BLOCKS = "data_queue_max_blocks";
+ public static final String FUZZY_DISABLE_RUNTIME_FILTER_IN_BE =
"fuzzy_disable_runtime_filter_in_be";
+
public static final String GENERATE_STATS_FACTOR = "generate_stats_factor";
public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
@@ -2227,6 +2229,13 @@ public class SessionVariable implements Serializable,
Writable {
needForward = true, fuzzy = true)
public long dataQueueMaxBlocks = 1;
+ @VariableMgr.VarAttr(
+ name = FUZZY_DISABLE_RUNTIME_FILTER_IN_BE,
+ description = {"在 BE 上开启禁用 runtime filter 的随机开关,用于测试",
+ "Disable the runtime filter on the BE for testing
purposes."},
+ needForward = true, fuzzy = false)
+ public boolean fuzzyDisableRuntimeFilterInBE = false;
+
// If the memory consumption of sort node exceed this limit, will trigger
spill to disk;
// Set to 0 to disable; min: 128M
public static final long MIN_EXTERNAL_SORT_BYTES_THRESHOLD = 2097152;
@@ -2475,6 +2484,8 @@ public class SessionVariable implements Serializable,
Writable {
this.batchSize = 1024;
this.enableFoldConstantByBe = false;
}
+
+ this.fuzzyDisableRuntimeFilterInBE = true;
}
}
@@ -3953,6 +3964,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableForceSpill(enableForceSpill);
tResult.setMinRevocableMem(minRevocableMem);
tResult.setDataQueueMaxBlocks(dataQueueMaxBlocks);
+
tResult.setFuzzyDisableRuntimeFilterInBe(fuzzyDisableRuntimeFilterInBE);
tResult.setEnableLocalMergeSort(enableLocalMergeSort);
tResult.setEnableParallelResultSink(enableParallelResultSink);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 254e0740943..150c07fcf9a 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -595,6 +595,7 @@ message PMergeFilterRequest {
optional bool contain_null = 11;
optional bool ignored = 12;
optional uint64 local_merge_time = 13;
+ optional bool disabled = 14;
};
message PMergeFilterResponse {
@@ -615,6 +616,7 @@ message PPublishFilterRequest {
optional PColumnType column_type = 10;
optional bool contain_null = 11;
optional bool ignored = 12;
+ optional bool disabled = 13;
};
message PPublishFilterRequestV2 {
@@ -631,6 +633,7 @@ message PPublishFilterRequestV2 {
optional bool ignored = 11;
repeated int32 fragment_ids = 12;
optional uint64 local_merge_time = 13;
+ optional bool disabled = 14;
};
message PPublishFilterResponse {
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 6184c967ec0..fb0859eb50f 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -356,6 +356,8 @@ struct TQueryOptions {
141: optional bool ignore_runtime_filter_error = false;
+ 146: optional bool fuzzy_disable_runtime_filter_in_be = false;
+
// upgrade options. keep them same in every branch.
200: optional bool new_is_ip_address_in_range = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]