This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 41b5629e912 [bug](rf) fix only min/max rf return error when has remote
target (#25588) (#25763)
41b5629e912 is described below
commit 41b5629e912a47f983513c93f33998995079d151
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Oct 23 23:25:51 2023 +0800
[bug](rf) fix only min/max rf return error when has remote target (#25588)
(#25763)
---
be/src/exprs/runtime_filter.cpp | 19 ++++++++++++++++---
be/src/exprs/runtime_filter.h | 4 ++--
be/src/exprs/runtime_filter_slots_cross.h | 5 +++--
be/src/vec/exec/join/vnested_loop_join_node.cpp | 10 +++++-----
gensrc/proto/internal_service.proto | 2 ++
5 files changed, 28 insertions(+), 12 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 747785a730c..9450e1c5cda 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -171,6 +171,10 @@ RuntimeFilterType get_type(int filter_type) {
}
case PFilterType::MINMAX_FILTER:
return RuntimeFilterType::MINMAX_FILTER;
+ case PFilterType::MIN_FILTER:
+ return RuntimeFilterType::MIN_FILTER;
+ case PFilterType::MAX_FILTER:
+ return RuntimeFilterType::MAX_FILTER;
default:
return RuntimeFilterType::UNKNOWN_FILTER;
}
@@ -183,6 +187,10 @@ PFilterType get_type(RuntimeFilterType type) {
return PFilterType::IN_FILTER;
case RuntimeFilterType::BLOOM_FILTER:
return PFilterType::BLOOM_FILTER;
+ case RuntimeFilterType::MIN_FILTER:
+ return PFilterType::MIN_FILTER;
+ case RuntimeFilterType::MAX_FILTER:
+ return PFilterType::MAX_FILTER;
case RuntimeFilterType::MINMAX_FILTER:
return PFilterType::MINMAX_FILTER;
case RuntimeFilterType::IN_OR_BLOOM_FILTER:
@@ -1437,6 +1445,8 @@ Status IRuntimeFilter::create_wrapper(QueryContext*
query_ctx,
DCHECK(param->request->has_bloom_filter());
return (*wrapper)->assign(¶m->request->bloom_filter(),
param->data);
}
+ case PFilterType::MIN_FILTER:
+ case PFilterType::MAX_FILTER:
case PFilterType::MINMAX_FILTER: {
DCHECK(param->request->has_minmax_filter());
return (*wrapper)->assign(¶m->request->minmax_filter());
@@ -1478,6 +1488,8 @@ Status IRuntimeFilter::_create_wrapper(RuntimeState*
state, const T* param, Obje
DCHECK(param->request->has_bloom_filter());
return (*wrapper)->assign(¶m->request->bloom_filter(),
param->data);
}
+ case PFilterType::MIN_FILTER:
+ case PFilterType::MAX_FILTER:
case PFilterType::MINMAX_FILTER: {
DCHECK(param->request->has_minmax_filter());
return (*wrapper)->assign(¶m->request->minmax_filter());
@@ -1563,7 +1575,9 @@ Status IRuntimeFilter::serialize_impl(T* request, void**
data, int* len) {
DCHECK(data != nullptr);
request->mutable_bloom_filter()->set_filter_length(*len);
request->mutable_bloom_filter()->set_always_true(false);
- } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER) {
+ } else if (real_runtime_filter_type == RuntimeFilterType::MINMAX_FILTER ||
+ real_runtime_filter_type == RuntimeFilterType::MIN_FILTER ||
+ real_runtime_filter_type == RuntimeFilterType::MAX_FILTER) {
auto minmax_filter = request->mutable_minmax_filter();
to_protobuf(minmax_filter);
} else {
@@ -1706,8 +1720,7 @@ void IRuntimeFilter::to_protobuf(PMinMaxFilter* filter) {
void* min_data = nullptr;
void* max_data = nullptr;
_wrapper->get_minmax_filter_desc(&min_data, &max_data);
- DCHECK(min_data != nullptr);
- DCHECK(max_data != nullptr);
+ DCHECK(min_data != nullptr && max_data != nullptr);
filter->set_column_type(to_proto(_wrapper->column_type()));
switch (_wrapper->column_type()) {
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index b92bb4aabd7..1272ac2e7b0 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -76,8 +76,8 @@ enum class RuntimeFilterType {
BLOOM_FILTER = 2,
IN_OR_BLOOM_FILTER = 3,
BITMAP_FILTER = 4,
- MIN_FILTER = 5, // only min // now only support at local
- MAX_FILTER = 6 // only max // now only support at local
+ MIN_FILTER = 5, // only min
+ MAX_FILTER = 6 // only max
};
inline std::string to_string(RuntimeFilterType type) {
diff --git a/be/src/exprs/runtime_filter_slots_cross.h
b/be/src/exprs/runtime_filter_slots_cross.h
index 8711ba181cf..4868b27a4ea 100644
--- a/be/src/exprs/runtime_filter_slots_cross.h
+++ b/be/src/exprs/runtime_filter_slots_cross.h
@@ -48,8 +48,9 @@ public:
if (runtime_filter == nullptr) {
return Status::InternalError("runtime filter is nullptr");
}
- // cross join has not remote filter
- if (runtime_filter->has_remote_target()) {
+ // cross join has not remote filter for bitmap filter(non shuffle
join)
+ if (runtime_filter->type() == RuntimeFilterType::BITMAP_FILTER &&
+ runtime_filter->has_remote_target()) {
return Status::InternalError("cross join runtime filter has
remote target");
}
_runtime_filters.push_back(runtime_filter);
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 6265f6c609c..eee179f7837 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -79,7 +79,7 @@ struct RuntimeFilterBuild {
if (!runtime_filter_slots.empty() &&
!_join_node->_build_blocks.empty()) {
SCOPED_TIMER(_join_node->_push_compute_timer);
for (auto& build_block : _join_node->_build_blocks) {
- runtime_filter_slots.insert(&build_block);
+ RETURN_IF_ERROR(runtime_filter_slots.insert(&build_block));
}
}
{
@@ -187,7 +187,7 @@ Status
VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) {
std::placeholders::_3)));
}
- sink(state, &block, eos);
+ RETURN_IF_ERROR(sink(state, &block, eos));
if (eos) {
break;
@@ -213,7 +213,7 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState*
state, vectorized::Block*
if (eos) {
COUNTER_UPDATE(_build_rows_counter, _build_rows);
- RuntimeFilterBuild(this)(state);
+ RETURN_IF_ERROR(RuntimeFilterBuild(this)(state));
// optimize `in bitmap`, see
https://github.com/apache/doris/issues/14338
if (_is_output_left_side_only &&
@@ -268,7 +268,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state,
Block* block, bool* eo
RETURN_IF_CANCELLED(state);
while (need_more_input_data()) {
RETURN_IF_ERROR(_fresh_left_block(state));
- push(state, &_left_block, _left_side_eos);
+ RETURN_IF_ERROR(push(state, &_left_block, _left_side_eos));
}
return pull(state, block, eos);
@@ -637,7 +637,7 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
RETURN_IF_CANCELLED(state);
// We can close the right child to release its resources because its input
has been
// fully consumed.
- child(1)->close(state);
+ RETURN_IF_ERROR(child(1)->close(state));
return Status::OK();
}
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 8af26a317e3..681bf071dab 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -475,6 +475,8 @@ enum PFilterType {
MINMAX_FILTER = 2;
IN_FILTER = 3;
IN_OR_BLOOM_FILTER = 4;
+ MIN_FILTER = 5;
+ MAX_FILTER = 6;
};
message PMergeFilterRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]