Dandandan commented on a change in pull request #55:
URL: https://github.com/apache/arrow-datafusion/pull/55#discussion_r619791415
##########
File path: datafusion/src/physical_optimizer/coalesce_batches.rs
##########
@@ -58,7 +59,17 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// See https://issues.apache.org/jira/browse/ARROW-11068
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
- || plan_any.downcast_ref::<RepartitionExec>().is_some();
+ || {
+ match plan_any.downcast_ref::<RepartitionExec>() {
+ Some(p) => match p.partitioning() {
+ // do not coalesce hash partitions since other plans
like partitioned hash
+ // join depends on it empty batches for outter joins
Review comment:
```suggestion
// join depends on it empty batches for outer joins
```
##########
File path: datafusion/src/physical_optimizer/coalesce_batches.rs
##########
@@ -58,7 +59,17 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// See https://issues.apache.org/jira/browse/ARROW-11068
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
- || plan_any.downcast_ref::<RepartitionExec>().is_some();
+ || {
+ match plan_any.downcast_ref::<RepartitionExec>() {
+ Some(p) => match p.partitioning() {
+ // do not coalesce hash partitions since other plans
like partitioned hash
Review comment:
I think this should be something to be solved in the hash join
implementation, so if we merge this as is I think that deserves a github issue.
Also you might want to use the matches macro here: `if
matches!(p.partitioning(), Partitioning::Hash(_, _))`
##########
File path: datafusion/src/physical_optimizer/coalesce_batches.rs
##########
@@ -58,7 +59,17 @@ impl PhysicalOptimizerRule for CoalesceBatches {
// See https://issues.apache.org/jira/browse/ARROW-11068
let wrap_in_coalesce = plan_any.downcast_ref::<FilterExec>().is_some()
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
- || plan_any.downcast_ref::<RepartitionExec>().is_some();
+ || {
+ match plan_any.downcast_ref::<RepartitionExec>() {
+ Some(p) => match p.partitioning() {
+ // do not coalesce hash partitions since other plans
like partitioned hash
Review comment:
I think this should be something to be solved in the hash join
implementation, so if we merge this as is I think that deserves a github issue.
Also you might want to use the matches macro here:
`!matches!(p.partitioning(), Partitioning::Hash(_, _))`
##########
File path: datafusion/src/execution/dataframe_impl.rs
##########
@@ -252,7 +252,12 @@ mod tests {
let right = test_table()?.select_columns(&["c1", "c3"])?;
let left_rows = left.collect().await?;
let right_rows = right.collect().await?;
- let join = left.join(right, JoinType::Inner, &["c1"], &["c1"])?;
+ let join = left.join(
+ right,
+ JoinType::Inner,
+ vec![Column::from_name("c1".to_string())],
Review comment:
I think it might be useful to keep supporting the string syntax to keep
the dataframe api simple, but also support qualified references.
Maybe the API could be used `column: Vec<Into<Column>>` and includes an
implementation for `&str`?
##########
File path: datafusion/src/execution/dataframe_impl.rs
##########
@@ -252,7 +252,12 @@ mod tests {
let right = test_table()?.select_columns(&["c1", "c3"])?;
let left_rows = left.collect().await?;
let right_rows = right.collect().await?;
- let join = left.join(right, JoinType::Inner, &["c1"], &["c1"])?;
+ let join = left.join(
+ right,
+ JoinType::Inner,
+ vec![Column::from_name("c1".to_string())],
Review comment:
I think it might be useful to keep supporting the string syntax to keep
the dataframe api simple, but also support qualified names.
Maybe the API could be used `column: Vec<Into<Column>>` and includes an
implementation for `&str`?
##########
File path: datafusion/src/physical_plan/hash_join.rs
##########
@@ -891,6 +898,36 @@ impl Stream for HashJoinStream {
}
Some(result)
}
+ // If maybe_batch is None and num_output_rows is 0, that means
right side batch was
+ // empty and has been coalesced to None. Fill right side with
Null if preserve_left
+ // is true.
+ None if self.preserve_left && self.num_output_rows == 0 => {
Review comment:
I think this partially resolves a more general issue with the left join,
which is that it doesn't keep track of unmatched left rows across batches.
https://issues.apache.org/jira/browse/ARROW-10971
Maybe we can add a TODO here / issue that we should generalize this to
produce rows that were not matched.
This looks like a great start for that 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]