morrySnow commented on code in PR #63366:
URL: https://github.com/apache/doris/pull/63366#discussion_r3411827399


##########
gensrc/thrift/Partitions.thrift:
##########
@@ -55,6 +55,40 @@ enum TPartitionType {
   MERGE_PARTITIONED = 9
 }
 
+enum TLocalPartitionType {
+  NOOP = 0,
+  // used to resume the global hash distribution because other distribution 
break the global hash distribution,
+  // such as PASSTHROUGH. and then JoinNode can shuffle data by the same hash 
distribution.
+  //
+  // for example:                                   look here, need resume to 
GLOBAL_EXECUTION_HASH_SHUFFLE
+  //                                                                           
 ↓
+  //   Node -> LocalExchangeNode(PASSTHROUGH) → JoinNode →  
LocalExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) → JoinNode
+  //                  ExchangeNode(BROADCAST) ↗                                
                                  ↑
+  //                                                                         
ExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE)
+  GLOBAL_EXECUTION_HASH_SHUFFLE = 1,
+  // used to rebalance data for rebalance data and add parallelism
+  //
+  // for example:          look here, need use LOCAL_EXECUTION_HASH_SHUFFLE to 
rebalance data
+  //                                         ↓
+  //  Scan(hash(id)) -> LocalExchangeNode(LOCAL_EXECUTION_HASH_SHUFFLE(id, 
name)) → AggregationNode(group by(id,name))

Review Comment:
   This example is not very rigorous. The `hash(id)` function also ensures that 
tuples with the same `id` and `name` are assigned to the same instance. It 
would be more reasonable to change it to "scan(hash(id, name))" and 
"agg(hash(id))".



##########
gensrc/thrift/Partitions.thrift:
##########
@@ -55,6 +55,40 @@ enum TPartitionType {
   MERGE_PARTITIONED = 9
 }
 
+enum TLocalPartitionType {
+  NOOP = 0,
+  // used to resume the global hash distribution because other distribution 
break the global hash distribution,
+  // such as PASSTHROUGH. and then JoinNode can shuffle data by the same hash 
distribution.
+  //
+  // for example:                                   look here, need resume to 
GLOBAL_EXECUTION_HASH_SHUFFLE
+  //                                                                           
 ↓
+  //   Node -> LocalExchangeNode(PASSTHROUGH) → JoinNode →  
LocalExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) → JoinNode
+  //                  ExchangeNode(BROADCAST) ↗                                
                                  ↑
+  //                                                                         
ExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE)
+  GLOBAL_EXECUTION_HASH_SHUFFLE = 1,
+  // used to rebalance data for rebalance data and add parallelism
+  //
+  // for example:          look here, need use LOCAL_EXECUTION_HASH_SHUFFLE to 
rebalance data
+  //                                         ↓
+  //  Scan(hash(id)) -> LocalExchangeNode(LOCAL_EXECUTION_HASH_SHUFFLE(id, 
name)) → AggregationNode(group by(id,name))
+  //
+  // the LOCAL_EXECUTION_HASH_SHUFFLE is necessary because the hash 
distribution of scan node is based on id,
+  // but the hash distribution of aggregation node is based on id and name, so 
we need to rebalance data by both
+  // id and name to make sure the data with same id and name can be sent to 
the same instance of aggregation node.
+  // and we can not use GLOBAL_EXECUTION_HASH_SHUFFLE(id, name) here, because
+  // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` is used to mapping 
partial global instance index to local
+  // instance index, and discard the other backend's instance index, the data 
not belong to the local instance will be
+  // discarded, which cause data loss.
+  LOCAL_EXECUTION_HASH_SHUFFLE = 2,
+  BUCKET_HASH_SHUFFLE = 3,
+  // round-robin partition, used to rebalance data for rebalance data and add 
parallelism
+  PASSTHROUGH = 4,
+  ADAPTIVE_PASSTHROUGH = 5,
+  BROADCAST = 6,
+  PASS_TO_ONE = 7,
+  LOCAL_MERGE_SORT = 8

Review Comment:
   all of these enum value should add comment to explain what it is



##########
gensrc/thrift/PlanNodes.thrift:
##########
@@ -1427,6 +1428,24 @@ struct TExchangeNode {
   4: optional Partitions.TPartitionType partition_type
 }
 
+struct TLocalExchangeNode {
+  1: required Partitions.TLocalPartitionType partition_type

Review Comment:
   all fields should be optional



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to