This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f87a3ccba2 [fix](runtime_filter) runtime_profile was not initialized
in multi_cast_data_stream_source (#21690)
f87a3ccba2 is described below
commit f87a3ccba2852d46d28bd11d9dbc4522a347848f
Author: Jerry Hu <[email protected]>
AuthorDate: Tue Jul 11 00:16:29 2023 +0800
[fix](runtime_filter) runtime_profile was not initialized in
multi_cast_data_stream_source (#21690)
---
be/src/pipeline/exec/multi_cast_data_stream_source.cpp | 3 ++-
be/src/vec/exec/runtime_filter_consumer.cpp | 9 +++++++++
be/src/vec/exec/runtime_filter_consumer.h | 2 ++
be/src/vec/exec/scan/vscan_node.cpp | 7 +------
4 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 924db8a729..216c626330 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -55,7 +55,8 @@
MultiCastDataStreamerSourceOperator::MultiCastDataStreamerSourceOperator(
Status MultiCastDataStreamerSourceOperator::prepare(doris::RuntimeState*
state) {
RETURN_IF_ERROR(vectorized::RuntimeFilterConsumer::init(state));
- _register_runtime_filter();
+ // init profile for runtime filter
+ RuntimeFilterConsumer::_init_profile(_multi_cast_data_streamer->profile());
if (_t_data_stream_sink.__isset.output_exprs) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.output_exprs,
_output_expr_contexts));
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 4f51d99ccc..b05ebf0476 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -34,6 +34,15 @@ Status RuntimeFilterConsumer::init(RuntimeState* state) {
return Status::OK();
}
+void RuntimeFilterConsumer::_init_profile(RuntimeProfile* profile) {
+ std::stringstream ss;
+ for (auto& rf_ctx : _runtime_filter_ctxs) {
+ rf_ctx.runtime_filter->init_profile(profile);
+ ss << rf_ctx.runtime_filter->get_name() << ", ";
+ }
+ profile->add_info_string("RuntimeFilters: ", ss.str());
+}
+
Status RuntimeFilterConsumer::_register_runtime_filter() {
int filter_size = _runtime_filter_descs.size();
_runtime_filter_ctxs.reserve(filter_size);
diff --git a/be/src/vec/exec/runtime_filter_consumer.h
b/be/src/vec/exec/runtime_filter_consumer.h
index c938e8510b..4e4d53e818 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -45,6 +45,8 @@ protected:
// Append late-arrival runtime filters to the vconjunct_ctx.
Status _append_rf_into_conjuncts(const VExprSPtrs& vexprs);
+ void _init_profile(RuntimeProfile* profile);
+
void _prepare_rf_timer(RuntimeProfile* profile);
// For runtime filters
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index e9a14aac71..7384225971 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -117,12 +117,7 @@ Status VScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
// init profile for runtime filter
- std::stringstream ss;
- for (auto& rf_ctx : _runtime_filter_ctxs) {
- rf_ctx.runtime_filter->init_profile(_runtime_profile.get());
- ss << rf_ctx.runtime_filter->get_name() << ", ";
- }
- _runtime_profile->add_info_string("RuntimeFilters: ", ss.str());
+ RuntimeFilterConsumer::_init_profile(_runtime_profile.get());
if (_is_pipeline_scan) {
if (_shared_scan_opt) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]