This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f7e01ceffa863dcf548df43afb94f1bb6f9d5167
Author: zhangstar333 <[email protected]>
AuthorDate: Mon Jan 29 21:21:02 2024 +0800

    [bug](node) add dependency for set operation node (#30203)
    
    These sinks must be completed one by one in order, eg: child(1) must wait 
child(0) build finish
---
 be/src/pipeline/pipeline_fragment_context.cpp      |  6 +-
 be/src/vec/exec/vset_operation_node.cpp            | 11 ++--
 .../data/nereids_syntax_p0/set_operation.out       |  4 ++
 .../suites/nereids_syntax_p0/set_operation.groovy  | 74 ++++++++++++++++++++++
 4 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 5e8af4940b8..64572f03692 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -693,7 +693,8 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
     OperatorBuilderPtr sink_builder =
             std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(), 
node);
     RETURN_IF_ERROR(build_pipeline->set_sink_builder(sink_builder));
-
+    std::vector<PipelinePtr> all_pipelines;
+    all_pipelines.emplace_back(build_pipeline);
     for (int child_id = 1; child_id < node->children_count(); ++child_id) {
         auto probe_pipeline = add_pipeline();
         RETURN_IF_ERROR(_build_pipelines(node->child(child_id), 
probe_pipeline));
@@ -701,6 +702,9 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode
                 
std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(), 
child_id,
                                                                             
node);
         RETURN_IF_ERROR(probe_pipeline->set_sink_builder(probe_sink_builder));
+        //eg: These sinks must be completed one by one in order, child(1) must 
wait child(0) build finish
+        probe_pipeline->add_dependency(all_pipelines[child_id - 1]);
+        all_pipelines.emplace_back(probe_pipeline);
     }
 
     OperatorBuilderPtr source_builder =
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 75317b4c933..16c267b26df 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -352,10 +352,13 @@ Status 
VSetOperationNode<is_intersect>::sink_probe(RuntimeState* state, int chil
                                                    bool eos) {
     SCOPED_TIMER(_exec_timer);
     SCOPED_TIMER(_probe_timer);
-    CHECK(_build_finished) << "cannot sink probe data before build finished";
-    if (child_id > 1) {
-        CHECK(_probe_finished_children_index[child_id - 1])
-                << fmt::format("child with id: {} should be probed first", 
child_id);
+    if (!_build_finished) {
+        return Status::RuntimeError("cannot sink probe data before build 
finished " +
+                                    std::to_string(child_id));
+    }
+    if (child_id > 1 && !_probe_finished_children_index[child_id - 1]) {
+        return Status::RuntimeError("the child with id should be probed first 
" +
+                                    std::to_string(child_id - 1));
     }
     auto probe_rows = block->rows();
     if (probe_rows > 0) {
diff --git a/regression-test/data/nereids_syntax_p0/set_operation.out 
b/regression-test/data/nereids_syntax_p0/set_operation.out
index 28f286ea3d6..0ce9a4b4c99 100644
--- a/regression-test/data/nereids_syntax_p0/set_operation.out
+++ b/regression-test/data/nereids_syntax_p0/set_operation.out
@@ -596,3 +596,7 @@ hell0
 2.0554876421875E8      3601
 5.395085565625E7       3602
 
+-- !intersect_case --
+0
+1
+
diff --git a/regression-test/suites/nereids_syntax_p0/set_operation.groovy 
b/regression-test/suites/nereids_syntax_p0/set_operation.groovy
index b23811727bf..6d7f4e18ff8 100644
--- a/regression-test/suites/nereids_syntax_p0/set_operation.groovy
+++ b/regression-test/suites/nereids_syntax_p0/set_operation.groovy
@@ -329,4 +329,78 @@ suite("test_nereids_set_operation") {
         union
         select avg(tap), potno from dwd_daytable where potno=3602 and ddate >= 
'2023-08-01' group by potno limit 10;
     """
+
+    sql "DROP TABLE IF EXISTS 
table_22_undef_partitions2_keys3_properties4_distributed_by54"
+    sql """
+    create table table_22_undef_partitions2_keys3_properties4_distributed_by54 
(
+        `col_int_undef_signed_not_null` int  not null ,
+        `col_date_undef_signed_not_null` date  not null ,
+        `pk` int,
+        `col_int_undef_signed` int  null ,
+        `col_date_undef_signed` date  null ,
+        `col_varchar_10__undef_signed` varchar(10)  null ,
+        `col_varchar_10__undef_signed_not_null` varchar(10)  not null ,
+        `col_varchar_1024__undef_signed` varchar(1024)  null ,
+        `col_varchar_1024__undef_signed_not_null` varchar(1024)  not null 
+    ) engine=olap
+    DUPLICATE KEY(col_int_undef_signed_not_null, 
col_date_undef_signed_not_null, pk)
+    PARTITION BY RANGE(col_int_undef_signed_not_null, 
col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), 
('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', 
'2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', 
'2023-12-21')))
+    distributed by hash(pk) buckets 10
+    properties("replication_num" = "1");
+    """
+
+    sql """
+        insert into 
table_22_undef_partitions2_keys3_properties4_distributed_by54(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null)
 values 
(0,0,2,"2023-12-10","2023-12-09","ok",'f','a','l'),(1,1,9,"2023-12-19","2023-12-10",'x',"it","is",'b'),(2,null,7,"2023-12-18","2023-12-13","on",'p',"why",'x'
 [...]
+    """
+
+    sql "DROP TABLE IF EXISTS 
table_3_undef_partitions2_keys3_properties4_distributed_by54"
+    sql """
+    create table table_3_undef_partitions2_keys3_properties4_distributed_by54 (
+        `col_int_undef_signed_not_null` int  not null ,
+        `col_date_undef_signed_not_null` date  not null ,
+        `pk` int,
+        `col_int_undef_signed` int MIN  null ,
+        `col_date_undef_signed` date REPLACE  null ,
+        `col_varchar_10__undef_signed` varchar(10) REPLACE  null ,
+        `col_varchar_10__undef_signed_not_null` varchar(10) MIN  not null ,
+        `col_varchar_1024__undef_signed` varchar(1024) REPLACE  null ,
+        `col_varchar_1024__undef_signed_not_null` varchar(1024) REPLACE  not 
null 
+    ) engine=olap
+    AGGREGATE KEY(col_int_undef_signed_not_null, 
col_date_undef_signed_not_null, pk)
+    PARTITION BY RANGE(col_int_undef_signed_not_null, 
col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), 
('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', 
'2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', 
'2023-12-21')))
+    distributed by hash(pk) buckets 10
+    properties("replication_num" = "1");
+    """
+
+    sql """
+        insert into 
table_3_undef_partitions2_keys3_properties4_distributed_by54(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null)
 values 
(0,null,5,"2023-12-13","2023-12-15",'c','c',"ok","had"),(1,null,7,"2023-12-12","2023-12-19","up",'e',"why",'c'),(2,3,2,"2023-12-15","2023-12-18","hey",'b',"as
 [...]
+    """
+
+
+    sql "DROP TABLE IF EXISTS 
table_2_undef_partitions2_keys3_properties4_distributed_by55"
+    sql """
+    create table table_2_undef_partitions2_keys3_properties4_distributed_by55 (
+        `col_int_undef_signed_not_null` int  not null ,
+        `col_date_undef_signed_not_null` date  not null ,
+        `pk` int,
+        `col_int_undef_signed` int REPLACE  null ,
+        `col_date_undef_signed` date MIN  null ,
+        `col_varchar_10__undef_signed` varchar(10) MIN  null ,
+        `col_varchar_10__undef_signed_not_null` varchar(10) MAX  not null ,
+        `col_varchar_1024__undef_signed` varchar(1024) REPLACE  null ,
+        `col_varchar_1024__undef_signed_not_null` varchar(1024) MIN  not null 
+    ) engine=olap
+    AGGREGATE KEY(col_int_undef_signed_not_null, 
col_date_undef_signed_not_null, pk)
+    PARTITION BY RANGE(col_int_undef_signed_not_null, 
col_date_undef_signed_not_null) (PARTITION p0 VALUES [('-10000', '2023-12-01'), 
('3', '2023-12-10')), PARTITION p1 VALUES [('3', '2023-12-10'), ('6', 
'2023-12-15')), PARTITION p2 VALUES [('6', '2023-12-15'), ('10000', 
'2023-12-21')))
+    distributed by hash(pk) buckets 10
+    properties("replication_num" = "1");
+    """
+
+    sql """
+        insert into 
table_2_undef_partitions2_keys3_properties4_distributed_by55(pk,col_int_undef_signed,col_int_undef_signed_not_null,col_date_undef_signed,col_date_undef_signed_not_null,col_varchar_10__undef_signed,col_varchar_10__undef_signed_not_null,col_varchar_1024__undef_signed,col_varchar_1024__undef_signed_not_null)
 values 
(0,null,2,"2023-12-20","2023-12-11",'m','g','t',"in"),(1,1,4,"2023-12-09","2023-12-19","had",'b',"was","didn't");
+    """
+
+    qt_intersect_case """
+        SELECT subq1.`pk` AS pk1 FROM ( (  SELECT t1.`pk`  FROM 
table_22_undef_partitions2_keys3_properties4_distributed_by54 AS t1 INNER JOIN 
table_3_undef_partitions2_keys3_properties4_distributed_by54 AS alias1 ON t1 . 
`pk` = alias1 . `pk`   ) INTERSECT (  SELECT t1.`pk`  FROM 
table_22_undef_partitions2_keys3_properties4_distributed_by54 AS t1 INNER JOIN 
table_2_undef_partitions2_keys3_properties4_distributed_by55 AS alias2 ON t1 . 
`pk` = alias2 . `pk`   ) ) subq1 GROUP BY subq1.`pk`  [...]
+    """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to