924060929 commented on PR #61184:
URL: https://github.com/apache/doris/pull/61184#issuecomment-4035921429

   > > > > 你好@wuxueyang96感谢
   > > > > 您提交此 PR,我已经在#59006中提交了相同的功能。但是此功能会破坏本地 shuffle 
并计算错误的结果,因此我在#60823中将其撤销。所以我们应该先重构本地 shuffle,然后再支持此功能。
   > > > 
   > > > 
   > > > 我不太确定本地桶的 shuffle 操作是否与此 PR 相同。此 PR 实际上想要消除 shuffle 操作,无论是本地桶 shuffle 
还是全局 shuffle。
   > > 
   > > 
   > > 我的 PR 
包含了在集合操作下消除交换的功能,因为支持桶混洗本身就要求另一端根据存储的哈希算法进行分布:基端不需要混洗,如果另一端不满足要求,则需要使用桶混洗。如果两端位于同一位置,则两端都不需要混洗,因为它们都满足存储哈希算法的分布要求。因此,我的
 PR 是你的 PR 的超集,更加抽象。
   > 
   > 
[实际上,我根据bf2e1c2](https://github.com/apache/doris/commit/bf2e1c2dda944e47a5e9bf34972ae772570ec1c0)重新构建了代码,我想我已经理解你的意思了。但是如果你看一下片段
 5,它仍然包含两个并集下面的交换,我只是想知道你最终想要实现的效果是什么。
   > 
   > ```
   > MySQL [(none)]> show frontends;
   > 
+-----------------------------------------+---------------+-------------+----------+-----------+---------+--------------------+----------+----------+------------+------+-------+-------------------+---------------------+---------------------+----------+--------+------------------------+------------------+-----------+
   > | Name                                    | Host          | EditLogPort | 
HttpPort | QueryPort | RpcPort | ArrowFlightSqlPort | Role     | IsMaster | 
ClusterId  | Join | Alive | ReplayedJournalId | LastStartTime       | 
LastHeartbeat       | IsHelper | ErrMsg | Version                | 
CurrentConnected | LiveSince |
   > 
+-----------------------------------------+---------------+-------------+----------+-----------+---------+--------------------+----------+----------+------------+------+-------+-------------------+---------------------+---------------------+----------+--------+------------------------+------------------+-----------+
   > | fe_781cb7e1_a9c0_49ee_845f_9ffa707ddeeb | 10.37.114.244 | 9010        | 
8030     | 9030      | 9020    | 8070               | FOLLOWER | true     | 
1202823493 | true | true  | 384               | 2026-03-10 19:50:11 | 
2026-03-10 20:15:32 | true     |        | doris-0.0.0-bf2e1c2dda | Yes          
    | NULL      |
   > 
+-----------------------------------------+---------------+-------------+----------+-----------+---------+--------------------+----------+----------+------------+------+-------+-------------------+---------------------+---------------------+----------+--------+------------------------+------------------+-----------+
   > 1 row in set (0.017 sec)
   > 
   > MySQL [(none)]> use test;
   > Reading table information for completion of table and column names
   > You can turn off this feature to get a quicker startup with -A
   > 
   > Database changed
   > MySQL [test]> explain select d0.sum_val,     d1.val,     d1.id from (      
   select sum(sum_val) as sum_val,             id         from (                
 (                     SELECT sum(val) as sum_val,                         id   
                  from t2                     group by id                 )     
            union all                 (                     SELECT sum(val) as 
sum_val,                         id                     from t3                 
    group by id                 )             ) as l         group by id     ) 
as d0     right join (         select id,             val         from t1     ) 
as d1 on d0.id = d1.id order by d0.id;
   > 
+----------------------------------------------------------------------------------------------------------------------------------------------------+
   > | Explain String(Nereids Planner)                                          
                                                                          |
   > 
+----------------------------------------------------------------------------------------------------------------------------------------------------+
   > | PLAN FRAGMENT 0                                                          
                                                                          |
   > |   OUTPUT EXPRS:                                                          
                                                                          |
   > |     sum_val[#44]                                                         
                                                                          |
   > |     val[#45]                                                             
                                                                          |
   > |     id[#46]                                                              
                                                                          |
   > |   PARTITION: UNPARTITIONED                                               
                                                                          |
   > |                                                                          
                                                                          |
   > |   HAS_COLO_PLAN_NODE: false                                              
                                                                          |
   > |                                                                          
                                                                          |
   > |   VRESULT SINK                                                           
                                                                          |
   > |      MYSQL_PROTOCOL                                                      
                                                                          |
   > |                                                                          
                                                                          |
   > |   19:VMERGING-EXCHANGE                                                   
                                                                          |
   > |      offset: 0                                                           
                                                                          |
   > |      final projections: sum_val[#43], val[#41], id[#40]                  
                                                                          |
   > |      final project output tuple id: 17                                   
                                                                          |
   > |      distribute expr lists: id[#40]                                      
                                                                          |
   > |                                                                          
                                                                          |
   > | PLAN FRAGMENT 1                                                          
                                                                          |
   > |                                                                          
                                                                          |
   > |   PARTITION: HASH_PARTITIONED: id[#27], r1$c$1[#29]                      
                                                                          |
   > |                                                                          
                                                                          |
   > |   HAS_COLO_PLAN_NODE: false                                              
                                                                          |
   > |                                                                          
                                                                          |
   > |   STREAM DATA SINK                                                       
                                                                          |
   > |     EXCHANGE ID: 19                                                      
                                                                          |
   > |     UNPARTITIONED                                                        
                                                                          |
   > | IS_MERGE: true                                                           
                                                                          |
   > |                                                                          
                                                                          |
   > |   18:VSORT(760)                                                          
                                                                          |
   > |   |  order by: id[#42] ASC                                               
                                                                          |
   > |   |  algorithm: full sort                                                
                                                                          |
   > |   |  local merge sort                                                    
                                                                          |
   > |   |  merge by exchange                                                   
                                                                          |
   > |   |  offset: 0                                                           
                                                                          |
   > |   |  distribute expr lists:                                              
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   17:VHASH JOIN(752)                                                     
                                                                          |
   > |   |  join op: LEFT OUTER JOIN(PARTITIONED)[]                             
                                                                          |
   > |   |  equal join conjunct: (id[#27] = id[#22])                            
                                                                          |
   > |   |  equal join conjunct: (r1$c$1[#29] = r2$c$2[#24])                    
                                                                          |
   > |   |  cardinality=20                                                      
                                                                          |
   > |   |  vec output tuple id: 15                                             
                                                                          |
   > |   |  output tuple id: 15                                                 
                                                                          |
   > |   |  vIntermediate tuple ids: 14                                         
                                                                          |
   > |   |  hash output slot ids: 22 23 27 28                                   
                                                                          |
   > |   |  final projections: id[#30], val[#31], id[#33], sum_val[#34]         
                                                                          |
   > |   |  final project output tuple id: 15                                   
                                                                          |
   > |   |  distribute expr lists: id[#27], r1$c$1[#29]                         
                                                                          |
   > |   |  distribute expr lists: id[#22], r2$c$2[#24]                         
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   |----14:VEXCHANGE                                                      
                                                                          |
   > |   |       offset: 0                                                      
                                                                          |
   > |   |       distribute expr lists: id[#22]                                 
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   16:VEXCHANGE                                                           
                                                                          |
   > |      offset: 0                                                           
                                                                          |
   > |      distribute expr lists: id[#27]                                      
                                                                          |
   > |                                                                          
                                                                          |
   > | PLAN FRAGMENT 2                                                          
                                                                          |
   > |                                                                          
                                                                          |
   > |   PARTITION: HASH_PARTITIONED: id[#25]                                   
                                                                          |
   > |                                                                          
                                                                          |
   > |   HAS_COLO_PLAN_NODE: false                                              
                                                                          |
   > |                                                                          
                                                                          |
   > |   STREAM DATA SINK                                                       
                                                                          |
   > |     EXCHANGE ID: 16                                                      
                                                                          |
   > |     HASH_PARTITIONED: id[#27], r1$c$1[#29]                               
                                                                          |
   > |                                                                          
                                                                          |
   > |   15:VOlapScanNode(680)                                                  
                                                                          |
   > |      TABLE: test.t1(t1), PREAGGREGATION: ON                              
                                                                          |
   > |      partitions=1/1 (t1)                                                 
                                                                          |
   > |      tablets=10/10, tabletList=1773143411383,1773143411386,1773143411389 
...                                                                       |
   > |      cardinality=20, avgRowSize=1952.0, numNodes=2                       
                                                                          |
   > |      pushAggOp=NONE                                                      
                                                                          |
   > |      final projections: id[#25], val[#26], if(id[#25] IN (1, 2, 3, 4, 5, 
6, 7, 8, 9, 10), CAST(random(0, 127) AS smallint), 0)                     |
   > |      final project output tuple id: 13                                   
                                                                          |
   > |                                                                          
                                                                          |
   > | PLAN FRAGMENT 3                                                          
                                                                          |
   > |                                                                          
                                                                          |
   > |   PARTITION: HASH_PARTITIONED: skewValue$c$13[#16]                       
                                                                          |
   > |                                                                          
                                                                          |
   > |   HAS_COLO_PLAN_NODE: true                                               
                                                                          |
   > |                                                                          
                                                                          |
   > |   STREAM DATA SINK                                                       
                                                                          |
   > |     EXCHANGE ID: 14                                                      
                                                                          |
   > |     HASH_PARTITIONED: id[#22], r2$c$2[#24]                               
                                                                          |
   > |                                                                          
                                                                          |
   > |   13:VHASH JOIN(740)                                                     
                                                                          |
   > |   |  join op: RIGHT OUTER JOIN(PARTITIONED)[]                            
                                                                          |
   > |   |  equal join conjunct: (skewValue$c$13[#16] = id[#12])                
                                                                          |
   > |   |  cardinality=50                                                      
                                                                          |
   > |   |  vec output tuple id: 11                                             
                                                                          |
   > |   |  output tuple id: 11                                                 
                                                                          |
   > |   |  vIntermediate tuple ids: 10                                         
                                                                          |
   > |   |  hash output slot ids: 17 12 13                                      
                                                                          |
   > |   |  final projections: id[#20], sum_val[#21], 
if(cast(explodeColumn$c$14 as SMALLINT)[#19] IS NULL, 0, 
cast(explodeColumn$c$14 as SMALLINT)[#19]) |
   > |   |  final project output tuple id: 11                                   
                                                                          |
   > |   |  distribute expr lists: skewValue$c$13[#16]                          
                                                                          |
   > |   |  distribute expr lists: id[#12]                                      
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   |----9:VAGGREGATE (merge finalize)(736)                                
                                                                          |
   > |   |    |  output: sum(partial_sum(sum_val)[#11])[#13]                    
                                                                          |
   > |   |    |  group by: id[#10]                                              
                                                                          |
   > |   |    |  sortByGroupKey:false                                           
                                                                          |
   > |   |    |  cardinality=10                                                 
                                                                          |
   > |   |    |  distribute expr lists: id[#10]                                 
                                                                          |
   > |   |    |                                                                 
                                                                          |
   > |   |    8:VEXCHANGE                                                       
                                                                          |
   > |   |       offset: 0                                                      
                                                                          |
   > |   |       distribute expr lists:                                         
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   12:VEXCHANGE                                                           
                                                                          |
   > |      offset: 0                                                           
                                                                          |
   > |      distribute expr lists:                                              
                                                                          |
   > |                                                                          
                                                                          |
   > | PLAN FRAGMENT 4                                                          
                                                                          |
   > |                                                                          
                                                                          |
   > |   PARTITION: UNPARTITIONED                                               
                                                                          |
   > |                                                                          
                                                                          |
   > |   HAS_COLO_PLAN_NODE: false                                              
                                                                          |
   > |                                                                          
                                                                          |
   > |   STREAM DATA SINK                                                       
                                                                          |
   > |     EXCHANGE ID: 12                                                      
                                                                          |
   > |     HASH_PARTITIONED: skewValue$c$13[#16]                                
                                                                          |
   > |                                                                          
                                                                          |
   > |   11:VTABLE FUNCTION NODE(694)                                           
                                                                          |
   > |   |  table function: explode_numbers(128)                                
                                                                          |
   > |   |  lateral view tuple id: 8                                            
                                                                          |
   > |   |  output slot id: 14 15                                               
                                                                          |
   > |   |  cardinality=-1                                                      
                                                                          |
   > |   |  final projections: skewValue$c$13[#14], 
CAST(explodeColumn$c$14[#15] AS smallint)                                       
                      |
   > |   |  final project output tuple id: 9                                    
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   10:VUNION(690)                                                         
                                                                          |
   > |      constant exprs:                                                     
                                                                          |
   > |          1                                                               
                                                                          |
   > |          2                                                               
                                                                          |
   > |          3                                                               
                                                                          |
   > |          4                                                               
                                                                          |
   > |          5                                                               
                                                                          |
   > |          6                                                               
                                                                          |
   > |          7                                                               
                                                                          |
   > |          8                                                               
                                                                          |
   > |          9                                                               
                                                                          |
   > |          10                                                              
                                                                          |
   > |                                                                          
                                                                          |
   > | PLAN FRAGMENT 5                                                          
                                                                          |
   > |                                                                          
                                                                          |
   > |   PARTITION: RANDOM                                                      
                                                                          |
   > |                                                                          
                                                                          |
   > |   HAS_COLO_PLAN_NODE: false                                              
                                                                          |
   > |                                                                          
                                                                          |
   > |   STREAM DATA SINK                                                       
                                                                          |
   > |     EXCHANGE ID: 08                                                      
                                                                          |
   > |     HASH_PARTITIONED: id[#10]                                            
                                                                          |
   > |                                                                          
                                                                          |
   > |   7:VAGGREGATE (update serialize)(728)                                   
                                                                          |
   > |   |  STREAMING                                                           
                                                                          |
   > |   |  output: partial_sum(sum_val[#9])[#11]                               
                                                                          |
   > |   |  group by: id[#8]                                                    
                                                                          |
   > |   |  sortByGroupKey:false                                                
                                                                          |
   > |   |  cardinality=10                                                      
                                                                          |
   > |   |  distribute expr lists:                                              
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   6:VUNION(724)                                                          
                                                                          |
   > |   |  distribute expr lists:                                              
                                                                          |
   > |   |  distribute expr lists:                                              
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   |----5:VEXCHANGE                                                       
                                                                          |
   > |   |       offset: 0                                                      
                                                                          |
   > |   |       distribute expr lists: id[#6]                                  
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   2:VEXCHANGE                                                            
                                                                          |
   > |      offset: 0                                                           
                                                                          |
   > |      distribute expr lists: id[#2]                                       
                                                                          |
   > |                                                                          
                                                                          |
   > | PLAN FRAGMENT 6                                                          
                                                                          |
   > |                                                                          
                                                                          |
   > |   PARTITION: HASH_PARTITIONED: id[#4]                                    
                                                                          |
   > |                                                                          
                                                                          |
   > |   HAS_COLO_PLAN_NODE: true                                               
                                                                          |
   > |                                                                          
                                                                          |
   > |   STREAM DATA SINK                                                       
                                                                          |
   > |     EXCHANGE ID: 05                                                      
                                                                          |
   > |     RANDOM                                                               
                                                                          |
   > |                                                                          
                                                                          |
   > |   4:VAGGREGATE (merge finalize)(716)                                     
                                                                          |
   > |   |  output: sum(val[#5])[#7]                                            
                                                                          |
   > |   |  group by: id[#4]                                                    
                                                                          |
   > |   |  sortByGroupKey:false                                                
                                                                          |
   > |   |  cardinality=10                                                      
                                                                          |
   > |   |  distribute expr lists: id[#4]                                       
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   3:VOlapScanNode(712)                                                   
                                                                          |
   > |      TABLE: test.t3(t3), PREAGGREGATION: ON                              
                                                                          |
   > |      partitions=1/1 (t3)                                                 
                                                                          |
   > |      tablets=10/10, tabletList=1773143411452,1773143411455,1773143411458 
...                                                                       |
   > |      cardinality=20, avgRowSize=1952.0, numNodes=2                       
                                                                          |
   > |      pushAggOp=NONE                                                      
                                                                          |
   > |                                                                          
                                                                          |
   > | PLAN FRAGMENT 7                                                          
                                                                          |
   > |                                                                          
                                                                          |
   > |   PARTITION: HASH_PARTITIONED: id[#0]                                    
                                                                          |
   > |                                                                          
                                                                          |
   > |   HAS_COLO_PLAN_NODE: true                                               
                                                                          |
   > |                                                                          
                                                                          |
   > |   STREAM DATA SINK                                                       
                                                                          |
   > |     EXCHANGE ID: 02                                                      
                                                                          |
   > |     RANDOM                                                               
                                                                          |
   > |                                                                          
                                                                          |
   > |   1:VAGGREGATE (merge finalize)(707)                                     
                                                                          |
   > |   |  output: sum(val[#1])[#3]                                            
                                                                          |
   > |   |  group by: id[#0]                                                    
                                                                          |
   > |   |  sortByGroupKey:false                                                
                                                                          |
   > |   |  cardinality=10                                                      
                                                                          |
   > |   |  distribute expr lists: id[#0]                                       
                                                                          |
   > |   |                                                                      
                                                                          |
   > |   0:VOlapScanNode(703)                                                   
                                                                          |
   > |      TABLE: test.t2(t2), PREAGGREGATION: ON                              
                                                                          |
   > |      partitions=1/1 (t2)                                                 
                                                                          |
   > |      tablets=10/10, tabletList=1773143411417,1773143411420,1773143411423 
...                                                                       |
   > |      cardinality=20, avgRowSize=1952.0, numNodes=2                       
                                                                          |
   > |      pushAggOp=NONE                                                      
                                                                          |
   > |                                                                          
                                                                          |
   > |                                                                          
                                                                          |
   > |                                                                          
                                                                          |
   > | ========== STATISTICS ==========                                         
                                                                          |
   > 
+----------------------------------------------------------------------------------------------------------------------------------------------------+
   > 228 rows in set (0.030 sec)
   > ```
   
   It seems to be some scenarios that need to be optimized, but the main idea 
of optimization is still bucket shuffle, allowing the Cascades framework to 
automatically identify lower layers that meet bucket distribution and ignore 
exchange


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to