wuxueyang96 opened a new pull request, #61184:
URL: https://github.com/apache/doris/pull/61184

   ### What problem does this PR solve?
   
   Currently, execute a sql like: 
   
   ```sql
   select d0.sum_val,
       d1.val,
       d1.id
   from (
           select sum(sum_val) as sum_val,
               id
           from (
                   (
                       SELECT sum(val) as sum_val,
                           id
                       from t2
                       group by id
                   )
                   union all
                   (
                       SELECT sum(val) as sum_val,
                           id
                       from t3
                       group by id
                   )
               ) as l
           group by id
       ) as d0
       right join (
           select id,
               val
           from t1
       ) as d1 on d0.id = d1.id
   order by d0.id;
   ```
   
   The final plan will look like:
   
   ```
   
+----------------------------------------------------------------------------+
   | Explain String(Nereids Planner)                                            
|
   
+----------------------------------------------------------------------------+
   | PLAN FRAGMENT 0                                                            
|
   |   OUTPUT EXPRS:                                                            
|
   |     sum_val[#28]                                                           
|
   |     val[#29]                                                               
|
   |     id[#30]                                                                
|
   |   PARTITION: UNPARTITIONED                                                 
|
   |                                                                            
|
   |   HAS_COLO_PLAN_NODE: false                                                
|
   |                                                                            
|
   |   VRESULT SINK                                                             
|
   |      MYSQL_PROTOCOL                                                        
|
   |                                                                            
|
   |   14:VMERGING-EXCHANGE                                                     
|
   |      offset: 0                                                             
|
   |      final projections: sum_val[#27], val[#25], id[#24]                    
|
   |      final project output tuple id: 11                                     
|
   |      distribute expr lists: id[#24]                                        
|
   |                                                                            
|
   | PLAN FRAGMENT 1                                                            
|
   |                                                                            
|
   |   PARTITION: HASH_PARTITIONED: id[#14]                                     
|
   |                                                                            
|
   |   HAS_COLO_PLAN_NODE: false                                                
|
   |                                                                            
|
   |   STREAM DATA SINK                                                         
|
   |     EXCHANGE ID: 14                                                        
|
   |     UNPARTITIONED                                                          
|
   | IS_MERGE: true                                                             
|
   |                                                                            
|
   |   13:VSORT(599)                                                            
|
   |   |  order by: id[#26] ASC                                                 
|
   |   |  algorithm: full sort                                                  
|
   |   |  local merge sort                                                      
|
   |   |  merge by exchange                                                     
|
   |   |  offset: 0                                                             
|
   |   |  distribute expr lists:                                                
|
   |   |                                                                        
|
   |   12:VHASH JOIN(595)                                                       
|
   |   |  join op: LEFT OUTER JOIN(BROADCAST)[]                                 
|
   |   |  equal join conjunct: (id[#14] = id[#12])                              
|
   |   |  cardinality=20                                                        
|
   |   |  vec output tuple id: 9                                                
|
   |   |  output tuple id: 9                                                    
|
   |   |  vIntermediate tuple ids: 8                                            
|
   |   |  hash output slot ids: 12 13 14 15                                     
|
   |   |  final projections: id[#16], val[#17], id[#18], sum_val[#19]           
|
   |   |  final project output tuple id: 9                                      
|
   |   |  distribute expr lists: id[#14]                                        
|
   |   |  distribute expr lists:                                                
|
   |   |                                                                        
|
   |   |----10:VEXCHANGE                                                        
|
   |   |       offset: 0                                                        
|
   |   |       distribute expr lists: id[#12]                                   
|
   |   |                                                                        
|
   |   11:VOlapScanNode(553)                                                    
|
   |      TABLE: test.t1(t1), PREAGGREGATION: ON                                
|
   |      partitions=1/1 (t1)                                                   
|
   |      tablets=4/4, tabletList=1772677938221,1772677938224,1772677938227 ... 
|
   |      cardinality=20, avgRowSize=978.0, numNodes=2                          
|
   |      pushAggOp=NONE                                                        
|
   |                                                                            
|
   | PLAN FRAGMENT 2                                                            
|
   |                                                                            
|
   |   PARTITION: HASH_PARTITIONED: id[#10]                                     
|
   |                                                                            
|
   |   HAS_COLO_PLAN_NODE: true                                                 
|
   |                                                                            
|
   |   STREAM DATA SINK                                                         
|
   |     EXCHANGE ID: 10                                                        
|
   |     UNPARTITIONED                                                          
|
   |                                                                            
|
   |   9:VAGGREGATE (merge finalize)(587)                                       
|
   |   |  output: sum(partial_sum(sum_val)[#11])[#13]                           
|
   |   |  group by: id[#10]                                                     
|
   |   |  sortByGroupKey:false                                                  
|
   |   |  cardinality=4                                                         
|
   |   |  distribute expr lists: id[#10]                                        
|
   |   |                                                                        
|
   |   8:VEXCHANGE                                                              
|
   |      offset: 0                                                             
|
   |      distribute expr lists:                                                
|
   |                                                                            
|
   | PLAN FRAGMENT 3                                                            
|
   |                                                                            
|
   |   PARTITION: RANDOM                                                        
|
   |                                                                            
|
   |   HAS_COLO_PLAN_NODE: false                                                
|
   |                                                                            
|
   |   STREAM DATA SINK                                                         
|
   |     EXCHANGE ID: 08                                                        
|
   |     HASH_PARTITIONED: id[#10]                                              
|
   |                                                                            
|
   |   7:VAGGREGATE (update serialize)(579)                                     
|
   |   |  STREAMING                                                             
|
   |   |  output: partial_sum(sum_val[#9])[#11]                                 
|
   |   |  group by: id[#8]                                                      
|
   |   |  sortByGroupKey:false                                                  
|
   |   |  cardinality=4                                                         
|
   |   |  distribute expr lists:                                                
|
   |   |                                                                        
|
   |   6:VUNION(575)                                                            
|
   |   |  distribute expr lists:                                                
|
   |   |  distribute expr lists:                                                
|
   |   |                                                                        
|
   |   |----5:VEXCHANGE                                                         
|
   |   |       offset: 0                                                        
|
   |   |       distribute expr lists: id[#6]                                    
|
   |   |                                                                        
|
   |   2:VEXCHANGE                                                              
|
   |      offset: 0                                                             
|
   |      distribute expr lists: id[#2]                                         
|
   |                                                                            
|
   | PLAN FRAGMENT 4                                                            
|
   |                                                                            
|
   |   PARTITION: HASH_PARTITIONED: id[#4]                                      
|
   |                                                                            
|
   |   HAS_COLO_PLAN_NODE: true                                                 
|
   |                                                                            
|
   |   STREAM DATA SINK                                                         
|
   |     EXCHANGE ID: 05                                                        
|
   |     RANDOM                                                                 
|
   |                                                                            
|
   |   4:VAGGREGATE (merge finalize)(567)                                       
|
   |   |  output: sum(val[#5])[#7]                                              
|
   |   |  group by: id[#4]                                                      
|
   |   |  sortByGroupKey:false                                                  
|
   |   |  cardinality=6                                                         
|
   |   |  distribute expr lists: id[#4]                                         
|
   |   |                                                                        
|
   |   3:VOlapScanNode(563)                                                     
|
   |      TABLE: test.t3(t3), PREAGGREGATION: ON                                
|
   |      partitions=1/1 (t3)                                                   
|
   |      tablets=4/4, tabletList=1772677938253,1772677938256,1772677938259 ... 
|
   |      cardinality=20, avgRowSize=978.0, numNodes=2                          
|
   |      pushAggOp=NONE                                                        
|
   |                                                                            
|
   | PLAN FRAGMENT 5                                                            
|
   |                                                                            
|
   |   PARTITION: HASH_PARTITIONED: id[#0]                                      
|
   |                                                                            
|
   |   HAS_COLO_PLAN_NODE: true                                                 
|
   |                                                                            
|
   |   STREAM DATA SINK                                                         
|
   |     EXCHANGE ID: 02                                                        
|
   |     RANDOM                                                                 
|
   |                                                                            
|
   |   1:VAGGREGATE (merge finalize)(558)                                       
|
   |   |  output: sum(val[#1])[#3]                                              
|
   |   |  group by: id[#0]                                                      
|
   |   |  sortByGroupKey:false                                                  
|
   |   |  cardinality=6                                                         
|
   |   |  distribute expr lists: id[#0]                                         
|
   |   |                                                                        
|
   |   0:VOlapScanNode(554)                                                     
|
   |      TABLE: test.t2(t2), PREAGGREGATION: ON                                
|
   |      partitions=1/1 (t2)                                                   
|
   |      tablets=4/4, tabletList=1772677938237,1772677938240,1772677938243 ... 
|
   |      cardinality=20, avgRowSize=978.0, numNodes=2                          
|
   |      pushAggOp=NONE                                                        
|
   |                                                                            
|
   |                                                                            
|
   |                                                                            
|
   | ========== STATISTICS ==========                                           
|
   | planned with unknown column statistics                                     
|
   
+----------------------------------------------------------------------------+
   164 rows in set (0.135 sec)
   ```
   
   All of tables mentioned above are in a colocated group and the distribution 
key is same as the group key. It is obvious that the two exchanges in plan 
fragment 3 is unnecessary. Since identical dirtibution keys and aggregation 
keys ensure that all the same aggregation keys of the two colocated tables only 
exist in the corresponding tablet, aggregation can be directly performed on a 
single machine after the union of the two tables to obtain the correct result.
   
   The current implementation adds a `PhysicalDistribute` operator for 
operators that require a distribution spec of `DistributionSpecAny`, whose 
child nodes have a distribution spec of `DistributionSpecHash` and a shuffle 
type of `NATURAL`.
   
   This operation has a distribution type of `DistributionSpecAny`, so the 
properties of `DistributionSpecHash` cannot be propagated up to the 
`SetOperation`(`UNION`/`EXCEPT`/`INTERSECT`) operator.
   
   THE PR revises the current logic: for such scenarios, the 
`PhysicalDistribute` operator will not be added if and only if all child nodes 
of the `SetOperation` belong to the same colocate group, have a distribution 
spec of `DistributionSpecHash` and use a shuffle type of `NATURAL`.
   
   For example, for sql like:
   
   ```sql
   SELECT sum(val) as sum_val,
       id
   from t2
   group by id
   union all
   SELECT sum(val) as sum_val,
       id
   from t3
   group by id;
   ```
   
   It will get a plan like:
   
   ```
   
+---------------------------------------------------------------------------------+
   | Explain String(Nereids Planner)                                            
     |
   
+---------------------------------------------------------------------------------+
   | PLAN FRAGMENT 0                                                            
     |
   |   OUTPUT EXPRS:                                                            
     |
   |     sum_val[#8]                                                            
     |
   |     id[#9]                                                                 
     |
   |   PARTITION: HASH_PARTITIONED: id[#4]                                      
     |
   |                                                                            
     |
   |   HAS_COLO_PLAN_NODE: true                                                 
     |
   |                                                                            
     |
   |   VRESULT SINK                                                             
     |
   |      MYSQL_PROTOCOL                                                        
     |
   |                                                                            
     |
   |   4:VUNION(169)                                                            
     |
   |   |  distribute expr lists: id[#2]                                         
     |
   |   |  distribute expr lists: id[#6]                                         
     |
   |   |                                                                        
     |
   |   |----3:VAGGREGATE (merge finalize)(165)                                  
     |
   |   |    |  output: sum(val[#5])[#7]                                         
     |
   |   |    |  group by: id[#4]                                                 
     |
   |   |    |  sortByGroupKey:false                                             
     |
   |   |    |  cardinality=6                                                    
     |
   |   |    |  distribute expr lists: id[#4]                                    
     |
   |   |    |                                                                   
     |
   |   |    2:VOlapScanNode(161)                                                
     |
   |   |       TABLE: test.t3(t3), PREAGGREGATION: ON                           
     |
   |   |       partitions=1/1 (t3)                                              
     |
   |   |       tablets=4/4, 
tabletList=1772677938253,1772677938256,1772677938259 ... |
   |   |       cardinality=20, avgRowSize=489.0, numNodes=2                     
     |
   |   |       pushAggOp=NONE                                                   
     |
   |   |                                                                        
     |
   |   1:VAGGREGATE (merge finalize)(160)                                       
     |
   |   |  output: sum(val[#1])[#3]                                              
     |
   |   |  group by: id[#0]                                                      
     |
   |   |  sortByGroupKey:false                                                  
     |
   |   |  cardinality=6                                                         
     |
   |   |  distribute expr lists: id[#0]                                         
     |
   |   |                                                                        
     |
   |   0:VOlapScanNode(156)                                                     
     |
   |      TABLE: test.t2(t2), PREAGGREGATION: ON                                
     |
   |      partitions=1/1 (t2)                                                   
     |
   |      tablets=4/4, tabletList=1772677938237,1772677938240,1772677938243 ... 
     |
   |      cardinality=20, avgRowSize=489.0, numNodes=2                          
     |
   |      pushAggOp=NONE                                                        
     |
   |                                                                            
     |
   |                                                                            
     |
   |                                                                            
     |
   | ========== STATISTICS ==========                                           
     |
   | planned with unknown column statistics                                     
     |
   
+---------------------------------------------------------------------------------+
   ```
   
   But for sql like:
   
   ```sql
   SELECT sum(val) as sum_val,
       id
   from t2
   group by id
   union all
   SELECT max(id) as sum_val,
     val as id
   from t3
   group by val;
   ```
   
   It will use plan like below:
   
   ```
   
+----------------------------------------------------------------------------+
   | Explain String(Nereids Planner)                                            
|
   
+----------------------------------------------------------------------------+
   | PLAN FRAGMENT 0                                                            
|
   |   OUTPUT EXPRS:                                                            
|
   |     sum_val[#12]                                                           
|
   |     id[#13]                                                                
|
   |   PARTITION: HASH_PARTITIONED: val[#6]                                     
|
   |                                                                            
|
   |   HAS_COLO_PLAN_NODE: true                                                 
|
   |                                                                            
|
   |   VRESULT SINK                                                             
|
   |      MYSQL_PROTOCOL                                                        
|
   |                                                                            
|
   |   7:VUNION(234)                                                            
|
   |   |  distribute expr lists:                                                
|
   |   |  distribute expr lists: id[#11]                                        
|
   |   |                                                                        
|
   |   |----6:VAGGREGATE (merge finalize)(226)                                  
|
   |   |    |  output: max(partial_max(id)[#7])[#9]                             
|
   |   |    |  group by: val[#6]                                                
|
   |   |    |  sortByGroupKey:false                                             
|
   |   |    |  cardinality=1                                                    
|
   |   |    |  final projections: CAST(sum_val[#9] AS bigint), val[#8]          
|
   |   |    |  final project output tuple id: 5                                 
|
   |   |    |  distribute expr lists: val[#6]                                   
|
   |   |    |                                                                   
|
   |   |    5:VEXCHANGE                                                         
|
   |   |       offset: 0                                                        
|
   |   |       distribute expr lists:                                           
|
   |   |                                                                        
|
   |   2:VEXCHANGE                                                              
|
   |      offset: 0                                                             
|
   |      distribute expr lists: id[#2]                                         
|
   |                                                                            
|
   | PLAN FRAGMENT 1                                                            
|
   |                                                                            
|
   |   PARTITION: HASH_PARTITIONED: id[#4]                                      
|
   |                                                                            
|
   |   HAS_COLO_PLAN_NODE: false                                                
|
   |                                                                            
|
   |   STREAM DATA SINK                                                         
|
   |     EXCHANGE ID: 05                                                        
|
   |     HASH_PARTITIONED: val[#6]                                              
|
   |                                                                            
|
   |   4:VAGGREGATE (update serialize)(218)                                     
|
   |   |  STREAMING                                                             
|
   |   |  output: partial_max(id[#4])[#7]                                       
|
   |   |  group by: val[#5]                                                     
|
   |   |  sortByGroupKey:false                                                  
|
   |   |  cardinality=1                                                         
|
   |   |  distribute expr lists: id[#4]                                         
|
   |   |                                                                        
|
   |   3:VOlapScanNode(214)                                                     
|
   |      TABLE: test.t3(t3), PREAGGREGATION: ON                                
|
   |      partitions=1/1 (t3)                                                   
|
   |      tablets=4/4, tabletList=1772677938253,1772677938256,1772677938259 ... 
|
   |      cardinality=1, avgRowSize=0.0, numNodes=2                             
|
   |      pushAggOp=NONE                                                        
|
   |                                                                            
|
   | PLAN FRAGMENT 2                                                            
|
   |                                                                            
|
   |   PARTITION: HASH_PARTITIONED: id[#0]                                      
|
   |                                                                            
|
   |   HAS_COLO_PLAN_NODE: true                                                 
|
   |                                                                            
|
   |   STREAM DATA SINK                                                         
|
   |     EXCHANGE ID: 02                                                        
|
   |     RANDOM                                                                 
|
   |                                                                            
|
   |   1:VAGGREGATE (merge finalize)(209)                                       
|
   |   |  output: sum(val[#1])[#3]                                              
|
   |   |  group by: id[#0]                                                      
|
   |   |  sortByGroupKey:false                                                  
|
   |   |  cardinality=1                                                         
|
   |   |  distribute expr lists: id[#0]                                         
|
   |   |                                                                        
|
   |   0:VOlapScanNode(205)                                                     
|
   |      TABLE: test.t2(t2), PREAGGREGATION: ON                                
|
   |      partitions=1/1 (t2)                                                   
|
   |      tablets=4/4, tabletList=1772677938237,1772677938240,1772677938243 ... 
|
   |      cardinality=1, avgRowSize=0.0, numNodes=2                             
|
   |      pushAggOp=NONE                                                        
|
   |                                                                            
|
   |                                                                            
|
   |                                                                            
|
   | ========== STATISTICS ==========                                           
|
   | planned with unknown column statistics                                     
|
   
+----------------------------------------------------------------------------+
   ```
   
   ### Release note
   
   None
   
   ### Check List (For Author)
   
   - Test <!-- At least one of them must be included. -->
       - [ ] Regression test
       - [ ] Unit Test
       - [ ] Manual test (add detailed scripts or steps below)
       - [ ] No need to test or manual test. Explain why:
           - [ ] This is a refactor/code format and no logic has been changed.
           - [ ] Previous test can cover this change.
           - [ ] No code files have been changed.
           - [ ] Other reason <!-- Add your reason?  -->
   
   - Behavior changed:
       - [x] No.
       - [ ] Yes. <!-- Explain the behavior change -->
   
   - Does this need documentation?
       - [x] No.
       - [ ] Yes. <!-- Add document PR link here. eg: 
https://github.com/apache/doris-website/pull/1214 -->
   
   ### Check List (For Reviewer who merge this PR)
   
   - [ ] Confirm the release note
   - [ ] Confirm test cases
   - [ ] Confirm document
   - [ ] Add branch pick label <!-- Add branch pick label that this PR should 
merge into -->
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to