This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new c019e79  ARROW-10885: [Rust][DataFusion] Optimize hash join build vs 
probe order based on number of rows
c019e79 is described below

commit c019e7944cfbdfaad8de33370be2aa7db48fdb69
Author: Heres, Daniel <[email protected]>
AuthorDate: Sun Dec 20 13:42:52 2020 -0700

    ARROW-10885: [Rust][DataFusion] Optimize hash join build vs probe order 
based on number of rows
    
    This PR uses the `num_rows` statistics to implement a common optimization 
to use the smallest table for the build phase.
    This is a good heuristic, as to have the smallest table used in the build 
phase leads to less items to be inserted to the hash table, in particular if 
the size of tables is very imbalanced.
    
    Some notes:
    
    * The optimization works on the `LogicalPlan` by swapping left and right, 
the join type and the key order. This seems currently the easiest place to add 
it, as there is no cost based optimizer and/or optimizers on the physical plan 
yet. The optimization rule assumes that the left part of the join will be used 
for the build phase and the right part for the probe phase.
    * It requires the number of rows to be exactly known, so it will not work 
whenever there is a transformation changing the number of rows, except for 
`limit`. The idea here is that in other cases, it is very hard to estimate the 
number of resulting rows.
    * The impact currently is measurable on queries with a bigger left side of 
an (inner) join
    
     FYI @andygrove @jorgecarleitao
    
    Closes #8961 from Dandandan/rows_hash
    
    Lead-authored-by: Heres, Daniel <[email protected]>
    Co-authored-by: DaniĆ«l Heres <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 rust/datafusion/src/execution/context.rs           |   2 +
 rust/datafusion/src/logical_plan/plan.rs           |   2 +-
 .../src/optimizer/hash_build_probe_order.rs        | 226 +++++++++++++++++++++
 rust/datafusion/src/optimizer/mod.rs               |   1 +
 .../src/optimizer/projection_push_down.rs          |   2 +-
 rust/datafusion/src/optimizer/utils.rs             |   2 +-
 rust/datafusion/src/physical_plan/hash_join.rs     |   4 +-
 rust/datafusion/src/physical_plan/hash_utils.rs    |   2 +-
 rust/datafusion/src/physical_plan/planner.rs       |   1 +
 rust/datafusion/src/sql/parser.rs                  |   2 +-
 rust/datafusion/src/sql/planner.rs                 |   2 +-
 11 files changed, 238 insertions(+), 8 deletions(-)

diff --git a/rust/datafusion/src/execution/context.rs 
b/rust/datafusion/src/execution/context.rs
index 7be35a7..fcfd883 100644
--- a/rust/datafusion/src/execution/context.rs
+++ b/rust/datafusion/src/execution/context.rs
@@ -17,6 +17,7 @@
 
 //! ExecutionContext contains methods for registering data sources and 
executing queries
 
+use crate::optimizer::hash_build_probe_order::HashBuildProbeOrder;
 use std::fs;
 use std::path::Path;
 use std::string::String;
@@ -316,6 +317,7 @@ impl ExecutionContext {
         // Apply standard rewrites and optimizations
         let mut plan = ProjectionPushDown::new().optimize(&plan)?;
         plan = FilterPushDown::new().optimize(&plan)?;
+        plan = HashBuildProbeOrder::new().optimize(&plan)?;
 
         self.state
             .lock()
diff --git a/rust/datafusion/src/logical_plan/plan.rs 
b/rust/datafusion/src/logical_plan/plan.rs
index 9caabac..358c4ca 100644
--- a/rust/datafusion/src/logical_plan/plan.rs
+++ b/rust/datafusion/src/logical_plan/plan.rs
@@ -34,7 +34,7 @@ use super::extension::UserDefinedLogicalNode;
 use crate::logical_plan::dfschema::DFSchemaRef;
 
 /// Join type
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Copy)]
 pub enum JoinType {
     /// Inner join
     Inner,
diff --git a/rust/datafusion/src/optimizer/hash_build_probe_order.rs 
b/rust/datafusion/src/optimizer/hash_build_probe_order.rs
new file mode 100644
index 0000000..df05076
--- /dev/null
+++ b/rust/datafusion/src/optimizer/hash_build_probe_order.rs
@@ -0,0 +1,226 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License
+
+//! Optimizer rule to switch build and probe order of hash join
+//! based on statistics of a `TableProvider`. If the number of
+//! rows of both sources is known, the order can be switched
+//! for a faster hash join.
+
+use std::sync::Arc;
+
+use crate::logical_plan::LogicalPlan;
+use crate::optimizer::optimizer::OptimizerRule;
+use crate::{error::Result, prelude::JoinType};
+
+use super::utils;
+
+/// BuildProbeOrder reorders the build and probe phase of
+/// hash joins. This uses the amount of rows that a datasource has.
+/// The rule optimizes the order such that the left (build) side of the join
+/// is the smallest.
+/// If the information is not available, the order stays the same,
+/// so that it could be optimized manually in a query.
+pub struct HashBuildProbeOrder {}
+
+// Gets exact number of rows, if known by the statistics of the underlying
+fn get_num_rows(logical_plan: &LogicalPlan) -> Option<usize> {
+    match logical_plan {
+        LogicalPlan::Projection { input, .. } => get_num_rows(input),
+        LogicalPlan::Sort { input, .. } => get_num_rows(input),
+        LogicalPlan::TableScan { source, .. } => source.statistics().num_rows,
+        LogicalPlan::EmptyRelation {
+            produce_one_row, ..
+        } => {
+            if *produce_one_row {
+                Some(1)
+            } else {
+                Some(0)
+            }
+        }
+        LogicalPlan::Limit { n: limit, input } => {
+            let num_rows_input = get_num_rows(input);
+            num_rows_input.map(|rows| std::cmp::min(*limit, rows))
+        }
+        _ => None,
+    }
+}
+
+// Finds out whether to swap left vs right order based on statistics
+fn should_swap_join_order(left: &LogicalPlan, right: &LogicalPlan) -> bool {
+    let left_rows = get_num_rows(left);
+    let right_rows = get_num_rows(right);
+
+    match (left_rows, right_rows) {
+        (Some(l), Some(r)) => l > r,
+        _ => false,
+    }
+}
+
+impl OptimizerRule for HashBuildProbeOrder {
+    fn name(&self) -> &str {
+        "hash_build_probe_order"
+    }
+
+    fn optimize(&mut self, plan: &LogicalPlan) -> Result<LogicalPlan> {
+        match plan {
+            // Main optimization rule, swaps order of left and right
+            // based on number of rows in each table
+            LogicalPlan::Join {
+                left,
+                right,
+                on,
+                join_type,
+                schema,
+            } => {
+                let left = self.optimize(left)?;
+                let right = self.optimize(right)?;
+                if should_swap_join_order(&left, &right) {
+                    // Swap left and right, change join type and (equi-)join 
key order
+                    Ok(LogicalPlan::Join {
+                        left: Arc::new(right),
+                        right: Arc::new(left),
+                        on: on
+                            .iter()
+                            .map(|(l, r)| (r.to_string(), l.to_string()))
+                            .collect(),
+                        join_type: swap_join_type(*join_type),
+                        schema: schema.clone(),
+                    })
+                } else {
+                    // Keep join as is
+                    Ok(LogicalPlan::Join {
+                        left: Arc::new(left),
+                        right: Arc::new(right),
+                        on: on.clone(),
+                        join_type: *join_type,
+                        schema: schema.clone(),
+                    })
+                }
+            }
+            // Rest: recurse into plan, apply optimization where possible
+            LogicalPlan::Projection { .. }
+            | LogicalPlan::Aggregate { .. }
+            | LogicalPlan::TableScan { .. }
+            | LogicalPlan::Limit { .. }
+            | LogicalPlan::Filter { .. }
+            | LogicalPlan::EmptyRelation { .. }
+            | LogicalPlan::Sort { .. }
+            | LogicalPlan::CreateExternalTable { .. }
+            | LogicalPlan::Explain { .. }
+            | LogicalPlan::Extension { .. } => {
+                let expr = utils::expressions(plan);
+
+                // apply the optimization to all inputs of the plan
+                let inputs = utils::inputs(plan);
+                let new_inputs = inputs
+                    .iter()
+                    .map(|plan| self.optimize(plan))
+                    .collect::<Result<Vec<_>>>()?;
+
+                utils::from_plan(plan, &expr, &new_inputs)
+            }
+        }
+    }
+}
+
+impl HashBuildProbeOrder {
+    #[allow(missing_docs)]
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+fn swap_join_type(join_type: JoinType) -> JoinType {
+    match join_type {
+        JoinType::Inner => JoinType::Inner,
+        JoinType::Left => JoinType::Right,
+        JoinType::Right => JoinType::Left,
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::sync::Arc;
+
+    use crate::{
+        datasource::{datasource::Statistics, TableProvider},
+        logical_plan::{DFSchema, Expr},
+        test::*,
+    };
+
+    struct TestTableProvider {
+        num_rows: usize,
+    }
+
+    impl TableProvider for TestTableProvider {
+        fn as_any(&self) -> &dyn std::any::Any {
+            unimplemented!()
+        }
+        fn schema(&self) -> arrow::datatypes::SchemaRef {
+            unimplemented!()
+        }
+
+        fn scan(
+            &self,
+            _projection: &Option<Vec<usize>>,
+            _batch_size: usize,
+            _filters: &[Expr],
+        ) -> Result<std::sync::Arc<dyn crate::physical_plan::ExecutionPlan>> {
+            unimplemented!()
+        }
+        fn statistics(&self) -> crate::datasource::datasource::Statistics {
+            Statistics {
+                num_rows: Some(self.num_rows),
+                total_byte_size: None,
+            }
+        }
+    }
+
+    #[test]
+    fn test_num_rows() -> Result<()> {
+        let table_scan = test_table_scan()?;
+
+        assert_eq!(get_num_rows(&table_scan), Some(0));
+
+        Ok(())
+    }
+
+    #[test]
+    fn test_swap_order() -> Result<()> {
+        let lp_left = LogicalPlan::TableScan {
+            table_name: "left".to_string(),
+            projection: None,
+            source: Arc::new(TestTableProvider { num_rows: 1000 }),
+            projected_schema: Arc::new(DFSchema::empty()),
+            filters: vec![],
+        };
+
+        let lp_right = LogicalPlan::TableScan {
+            table_name: "right".to_string(),
+            projection: None,
+            source: Arc::new(TestTableProvider { num_rows: 100 }),
+            projected_schema: Arc::new(DFSchema::empty()),
+            filters: vec![],
+        };
+
+        assert!(should_swap_join_order(&lp_left, &lp_right));
+        assert!(!should_swap_join_order(&lp_right, &lp_left));
+
+        Ok(())
+    }
+}
diff --git a/rust/datafusion/src/optimizer/mod.rs 
b/rust/datafusion/src/optimizer/mod.rs
index dffae53..91a338e 100644
--- a/rust/datafusion/src/optimizer/mod.rs
+++ b/rust/datafusion/src/optimizer/mod.rs
@@ -19,6 +19,7 @@
 //! some simple rules to a logical plan, such as "Projection Push Down" and 
"Type Coercion".
 
 pub mod filter_push_down;
+pub mod hash_build_probe_order;
 pub mod optimizer;
 pub mod projection_push_down;
 pub mod utils;
diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs 
b/rust/datafusion/src/optimizer/projection_push_down.rs
index 7f1c358..92236b9 100644
--- a/rust/datafusion/src/optimizer/projection_push_down.rs
+++ b/rust/datafusion/src/optimizer/projection_push_down.rs
@@ -182,7 +182,7 @@ fn optimize_plan(
                     true,
                 )?),
 
-                join_type: join_type.clone(),
+                join_type: *join_type,
                 on: on.clone(),
                 schema: schema.clone(),
             })
diff --git a/rust/datafusion/src/optimizer/utils.rs 
b/rust/datafusion/src/optimizer/utils.rs
index f482a17..da13b73 100644
--- a/rust/datafusion/src/optimizer/utils.rs
+++ b/rust/datafusion/src/optimizer/utils.rs
@@ -217,7 +217,7 @@ pub fn from_plan(
         } => Ok(LogicalPlan::Join {
             left: Arc::new(inputs[0].clone()),
             right: Arc::new(inputs[1].clone()),
-            join_type: join_type.clone(),
+            join_type: *join_type,
             on: on.clone(),
             schema: schema.clone(),
         }),
diff --git a/rust/datafusion/src/physical_plan/hash_join.rs 
b/rust/datafusion/src/physical_plan/hash_join.rs
index 7fb28f2..0724d37 100644
--- a/rust/datafusion/src/physical_plan/hash_join.rs
+++ b/rust/datafusion/src/physical_plan/hash_join.rs
@@ -108,7 +108,7 @@ impl HashJoinExec {
             left,
             right,
             on,
-            join_type: join_type.clone(),
+            join_type: *join_type,
             schema,
         })
     }
@@ -190,7 +190,7 @@ impl ExecutionPlan for HashJoinExec {
         Ok(Box::pin(HashJoinStream {
             schema: self.schema.clone(),
             on_right,
-            join_type: self.join_type.clone(),
+            join_type: self.join_type,
             left_data: (left_data.0, left_data.1),
             right: stream,
         }))
diff --git a/rust/datafusion/src/physical_plan/hash_utils.rs 
b/rust/datafusion/src/physical_plan/hash_utils.rs
index 5c13dfc..b26ff9b 100644
--- a/rust/datafusion/src/physical_plan/hash_utils.rs
+++ b/rust/datafusion/src/physical_plan/hash_utils.rs
@@ -22,7 +22,7 @@ use arrow::datatypes::{Field, Schema};
 use std::collections::HashSet;
 
 /// All valid types of joins.
-#[derive(Clone, Debug)]
+#[derive(Clone, Copy, Debug)]
 pub enum JoinType {
     /// Inner join
     Inner,
diff --git a/rust/datafusion/src/physical_plan/planner.rs 
b/rust/datafusion/src/physical_plan/planner.rs
index f02fc4e..1d3a479 100644
--- a/rust/datafusion/src/physical_plan/planner.rs
+++ b/rust/datafusion/src/physical_plan/planner.rs
@@ -274,6 +274,7 @@ impl DefaultPhysicalPlanner {
                     JoinType::Left => hash_utils::JoinType::Left,
                     JoinType::Right => hash_utils::JoinType::Right,
                 };
+
                 Ok(Arc::new(HashJoinExec::try_new(
                     left,
                     right,
diff --git a/rust/datafusion/src/sql/parser.rs 
b/rust/datafusion/src/sql/parser.rs
index e71d1a8..fb421f8 100644
--- a/rust/datafusion/src/sql/parser.rs
+++ b/rust/datafusion/src/sql/parser.rs
@@ -34,7 +34,7 @@ macro_rules! parser_err {
 }
 
 /// Types of files to parse as DataFrames
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone, Copy, PartialEq)]
 pub enum FileType {
     /// Newline-delimited JSON
     NdJson,
diff --git a/rust/datafusion/src/sql/planner.rs 
b/rust/datafusion/src/sql/planner.rs
index 2d946ca..379da66 100644
--- a/rust/datafusion/src/sql/planner.rs
+++ b/rust/datafusion/src/sql/planner.rs
@@ -153,7 +153,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             schema: schema.to_dfschema_ref()?,
             name: name.clone(),
             location: location.clone(),
-            file_type: file_type.clone(),
+            file_type: *file_type,
             has_header: *has_header,
         })
     }

Reply via email to