This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7385602b190 [bug](rf) fix only min/max rf return error when has remote 
target (#25588)
7385602b190 is described below

commit 7385602b190bde9c6d1e3ef98043b0217e19cc53
Author: zhangstar333 <[email protected]>
AuthorDate: Thu Oct 19 19:26:29 2023 +0800

    [bug](rf) fix only min/max rf return error when has remote target (#25588)
---
 be/src/exprs/runtime_filter.cpp                       | 19 ++++++++++++++++---
 be/src/exprs/runtime_filter.h                         |  4 ++--
 be/src/exprs/runtime_filter_slots_cross.h             |  5 +++--
 be/src/vec/exec/join/vnested_loop_join_node.cpp       | 10 +++++-----
 gensrc/proto/internal_service.proto                   |  2 ++
 regression-test/data/nereids_p0/join/test_join_13.out |  3 +++
 .../suites/nereids_p0/join/test_join_13.groovy        |  2 ++
 7 files changed, 33 insertions(+), 12 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 3679ce4c84c..aa6ec25f3fc 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -171,6 +171,10 @@ RuntimeFilterType get_type(int filter_type) {
     }
     case PFilterType::MINMAX_FILTER:
         return RuntimeFilterType::MINMAX_FILTER;
+    case PFilterType::MIN_FILTER:
+        return RuntimeFilterType::MIN_FILTER;
+    case PFilterType::MAX_FILTER:
+        return RuntimeFilterType::MAX_FILTER;
     default:
         return RuntimeFilterType::UNKNOWN_FILTER;
     }
@@ -183,6 +187,10 @@ PFilterType get_type(RuntimeFilterType type) {
         return PFilterType::IN_FILTER;
     case RuntimeFilterType::BLOOM_FILTER:
         return PFilterType::BLOOM_FILTER;
+    case RuntimeFilterType::MIN_FILTER:
+        return PFilterType::MIN_FILTER;
+    case RuntimeFilterType::MAX_FILTER:
+        return PFilterType::MAX_FILTER;
     case RuntimeFilterType::MINMAX_FILTER:
         return PFilterType::MINMAX_FILTER;
     case RuntimeFilterType::IN_OR_BLOOM_FILTER:
@@ -1328,6 +1336,8 @@ Status IRuntimeFilter::create_wrapper(QueryContext* 
query_ctx,
         DCHECK(param->request->has_bloom_filter());
         return (*wrapper)->assign(&param->request->bloom_filter(), 
param->data);
     }
+    case PFilterType::MIN_FILTER:
+    case PFilterType::MAX_FILTER:
     case PFilterType::MINMAX_FILTER: {
         DCHECK(param->request->has_minmax_filter());
         return (*wrapper)->assign(&param->request->minmax_filter());
@@ -1369,6 +1379,8 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState* 
state, const T* param, Obje
         DCHECK(param->request->has_bloom_filter());
         return (*wrapper)->assign(&param->request->bloom_filter(), 
param->data);
     }
+    case PFilterType::MIN_FILTER:
+    case PFilterType::MAX_FILTER:
     case PFilterType::MINMAX_FILTER: {
         DCHECK(param->request->has_minmax_filter());
         return (*wrapper)->assign(&param->request->minmax_filter());
@@ -1443,7 +1455,9 @@ Status IRuntimeFilter::serialize_impl(T* request, void** 
data, int* len) {
         DCHECK(data != nullptr);
         request->mutable_bloom_filter()->set_filter_length(*len);
         request->mutable_bloom_filter()->set_always_true(false);
-    } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+    } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER ||
+               real_runtime_filter_type == RuntimeFilterType::MIN_FILTER ||
+               real_runtime_filter_type == RuntimeFilterType::MAX_FILTER) {
         auto minmax_filter = request->mutable_minmax_filter();
         to_protobuf(minmax_filter);
     } else {
@@ -1586,8 +1600,7 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
     void* min_data = nullptr;
     void* max_data = nullptr;
     static_cast<void>(_wrapper->get_minmax_filter_desc(&min_data, &max_data));
-    DCHECK(min_data != nullptr);
-    DCHECK(max_data != nullptr);
+    DCHECK(min_data != nullptr && max_data != nullptr);
     filter->set_column_type(to_proto(_wrapper->column_type()));
 
     switch (_wrapper->column_type()) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index a05e594db62..a75d1b84b73 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -77,8 +77,8 @@ enum class RuntimeFilterType {
     BLOOM_FILTER = 2,
     IN_OR_BLOOM_FILTER = 3,
     BITMAP_FILTER = 4,
-    MIN_FILTER = 5, // only min // now only support at local
-    MAX_FILTER = 6  // only max // now only support at local
+    MIN_FILTER = 5, // only min
+    MAX_FILTER = 6  // only max
 };
 
 static RuntimeFilterType get_minmax_filter_type(TMinMaxRuntimeFilterType::type 
ttype) {
diff --git a/be/src/exprs/runtime_filter_slots_cross.h 
b/be/src/exprs/runtime_filter_slots_cross.h
index 8711ba181cf..4868b27a4ea 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -48,8 +48,9 @@ public:
             if (runtime_filter == nullptr) {
                 return Status::InternalError("runtime filter is nullptr");
             }
-            // cross join has not remote filter
-            if (runtime_filter->has_remote_target()) {
+            // cross join has not remote filter for bitmap filter(non shuffle 
join)
+            if (runtime_filter->type() == RuntimeFilterType::BITMAP_FILTER &&
+                runtime_filter->has_remote_target()) {
                 return Status::InternalError("cross join runtime filter has 
remote target");
             }
             _runtime_filters.push_back(runtime_filter);
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp 
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 7996333a40f..c409939aa82 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -80,7 +80,7 @@ Status RuntimeFilterBuild<Parent>::operator()(RuntimeState* 
state) {
     if (!runtime_filter_slots.empty() && !_parent->build_blocks().empty()) {
         SCOPED_TIMER(_parent->push_compute_timer());
         for (auto& build_block : _parent->build_blocks()) {
-            static_cast<void>(runtime_filter_slots.insert(&build_block));
+            RETURN_IF_ERROR(runtime_filter_slots.insert(&build_block));
         }
     }
     {
@@ -190,7 +190,7 @@ Status 
VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) {
                               std::placeholders::_3)));
         }
 
-        static_cast<void>(sink(state, &block, eos));
+        RETURN_IF_ERROR(sink(state, &block, eos));
 
         if (eos) {
             break;
@@ -216,7 +216,7 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState* 
state, vectorized::Block*
 
     if (eos) {
         COUNTER_UPDATE(_build_rows_counter, _build_rows);
-        
static_cast<void>(RuntimeFilterBuild<VNestedLoopJoinNode>(this)(state));
+        RETURN_IF_ERROR(RuntimeFilterBuild<VNestedLoopJoinNode>(this)(state));
 
         // optimize `in bitmap`, see 
https://github.com/apache/doris/issues/14338
         if (_is_output_left_side_only &&
@@ -271,7 +271,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state, 
Block* block, bool* eo
     RETURN_IF_CANCELLED(state);
     while (need_more_input_data()) {
         RETURN_IF_ERROR(_fresh_left_block(state));
-        static_cast<void>(push(state, &_left_block, _left_side_eos));
+        RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos));
     }
 
     return pull(state, block, eos);
@@ -634,7 +634,7 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
     RETURN_IF_CANCELLED(state);
     // We can close the right child to release its resources because its input 
has been
     // fully consumed.
-    static_cast<void>(child(1)->close(state));
+    RETURN_IF_ERROR(child(1)->close(state));
     return Status::OK();
 }
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index c757315f0d0..1654876b911 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -495,6 +495,8 @@ enum PFilterType {
     MINMAX_FILTER = 2;
     IN_FILTER = 3;
     IN_OR_BLOOM_FILTER = 4;
+    MIN_FILTER = 5;
+    MAX_FILTER = 6;
 };
 
 message PMergeFilterRequest {
diff --git a/regression-test/data/nereids_p0/join/test_join_13.out 
b/regression-test/data/nereids_p0/join/test_join_13.out
index 64f08c9be7a..a1f804bfe45 100644
--- a/regression-test/data/nereids_p0/join/test_join_13.out
+++ b/regression-test/data/nereids_p0/join/test_join_13.out
@@ -113,3 +113,6 @@
 14
 15
 
+-- !pipelineX_max_rf --
+1      1989    1001    11011902        123.123
+
diff --git a/regression-test/suites/nereids_p0/join/test_join_13.groovy 
b/regression-test/suites/nereids_p0/join/test_join_13.groovy
index 1c419796e06..e26a6cc3454 100644
--- a/regression-test/suites/nereids_p0/join/test_join_13.groovy
+++ b/regression-test/suites/nereids_p0/join/test_join_13.groovy
@@ -242,4 +242,6 @@ suite("test_join_13", "nereids_p0") {
     qt_right_anti_join_null_1 "select b.k1 from ${tbName1} t right anti join 
${tbName2} b on b.k1 > t.k1 order by b.k1"
 
     qt_right_anti_join_null_2 "select b.k1 from ${empty_name} t right anti 
join ${tbName2} b on b.k1 > t.k1 order by b.k1"
+
+    qt_pipelineX_max_rf """ select /*+ SET_VAR(runtime_filter_type=4, 
experimental_enable_pipeline_x_engine=true)*/ a.k1, a.k2, a.k3, a.k4, a.k5 from 
test a left anti join baseall b on a.k1 > b.k1 where a.k2 > 0 and a.k3 != 0 and 
a.k6 > "000" order by 1, 2, 3, 4, 5 limit 65535; """
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to