This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch tpc_preview4-external
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/tpc_preview4-external by this
push:
new db13e417337 [refine](exec)Provide an execute_filter interface in expr
to execute execute_conjuncts. (#59409)
db13e417337 is described below
commit db13e417337ffa5b6fa5980d2a4e96714474d2f5
Author: Mryange <[email protected]>
AuthorDate: Wed Dec 31 17:50:35 2025 +0800
[refine](exec)Provide an execute_filter interface in expr to execute
execute_conjuncts. (#59409)
---
be/src/common/config.cpp | 2 +-
be/src/olap/column_predicate.h | 5 +-
be/src/runtime_filter/runtime_filter_selectivity.h | 96 +++++++++
be/src/vec/exprs/vexpr.cpp | 55 +++++
be/src/vec/exprs/vexpr.h | 10 +-
be/src/vec/exprs/vexpr_context.cpp | 90 +--------
be/src/vec/exprs/vexpr_context.h | 9 +
be/src/vec/exprs/vruntimefilter_wrapper.cpp | 117 ++++++++---
be/src/vec/exprs/vruntimefilter_wrapper.h | 46 +----
.../runtime_filter_selectivity_test.cpp | 222 +++++++++++++++++++++
10 files changed, 495 insertions(+), 157 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 29f84798fcf..03a25a6f891 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1037,7 +1037,7 @@ DEFINE_mInt64(big_column_size_buffer, "65535");
DEFINE_mInt64(small_column_size_buffer, "100");
// Perform the always_true check at intervals determined by
runtime_filter_sampling_frequency
-DEFINE_mInt32(runtime_filter_sampling_frequency, "64");
+DEFINE_mInt32(runtime_filter_sampling_frequency, "32");
DEFINE_mInt32(execution_max_rpc_timeout_sec, "3600");
DEFINE_mBool(execution_ignore_eovercrowded, "true");
// cooldown task configs
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 6e6671ff337..7162a96399d 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -25,6 +25,7 @@
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/inverted_index_iterator.h"
#include "runtime/define_primitive_type.h"
+#include "runtime_filter/runtime_filter_selectivity.h"
#include "util/defer_op.h"
#include "util/runtime_profile.h"
#include "vec/columns/column.h"
@@ -372,8 +373,8 @@ protected:
if (!_always_true) {
_judge_filter_rows += filter_rows;
_judge_input_rows += input_rows;
- vectorized::VRuntimeFilterWrapper::judge_selectivity(
- get_ignore_threshold(), _judge_filter_rows,
_judge_input_rows, _always_true);
+
RuntimeFilterSelectivity::judge_selectivity(get_ignore_threshold(),
_judge_filter_rows,
+ _judge_input_rows,
_always_true);
}
}
diff --git a/be/src/runtime_filter/runtime_filter_selectivity.h
b/be/src/runtime_filter/runtime_filter_selectivity.h
new file mode 100644
index 00000000000..1b0a82143de
--- /dev/null
+++ b/be/src/runtime_filter/runtime_filter_selectivity.h
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+
+#include "common/config.h"
+#include "common/logging.h"
+
+namespace doris {
+
+// Used to track the selectivity of runtime filters
+// If the selectivity of a runtime filter is very low, it is considered
ineffective and can be ignored
+// Considering that the selectivity of runtime filters may change with data
variations
+// A dynamic selectivity tracking mechanism is needed
+// Note: this is not a thread-safe class
+
+class RuntimeFilterSelectivity {
+public:
+ RuntimeFilterSelectivity() = default;
+
+ RuntimeFilterSelectivity(const RuntimeFilterSelectivity&) = delete;
+ void update_judge_counter() {
+ if ((_judge_counter++) >= config::runtime_filter_sampling_frequency) {
+ reset_judge_selectivity();
+ }
+ }
+
+ void update_judge_selectivity(int filter_id, uint64_t filter_rows,
uint64_t input_rows,
+ double ignore_thredhold) {
+ if (!_always_true) {
+ _judge_filter_rows += filter_rows;
+ _judge_input_rows += input_rows;
+ judge_selectivity(ignore_thredhold, _judge_filter_rows,
_judge_input_rows,
+ _always_true);
+ }
+
+ VLOG_ROW << fmt::format(
+ "Runtime filter[{}] selectivity update: filter_rows: {},
input_rows: {}, filter "
+ "rate: {}, "
+ "ignore_thredhold: {}, counter: {} , always_true: {}",
+ filter_id, _judge_filter_rows, _judge_input_rows,
+ static_cast<double>(_judge_filter_rows) /
static_cast<double>(_judge_input_rows),
+ ignore_thredhold, _judge_counter, _always_true);
+ }
+
+ bool maybe_always_true_can_ignore() const {
+ /// TODO: maybe we can use session variable to control this behavior ?
+ if (config::runtime_filter_sampling_frequency <= 0) {
+ return false;
+ } else {
+ return _always_true;
+ }
+ }
+
+ static void judge_selectivity(double ignore_threshold, int64_t
filter_rows, int64_t input_rows,
+ bool& always_true) {
+ // if the judged input rows is too small, we think the selectivity is
not reliable
+ if (input_rows > min_judge_input_rows) {
+ always_true = (static_cast<double>(filter_rows) /
static_cast<double>(input_rows)) <
+ ignore_threshold;
+ }
+ }
+
+private:
+ void reset_judge_selectivity() {
+ _always_true = false;
+ _judge_counter = 0;
+ _judge_input_rows = 0;
+ _judge_filter_rows = 0;
+ }
+
+ int64_t _judge_input_rows = 0;
+ int64_t _judge_filter_rows = 0;
+ int _judge_counter = 0;
+ bool _always_true = false;
+
+ constexpr static int64_t min_judge_input_rows = 4096 * 10;
+};
+
+} // namespace doris
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 1bafe01ad71..52d4ca01eac 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -1015,5 +1015,60 @@ bool VExpr::ann_dist_is_fulfilled() const {
return _virtual_column_is_fulfilled;
}
+Status VExpr::execute_filter(VExprContext* context, const Block* block,
+ uint8_t* __restrict result_filter_data, size_t
rows, bool accept_null,
+ bool* can_filter_all) const {
+ ColumnPtr filter_column;
+ RETURN_IF_ERROR(execute_column(context, block, filter_column));
+ if (const auto* const_column =
check_and_get_column<ColumnConst>(*filter_column)) {
+ // const(nullable) or const(bool)
+ const bool result = accept_null
+ ? (const_column->is_null_at(0) ||
const_column->get_bool(0))
+ : (!const_column->is_null_at(0) &&
const_column->get_bool(0));
+ if (!result) {
+ // filter all
+ *can_filter_all = true;
+ memset(result_filter_data, 0, rows);
+ return Status::OK();
+ }
+ } else if (const auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
+ // nullable(bool)
+ const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
+ const IColumn::Filter& filter = assert_cast<const
ColumnUInt8&>(*nested_column).get_data();
+ const auto* __restrict filter_data = filter.data();
+ const auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
+
+ if (accept_null) {
+ for (size_t i = 0; i < rows; ++i) {
+ result_filter_data[i] &= (null_map_data[i]) || filter_data[i];
+ }
+ } else {
+ for (size_t i = 0; i < rows; ++i) {
+ result_filter_data[i] &= (!null_map_data[i]) & filter_data[i];
+ }
+ }
+
+ if ((memchr(result_filter_data, 0x1, rows) == nullptr)) {
+ *can_filter_all = true;
+ return Status::OK();
+ }
+ } else {
+ // bool
+ const IColumn::Filter& filter = assert_cast<const
ColumnUInt8&>(*filter_column).get_data();
+ const auto* __restrict filter_data = filter.data();
+
+ for (size_t i = 0; i < rows; ++i) {
+ result_filter_data[i] &= filter_data[i];
+ }
+
+ if (memchr(result_filter_data, 0x1, rows) == nullptr) {
+ *can_filter_all = true;
+ return Status::OK();
+ }
+ }
+
+ return Status::OK();
+}
+
#include "common/compile_check_end.h"
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index 35a0d3733b0..2a0abe439f9 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -147,6 +147,10 @@ public:
// Therefore we need a function like this to return the actual type
produced by execution.
virtual DataTypePtr execute_type(const Block* block) const { return
_data_type; }
+ virtual Status execute_filter(VExprContext* context, const Block* block,
+ uint8_t* __restrict result_filter_data,
size_t rows,
+ bool accept_null, bool* can_filter_all)
const;
+
// `is_blockable` means this expr will be blocked in `execute` (e.g. AI
Function, Remote Function)
[[nodiscard]] virtual bool is_blockable() const {
return std::any_of(_children.begin(), _children.end(),
@@ -204,12 +208,6 @@ public:
[](VExprSPtr child) { return
child->is_rf_wrapper(); });
}
- virtual void do_judge_selectivity(uint64_t filter_rows, uint64_t
input_rows) {
- for (auto child : _children) {
- child->do_judge_selectivity(filter_rows, input_rows);
- }
- }
-
static Status create_expr_tree(const TExpr& texpr, VExprContextSPtr& ctx);
static Status create_expr_trees(const std::vector<TExpr>& texprs,
VExprContextSPtrs& ctxs);
diff --git a/be/src/vec/exprs/vexpr_context.cpp
b/be/src/vec/exprs/vexpr_context.cpp
index a7b71b77646..2a9c049e303 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -199,7 +199,12 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& ctxs,
return execute_conjuncts(ctxs, filters, false, block, result_filter,
can_filter_all);
}
-// TODO: Performance Optimization
+Status VExprContext::execute_filter(const Block* block, uint8_t* __restrict
result_filter_data,
+ size_t rows, bool accept_null, bool*
can_filter_all) {
+ return _root->execute_filter(this, block, result_filter_data, rows,
accept_null,
+ can_filter_all);
+}
+
Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
const std::vector<IColumn::Filter*>*
filters,
bool accept_null, const Block* block,
@@ -209,85 +214,10 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& ctxs,
*can_filter_all = false;
auto* __restrict result_filter_data = result_filter->data();
for (const auto& ctx : ctxs) {
- // Statistics are only required when an rf wrapper exists in the expr.
- bool is_rf_wrapper = ctx->root()->is_rf_wrapper();
- ColumnPtr filter_column;
- RETURN_IF_ERROR(ctx->execute(block, filter_column));
- if (const auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
- size_t column_size = nullable_column->size();
- if (column_size == 0) {
- *can_filter_all = true;
- return Status::OK();
- } else {
- const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
- const IColumn::Filter& filter =
- assert_cast<const
ColumnUInt8&>(*nested_column).get_data();
- const auto* __restrict filter_data = filter.data();
- const auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
-
- size_t input_rows =
- rows - (is_rf_wrapper
- ?
simd::count_zero_num((int8_t*)result_filter_data, rows)
- : 0);
-
- if (accept_null) {
- for (size_t i = 0; i < rows; ++i) {
- result_filter_data[i] &= (null_map_data[i]) ||
filter_data[i];
- }
- } else {
- for (size_t i = 0; i < rows; ++i) {
- result_filter_data[i] &= (!null_map_data[i]) &
filter_data[i];
- }
- }
-
- size_t output_rows =
- rows - (is_rf_wrapper
- ?
simd::count_zero_num((int8_t*)result_filter_data, rows)
- : 0);
-
- if (is_rf_wrapper) {
- ctx->root()->do_judge_selectivity(input_rows -
output_rows, input_rows);
- }
-
- if ((is_rf_wrapper && output_rows == 0) ||
- (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows)
== nullptr)) {
- *can_filter_all = true;
- return Status::OK();
- }
- }
- } else if (const auto* const_column =
check_and_get_column<ColumnConst>(*filter_column)) {
- // filter all
- if (!const_column->get_bool(0)) {
- *can_filter_all = true;
- memset(result_filter_data, 0, result_filter->size());
- return Status::OK();
- }
- } else {
- const IColumn::Filter& filter =
- assert_cast<const ColumnUInt8&>(*filter_column).get_data();
- const auto* __restrict filter_data = filter.data();
-
- size_t input_rows =
- rows -
- (is_rf_wrapper ?
simd::count_zero_num((int8_t*)result_filter_data, rows) : 0);
-
- for (size_t i = 0; i < rows; ++i) {
- result_filter_data[i] &= filter_data[i];
- }
-
- size_t output_rows =
- rows -
- (is_rf_wrapper ?
simd::count_zero_num((int8_t*)result_filter_data, rows) : 0);
-
- if (is_rf_wrapper) {
- ctx->root()->do_judge_selectivity(input_rows - output_rows,
input_rows);
- }
-
- if ((is_rf_wrapper && output_rows == 0) ||
- (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) ==
nullptr)) {
- *can_filter_all = true;
- return Status::OK();
- }
+ RETURN_IF_ERROR(
+ ctx->execute_filter(block, result_filter_data, rows,
accept_null, can_filter_all));
+ if (*can_filter_all) {
+ return Status::OK();
}
}
if (filters != nullptr) {
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 3179526ec54..349f199af23 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -33,6 +33,7 @@
#include "olap/rowset/segment_v2/inverted_index_reader.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
+#include "runtime_filter/runtime_filter_selectivity.h"
#include "udf/udf.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
@@ -210,6 +211,9 @@ public:
bool all_expr_inverted_index_evaluated();
+ Status execute_filter(const Block* block, uint8_t* __restrict
result_filter_data, size_t rows,
+ bool accept_null, bool* can_filter_all);
+
[[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block*
block);
[[nodiscard]] static Status filter_block(const VExprContextSPtrs&
expr_contexts, Block* block,
@@ -246,6 +250,8 @@ public:
return _last_result_column_id;
}
+ RuntimeFilterSelectivity& get_runtime_filter_selectivity() { return
*_rf_selectivity; }
+
FunctionContext::FunctionStateScope get_function_state_scope() const {
return _is_clone ? FunctionContext::THREAD_LOCAL :
FunctionContext::FRAGMENT_LOCAL;
}
@@ -337,5 +343,8 @@ private:
segment_v2::AnnRangeSearchRuntime _ann_range_search_runtime;
bool _suitable_for_ann_index = true;
+
+ std::unique_ptr<RuntimeFilterSelectivity> _rf_selectivity =
+ std::make_unique<RuntimeFilterSelectivity>();
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index 8e915ffff67..b24df4860da 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -62,9 +62,7 @@ VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode&
node, VExprSPtr im
_impl(std::move(impl)),
_ignore_thredhold(ignore_thredhold),
_null_aware(null_aware),
- _filter_id(filter_id) {
- reset_judge_selectivity();
-}
+ _filter_id(filter_id) {}
Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const
RowDescriptor& desc,
VExprContext* context) {
@@ -89,38 +87,105 @@ void VRuntimeFilterWrapper::close(VExprContext* context,
Status VRuntimeFilterWrapper::execute_column(VExprContext* context, const
Block* block,
ColumnPtr& result_column) const {
- DCHECK(_open_finished || _getting_const_col);
- if (_judge_counter.fetch_sub(1) == 0) {
- reset_judge_selectivity();
+ return Status::InternalError("Not implement
VRuntimeFilterWrapper::execute_column");
+}
+
+const std::string& VRuntimeFilterWrapper::expr_name() const {
+ return _expr_name;
+}
+
+Status VRuntimeFilterWrapper::execute_filter(VExprContext* context, const
Block* block,
+ uint8_t* __restrict
result_filter_data, size_t rows,
+ bool accept_null, bool*
can_filter_all) const {
+ DCHECK(_open_finished);
+ if (accept_null) {
+ return Status::InternalError(
+ "Runtime filter does not support accept_null in
execute_filter");
}
- if (_always_true) {
- size_t size = block->rows();
- result_column = create_always_true_column(size,
_data_type->is_nullable());
- COUNTER_UPDATE(_always_true_filter_rows, size);
+
+ auto& rf_selectivity = context->get_runtime_filter_selectivity();
+ Defer auto_update_judge_counter = [&]() {
rf_selectivity.update_judge_counter(); };
+
+ // if always true, skip evaluate runtime filter
+ if (rf_selectivity.maybe_always_true_can_ignore()) {
+ COUNTER_UPDATE(_always_true_filter_rows, rows);
return Status::OK();
- } else {
- if (_getting_const_col) {
- _impl->set_getting_const_col(true);
+ }
+
+ ColumnPtr filter_column;
+ ColumnPtr arg_column = nullptr;
+ RETURN_IF_ERROR(_impl->execute_runtime_filter(context, block,
filter_column, &arg_column));
+
+ // bloom filter will handle null aware inside itself
+ if (_null_aware && TExprNodeType::BLOOM_PRED != node_type()) {
+ DCHECK(arg_column);
+ change_null_to_true(filter_column->assume_mutable(), arg_column);
+ }
+
+ if (const auto* const_column =
check_and_get_column<ColumnConst>(*filter_column)) {
+ // const(nullable) or const(bool)
+ if (!const_column->get_bool(0)) {
+ // filter all
+ COUNTER_UPDATE(_rf_filter_rows, rows);
+ COUNTER_UPDATE(_rf_input_rows, rows);
+ rf_selectivity.update_judge_selectivity(_filter_id, rows, rows,
_ignore_thredhold);
+ *can_filter_all = true;
+ memset(result_filter_data, 0, rows);
+ return Status::OK();
+ } else {
+ // filter none
+ COUNTER_UPDATE(_rf_input_rows, rows);
+ rf_selectivity.update_judge_selectivity(_filter_id, 0, rows,
_ignore_thredhold);
+ return Status::OK();
}
+ } else if (const auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
+ // nullable(bool)
+ const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
+ const IColumn::Filter& filter = assert_cast<const
ColumnUInt8&>(*nested_column).get_data();
+ const auto* __restrict filter_data = filter.data();
+ const auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
+
+ const size_t input_rows = rows -
simd::count_zero_num((int8_t*)result_filter_data, rows);
- ColumnPtr arg_column = nullptr;
- RETURN_IF_ERROR(_impl->execute_runtime_filter(context, block,
result_column, &arg_column));
- if (_getting_const_col) {
- _impl->set_getting_const_col(false);
+ for (size_t i = 0; i < rows; ++i) {
+ result_filter_data[i] &= (!null_map_data[i]) & filter_data[i];
}
- // bloom filter will handle null aware inside itself
- if (_null_aware && TExprNodeType::BLOOM_PRED != node_type()) {
- DCHECK(arg_column);
- change_null_to_true(result_column->assume_mutable(), arg_column);
+ const size_t output_rows = rows -
simd::count_zero_num((int8_t*)result_filter_data, rows);
+
+ COUNTER_UPDATE(_rf_filter_rows, input_rows - output_rows);
+ COUNTER_UPDATE(_rf_input_rows, input_rows);
+ rf_selectivity.update_judge_selectivity(_filter_id, input_rows -
output_rows, input_rows,
+ _ignore_thredhold);
+
+ if (output_rows == 0) {
+ *can_filter_all = true;
+ return Status::OK();
}
+ } else {
+ // bool
+ const IColumn::Filter& filter = assert_cast<const
ColumnUInt8&>(*filter_column).get_data();
+ const auto* __restrict filter_data = filter.data();
- return Status::OK();
- }
-}
+ const size_t input_rows = rows -
simd::count_zero_num((int8_t*)result_filter_data, rows);
-const std::string& VRuntimeFilterWrapper::expr_name() const {
- return _expr_name;
+ for (size_t i = 0; i < rows; ++i) {
+ result_filter_data[i] &= filter_data[i];
+ }
+
+ const size_t output_rows = rows -
simd::count_zero_num((int8_t*)result_filter_data, rows);
+
+ COUNTER_UPDATE(_rf_filter_rows, input_rows - output_rows);
+ COUNTER_UPDATE(_rf_input_rows, input_rows);
+ rf_selectivity.update_judge_selectivity(_filter_id, input_rows -
output_rows, input_rows,
+ _ignore_thredhold);
+
+ if (output_rows == 0) {
+ *can_filter_all = true;
+ return Status::OK();
+ }
+ }
+ return Status::OK();
}
#include "common/compile_check_end.h"
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 3535898915b..09bc8a815c7 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -63,6 +63,10 @@ public:
const std::string& expr_name() const override;
const VExprSPtrs& children() const override { return _impl->children(); }
+ Status execute_filter(VExprContext* context, const Block* block,
+ uint8_t* __restrict result_filter_data, size_t rows,
bool accept_null,
+ bool* can_filter_all) const override;
+
uint64_t get_digest(uint64_t seed) const override {
seed = _impl->get_digest(seed);
if (seed) {
@@ -91,33 +95,10 @@ public:
}
}
- void update_counters(int64_t filter_rows, int64_t input_rows) {
- COUNTER_UPDATE(_rf_filter_rows, filter_rows);
- COUNTER_UPDATE(_rf_input_rows, input_rows);
- }
-
- template <typename T>
- static void judge_selectivity(double ignore_threshold, int64_t
filter_rows, int64_t input_rows,
- T& always_true) {
- always_true = static_cast<double>(filter_rows) /
static_cast<double>(input_rows) <
- ignore_threshold;
- }
-
bool is_rf_wrapper() const override { return true; }
int filter_id() const { return _filter_id; }
- void do_judge_selectivity(uint64_t filter_rows, uint64_t input_rows)
override {
- update_counters(filter_rows, input_rows);
-
- if (!_always_true) {
- _judge_filter_rows += filter_rows;
- _judge_input_rows += input_rows;
- judge_selectivity(_ignore_thredhold, _judge_filter_rows,
_judge_input_rows,
- _always_true);
- }
- }
-
std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter()
const {
return _rf_filter_rows;
}
@@ -129,26 +110,7 @@ public:
}
private:
- void reset_judge_selectivity() const {
- _always_true = false;
- _judge_counter = config::runtime_filter_sampling_frequency;
- _judge_input_rows = 0;
- _judge_filter_rows = 0;
- }
-
VExprSPtr _impl;
- // VRuntimeFilterWrapper and ColumnPredicate share the same logic,
- // but it's challenging to unify them, so the code is duplicated.
- // _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true
- // are variables used to implement the _always_true logic, calculated
periodically
- // based on runtime_filter_sampling_frequency. During each period, if
_always_true
- // is evaluated as true, the logic for always_true is applied for the rest
of that period
- // without recalculating. At the beginning of the next period,
- // reset_judge_selectivity is used to reset these variables.
- mutable std::atomic_int _judge_counter = 0;
- mutable std::atomic_uint64_t _judge_input_rows = 0;
- mutable std::atomic_uint64_t _judge_filter_rows = 0;
- mutable std::atomic_int _always_true = false;
std::shared_ptr<RuntimeProfile::Counter> _rf_input_rows =
std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
diff --git a/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
new file mode 100644
index 00000000000..b8504f950c2
--- /dev/null
+++ b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime_filter/runtime_filter_selectivity.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+namespace doris {
+
+class RuntimeFilterSelectivityTest : public testing::Test {
+protected:
+ void SetUp() override {
+ // Save original config value
+ _original_sampling_frequency =
config::runtime_filter_sampling_frequency;
+ }
+
+ void TearDown() override {
+ // Restore original config value
+ config::runtime_filter_sampling_frequency =
_original_sampling_frequency;
+ }
+
+ int _original_sampling_frequency;
+};
+
+TEST_F(RuntimeFilterSelectivityTest, basic_initialization) {
+ RuntimeFilterSelectivity selectivity;
+ // Initially should be false (not always_true)
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, disabled_sampling_frequency) {
+ RuntimeFilterSelectivity selectivity;
+ config::runtime_filter_sampling_frequency = 0;
+
+ // Even if conditions are met, should return false when sampling is
disabled
+ selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, negative_sampling_frequency) {
+ RuntimeFilterSelectivity selectivity;
+ config::runtime_filter_sampling_frequency = -1;
+
+ selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_below_threshold) {
+ bool always_true = false;
+ // filter_rows/input_rows = 5/50000 = 0.0001 < 0.1
+ // input_rows (50000) > min_judge_input_rows (40960)
+ RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 50000, always_true);
+ EXPECT_TRUE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_above_threshold) {
+ bool always_true = false;
+ // filter_rows/input_rows = 25000/50000 = 0.5 >= 0.1
+ RuntimeFilterSelectivity::judge_selectivity(0.1, 25000, 50000,
always_true);
+ EXPECT_FALSE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest,
judge_selectivity_insufficient_input_rows) {
+ bool always_true = false;
+ // Even though 5/100 = 0.05 < 0.1, input_rows (100) < min_judge_input_rows
(40960)
+ RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 100, always_true);
+ EXPECT_FALSE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest, update_with_low_selectivity) {
+ config::runtime_filter_sampling_frequency = 100;
+ RuntimeFilterSelectivity selectivity;
+
+ // filter_rows/input_rows = 2000/50000 = 0.04 < 0.1
+ selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, update_with_high_selectivity) {
+ config::runtime_filter_sampling_frequency = 100;
+ RuntimeFilterSelectivity selectivity;
+
+ // filter_rows/input_rows = 45000/50000 = 0.9 >= 0.1
+ selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1);
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, once_always_true_stays_true) {
+ config::runtime_filter_sampling_frequency = 100;
+ RuntimeFilterSelectivity selectivity;
+
+ // First update: low selectivity
+ selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+
+ // Second update: high selectivity, but should be ignored
+ selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1);
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_low) {
+ config::runtime_filter_sampling_frequency = 100;
+ RuntimeFilterSelectivity selectivity;
+
+ // First update: 1000/50000 = 0.02
+ selectivity.update_judge_selectivity(-1, 1000, 50000, 0.1);
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_high) {
+ config::runtime_filter_sampling_frequency = 100;
+ RuntimeFilterSelectivity selectivity;
+
+ // First update: 20000/50000 = 0.4
+ selectivity.update_judge_selectivity(-1, 20000, 50000, 0.1);
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+
+ // Second update: accumulated (20000+20000)/(50000+50000) = 0.4
+ selectivity.update_judge_selectivity(-1, 20000, 50000, 0.1);
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, counter_triggers_reset) {
+ config::runtime_filter_sampling_frequency = 3;
+ RuntimeFilterSelectivity selectivity;
+
+ // Mark as always_true
+ selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+
+ // Update counter to trigger reset
+ selectivity.update_judge_counter(); // counter = 1
+ selectivity.update_judge_counter(); // counter = 2
+ selectivity.update_judge_counter(); // counter = 3, triggers reset
+
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, reset_allows_reevaluation) {
+ config::runtime_filter_sampling_frequency = 2;
+ RuntimeFilterSelectivity selectivity;
+
+ // First cycle: mark as always_true
+ selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+
+ // Trigger reset
+ selectivity.update_judge_counter(); // counter = 1
+ selectivity.update_judge_counter(); // counter = 2, triggers reset
+
+ // Second cycle: now with high selectivity
+ selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1);
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, edge_case_zero_rows) {
+ bool always_true = false;
+ RuntimeFilterSelectivity::judge_selectivity(0.1, 0, 0, always_true);
+ EXPECT_FALSE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest, edge_case_exact_threshold) {
+ bool always_true = false;
+ // Exactly at threshold: 5000/50000 = 0.1, NOT less than 0.1
+ RuntimeFilterSelectivity::judge_selectivity(0.1, 5000, 50000, always_true);
+ EXPECT_FALSE(always_true);
+
+ // Just below threshold: 4999/50000 = 0.09998 < 0.1
+ RuntimeFilterSelectivity::judge_selectivity(0.1, 4999, 50000, always_true);
+ EXPECT_TRUE(always_true);
+}
+
+TEST_F(RuntimeFilterSelectivityTest, multiple_updates_before_threshold) {
+ config::runtime_filter_sampling_frequency = 100;
+ RuntimeFilterSelectivity selectivity;
+
+ // Multiple updates with insufficient rows each time
+ selectivity.update_judge_selectivity(-1, 100, 1000, 0.1); // 100/1000,
insufficient
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+
+ selectivity.update_judge_selectivity(-1, 200, 2000, 0.1); // 300/3000,
insufficient
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+
+ // Now accumulated rows are sufficient: 300+2000 = 2300, 3000+40000 = 43000
+ selectivity.update_judge_selectivity(-1, 2000, 40000, 0.1); // 2300/43000
= 0.053 < 0.1
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+}
+
+TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
+ config::runtime_filter_sampling_frequency = 100;
+
+ // Test with threshold 0.05
+ {
+ RuntimeFilterSelectivity selectivity;
+ selectivity.update_judge_selectivity(-1, 2000, 50000, 0.05); // 0.04 <
0.05
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
+ }
+
+ // Test with threshold 0.03
+ {
+ RuntimeFilterSelectivity selectivity;
+ selectivity.update_judge_selectivity(-1, 2000, 50000, 0.03); // 0.04
>= 0.03
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
+ }
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]