This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new caa02b47113 [refactor](be) Rename runtime filter expression wrapper
(#63489)
caa02b47113 is described below
commit caa02b47113ced112ef166ae885c7de152f213be
Author: Jerry Hu <[email protected]>
AuthorDate: Fri May 22 12:14:34 2026 +0800
[refactor](be) Rename runtime filter expression wrapper (#63489)
### What problem does this PR solve?
Issue Number: None
Related PR: None
Problem Summary: Rename the BE runtime filter expression wrapper from
`VRuntimeFilterWrapper` to `RuntimeFilterExpr`. Also rename the
implementation/header/test files to `runtime_filter_expr.*` and update
the associated pointer alias, usages, includes, comments, and tests to
use the new name consistently.
### Release note
None
### Check List (For Author)
- Test: Compile check / Format check
- `DORIS_HOME=/mnt/disk7/hushenggang/doris ninja -C be/ut_build_ASAN
src/exprs/CMakeFiles/Exprs.dir/runtime_filter_expr.cpp.o
src/exec/CMakeFiles/Exec.dir/runtime_filter/runtime_filter_consumer.cpp.o
src/exec/CMakeFiles/Exec.dir/runtime_filter/runtime_filter_consumer_helper.cpp.o
src/exec/CMakeFiles/Exec.dir/operator/scan_operator.cpp.o
src/format/CMakeFiles/Format.dir/parquet/vparquet_reader.cpp.o
src/format/CMakeFiles/Format.dir/orc/vorc_reader.cpp.o
test/CMakeFiles/doris_be_test.dir/exec/runtime_filter/runtime_filter_expr_sampling_test.cpp.o
test/CMakeFiles/doris_be_test.dir/exec/runtime_filter/runtime_filter_consumer_test.cpp.o`
- `build-support/clang-format.sh`
- `build-support/check-format.sh`
- `git diff --check`
- Behavior changed: No
- Does this need documentation: No
---
be/src/exec/operator/scan_operator.cpp | 4 +--
.../runtime_filter/runtime_filter_consumer.cpp | 24 +++++++--------
.../exec/runtime_filter/runtime_filter_consumer.h | 12 ++++----
.../runtime_filter_consumer_helper.cpp | 6 ++--
.../runtime_filter_consumer_helper.h | 4 +--
.../exec/runtime_filter/runtime_filter_wrapper.h | 4 +--
...efilter_wrapper.cpp => runtime_filter_expr.cpp} | 36 ++++++++++------------
...ntimefilter_wrapper.h => runtime_filter_expr.h} | 16 +++++-----
be/src/format/orc/vorc_reader.cpp | 6 ++--
be/src/format/parquet/vparquet_reader.cpp | 4 +--
be/src/storage/predicate/bloom_filter_predicate.h | 2 +-
be/src/storage/predicate/column_predicate.h | 4 +--
.../runtime_filter_consumer_test.cpp | 8 ++---
...t.cpp => runtime_filter_expr_sampling_test.cpp} | 30 +++++++++---------
be/test/exprs/hybrid_set_test.cpp | 2 +-
15 files changed, 80 insertions(+), 82 deletions(-)
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index 7f72b4be146..3821b89aca5 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -37,6 +37,7 @@
#include "exec/runtime_filter/runtime_filter_consumer_helper.h"
#include "exec/scan/scanner_context.h"
#include "exprs/function/in.h"
+#include "exprs/runtime_filter_expr.h"
#include "exprs/vcast_expr.h"
#include "exprs/vectorized_fn_call.h"
#include "exprs/vexpr.h"
@@ -44,7 +45,6 @@
#include "exprs/vexpr_fwd.h"
#include "exprs/vin_predicate.h"
#include "exprs/virtual_slot_ref.h"
-#include "exprs/vruntimefilter_wrapper.h"
#include "exprs/vslot_ref.h"
#include "exprs/vtopn_pred.h"
#include "runtime/descriptors.h"
@@ -386,7 +386,7 @@ Status
ScanLocalState<Derived>::_normalize_predicate(VExprContext* context, cons
{
Defer attach_defer = [&]() {
if (pdt != PushDownType::UNACCEPTABLE &&
root->is_rf_wrapper()) {
- auto* rf_expr =
assert_cast<VRuntimeFilterWrapper*>(root.get());
+ auto* rf_expr =
assert_cast<RuntimeFilterExpr*>(root.get());
_slot_id_to_predicates[slot->id()].back()->attach_profile_counter(
rf_expr->filter_id(),
rf_expr->predicate_filtered_rows_counter(),
diff --git a/be/src/exec/runtime_filter/runtime_filter_consumer.cpp
b/be/src/exec/runtime_filter/runtime_filter_consumer.cpp
index 11bf31f4bc3..d1a6cd7d357 100644
--- a/be/src/exec/runtime_filter/runtime_filter_consumer.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_consumer.cpp
@@ -25,7 +25,7 @@
#include "runtime/runtime_profile.h"
namespace doris {
-Status
RuntimeFilterConsumer::_apply_ready_expr(std::vector<VRuntimeFilterPtr>&
push_exprs) {
+Status
RuntimeFilterConsumer::_apply_ready_expr(std::vector<RuntimeFilterExprPtr>&
push_exprs) {
_check_state({State::READY});
_set_state(State::APPLIED);
@@ -43,7 +43,7 @@ Status
RuntimeFilterConsumer::_apply_ready_expr(std::vector<VRuntimeFilterPtr>&
return Status::OK();
}
-Status RuntimeFilterConsumer::acquire_expr(std::vector<VRuntimeFilterPtr>&
push_exprs) {
+Status RuntimeFilterConsumer::acquire_expr(std::vector<RuntimeFilterExprPtr>&
push_exprs) {
std::unique_lock<std::recursive_mutex> l(_rmtx);
if (_rf_state == State::READY) {
RETURN_IF_ERROR(_apply_ready_expr(push_exprs));
@@ -74,7 +74,7 @@ std::shared_ptr<RuntimeFilterTimer>
RuntimeFilterConsumer::create_filter_timer(
return timer;
}
-Status RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>&
container,
+Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<RuntimeFilterExprPtr>&
container,
const TExpr& probe_expr) {
// TODO: `VExprContextSPtr` is not need, we should just create an expr.
VExprContextSPtr probe_ctx;
@@ -84,7 +84,7 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
bool null_aware = _wrapper->contain_null();
// Determine sampling frequency for the always_true optimization.
- // This will be propagated to VExprContext in
VRuntimeFilterWrapper::open().
+ // This will be propagated to VExprContext in RuntimeFilterExpr::open().
int sampling_frequency = _wrapper->disable_always_true_logic()
?
RuntimeFilterSelectivity::DISABLE_SAMPLING
:
config::runtime_filter_sampling_frequency;
@@ -103,7 +103,7 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
node.__set_is_nullable(false);
auto in_pred = VDirectInPredicate::create_shared(node,
_wrapper->hybrid_set());
in_pred->add_child(probe_ctx->root());
- auto wrapper = VRuntimeFilterWrapper::create_shared(
+ auto wrapper = RuntimeFilterExpr::create_shared(
node, in_pred,
get_in_list_ignore_thredhold(_wrapper->hybrid_set()->size()),
null_aware, _wrapper->filter_id(), sampling_frequency);
container.push_back(wrapper);
@@ -123,7 +123,7 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
if (null_aware) {
return Status::InternalError("only min predicate do not support
null aware");
}
- container.push_back(VRuntimeFilterWrapper::create_shared(
+ container.push_back(RuntimeFilterExpr::create_shared(
min_pred_node, min_pred, get_comparison_ignore_thredhold(),
null_aware,
_wrapper->filter_id(), sampling_frequency));
break;
@@ -142,7 +142,7 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
if (null_aware) {
return Status::InternalError("only max predicate do not support
null aware");
}
- container.push_back(VRuntimeFilterWrapper::create_shared(
+ container.push_back(RuntimeFilterExpr::create_shared(
max_pred_node, max_pred, get_comparison_ignore_thredhold(),
null_aware,
_wrapper->filter_id(), sampling_frequency));
break;
@@ -158,7 +158,7 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
_wrapper->minmax_func()->get_max(),
max_literal));
max_pred->add_child(probe_ctx->root());
max_pred->add_child(max_literal);
- container.push_back(VRuntimeFilterWrapper::create_shared(
+ container.push_back(RuntimeFilterExpr::create_shared(
max_pred_node, max_pred, get_comparison_ignore_thredhold(),
null_aware,
_wrapper->filter_id(), sampling_frequency));
@@ -175,7 +175,7 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
_wrapper->minmax_func()->get_min(),
min_literal));
min_pred->add_child(new_probe_ctx->root());
min_pred->add_child(min_literal);
- container.push_back(VRuntimeFilterWrapper::create_shared(
+ container.push_back(RuntimeFilterExpr::create_shared(
min_pred_node, min_pred, get_comparison_ignore_thredhold(),
null_aware,
_wrapper->filter_id(), sampling_frequency));
break;
@@ -192,7 +192,7 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
auto bloom_pred = VBloomPredicate::create_shared(node);
bloom_pred->set_filter(_wrapper->bloom_filter_func());
bloom_pred->add_child(probe_ctx->root());
- auto wrapper = VRuntimeFilterWrapper::create_shared(
+ auto wrapper = RuntimeFilterExpr::create_shared(
node, bloom_pred, get_bloom_filter_ignore_thredhold(),
null_aware,
_wrapper->filter_id(), sampling_frequency);
container.push_back(wrapper);
@@ -213,8 +213,8 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<VRuntimeFilterPtr>& co
if (null_aware) {
return Status::InternalError("bitmap predicate do not support null
aware");
}
- auto wrapper = VRuntimeFilterWrapper::create_shared(
- node, bitmap_pred, 0, null_aware, _wrapper->filter_id(),
sampling_frequency);
+ auto wrapper = RuntimeFilterExpr::create_shared(node, bitmap_pred, 0,
null_aware,
+ _wrapper->filter_id(),
sampling_frequency);
container.push_back(wrapper);
break;
}
diff --git a/be/src/exec/runtime_filter/runtime_filter_consumer.h
b/be/src/exec/runtime_filter/runtime_filter_consumer.h
index c3fdbb37f17..a5f6af795cb 100644
--- a/be/src/exec/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/exec/runtime_filter/runtime_filter_consumer.h
@@ -29,7 +29,7 @@
namespace doris {
// Work on ScanNode or MultiCastDataStreamSource, RuntimeFilterConsumerHelper
will manage all RuntimeFilterConsumer
-// Used to create VRuntimeFilterWrapper to filter data
+// Used to create RuntimeFilterExpr to filter data
class RuntimeFilterConsumer : public RuntimeFilter {
public:
// NOT_READY (-> TIMEOUT) -> READY -> APPLIED
@@ -55,7 +55,7 @@ public:
std::shared_ptr<Dependency> dependencies);
// Called after `State` is ready (e.g. signaled)
- Status acquire_expr(std::vector<VRuntimeFilterPtr>& push_exprs);
+ Status acquire_expr(std::vector<RuntimeFilterExprPtr>& push_exprs);
std::string debug_string() override {
std::unique_lock<std::recursive_mutex> l(_rmtx);
@@ -100,9 +100,9 @@ private:
DorisMetrics::instance()->runtime_filter_consumer_num->increment(1);
}
- Status _apply_ready_expr(std::vector<VRuntimeFilterPtr>& push_exprs);
+ Status _apply_ready_expr(std::vector<RuntimeFilterExprPtr>& push_exprs);
- Status _get_push_exprs(std::vector<VRuntimeFilterPtr>& container, const
TExpr& probe_expr);
+ Status _get_push_exprs(std::vector<RuntimeFilterExprPtr>& container, const
TExpr& probe_expr);
void _check_state(std::vector<State> assumed_states) {
if (!check_state_impl<RuntimeFilterConsumer>(_rf_state,
assumed_states)) {
throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -139,9 +139,9 @@ private:
std::make_shared<RuntimeProfile::Counter>(TUnit::TIME_NS, 0);
//_rf_filter is used to record the number of rows filtered by the runtime
filter.
//It aggregates the filtering statistics from both the Storage and
Execution.
- // Counter will be shared by RuntimeFilterConsumer & VRuntimeFilterWrapper
+ // Counter will be shared by RuntimeFilterConsumer & RuntimeFilterExpr
// OperatorLocalState's close method will collect the statistics from
RuntimeFilterConsumer
- // VRuntimeFilterWrapper will update the statistics.
+ // RuntimeFilterExpr will update the statistics.
std::shared_ptr<RuntimeProfile::Counter> _rf_filter =
std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1);
std::shared_ptr<RuntimeProfile::Counter> _rf_input =
diff --git a/be/src/exec/runtime_filter/runtime_filter_consumer_helper.cpp
b/be/src/exec/runtime_filter/runtime_filter_consumer_helper.cpp
index bf68b9d4054..202ba27c353 100644
--- a/be/src/exec/runtime_filter/runtime_filter_consumer_helper.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_consumer_helper.cpp
@@ -67,7 +67,7 @@ Status
RuntimeFilterConsumerHelper::acquire_runtime_filter(RuntimeState* state,
VExprContextSPtrs&
conjuncts,
const
RowDescriptor& row_descriptor) {
SCOPED_TIMER(_acquire_runtime_filter_timer.get());
- std::vector<VRuntimeFilterPtr> vexprs;
+ std::vector<RuntimeFilterExprPtr> vexprs;
for (const auto& consumer : _consumers) {
RETURN_IF_ERROR(consumer->acquire_expr(vexprs));
if (!consumer->is_applied()) {
@@ -79,7 +79,7 @@ Status
RuntimeFilterConsumerHelper::acquire_runtime_filter(RuntimeState* state,
}
Status RuntimeFilterConsumerHelper::_append_rf_into_conjuncts(
- RuntimeState* state, const std::vector<VRuntimeFilterPtr>& vexprs,
+ RuntimeState* state, const std::vector<RuntimeFilterExprPtr>& vexprs,
VExprContextSPtrs& conjuncts, const RowDescriptor& row_descriptor) {
if (vexprs.empty()) {
return Status::OK();
@@ -112,7 +112,7 @@ Status
RuntimeFilterConsumerHelper::try_append_late_arrival_runtime_filter(
}
// 1. Check if are runtime filter ready but not applied.
- std::vector<VRuntimeFilterPtr> exprs;
+ std::vector<RuntimeFilterExprPtr> exprs;
int current_arrived_rf_num = 0;
for (const auto& consumer : _consumers) {
RETURN_IF_ERROR(consumer->acquire_expr(exprs));
diff --git a/be/src/exec/runtime_filter/runtime_filter_consumer_helper.h
b/be/src/exec/runtime_filter/runtime_filter_consumer_helper.h
index 71d0027d3a4..f08636cc074 100644
--- a/be/src/exec/runtime_filter/runtime_filter_consumer_helper.h
+++ b/be/src/exec/runtime_filter/runtime_filter_consumer_helper.h
@@ -20,7 +20,7 @@
#include <mutex>
#include "exec/pipeline/dependency.h"
-#include "exprs/vruntimefilter_wrapper.h"
+#include "exprs/runtime_filter_expr.h"
#include "runtime/runtime_profile.h"
namespace doris {
@@ -56,7 +56,7 @@ public:
private:
// Append late-arrival runtime filters to the vconjunct_ctx.
Status _append_rf_into_conjuncts(RuntimeState* state,
- const std::vector<VRuntimeFilterPtr>&
vexprs,
+ const std::vector<RuntimeFilterExprPtr>&
vexprs,
VExprContextSPtrs& conjuncts,
const RowDescriptor& row_descriptor);
diff --git a/be/src/exec/runtime_filter/runtime_filter_wrapper.h
b/be/src/exec/runtime_filter/runtime_filter_wrapper.h
index 723bebeb807..1bacfa1b417 100644
--- a/be/src/exec/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/exec/runtime_filter/runtime_filter_wrapper.h
@@ -27,8 +27,8 @@
namespace doris {
class BloomFilterFuncBase;
-class VRuntimeFilterWrapper;
-using VRuntimeFilterPtr = std::shared_ptr<VRuntimeFilterWrapper>;
+class RuntimeFilterExpr;
+using RuntimeFilterExprPtr = std::shared_ptr<RuntimeFilterExpr>;
// This class is a wrapper of runtime predicate function
class RuntimeFilterWrapper {
diff --git a/be/src/exprs/vruntimefilter_wrapper.cpp
b/be/src/exprs/runtime_filter_expr.cpp
similarity index 79%
rename from be/src/exprs/vruntimefilter_wrapper.cpp
rename to be/src/exprs/runtime_filter_expr.cpp
index baa6b5a4761..9a58421553c 100644
--- a/be/src/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/exprs/runtime_filter_expr.cpp
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include "exprs/vruntimefilter_wrapper.h"
+#include "exprs/runtime_filter_expr.h"
#include <fmt/format.h>
@@ -54,9 +54,8 @@ namespace doris {
class VExprContext;
-VRuntimeFilterWrapper::VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr
impl,
- double ignore_thredhold, bool
null_aware,
- int filter_id, int
sampling_frequency)
+RuntimeFilterExpr::RuntimeFilterExpr(const TExprNode& node, VExprSPtr impl,
double ignore_thredhold,
+ bool null_aware, int filter_id, int
sampling_frequency)
: VExpr(node),
_impl(std::move(impl)),
_ignore_thredhold(ignore_thredhold),
@@ -64,16 +63,16 @@ VRuntimeFilterWrapper::VRuntimeFilterWrapper(const
TExprNode& node, VExprSPtr im
_filter_id(filter_id),
_sampling_frequency(sampling_frequency) {}
-Status VRuntimeFilterWrapper::prepare(RuntimeState* state, const
RowDescriptor& desc,
- VExprContext* context) {
+Status RuntimeFilterExpr::prepare(RuntimeState* state, const RowDescriptor&
desc,
+ VExprContext* context) {
RETURN_IF_ERROR_OR_PREPARED(_impl->prepare(state, desc, context));
- _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
+ _expr_name = fmt::format("RuntimeFilterExpr({})", _impl->expr_name());
_prepare_finished = true;
return Status::OK();
}
-Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
- FunctionContext::FunctionStateScope scope) {
+Status RuntimeFilterExpr::open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
DCHECK(_prepare_finished);
RETURN_IF_ERROR(_impl->open(state, context, scope));
context->get_runtime_filter_selectivity().set_sampling_frequency(_sampling_frequency);
@@ -81,24 +80,23 @@ Status VRuntimeFilterWrapper::open(RuntimeState* state,
VExprContext* context,
return Status::OK();
}
-void VRuntimeFilterWrapper::close(VExprContext* context,
- FunctionContext::FunctionStateScope scope) {
+void RuntimeFilterExpr::close(VExprContext* context,
FunctionContext::FunctionStateScope scope) {
_impl->close(context, scope);
}
-Status VRuntimeFilterWrapper::execute_column_impl(VExprContext* context, const
Block* block,
- const Selector* selector,
size_t count,
- ColumnPtr& result_column)
const {
- return Status::InternalError("Not implement
VRuntimeFilterWrapper::execute_column_impl");
+Status RuntimeFilterExpr::execute_column_impl(VExprContext* context, const
Block* block,
+ const Selector* selector, size_t
count,
+ ColumnPtr& result_column) const {
+ return Status::InternalError("Not implement
RuntimeFilterExpr::execute_column_impl");
}
-const std::string& VRuntimeFilterWrapper::expr_name() const {
+const std::string& RuntimeFilterExpr::expr_name() const {
return _expr_name;
}
-Status VRuntimeFilterWrapper::execute_filter(VExprContext* context, const
Block* block,
- uint8_t* __restrict
result_filter_data, size_t rows,
- bool accept_null, bool*
can_filter_all) const {
+Status RuntimeFilterExpr::execute_filter(VExprContext* context, const Block*
block,
+ uint8_t* __restrict
result_filter_data, size_t rows,
+ bool accept_null, bool*
can_filter_all) const {
DCHECK(_open_finished);
if (accept_null) {
return Status::InternalError(
diff --git a/be/src/exprs/vruntimefilter_wrapper.h
b/be/src/exprs/runtime_filter_expr.h
similarity index 90%
rename from be/src/exprs/vruntimefilter_wrapper.h
rename to be/src/exprs/runtime_filter_expr.h
index ac96fccf361..ceb4324d9d5 100644
--- a/be/src/exprs/vruntimefilter_wrapper.h
+++ b/be/src/exprs/runtime_filter_expr.h
@@ -46,14 +46,14 @@ namespace doris {
class Block;
class VExprContext;
-class VRuntimeFilterWrapper final : public VExpr {
- ENABLE_FACTORY_CREATOR(VRuntimeFilterWrapper);
+class RuntimeFilterExpr final : public VExpr {
+ ENABLE_FACTORY_CREATOR(RuntimeFilterExpr);
public:
- VRuntimeFilterWrapper(const TExprNode& node, VExprSPtr impl, double
ignore_thredhold,
- bool null_aware, int filter_id,
- int sampling_frequency =
RuntimeFilterSelectivity::DISABLE_SAMPLING);
- ~VRuntimeFilterWrapper() override = default;
+ RuntimeFilterExpr(const TExprNode& node, VExprSPtr impl, double
ignore_thredhold,
+ bool null_aware, int filter_id,
+ int sampling_frequency =
RuntimeFilterSelectivity::DISABLE_SAMPLING);
+ ~RuntimeFilterExpr() override = default;
Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
size_t count, ColumnPtr& result_column) const
override;
Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override;
@@ -130,6 +130,6 @@ private:
int _sampling_frequency;
};
-using VRuntimeFilterPtr = std::shared_ptr<VRuntimeFilterWrapper>;
+using RuntimeFilterExprPtr = std::shared_ptr<RuntimeFilterExpr>;
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/format/orc/vorc_reader.cpp
b/be/src/format/orc/vorc_reader.cpp
index bcb1a8d70f4..06ffe6302da 100644
--- a/be/src/format/orc/vorc_reader.cpp
+++ b/be/src/format/orc/vorc_reader.cpp
@@ -29,9 +29,9 @@
#include <limits>
#include <list>
+#include "exprs/runtime_filter_expr.h"
#include "exprs/vdirect_in_predicate.h"
#include "exprs/vexpr.h"
-#include "exprs/vruntimefilter_wrapper.h"
#include "exprs/vslot_ref.h"
#include "exprs/vtopn_pred.h"
@@ -75,13 +75,13 @@
#include "exec/scan/file_scanner.h"
#include "exprs/create_predicate_function.h"
#include "exprs/hybrid_set.h"
+#include "exprs/runtime_filter_expr.h"
#include "exprs/vbloom_predicate.h"
#include "exprs/vdirect_in_predicate.h"
#include "exprs/vectorized_fn_call.h"
#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
#include "exprs/vin_predicate.h"
-#include "exprs/vruntimefilter_wrapper.h"
#include "format/orc/orc_file_reader.h"
#include "format/table/iceberg_reader.h"
#include "format/table/transactional_hive_common.h"
@@ -1228,7 +1228,7 @@ void OrcReader::_collect_predicate_columns_from_conjuncts(
auto expr = conjunct->root();
if (expr->is_rf_wrapper()) {
- auto* runtime_filter =
static_cast<VRuntimeFilterWrapper*>(expr.get());
+ auto* runtime_filter = static_cast<RuntimeFilterExpr*>(expr.get());
auto filter_impl = runtime_filter->get_impl();
visit_slot(filter_impl.get());
diff --git a/be/src/format/parquet/vparquet_reader.cpp
b/be/src/format/parquet/vparquet_reader.cpp
index a2f2356085b..2565b254338 100644
--- a/be/src/format/parquet/vparquet_reader.cpp
+++ b/be/src/format/parquet/vparquet_reader.cpp
@@ -35,12 +35,12 @@
#include "core/typeid_cast.h"
#include "core/types.h"
#include "exec/scan/file_scanner.h"
+#include "exprs/runtime_filter_expr.h"
#include "exprs/vbloom_predicate.h"
#include "exprs/vdirect_in_predicate.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "exprs/vin_predicate.h"
-#include "exprs/vruntimefilter_wrapper.h"
#include "exprs/vslot_ref.h"
#include "exprs/vtopn_pred.h"
#include "format/column_type_convert.h"
@@ -599,7 +599,7 @@ void
ParquetReader::_collect_predicate_columns_from_conjuncts(
for (const auto& conjunct : _lazy_read_ctx.conjuncts) {
auto expr = conjunct->root();
if (expr->is_rf_wrapper()) {
- VRuntimeFilterWrapper* runtime_filter =
assert_cast<VRuntimeFilterWrapper*>(expr.get());
+ RuntimeFilterExpr* runtime_filter =
assert_cast<RuntimeFilterExpr*>(expr.get());
auto filter_impl = runtime_filter->get_impl();
visit_slot(filter_impl.get());
} else {
diff --git a/be/src/storage/predicate/bloom_filter_predicate.h
b/be/src/storage/predicate/bloom_filter_predicate.h
index 2878175644c..3b7479841fb 100644
--- a/be/src/storage/predicate/bloom_filter_predicate.h
+++ b/be/src/storage/predicate/bloom_filter_predicate.h
@@ -24,7 +24,7 @@
#include "core/column/predicate_column.h"
#include "core/data_type/primitive_type.h"
#include "exprs/bloom_filter_func.h"
-#include "exprs/vruntimefilter_wrapper.h"
+#include "exprs/runtime_filter_expr.h"
#include "storage/predicate/column_predicate.h"
namespace doris {
diff --git a/be/src/storage/predicate/column_predicate.h
b/be/src/storage/predicate/column_predicate.h
index 6ac2c45e93f..9e8d9e1f921 100644
--- a/be/src/storage/predicate/column_predicate.h
+++ b/be/src/storage/predicate/column_predicate.h
@@ -24,7 +24,7 @@
#include "core/column/column.h"
#include "core/data_type/define_primitive_type.h"
#include "exec/runtime_filter/runtime_filter_selectivity.h"
-#include "exprs/vruntimefilter_wrapper.h"
+#include "exprs/runtime_filter_expr.h"
#include "format/parquet/parquet_predicate.h"
#include "runtime/runtime_profile.h"
#include "storage/index/bloom_filter/bloom_filter.h"
@@ -416,7 +416,7 @@ protected:
// TODO: the value is only in delete condition, better be template value
bool _opposite;
int _runtime_filter_id = -1;
- // VRuntimeFilterWrapper and ColumnPredicate share the same logic,
+ // RuntimeFilterExpr and ColumnPredicate share the same logic,
// but it's challenging to unify them, so the code is duplicated.
// _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true
// are variables used to implement the _always_true logic, calculated
periodically
diff --git a/be/test/exec/runtime_filter/runtime_filter_consumer_test.cpp
b/be/test/exec/runtime_filter/runtime_filter_consumer_test.cpp
index a431f2adc00..aeb638da0e2 100644
--- a/be/test/exec/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/exec/runtime_filter/runtime_filter_consumer_test.cpp
@@ -48,7 +48,7 @@ public:
ASSERT_EQ(e.code(), ErrorCode::INTERNAL_ERROR);
}
- std::vector<VRuntimeFilterPtr> push_exprs;
+ std::vector<RuntimeFilterExprPtr> push_exprs;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs));
ASSERT_NE(push_exprs.size(), 0);
ASSERT_TRUE(consumer->is_applied());
@@ -123,7 +123,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
- std::vector<VRuntimeFilterPtr> push_exprs;
+ std::vector<RuntimeFilterExprPtr> push_exprs;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs));
ASSERT_EQ(push_exprs.size(), 0);
ASSERT_FALSE(consumer->is_applied());
@@ -159,7 +159,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_disabled) {
RuntimeFilterProducer::create(_query_ctx.get(), &desc, &producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::DISABLED);
- std::vector<VRuntimeFilterPtr> push_exprs;
+ std::vector<RuntimeFilterExprPtr> push_exprs;
consumer->signal(producer.get());
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(consumer->acquire_expr(push_exprs));
ASSERT_EQ(push_exprs.size(), 0);
@@ -224,7 +224,7 @@ TEST_F(RuntimeFilterConsumerTest,
aquire_signal_at_same_time) {
RuntimeFilterProducer::create(_query_ctx.get(), &desc,
&producer));
producer->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);
- std::vector<VRuntimeFilterPtr> push_exprs;
+ std::vector<RuntimeFilterExprPtr> push_exprs;
std::thread thread1(
[&]() { [[maybe_unused]] auto res =
consumer->acquire_expr(push_exprs); });
std::thread thread2([&]() { consumer->signal(producer.get()); });
diff --git
a/be/test/exec/runtime_filter/vruntimefilter_wrapper_sampling_test.cpp
b/be/test/exec/runtime_filter/runtime_filter_expr_sampling_test.cpp
similarity index 87%
rename from be/test/exec/runtime_filter/vruntimefilter_wrapper_sampling_test.cpp
rename to be/test/exec/runtime_filter/runtime_filter_expr_sampling_test.cpp
index 4cb4f4a999f..403ef8713e4 100644
--- a/be/test/exec/runtime_filter/vruntimefilter_wrapper_sampling_test.cpp
+++ b/be/test/exec/runtime_filter/runtime_filter_expr_sampling_test.cpp
@@ -20,12 +20,12 @@
#include "exec/runtime_filter/runtime_filter_selectivity.h"
#include "exec/runtime_filter/runtime_filter_test_utils.h"
+#include "exprs/runtime_filter_expr.h"
#include "exprs/vexpr_context.h"
-#include "exprs/vruntimefilter_wrapper.h"
namespace doris {
-// Minimal VExpr implementation for testing VRuntimeFilterWrapper in isolation.
+// Minimal VExpr implementation for testing RuntimeFilterExpr in isolation.
class StubVExpr : public VExpr {
public:
StubVExpr() : VExpr(make_texpr_node()) {}
@@ -61,13 +61,13 @@ private:
}
};
-class VRuntimeFilterWrapperSamplingTest : public RuntimeFilterTest {};
+class RuntimeFilterExprSamplingTest : public RuntimeFilterTest {};
-// Test that VRuntimeFilterWrapper stores and propagates sampling_frequency
+// Test that RuntimeFilterExpr stores and propagates sampling_frequency
// through open() to VExprContext. This is the core fix for the bug where
// sampling_frequency was lost when _append_rf_into_conjuncts creates a new
// VExprContext via VExprContext::create_shared(expr).
-TEST_F(VRuntimeFilterWrapperSamplingTest, open_propagates_sampling_frequency) {
+TEST_F(RuntimeFilterExprSamplingTest, open_propagates_sampling_frequency) {
auto stub = std::make_shared<StubVExpr>();
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
@@ -80,8 +80,8 @@ TEST_F(VRuntimeFilterWrapperSamplingTest,
open_propagates_sampling_frequency) {
.build();
const int expected_frequency = 32;
- auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4,
false, /*filter_id=*/1,
- expected_frequency);
+ auto wrapper = RuntimeFilterExpr::create_shared(node, stub, 0.4, false,
/*filter_id=*/1,
+ expected_frequency);
// Simulate the VExprContext recreation that happens in
_append_rf_into_conjuncts.
// A fresh VExprContext has default sampling_frequency = DISABLE_SAMPLING
(-1).
@@ -94,7 +94,7 @@ TEST_F(VRuntimeFilterWrapperSamplingTest,
open_propagates_sampling_frequency) {
wrapper->open(_runtime_states[0].get(), context.get(),
FunctionContext::FRAGMENT_LOCAL)
.ok());
- // After open(), sampling_frequency should be propagated from
VRuntimeFilterWrapper
+ // After open(), sampling_frequency should be propagated from
RuntimeFilterExpr
// to VExprContext. Verify by accumulating low-selectivity data and
checking
// that always_true can now be detected.
auto& selectivity = context->get_runtime_filter_selectivity();
@@ -104,7 +104,7 @@ TEST_F(VRuntimeFilterWrapperSamplingTest,
open_propagates_sampling_frequency) {
// Test that default sampling_frequency (DISABLE_SAMPLING) disables the
always_true
// optimization, matching the behavior when disable_always_true_logic is set.
-TEST_F(VRuntimeFilterWrapperSamplingTest,
default_sampling_frequency_disables_optimization) {
+TEST_F(RuntimeFilterExprSamplingTest,
default_sampling_frequency_disables_optimization) {
auto stub = std::make_shared<StubVExpr>();
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
@@ -117,7 +117,7 @@ TEST_F(VRuntimeFilterWrapperSamplingTest,
default_sampling_frequency_disables_op
.build();
// No sampling_frequency argument - uses default DISABLE_SAMPLING
- auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4,
false, /*filter_id=*/1);
+ auto wrapper = RuntimeFilterExpr::create_shared(node, stub, 0.4, false,
/*filter_id=*/1);
auto context = std::make_shared<VExprContext>(wrapper);
RowDescriptor row_desc;
@@ -135,7 +135,7 @@ TEST_F(VRuntimeFilterWrapperSamplingTest,
default_sampling_frequency_disables_op
// Test that sampling_frequency survives VExprContext recreation, which is the
// exact scenario that caused the original bug.
-TEST_F(VRuntimeFilterWrapperSamplingTest,
sampling_frequency_survives_context_recreation) {
+TEST_F(RuntimeFilterExprSamplingTest,
sampling_frequency_survives_context_recreation) {
auto stub = std::make_shared<StubVExpr>();
auto node = TExprNodeBuilder(TExprNodeType::SLOT_REF,
TTypeDescBuilder()
@@ -148,8 +148,8 @@ TEST_F(VRuntimeFilterWrapperSamplingTest,
sampling_frequency_survives_context_re
.build();
const int expected_frequency = 32;
- auto wrapper = VRuntimeFilterWrapper::create_shared(node, stub, 0.4,
false, /*filter_id=*/1,
- expected_frequency);
+ auto wrapper = RuntimeFilterExpr::create_shared(node, stub, 0.4, false,
/*filter_id=*/1,
+ expected_frequency);
// First context - prepare and open work
auto context1 = std::make_shared<VExprContext>(wrapper);
@@ -159,7 +159,7 @@ TEST_F(VRuntimeFilterWrapperSamplingTest,
sampling_frequency_survives_context_re
wrapper->open(_runtime_states[0].get(), context1.get(),
FunctionContext::FRAGMENT_LOCAL)
.ok());
- // Create a brand new non-clone VExprContext with the same
VRuntimeFilterWrapper,
+ // Create a brand new non-clone VExprContext with the same
RuntimeFilterExpr,
// matching the production path in _append_rf_into_conjuncts which calls
// VExprContext::create_shared(expr) then conjunct->prepare() and
conjunct->open().
auto context2 = std::make_shared<VExprContext>(wrapper);
@@ -170,7 +170,7 @@ TEST_F(VRuntimeFilterWrapperSamplingTest,
sampling_frequency_survives_context_re
ASSERT_TRUE(context2->prepare(_runtime_states[0].get(), row_desc).ok());
ASSERT_TRUE(context2->open(_runtime_states[0].get()).ok());
- // After open(), sampling_frequency should be propagated from
VRuntimeFilterWrapper
+ // After open(), sampling_frequency should be propagated from
RuntimeFilterExpr
// to context2. Verify by accumulating low-selectivity data and checking
that
// always_true can be detected — this is the actual behavior the fix
protects.
auto& selectivity = context2->get_runtime_filter_selectivity();
diff --git a/be/test/exprs/hybrid_set_test.cpp
b/be/test/exprs/hybrid_set_test.cpp
index eafe6f38fe5..aed2103d66f 100644
--- a/be/test/exprs/hybrid_set_test.cpp
+++ b/be/test/exprs/hybrid_set_test.cpp
@@ -551,7 +551,7 @@ TEST_F(HybridSetTest, FindBatch) {
ASSERT_EQ(result_column->get_data()[6], 0);
ASSERT_EQ(result_column->get_data()[7], 0);
- // Only bloom fitler need to handle
nullaware(VRuntimeFilterWrapper::execute),
+ // Only bloom fitler need to handle nullaware(RuntimeFilterExpr::execute),
// So HybridSet will return false when find null value.
string_set2->find_batch_nullable(*string_column, string_column->size(),
nullmap_column->get_data(),
result_column->get_data());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]