This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 5734bbcb287 [cherry-pick](branch-30) execute expr should use local
states instead of operators(#40189) (#41324)
5734bbcb287 is described below
commit 5734bbcb287a4d16cd0a7db545c406d966c714fb
Author: zhangstar333 <[email protected]>
AuthorDate: Fri Sep 27 19:36:45 2024 +0800
[cherry-pick](branch-30) execute expr should use local states instead of
operators(#40189) (#41324)
The expr of operator cannot be executed concurrently, should use local
state's expr.
cherry-pick from master https://github.com/apache/doris/pull/40189
---
be/src/pipeline/exec/aggregation_source_operator.cpp | 3 ++-
be/src/pipeline/exec/assert_num_rows_operator.cpp | 3 ++-
be/src/pipeline/exec/assert_num_rows_operator.h | 3 +++
.../exec/distinct_streaming_aggregation_operator.cpp | 4 ++--
be/src/pipeline/exec/multi_cast_data_stream_source.h | 7 ++++---
be/src/pipeline/exec/operator.h | 12 +++++++-----
be/src/pipeline/exec/repeat_operator.cpp | 2 +-
be/src/pipeline/exec/streaming_aggregation_operator.cpp | 4 ++--
regression-test/data/javaudf_p0/test_javaudf_string.out | 3 +++
.../suites/javaudf_p0/test_javaudf_string.groovy | 13 +++++++++++++
10 files changed, 39 insertions(+), 15 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 3264ad56f3c..a5f40a431c5 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -443,7 +443,8 @@ Status AggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* blo
RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block,
block->columns()));
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+ block->columns()));
local_state.do_agg_limit(block, eos);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 4a51002beff..5aa27b51c45 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -116,7 +116,8 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
}
COUNTER_SET(local_state.rows_returned_counter(),
local_state.num_rows_returned());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block,
block->columns()));
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+ block->columns()));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h
b/be/src/pipeline/exec/assert_num_rows_operator.h
index 423bd69144e..dcc64f57878 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -28,6 +28,9 @@ public:
AssertNumRowsLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<FakeSharedState>(state, parent) {}
~AssertNumRowsLocalState() = default;
+
+private:
+ friend class AssertNumRowsOperatorX;
};
class AssertNumRowsOperatorX final : public
StreamingOperatorX<AssertNumRowsLocalState> {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index e8efb51973e..ab71b52ae01 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -462,8 +462,8 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState*
state, vectorized::Bloc
local_state._make_nullable_output_key(block);
if (!_is_streaming_preagg) {
// dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(
- vectorized::VExprContext::filter_block(_conjuncts, block,
block->columns()));
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+
block->columns()));
}
local_state.add_num_rows_returned(block->rows());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 8ecbd23764d..c71310e3ee3 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -62,6 +62,7 @@ public:
}
private:
+ friend class MultiCastDataStreamerSourceOperatorX;
vectorized::VExprContextSPtrs _output_expr_contexts;
std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;
@@ -95,8 +96,8 @@ public:
if (_t_data_stream_sink.__isset.conjuncts) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts,
- _conjuncts));
- RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state,
_row_desc()));
+ conjuncts()));
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(conjuncts(), state,
_row_desc()));
}
return Status::OK();
}
@@ -107,7 +108,7 @@ public:
RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts,
state));
}
if (_t_data_stream_sink.__isset.conjuncts) {
- RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state));
+ RETURN_IF_ERROR(vectorized::VExpr::open(conjuncts(), state));
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index abed7fb446a..bde2291ec3a 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -760,16 +760,18 @@ protected:
ObjectPool* _pool = nullptr;
std::vector<TupleId> _tuple_ids;
+private:
+ // The expr of operator set to private permissions, as cannot be executed
concurrently,
+ // should use local state's expr.
vectorized::VExprContextSPtrs _conjuncts;
+ vectorized::VExprContextSPtrs _projections;
+ // Used in common subexpression elimination to compute intermediate
results.
+ std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
+protected:
RowDescriptor _row_descriptor;
-
std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
- vectorized::VExprContextSPtrs _projections;
-
std::vector<RowDescriptor> _intermediate_output_row_descriptor;
- // Used in common subexpression elimination to compute intermediate
results.
- std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;
diff --git a/be/src/pipeline/exec/repeat_operator.cpp
b/be/src/pipeline/exec/repeat_operator.cpp
index 48cc427d85b..fe45e85c52d 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -234,7 +234,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state,
vectorized::Block* outp
}
_child_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
}
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts,
output_block,
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
output_block,
output_block->columns()));
*eos = _child_eos && _child_block.rows() == 0;
local_state.reached_limit(output_block, eos);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 689a361c371..8aa1eb8df97 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1285,8 +1285,8 @@ Status StreamingAggOperatorX::pull(RuntimeState* state,
vectorized::Block* block
RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state,
block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(
- vectorized::VExprContext::filter_block(_conjuncts, block,
block->columns()));
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+
block->columns()));
}
local_state.reached_limit(block, eos);
diff --git a/regression-test/data/javaudf_p0/test_javaudf_string.out
b/regression-test/data/javaudf_p0/test_javaudf_string.out
index 59f2f7c776d..b42a368b028 100644
--- a/regression-test/data/javaudf_p0/test_javaudf_string.out
+++ b/regression-test/data/javaudf_p0/test_javaudf_string.out
@@ -65,3 +65,6 @@ ab***fg7 ab***fg7
ab***fg8 ab***fg8
ab***fg9 ab***fg9
+-- !select_5 --
+0
+
diff --git a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
index 6517c4b08c2..e6484a1fde1 100644
--- a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
+++ b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
@@ -86,9 +86,22 @@ suite("test_javaudf_string") {
test_javaudf_string
JOIN test_javaudf_string_2 ON test_javaudf_string.user_id =
test_javaudf_string_2.user_id order by 1,2;
"""
+ sql """DROP TABLE IF EXISTS tbl1"""
+ sql """create table tbl1(k1 int, k2 string) distributed by hash(k1)
buckets 1 properties("replication_num" = "1");"""
+ sql """ insert into tbl1 values(1, "5");"""
+ Integer count = 0;
+ Integer maxCount = 20;
+ while (count < maxCount) {
+ sql """ insert into tbl1 select * from tbl1;"""
+ count++
+ sleep(100);
+ }
+ sql """ insert into tbl1 select random()%10000 * 10000, "5" from
tbl1;"""
+ qt_select_5 """ select count(0) from (select k1, max(k2) as k2 from
tbl1 group by k1)v where java_udf_string_test(k2, 0, 1) = "asd" """;
} finally {
try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int,
int);")
try_sql("DROP TABLE IF EXISTS ${tableName}")
+ try_sql("DROP TABLE IF EXISTS tbl1")
try_sql("DROP TABLE IF EXISTS test_javaudf_string_2")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]