This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7385602b190 [bug](rf) fix only min/max rf return error when has remote
target (#25588)
7385602b190 is described below
commit 7385602b190bde9c6d1e3ef98043b0217e19cc53
Author: zhangstar333 <[email protected]>
AuthorDate: Thu Oct 19 19:26:29 2023 +0800
[bug](rf) fix only min/max rf return error when has remote target (#25588)
---
be/src/exprs/runtime_filter.cpp | 19 ++++++++++++++++---
be/src/exprs/runtime_filter.h | 4 ++--
be/src/exprs/runtime_filter_slots_cross.h | 5 +++--
be/src/vec/exec/join/vnested_loop_join_node.cpp | 10 +++++-----
gensrc/proto/internal_service.proto | 2 ++
regression-test/data/nereids_p0/join/test_join_13.out | 3 +++
.../suites/nereids_p0/join/test_join_13.groovy | 2 ++
7 files changed, 33 insertions(+), 12 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 3679ce4c84c..aa6ec25f3fc 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -171,6 +171,10 @@ RuntimeFilterType get_type(int filter_type) {
}
case PFilterType::MINMAX_FILTER:
return RuntimeFilterType::MINMAX_FILTER;
+ case PFilterType::MIN_FILTER:
+ return RuntimeFilterType::MIN_FILTER;
+ case PFilterType::MAX_FILTER:
+ return RuntimeFilterType::MAX_FILTER;
default:
return RuntimeFilterType::UNKNOWN_FILTER;
}
@@ -183,6 +187,10 @@ PFilterType get_type(RuntimeFilterType type) {
return PFilterType::IN_FILTER;
case RuntimeFilterType::BLOOM_FILTER:
return PFilterType::BLOOM_FILTER;
+ case RuntimeFilterType::MIN_FILTER:
+ return PFilterType::MIN_FILTER;
+ case RuntimeFilterType::MAX_FILTER:
+ return PFilterType::MAX_FILTER;
case RuntimeFilterType::MINMAX_FILTER:
return PFilterType::MINMAX_FILTER;
case RuntimeFilterType::IN_OR_BLOOM_FILTER:
@@ -1328,6 +1336,8 @@ Status IRuntimeFilter::create_wrapper(QueryContext*
query_ctx,
DCHECK(param->request->has_bloom_filter());
return (*wrapper)->assign(¶m->request->bloom_filter(),
param->data);
}
+ case PFilterType::MIN_FILTER:
+ case PFilterType::MAX_FILTER:
case PFilterType::MINMAX_FILTER: {
DCHECK(param->request->has_minmax_filter());
return (*wrapper)->assign(¶m->request->minmax_filter());
@@ -1369,6 +1379,8 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState*
state, const T* param, Obje
DCHECK(param->request->has_bloom_filter());
return (*wrapper)->assign(¶m->request->bloom_filter(),
param->data);
}
+ case PFilterType::MIN_FILTER:
+ case PFilterType::MAX_FILTER:
case PFilterType::MINMAX_FILTER: {
DCHECK(param->request->has_minmax_filter());
return (*wrapper)->assign(¶m->request->minmax_filter());
@@ -1443,7 +1455,9 @@ Status IRuntimeFilter::serialize_impl(T* request, void**
data, int* len) {
DCHECK(data != nullptr);
request->mutable_bloom_filter()->set_filter_length(*len);
request->mutable_bloom_filter()->set_always_true(false);
- } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+ } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER ||
+ real_runtime_filter_type == RuntimeFilterType::MIN_FILTER ||
+ real_runtime_filter_type == RuntimeFilterType::MAX_FILTER) {
auto minmax_filter = request->mutable_minmax_filter();
to_protobuf(minmax_filter);
} else {
@@ -1586,8 +1600,7 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
void* min_data = nullptr;
void* max_data = nullptr;
static_cast<void>(_wrapper->get_minmax_filter_desc(&min_data, &max_data));
- DCHECK(min_data != nullptr);
- DCHECK(max_data != nullptr);
+ DCHECK(min_data != nullptr && max_data != nullptr);
filter->set_column_type(to_proto(_wrapper->column_type()));
switch (_wrapper->column_type()) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index a05e594db62..a75d1b84b73 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -77,8 +77,8 @@ enum class RuntimeFilterType {
BLOOM_FILTER = 2,
IN_OR_BLOOM_FILTER = 3,
BITMAP_FILTER = 4,
- MIN_FILTER = 5, // only min // now only support at local
- MAX_FILTER = 6 // only max // now only support at local
+ MIN_FILTER = 5, // only min
+ MAX_FILTER = 6 // only max
};
static RuntimeFilterType get_minmax_filter_type(TMinMaxRuntimeFilterType::type
ttype) {
diff --git a/be/src/exprs/runtime_filter_slots_cross.h
b/be/src/exprs/runtime_filter_slots_cross.h
index 8711ba181cf..4868b27a4ea 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -48,8 +48,9 @@ public:
if (runtime_filter == nullptr) {
return Status::InternalError("runtime filter is nullptr");
}
- // cross join has not remote filter
- if (runtime_filter->has_remote_target()) {
+ // cross join has not remote filter for bitmap filter(non shuffle
join)
+ if (runtime_filter->type() == RuntimeFilterType::BITMAP_FILTER &&
+ runtime_filter->has_remote_target()) {
return Status::InternalError("cross join runtime filter has
remote target");
}
_runtime_filters.push_back(runtime_filter);
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 7996333a40f..c409939aa82 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -80,7 +80,7 @@ Status RuntimeFilterBuild<Parent>::operator()(RuntimeState*
state) {
if (!runtime_filter_slots.empty() && !_parent->build_blocks().empty()) {
SCOPED_TIMER(_parent->push_compute_timer());
for (auto& build_block : _parent->build_blocks()) {
- static_cast<void>(runtime_filter_slots.insert(&build_block));
+ RETURN_IF_ERROR(runtime_filter_slots.insert(&build_block));
}
}
{
@@ -190,7 +190,7 @@ Status
VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) {
std::placeholders::_3)));
}
- static_cast<void>(sink(state, &block, eos));
+ RETURN_IF_ERROR(sink(state, &block, eos));
if (eos) {
break;
@@ -216,7 +216,7 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState*
state, vectorized::Block*
if (eos) {
COUNTER_UPDATE(_build_rows_counter, _build_rows);
-
static_cast<void>(RuntimeFilterBuild<VNestedLoopJoinNode>(this)(state));
+ RETURN_IF_ERROR(RuntimeFilterBuild<VNestedLoopJoinNode>(this)(state));
// optimize `in bitmap`, see
https://github.com/apache/doris/issues/14338
if (_is_output_left_side_only &&
@@ -271,7 +271,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state,
Block* block, bool* eo
RETURN_IF_CANCELLED(state);
while (need_more_input_data()) {
RETURN_IF_ERROR(_fresh_left_block(state));
- static_cast<void>(push(state, &_left_block, _left_side_eos));
+ RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos));
}
return pull(state, block, eos);
@@ -634,7 +634,7 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
// We can close the right child to release its resources because its input
has been
// fully consumed.
- static_cast<void>(child(1)->close(state));
+ RETURN_IF_ERROR(child(1)->close(state));
return Status::OK();
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index c757315f0d0..1654876b911 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -495,6 +495,8 @@ enum PFilterType {
MINMAX_FILTER = 2;
IN_FILTER = 3;
IN_OR_BLOOM_FILTER = 4;
+ MIN_FILTER = 5;
+ MAX_FILTER = 6;
};
message PMergeFilterRequest {
diff --git a/regression-test/data/nereids_p0/join/test_join_13.out
b/regression-test/data/nereids_p0/join/test_join_13.out
index 64f08c9be7a..a1f804bfe45 100644
--- a/regression-test/data/nereids_p0/join/test_join_13.out
+++ b/regression-test/data/nereids_p0/join/test_join_13.out
@@ -113,3 +113,6 @@
14
15
+-- !pipelineX_max_rf --
+1 1989 1001 11011902 123.123
+
diff --git a/regression-test/suites/nereids_p0/join/test_join_13.groovy
b/regression-test/suites/nereids_p0/join/test_join_13.groovy
index 1c419796e06..e26a6cc3454 100644
--- a/regression-test/suites/nereids_p0/join/test_join_13.groovy
+++ b/regression-test/suites/nereids_p0/join/test_join_13.groovy
@@ -242,4 +242,6 @@ suite("test_join_13", "nereids_p0") {
qt_right_anti_join_null_1 "select b.k1 from ${tbName1} t right anti join
${tbName2} b on b.k1 > t.k1 order by b.k1"
qt_right_anti_join_null_2 "select b.k1 from ${empty_name} t right anti
join ${tbName2} b on b.k1 > t.k1 order by b.k1"
+
+ qt_pipelineX_max_rf """ select /*+ SET_VAR(runtime_filter_type=4,
experimental_enable_pipeline_x_engine=true)*/ a.k1, a.k2, a.k3, a.k4, a.k5 from
test a left anti join baseall b on a.k1 > b.k1 where a.k2 > 0 and a.k3 != 0 and
a.k6 > "000" order by 1, 2, 3, 4, 5 limit 65535; """
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]