This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f7e01ceffa863dcf548df43afb94f1bb6f9d5167 Author: zhangstar333 <[email protected]> AuthorDate: Mon Jan 29 21:21:02 2024 +0800 [bug](node) add dependency for set operation node (#30203) These sinks must be completed one by one in order, eg: child(1) must wait child(0) build finish --- be/src/pipeline/pipeline_fragment_context.cpp | 6 +- be/src/vec/exec/vset_operation_node.cpp | 11 ++-- .../data/nereids_syntax_p0/set_operation.out | 4 ++ .../suites/nereids_syntax_p0/set_operation.groovy | 74 ++++++++++++++++++++++ 4 files changed, 90 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 5e8af4940b8..64572f03692 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -693,7 +693,8 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode OperatorBuilderPtr sink_builder = std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(), node); RETURN_IF_ERROR(build_pipeline->set_sink_builder(sink_builder)); - + std::vector<PipelinePtr> all_pipelines; + all_pipelines.emplace_back(build_pipeline); for (int child_id = 1; child_id < node->children_count(); ++child_id) { auto probe_pipeline = add_pipeline(); RETURN_IF_ERROR(_build_pipelines(node->child(child_id), probe_pipeline)); @@ -701,6 +702,9 @@ Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(), child_id, node); RETURN_IF_ERROR(probe_pipeline->set_sink_builder(probe_sink_builder)); + //eg: These sinks must be completed one by one in order, child(1) must wait child(0) build finish + probe_pipeline->add_dependency(all_pipelines[child_id - 1]); + all_pipelines.emplace_back(probe_pipeline); } OperatorBuilderPtr source_builder = diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp index 75317b4c933..16c267b26df 100644 --- a/be/src/vec/exec/vset_operation_node.cpp +++ b/be/src/vec/exec/vset_operation_node.cpp @@ -352,10 +352,13 @@ Status VSetOperationNode<is_intersect>::sink_probe(RuntimeState* state, int chil bool eos) { SCOPED_TIMER(_exec_timer); SCOPED_TIMER(_probe_timer); - CHECK(_build_finished) << "cannot sink probe data before build finished"; - if (child_id > 1) { - CHECK(_probe_finished_children_index[child_id - 1]) - << fmt::format("child with id: {} should be probed first", child_id); + if (!_build_finished) { + return Status::RuntimeError("cannot sink probe data before build finished " + + std::to_string(child_id)); + } + if (child_id > 1 && !_probe_finished_children_index[child_id - 1]) { + return Status::RuntimeError("the child with id should be probed first " + + std::to_string(child_id - 1)); } auto probe_rows = block->rows(); if (probe_rows > 0) { diff --git a/regression-test/data/nereids_syntax_p0/set_operation.out b/regression-test/data/nereids_syntax_p0/set_operation.out index 28f286ea3d6..0ce9a4b4c99 100644 --- a/regression-test/data/nereids_syntax_p0/set_operation.out +++ b/regression-test/data/nereids_syntax_p0/set_operation.out @@ -596,3 +596,7 @@ hell0 2.0554876421875E8 3601 5.395085565625E7 3602 +-- !intersect_case -- +0 +1 + diff --git a/regression-test/suites/nereids_syntax_p0/set_operation.groovy b/regression-test/suites/nereids_syntax_p0/set_operation.groovy index b23811727bf..6d7f4e18ff8 100644 --- a/regression-test/suites/nereids_syntax_p0/set_operation.groovy +++ b/regression-test/suites/nereids_syntax_p0/set_operation.groovy @@ -329,4 +329,78 @@ suite("test_nereids_set_operation") { union select avg(tap), potno from dwd_daytable where potno=3602 and ddate >= '2023-08-01' group by potno limit 10; """ + + sql "DROP TABLE IF EXISTS table_22_undef_partitions2_keys3_properties4_distributed_by54" + sql """ + create table table_22_undef_partitions2_keys3_properties4_distributed_by54 ( + `col_int_undef_signed_not_null` int not null , + `col_date_undef_signed_not_null` date not null , + `pk` int, + `col_int_undef_signed` int null , + `col_date_undef_signed` date null , + `col_varchar_10__undef_signed` varchar(10) null , + `col_varchar_10__undef_signed_not_null` varchar(10) not null , + `col_varchar_1024__undef_signed` varchar(1024) null , + `col_varchar_1024__undef_signed_not_null` varchar(1024) not null + ) engine=olap + DUPLICATE KEY(col_int_undef_signed_not_null, col_date_undef_signed_not_null, pk) + PARTITION BY RANGE(col_int_undef_signed_not_null, col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), ('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', '2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', '2023-12-21'))) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + insert into table_22_undef_partitions2_keys3_properties4_distributed_by54(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null) values (0,0,2,"2023-12-10","2023-12-09","ok",'f','a','l'),(1,1,9,"2023-12-19","2023-12-10",'x',"it","is",'b'),(2,null,7,"2023-12-18","2023-12-13","on",'p',"why",'x' [...] + """ + + sql "DROP TABLE IF EXISTS table_3_undef_partitions2_keys3_properties4_distributed_by54" + sql """ + create table table_3_undef_partitions2_keys3_properties4_distributed_by54 ( + `col_int_undef_signed_not_null` int not null , + `col_date_undef_signed_not_null` date not null , + `pk` int, + `col_int_undef_signed` int MIN null , + `col_date_undef_signed` date REPLACE null , + `col_varchar_10__undef_signed` varchar(10) REPLACE null , + `col_varchar_10__undef_signed_not_null` varchar(10) MIN not null , + `col_varchar_1024__undef_signed` varchar(1024) REPLACE null , + `col_varchar_1024__undef_signed_not_null` varchar(1024) REPLACE not null + ) engine=olap + AGGREGATE KEY(col_int_undef_signed_not_null, col_date_undef_signed_not_null, pk) + PARTITION BY RANGE(col_int_undef_signed_not_null, col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), ('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', '2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', '2023-12-21'))) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + insert into table_3_undef_partitions2_keys3_properties4_distributed_by54(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null) values (0,null,5,"2023-12-13","2023-12-15",'c','c',"ok","had"),(1,null,7,"2023-12-12","2023-12-19","up",'e',"why",'c'),(2,3,2,"2023-12-15","2023-12-18","hey",'b',"as [...] + """ + + + sql "DROP TABLE IF EXISTS table_2_undef_partitions2_keys3_properties4_distributed_by55" + sql """ + create table table_2_undef_partitions2_keys3_properties4_distributed_by55 ( + `col_int_undef_signed_not_null` int not null , + `col_date_undef_signed_not_null` date not null , + `pk` int, + `col_int_undef_signed` int REPLACE null , + `col_date_undef_signed` date MIN null , + `col_varchar_10__undef_signed` varchar(10) MIN null , + `col_varchar_10__undef_signed_not_null` varchar(10) MAX not null , + `col_varchar_1024__undef_signed` varchar(1024) REPLACE null , + `col_varchar_1024__undef_signed_not_null` varchar(1024) MIN not null + ) engine=olap + AGGREGATE KEY(col_int_undef_signed_not_null, col_date_undef_signed_not_null, pk) + PARTITION BY RANGE(col_int_undef_signed_not_null, col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), ('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', '2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', '2023-12-21'))) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + insert into table_2_undef_partitions2_keys3_properties4_distributed_by55(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null) values (0,null,2,"2023-12-20","2023-12-11",'m','g','t',"in"),(1,1,4,"2023-12-09","2023-12-19","had",'b',"was","didn't"); + """ + + qt_intersect_case """ + SELECT subq1.`pk` AS pk1 FROM ( ( SELECT t1.`pk` FROM table_22_undef_partitions2_keys3_properties4_distributed_by54 AS t1 INNER JOIN table_3_undef_partitions2_keys3_properties4_distributed_by54 AS alias1 ON t1 . `pk` = alias1 . `pk` ) INTERSECT ( SELECT t1.`pk` FROM table_22_undef_partitions2_keys3_properties4_distributed_by54 AS t1 INNER JOIN table_2_undef_partitions2_keys3_properties4_distributed_by55 AS alias2 ON t1 . `pk` = alias2 . `pk` ) ) subq1 GROUP BY subq1.`pk` [...] + """ } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
