alamb commented on a change in pull request #965:
URL: https://github.com/apache/arrow-datafusion/pull/965#discussion_r707545447



##########
File path: datafusion/src/datasource/parquet.rs
##########
@@ -440,8 +427,8 @@ mod tests {
             .await;
 
         // test metadata
-        assert_eq!(table.statistics().num_rows, Some(8));
-        assert_eq!(table.statistics().total_byte_size, Some(671));
+        assert_eq!(exec.statistics().num_rows, Some(8));
+        assert_eq!(exec.statistics().total_byte_size, Some(671));

Review comment:
       👍 

##########
File path: datafusion/src/physical_optimizer/aggregate_statistics.rs
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::ExecutionConfig;
+use crate::physical_plan::empty::EmptyExec;
+use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{
+    expressions, AggregateExpr, ColumnStatistics, ExecutionPlan, Statistics,
+};
+use crate::scalar::ScalarValue;
+
+use super::optimizer::PhysicalOptimizerRule;
+use super::utils::optimize_children;
+use crate::error::Result;
+
+/// Optimizer that uses available statistics for aggregate functions
+pub struct AggregateStatistics {}
+
+impl AggregateStatistics {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for AggregateStatistics {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        execution_config: &ExecutionConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(partial_agg_exec) = take_optimizable(&*plan) {
+            let partial_agg_exec = partial_agg_exec
+                .as_any()
+                .downcast_ref::<HashAggregateExec>()
+                .expect("take_optimizable() ensures that this is a 
HashAggregateExec");
+            let stats = partial_agg_exec.input().statistics();
+            let mut projections = vec![];
+            for expr in partial_agg_exec.aggr_expr() {
+                if let Some((num_rows, name)) = 
take_optimizable_count(&**expr, &stats) {
+                    projections.push((expressions::lit(num_rows), 
name.to_owned()));
+                } else if let Some((min, name)) = 
take_optimizable_min(&**expr, &stats) {
+                    projections.push((expressions::lit(min), name.to_owned()));
+                } else if let Some((max, name)) = 
take_optimizable_max(&**expr, &stats) {
+                    projections.push((expressions::lit(max), name.to_owned()));
+                } else {
+                    // TODO: we need all aggr_expr to be resolved (cf TODO 
fullres)

Review comment:
       I don't understand the notation `cf TODO fullres`

##########
File path: datafusion/tests/statistics.rs
##########
@@ -0,0 +1,284 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       ❤️ 

##########
File path: datafusion/tests/sql.rs
##########
@@ -1896,6 +1896,28 @@ async fn left_join() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn left_join_unbalanced() -> Result<()> {
+    // the t1_id is larger than t2_id so the hash_build_probe_order optimizer 
should kick in
+    let mut ctx = create_join_context_unbalanced("t1_id", "t2_id")?;
+    let equivalent_sql = [
+        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t1_id = t2_id 
ORDER BY t1_id",
+        "SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON t2_id = t1_id 
ORDER BY t1_id",

Review comment:
       ANother good test would be to select columns in a different order than 
t1 and then t2 -- for example:
   ```
           "SELECT t2_name, t1_name, t1_id  FROM t1 LEFT JOIN t2 ON t2_id = 
t1_id ORDER BY t1_id",
   ```
   

##########
File path: datafusion/src/physical_optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one

Review comment:
       I also went through the logic in here and it looks good to me

##########
File path: datafusion/src/physical_optimizer/aggregate_statistics.rs
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::ExecutionConfig;
+use crate::physical_plan::empty::EmptyExec;
+use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec};
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{
+    expressions, AggregateExpr, ColumnStatistics, ExecutionPlan, Statistics,
+};
+use crate::scalar::ScalarValue;
+
+use super::optimizer::PhysicalOptimizerRule;
+use super::utils::optimize_children;
+use crate::error::Result;
+
+/// Optimizer that uses available statistics for aggregate functions
+pub struct AggregateStatistics {}
+
+impl AggregateStatistics {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl PhysicalOptimizerRule for AggregateStatistics {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        execution_config: &ExecutionConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        if let Some(partial_agg_exec) = take_optimizable(&*plan) {
+            let partial_agg_exec = partial_agg_exec
+                .as_any()
+                .downcast_ref::<HashAggregateExec>()
+                .expect("take_optimizable() ensures that this is a 
HashAggregateExec");
+            let stats = partial_agg_exec.input().statistics();
+            let mut projections = vec![];
+            for expr in partial_agg_exec.aggr_expr() {
+                if let Some((num_rows, name)) = 
take_optimizable_count(&**expr, &stats) {
+                    projections.push((expressions::lit(num_rows), 
name.to_owned()));
+                } else if let Some((min, name)) = 
take_optimizable_min(&**expr, &stats) {
+                    projections.push((expressions::lit(min), name.to_owned()));
+                } else if let Some((max, name)) = 
take_optimizable_max(&**expr, &stats) {
+                    projections.push((expressions::lit(max), name.to_owned()));
+                } else {
+                    // TODO: we need all aggr_expr to be resolved (cf TODO 
fullres)
+                    break;
+                }
+            }
+
+            // TODO fullres: use statistics even if not all aggr_expr could be 
resolved

Review comment:
       this is a cool possible follow on project that would work well for new 
contributors -- I would be happy to file a ticket if you agree.

##########
File path: datafusion/src/physical_optimizer/aggregate_statistics.rs
##########
@@ -0,0 +1,384 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;

Review comment:
       I went over the logic in this module carefully and I like it and agree 
with @houqp 

##########
File path: datafusion/src/physical_optimizer/hash_build_probe_order.rs
##########
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilizing exact statistics from sources to avoid scanning data
+use std::sync::Arc;
+
+use arrow::datatypes::Schema;
+
+use crate::execution::context::ExecutionConfig;
+use crate::logical_plan::JoinType;
+use crate::physical_plan::cross_join::CrossJoinExec;
+use crate::physical_plan::expressions::Column;
+use crate::physical_plan::hash_join::HashJoinExec;
+use crate::physical_plan::projection::ProjectionExec;
+use crate::physical_plan::{ExecutionPlan, PhysicalExpr};
+
+use super::optimizer::PhysicalOptimizerRule;
+use super::utils::optimize_children;
+use crate::error::Result;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+impl HashBuildProbeOrder {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+fn should_swap_join_order(left: &dyn ExecutionPlan, right: &dyn ExecutionPlan) 
-> bool {
+    let left_rows = left.statistics().num_rows;
+    let right_rows = right.statistics().num_rows;
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+fn supports_swap(join_type: JoinType) -> bool {
+    match join_type {
+        JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
true,
+        JoinType::Semi | JoinType::Anti => false,
+    }
+}
+
+fn swap_join_type(join_type: JoinType) -> JoinType {
+    match join_type {
+        JoinType::Inner => JoinType::Inner,
+        JoinType::Full => JoinType::Full,
+        JoinType::Left => JoinType::Right,
+        JoinType::Right => JoinType::Left,
+        _ => unreachable!(),
+    }
+}
+
+/// When the order of the join is changed by the optimizer,
+/// the columns in the output should not be impacted.
+/// This helper creates the expressions that will allow to swap
+/// back the values from the original left as first columns and
+/// those on the right next
+fn swap_reverting_projection(
+    left_schema: &Schema,
+    right_schema: &Schema,
+) -> Vec<(Arc<dyn PhysicalExpr>, String)> {
+    let right_cols = right_schema.fields().iter().enumerate().map(|(i, f)| {
+        (
+            Arc::new(Column::new(f.name(), i)) as Arc<dyn PhysicalExpr>,
+            f.name().to_owned(),
+        )
+    });
+    let right_len = right_cols.len();
+    let left_cols = left_schema.fields().iter().enumerate().map(|(i, f)| {
+        (
+            Arc::new(Column::new(f.name(), right_len + i)) as Arc<dyn 
PhysicalExpr>,
+            f.name().to_owned(),
+        )
+    });
+
+    left_cols.chain(right_cols).collect()
+}
+
+impl PhysicalOptimizerRule for HashBuildProbeOrder {
+    fn optimize(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        execution_config: &ExecutionConfig,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let plan = optimize_children(self, plan, execution_config)?;
+        if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+            let left = hash_join.left();
+            let right = hash_join.right();
+            if should_swap_join_order(&**left, &**right)
+                && supports_swap(*hash_join.join_type())
+            {
+                let new_join = HashJoinExec::try_new(
+                    Arc::clone(right),
+                    Arc::clone(left),
+                    hash_join
+                        .on()
+                        .iter()
+                        .map(|(l, r)| (r.clone(), l.clone()))
+                        .collect(),
+                    &swap_join_type(*hash_join.join_type()),
+                    *hash_join.partition_mode(),
+                )?;
+                let proj = ProjectionExec::try_new(
+                    swap_reverting_projection(&*left.schema(), 
&*right.schema()),
+                    Arc::new(new_join),
+                )?;
+                return Ok(Arc::new(proj));
+            }
+        } else if let Some(cross_join) = 
plan.as_any().downcast_ref::<CrossJoinExec>() {
+            let left = cross_join.left();
+            let right = cross_join.right();
+            if should_swap_join_order(&**left, &**right) {
+                let new_join =
+                    CrossJoinExec::try_new(Arc::clone(right), 
Arc::clone(left))?;
+                let proj = ProjectionExec::try_new(
+                    swap_reverting_projection(&*left.schema(), 
&*right.schema()),
+                    Arc::new(new_join),
+                )?;
+                return Ok(Arc::new(proj));
+            }
+        }
+        Ok(plan)
+    }
+
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::{
+        physical_plan::{hash_join::PartitionMode, Statistics},
+        test::exec::StatisticsExec,
+    };
+
+    use super::*;
+    use std::sync::Arc;
+
+    use arrow::datatypes::{DataType, Field, Schema};
+
+    fn create_big_and_small() -> (Arc<dyn ExecutionPlan>, Arc<dyn 
ExecutionPlan>) {
+        let big = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(100000),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("big_col", DataType::Int32, false)]),
+        ));
+
+        let small = Arc::new(StatisticsExec::new(
+            Statistics {
+                num_rows: Some(10),
+                ..Default::default()
+            },
+            Schema::new(vec![Field::new("small_col", DataType::Int32, false)]),
+        ));
+        (big, small)
+    }
+
+    #[tokio::test]
+    async fn test_join_with_swap() {
+        let (big, small) = create_big_and_small();
+
+        let join = HashJoinExec::try_new(
+            Arc::clone(&big),
+            Arc::clone(&small),
+            vec![(
+                Column::new_with_schema("big_col", &big.schema()).unwrap(),
+                Column::new_with_schema("small_col", &small.schema()).unwrap(),
+            )],
+            &JoinType::Left,
+            PartitionMode::CollectLeft,
+        )
+        .unwrap();
+
+        let optimized_join = HashBuildProbeOrder::new()
+            .optimize(Arc::new(join), &ExecutionConfig::new())
+            .unwrap();
+
+        let swapping_projection = optimized_join
+            .as_any()
+            .downcast_ref::<ProjectionExec>()
+            .expect("A proj is required to swap columns back to their original 
order");
+
+        assert_eq!(swapping_projection.expr().len(), 2);
+        let (col, name) = &swapping_projection.expr()[0];
+        assert_eq!(name, "big_col");
+        assert_col_expr(col, "big_col", 1);
+        let (col, name) = &swapping_projection.expr()[1];
+        assert_eq!(name, "small_col");
+        assert_col_expr(col, "small_col", 0);
+
+        let swapped_join = swapping_projection
+            .input()
+            .as_any()
+            .downcast_ref::<HashJoinExec>()
+            .expect("The type of the plan should not be changed");
+
+        assert_eq!(swapped_join.left().statistics().num_rows, Some(10));
+        assert_eq!(swapped_join.right().statistics().num_rows, Some(100000));
+    }

Review comment:
       I think it would also be prudent to verify the join type has been 
reversed from `Left` to `Right` as well in this test




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to