This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new b104e933cd4 [Bug](expr) execute expr should use local states instead
of operators (#40189) (#40219)
b104e933cd4 is described below
commit b104e933cd46a742f7ccc380c1470735c7e5f2ed
Author: zhangstar333 <[email protected]>
AuthorDate: Sun Sep 1 00:41:10 2024 +0800
[Bug](expr) execute expr should use local states instead of operators
(#40189) (#40219)
## Proposed changes
cherry-pick from master #40189
<!--Describe your changes.-->
---
be/src/pipeline/exec/aggregation_source_operator.cpp | 3 ++-
be/src/pipeline/exec/assert_num_rows_operator.cpp | 3 ++-
be/src/pipeline/exec/assert_num_rows_operator.h | 3 +++
.../exec/distinct_streaming_aggregation_operator.cpp | 4 ++--
be/src/pipeline/exec/multi_cast_data_stream_source.h | 7 ++++---
be/src/pipeline/exec/repeat_operator.cpp | 2 +-
be/src/pipeline/exec/streaming_aggregation_operator.cpp | 4 ++--
regression-test/data/javaudf_p0/test_javaudf_string.out | 3 +++
.../suites/javaudf_p0/test_javaudf_string.groovy | 13 +++++++++++++
9 files changed, 32 insertions(+), 10 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index fadddee9034..8f19c24589b 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -435,7 +435,8 @@ Status AggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* blo
RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block,
block->columns()));
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+ block->columns()));
local_state.reached_limit(block, eos);
return Status::OK();
}
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index ef0efd3f86b..8f86e3ecb87 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -120,7 +120,8 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
}
COUNTER_SET(local_state.rows_returned_counter(),
local_state.num_rows_returned());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts, block,
block->columns()));
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+ block->columns()));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.h
b/be/src/pipeline/exec/assert_num_rows_operator.h
index 4d6d835f815..3874c1fc771 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.h
+++ b/be/src/pipeline/exec/assert_num_rows_operator.h
@@ -46,6 +46,9 @@ public:
AssertNumRowsLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<FakeSharedState>(state, parent) {}
~AssertNumRowsLocalState() = default;
+
+private:
+ friend class AssertNumRowsOperatorX;
};
class AssertNumRowsOperatorX final : public
StreamingOperatorX<AssertNumRowsLocalState> {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 16c0df07b49..1635d927b6d 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -515,8 +515,8 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState*
state, vectorized::Bloc
local_state._make_nullable_output_key(block);
if (!_is_streaming_preagg) {
// dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(
- vectorized::VExprContext::filter_block(_conjuncts, block,
block->columns()));
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+
block->columns()));
}
local_state.add_num_rows_returned(block->rows());
COUNTER_UPDATE(local_state.blocks_returned_counter(), 1);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 761e899c3d1..aab5cb96dbe 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -117,6 +117,7 @@ public:
}
private:
+ friend class MultiCastDataStreamerSourceOperatorX;
vectorized::VExprContextSPtrs _output_expr_contexts;
std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;
@@ -151,8 +152,8 @@ public:
if (_t_data_stream_sink.__isset.conjuncts) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_data_stream_sink.conjuncts,
- _conjuncts));
- RETURN_IF_ERROR(vectorized::VExpr::prepare(_conjuncts, state,
_row_desc()));
+ conjuncts()));
+ RETURN_IF_ERROR(vectorized::VExpr::prepare(conjuncts(), state,
_row_desc()));
}
return Status::OK();
}
@@ -163,7 +164,7 @@ public:
RETURN_IF_ERROR(vectorized::VExpr::open(_output_expr_contexts,
state));
}
if (_t_data_stream_sink.__isset.conjuncts) {
- RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state));
+ RETURN_IF_ERROR(vectorized::VExpr::open(conjuncts(), state));
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/repeat_operator.cpp
b/be/src/pipeline/exec/repeat_operator.cpp
index 65eccc3fd4e..991754ba122 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -247,7 +247,7 @@ Status RepeatOperatorX::pull(doris::RuntimeState* state,
vectorized::Block* outp
}
_child_block.clear_column_data(_child_x->row_desc().num_materialized_slots());
}
- RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_conjuncts,
output_block,
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
output_block,
output_block->columns()));
*eos = _child_eos && _child_block.rows() == 0;
local_state.reached_limit(output_block, eos);
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index d7589f59f9f..cfb63aae9a5 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1299,8 +1299,8 @@ Status StreamingAggOperatorX::pull(RuntimeState* state,
vectorized::Block* block
RETURN_IF_ERROR(local_state._executor->get_result(&local_state, state,
block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
- RETURN_IF_ERROR(
- vectorized::VExprContext::filter_block(_conjuncts, block,
block->columns()));
+
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
+
block->columns()));
}
local_state.reached_limit(block, eos);
diff --git a/regression-test/data/javaudf_p0/test_javaudf_string.out
b/regression-test/data/javaudf_p0/test_javaudf_string.out
index 59f2f7c776d..b42a368b028 100644
--- a/regression-test/data/javaudf_p0/test_javaudf_string.out
+++ b/regression-test/data/javaudf_p0/test_javaudf_string.out
@@ -65,3 +65,6 @@ ab***fg7 ab***fg7
ab***fg8 ab***fg8
ab***fg9 ab***fg9
+-- !select_5 --
+0
+
diff --git a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
index 6517c4b08c2..e6484a1fde1 100644
--- a/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
+++ b/regression-test/suites/javaudf_p0/test_javaudf_string.groovy
@@ -86,9 +86,22 @@ suite("test_javaudf_string") {
test_javaudf_string
JOIN test_javaudf_string_2 ON test_javaudf_string.user_id =
test_javaudf_string_2.user_id order by 1,2;
"""
+ sql """DROP TABLE IF EXISTS tbl1"""
+ sql """create table tbl1(k1 int, k2 string) distributed by hash(k1)
buckets 1 properties("replication_num" = "1");"""
+ sql """ insert into tbl1 values(1, "5");"""
+ Integer count = 0;
+ Integer maxCount = 20;
+ while (count < maxCount) {
+ sql """ insert into tbl1 select * from tbl1;"""
+ count++
+ sleep(100);
+ }
+ sql """ insert into tbl1 select random()%10000 * 10000, "5" from
tbl1;"""
+ qt_select_5 """ select count(0) from (select k1, max(k2) as k2 from
tbl1 group by k1)v where java_udf_string_test(k2, 0, 1) = "asd" """;
} finally {
try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int,
int);")
try_sql("DROP TABLE IF EXISTS ${tableName}")
+ try_sql("DROP TABLE IF EXISTS tbl1")
try_sql("DROP TABLE IF EXISTS test_javaudf_string_2")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]