This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch set-runtime-filter
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/set-runtime-filter by this
push:
new 3b7d5d3c716 support rf on set operator: be part
3b7d5d3c716 is described below
commit 3b7d5d3c71619fd11489ddd22a04debf1704810d
Author: BiteTheDDDDt <[email protected]>
AuthorDate: Tue Mar 25 14:11:52 2025 +0800
support rf on set operator: be part
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.h | 2 +-
.../exec/nested_loop_join_build_operator.cpp | 2 +-
.../exec/nested_loop_join_build_operator.h | 2 +-
be/src/pipeline/exec/set_sink_operator.cpp | 54 +++++++++++++-----
be/src/pipeline/exec/set_sink_operator.h | 19 +++++--
.../runtime_filter_producer_helper_set.h | 65 ++++++++++++++++++++++
7 files changed, 121 insertions(+), 25 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 15153d1df40..3f2a262a84c 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -92,7 +92,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo
// Hash Table Init
RETURN_IF_ERROR(_hash_table_init(state));
- _runtime_filter_producer_helper =
std::make_shared<RuntimeFilterProducerHelper>(
+ _runtime_filter_producer_helper =
std::make_unique<RuntimeFilterProducerHelper>(
profile(), _should_build_hash_table, p._is_broadcast_join);
RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state,
_build_expr_ctxs,
p._runtime_filter_descs));
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 7ac62160bbd..44d19014d94 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -84,7 +84,7 @@ protected:
size_t _build_side_rows = 0;
vectorized::MutableBlock _build_side_mutable_block;
- std::shared_ptr<RuntimeFilterProducerHelper>
_runtime_filter_producer_helper;
+ std::unique_ptr<RuntimeFilterProducerHelper>
_runtime_filter_producer_helper;
/*
* The comparison result of a null value with any other value is null,
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index 7b8647f2232..c6325f10102 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -41,7 +41,7 @@ Status NestedLoopJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkSta
RETURN_IF_ERROR(p._filter_src_expr_ctxs[i]->clone(state,
_filter_src_expr_ctxs[i]));
}
- _runtime_filter_producer_helper =
std::make_shared<RuntimeFilterProducerHelperCross>(profile());
+ _runtime_filter_producer_helper =
std::make_unique<RuntimeFilterProducerHelperCross>(profile());
RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state,
_filter_src_expr_ctxs,
p._runtime_filter_descs));
return Status::OK();
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index 42274276fbb..2600d5085f6 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -47,7 +47,7 @@ private:
friend class NestedLoopJoinBuildSinkOperatorX;
vectorized::VExprContextSPtrs _filter_src_expr_ctxs;
- std::shared_ptr<RuntimeFilterProducerHelperCross>
_runtime_filter_producer_helper;
+ std::unique_ptr<RuntimeFilterProducerHelperCross>
_runtime_filter_producer_helper;
};
class NestedLoopJoinBuildSinkOperatorX final
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 4faeb975ef9..f5236884c70 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -20,12 +20,35 @@
#include <memory>
#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
#include "vec/common/hash_table/hash_table_set_build.h"
#include "vec/core/materialize_block.h"
namespace doris::pipeline {
#include "common/compile_check_begin.h"
+template <bool is_intersect>
+Status SetSinkLocalState<is_intersect>::close(RuntimeState* state, Status
exec_status) {
+ if (_closed) {
+ return Status::OK();
+ }
+
+ if (!_runtime_filter_producer_helper || state->is_cancelled() || !_eos) {
+ return Base::close(state, exec_status);
+ }
+
+ try {
+ RETURN_IF_ERROR(
+ _runtime_filter_producer_helper->process(state,
&_shared_state->build_block));
+ } catch (Exception& e) {
+ return Status::InternalError(
+ "rf process meet error: {}, wake_up_early: {},
_finish_dependency: {}",
+ e.to_string(), state->get_task()->wake_up_early(),
+ _finish_dependency->debug_string());
+ }
+ return Base::close(state, exec_status);
+}
+
template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state,
vectorized::Block* in_block,
bool eos) {
@@ -57,23 +80,21 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState*
state, vectorized::Blo
local_state._mutable_block.clear();
if (eos) {
- if constexpr (is_intersect) {
- valid_element_in_hash_tbl = 0;
- } else {
- std::visit(
- [&](auto&& arg) {
- using HashTableCtxType =
std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- valid_element_in_hash_tbl =
arg.hash_table->size();
- }
- },
-
local_state._shared_state->hash_table_variants->method_variant);
- }
+ uint64_t hash_table_size = 0;
+ std::visit(
+ [&](auto&& arg) {
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
+ hash_table_size = arg.hash_table->size();
+ }
+ },
+
local_state._shared_state->hash_table_variants->method_variant);
+ valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
+
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
->set_ready();
- if (_child_quantity == 1) {
- local_state._dependency->set_ready_to_read();
- }
+
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
+ state, hash_table_size, local_state._finish_dependency));
}
}
return Status::OK();
@@ -175,6 +196,9 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState*
state, LocalSinkState
RETURN_IF_ERROR(_shared_state->update_build_not_ignore_null(_child_exprs));
+ _runtime_filter_producer_helper =
std::make_unique<RuntimeFilterProducerHelperSet>(profile());
+ RETURN_IF_ERROR(_runtime_filter_producer_helper->init(state, _child_exprs,
+
parent._runtime_filter_descs));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 08f789f702a..c87007fabf8 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -17,10 +17,8 @@
#pragma once
-#include <stdint.h>
-
-#include "olap/olap_common.h"
#include "operator.h"
+#include "runtime_filter/runtime_filter_producer_helper_set.h"
namespace doris {
#include "common/compile_check_begin.h"
@@ -46,6 +44,7 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
+ Status close(RuntimeState* state, Status exec_status) override;
private:
friend class SetSinkOperatorX<is_intersect>;
@@ -57,6 +56,9 @@ private:
RuntimeProfile::Counter* _merge_block_timer = nullptr;
RuntimeProfile::Counter* _build_timer = nullptr;
+
+ std::unique_ptr<RuntimeFilterProducerHelperSet>
_runtime_filter_producer_helper;
+ std::shared_ptr<CountedFinishDependency> _finish_dependency;
};
template <bool is_intersect>
@@ -71,14 +73,17 @@ public:
SetSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool* pool,
const TPlanNode& tnode, const DescriptorTbl& descs)
: Base(sink_id, tnode.node_id, dest_id),
- _cur_child_id(child_id),
_child_quantity(tnode.node_type ==
TPlanNodeType::type::INTERSECT_NODE
?
tnode.intersect_node.result_expr_lists.size()
:
tnode.except_node.result_expr_lists.size()),
_is_colocate(is_intersect ? tnode.intersect_node.is_colocate
: tnode.except_node.is_colocate),
_partition_exprs(is_intersect ?
tnode.intersect_node.result_expr_lists[child_id]
- :
tnode.except_node.result_expr_lists[child_id]) {}
+ :
tnode.except_node.result_expr_lists[child_id]),
+ _runtime_filter_descs(tnode.runtime_filters) {
+ DCHECK_EQ(child_id, _cur_child_id);
+ DCHECK_GT(_child_quantity, 1);
+ }
~SetSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TDataSink",
@@ -107,13 +112,15 @@ private:
vectorized::Block& block,
vectorized::ColumnRawPtrs& raw_ptrs,
size_t& rows);
- const int _cur_child_id;
+ const int _cur_child_id = 0;
const size_t _child_quantity;
// every child has its result expr list
vectorized::VExprContextSPtrs _child_exprs;
const bool _is_colocate;
const std::vector<TExpr> _partition_exprs;
using OperatorBase::_child;
+
+ const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
};
#include "common/compile_check_end.h"
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper_set.h
b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
new file mode 100644
index 00000000000..39739aabccf
--- /dev/null
+++ b/be/src/runtime_filter/runtime_filter_producer_helper_set.h
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/runtime_state.h"
+#include "runtime_filter/runtime_filter.h"
+#include "runtime_filter/runtime_filter_mgr.h"
+#include "runtime_filter/runtime_filter_producer_helper.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+// this class used in set sink node
+class RuntimeFilterProducerHelperSet : public RuntimeFilterProducerHelper {
+public:
+ ~RuntimeFilterProducerHelperSet() override = default;
+
+ RuntimeFilterProducerHelperSet(RuntimeProfile* profile)
+ : RuntimeFilterProducerHelper(profile, true, false) {}
+
+ Status process(RuntimeState* state, const vectorized::Block* block) {
+ if (_skip_runtime_filters_process) {
+ return Status::OK();
+ }
+
+ bool wake_up_early = state->get_task()->wake_up_early();
+ // Runtime filter is ignored partially which has no effect on
correctness.
+ auto wrapper_state = wake_up_early ?
RuntimeFilterWrapper::State::IGNORED
+ :
RuntimeFilterWrapper::State::READY;
+ if (!wake_up_early) {
+ // Hash table is completed and runtime filter has a global size
now.
+ uint64_t hash_table_size = block ? block->rows() : 0;
+ RETURN_IF_ERROR(_init_filters(state, hash_table_size));
+ RETURN_IF_ERROR(_insert(block, 0));
+ }
+
+ for (const auto& filter : _producers) {
+ filter->set_wrapper_state_and_ready_to_publish(wrapper_state);
+ }
+
+ RETURN_IF_ERROR(_publish(state));
+ return Status::OK();
+ }
+};
+#include "common/compile_check_end.h"
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]