xudong963 commented on code in PR #16956:
URL: https://github.com/apache/datafusion/pull/16956#discussion_r2787472661
##########
datafusion/core/tests/physical_optimizer/partition_statistics.rs:
##########
@@ -1045,189 +760,110 @@ mod test {
}
Review Comment:
Would be good to see tests call validate_statistics_with_data() to verify
real execution.
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -1197,21 +953,56 @@ impl ExecutionPlan for HashJoinExec {
}
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- if partition.is_some() {
- return Ok(Statistics::new_unknown(&self.schema()));
+ match (partition, self.mode) {
Review Comment:
nit:
```rust
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
let (left_stats, right_stats) = match (partition, self.mode) {
(Some(p), PartitionMode::CollectLeft) => {
(self.left.partition_statistics(None)?,
self.right.partition_statistics(Some(p))?)
}
(Some(p), PartitionMode::Partitioned) => {
(self.left.partition_statistics(Some(p))?,
self.right.partition_statistics(Some(p))?)
}
(None, _) | (Some(_), PartitionMode::Auto) => {
(self.left.partition_statistics(None)?,
self.right.partition_statistics(None)?)
}
};
let stats = estimate_join_statistics(left_stats, right_stats,
&self.on, &self.join_type, &self.join_schema)?;
Ok(stats.project(self.projection.as_ref()))
}
```
##########
datafusion/physical-plan/src/joins/hash_join.rs:
##########
@@ -892,21 +892,56 @@ impl ExecutionPlan for HashJoinExec {
}
fn partition_statistics(&self, partition: Option<usize>) ->
Result<Statistics> {
- if partition.is_some() {
- return Ok(Statistics::new_unknown(&self.schema()));
+ match (partition, self.mode) {
+ // For CollectLeft mode, the left side is collected into a single
partition,
+ // so all left partitions are available to each output partition.
+ // For the right side, we need the specific partition statistics.
+ (Some(partition), PartitionMode::CollectLeft) => {
+ let left_stats = self.left.partition_statistics(None)?;
+ let right_stats =
self.right.partition_statistics(Some(partition))?;
+
+ let stats = estimate_join_statistics(
+ left_stats,
+ right_stats,
+ self.on.clone(),
+ &self.join_type,
+ &self.join_schema,
+ )?;
+ Ok(stats.project(self.projection.as_ref()))
+ }
+
+ // For Partitioned mode, both sides are partitioned, so each
output partition
+ // only has access to the corresponding partition from both sides.
+ (Some(partition), PartitionMode::Partitioned) => {
+ let left_stats =
self.left.partition_statistics(Some(partition))?;
+ let right_stats =
self.right.partition_statistics(Some(partition))?;
+
+ let stats = estimate_join_statistics(
+ left_stats,
+ right_stats,
+ self.on.clone(),
+ &self.join_type,
+ &self.join_schema,
+ )?;
+ Ok(stats.project(self.projection.as_ref()))
+ }
+
+ // For Auto mode or when no specific partition is requested, fall
back to
Review Comment:
Good for me to keep the old way!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]