This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 90c2aff44 Add optimizer rule to filter out null keys before a join
(#2740)
90c2aff44 is described below
commit 90c2aff4423a4abca68fb1e0252f24aa62f02716
Author: Andy Grove <[email protected]>
AuthorDate: Sat Jun 18 07:10:01 2022 -0600
Add optimizer rule to filter out null keys before a join (#2740)
---
datafusion/core/src/execution/context.rs | 2 +
datafusion/optimizer/src/filter_null_join_keys.rs | 247 ++++++++++++++++++++++
datafusion/optimizer/src/lib.rs | 1 +
3 files changed, 250 insertions(+)
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index ac7727f64..ad71d4bb8 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -92,6 +92,7 @@ use crate::variable::{VarProvider, VarType};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_expr::TableSource;
+use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
use datafusion_sql::{
parser::DFParser,
planner::{ContextProvider, SqlToRel},
@@ -1210,6 +1211,7 @@ impl SessionState {
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(ProjectionPushDown::new()),
+ Arc::new(FilterNullJoinKeys::default()),
Arc::new(FilterPushDown::new()),
Arc::new(LimitPushDown::new()),
Arc::new(SingleDistinctToGroupBy::new()),
diff --git a/datafusion/optimizer/src/filter_null_join_keys.rs
b/datafusion/optimizer/src/filter_null_join_keys.rs
new file mode 100644
index 000000000..11fb19996
--- /dev/null
+++ b/datafusion/optimizer/src/filter_null_join_keys.rs
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The FilterNullJoinKeys rule will identify inner joins with equi-join
conditions
+//! where the join key is nullable on one side and non-nullable on the other
side
+//! and then insert an `IsNotNull` filter on the nullable side since null
values
+//! can never match.
+
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::{Column, DFField, DFSchemaRef};
+use datafusion_expr::{
+ and, logical_plan::Filter, logical_plan::JoinType, Expr, LogicalPlan,
+};
+use std::sync::Arc;
+
+/// The FilterNullJoinKeys rule will identify inner joins with equi-join
conditions
+/// where the join key is nullable on one side and non-nullable on the other
side
+/// and then insert an `IsNotNull` filter on the nullable side since null
values
+/// can never match.
+#[derive(Default)]
+pub struct FilterNullJoinKeys {}
+
+impl OptimizerRule for FilterNullJoinKeys {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &OptimizerConfig,
+ ) -> datafusion_common::Result<LogicalPlan> {
+ match plan {
+ LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
+ // recurse down first and optimize inputs
+ let mut join = join.clone();
+ join.left = Arc::new(self.optimize(&join.left,
optimizer_config)?);
+ join.right = Arc::new(self.optimize(&join.right,
optimizer_config)?);
+
+ let left_schema = join.left.schema();
+ let right_schema = join.right.schema();
+
+ let mut left_filters = vec![];
+ let mut right_filters = vec![];
+
+ for (l, r) in &join.on {
+ if let Some((left_field, right_field)) =
+ resolve_join_key_pair(left_schema, right_schema, l, r)
+ {
+ if left_field.is_nullable() &&
!right_field.is_nullable() {
+ left_filters.push(l.clone());
+ } else if !left_field.is_nullable() &&
right_field.is_nullable() {
+ right_filters.push(r.clone());
+ }
+ }
+ }
+
+ if !left_filters.is_empty() {
+ let predicate = create_not_null_predicate(left_filters);
+ join.left = Arc::new(LogicalPlan::Filter(Filter {
+ predicate,
+ input: join.left.clone(),
+ }));
+ }
+ if !right_filters.is_empty() {
+ let predicate = create_not_null_predicate(right_filters);
+ join.right = Arc::new(LogicalPlan::Filter(Filter {
+ predicate,
+ input: join.right.clone(),
+ }));
+ }
+ Ok(LogicalPlan::Join(join))
+ }
+ _ => {
+ // Apply the optimization to all inputs of the plan
+ utils::optimize_children(self, plan, optimizer_config)
+ }
+ }
+ }
+
+ fn name(&self) -> &str {
+ "FilterNullJoinKeys"
+ }
+}
+
+fn create_not_null_predicate(columns: Vec<Column>) -> Expr {
+ let not_null_exprs: Vec<Expr> = columns
+ .into_iter()
+ .map(|c| Expr::IsNotNull(Box::new(Expr::Column(c))))
+ .collect();
+ // combine the IsNotNull expressions with AND
+ not_null_exprs
+ .iter()
+ .skip(1)
+ .fold(not_null_exprs[0].clone(), |a, b| and(a, b.clone()))
+}
+
+fn resolve_join_key_pair(
+ left_schema: &DFSchemaRef,
+ right_schema: &DFSchemaRef,
+ c1: &Column,
+ c2: &Column,
+) -> Option<(DFField, DFField)> {
+ resolve_fields(left_schema, right_schema, c1, c2)
+ .or_else(|| resolve_fields(left_schema, right_schema, c2, c1))
+}
+
+fn resolve_fields(
+ left_schema: &DFSchemaRef,
+ right_schema: &DFSchemaRef,
+ c1: &Column,
+ c2: &Column,
+) -> Option<(DFField, DFField)> {
+ match (
+ left_schema.index_of_column(c1),
+ right_schema.index_of_column(c2),
+ ) {
+ (Ok(left_index), Ok(right_index)) => {
+ let left_field = left_schema.field(left_index);
+ let right_field = right_schema.field(right_index);
+ Some((left_field.clone(), right_field.clone()))
+ }
+ _ => None,
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_common::{Column, Result};
+ use datafusion_expr::logical_plan::table_scan;
+ use datafusion_expr::{logical_plan::JoinType, LogicalPlanBuilder};
+
+ fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan {
+ let rule = FilterNullJoinKeys::default();
+ rule.optimize(plan, &OptimizerConfig::new())
+ .expect("failed to optimize plan")
+ }
+
+ fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
+ let optimized_plan = optimize_plan(plan);
+ let formatted_plan = format!("{:?}", optimized_plan);
+ assert_eq!(formatted_plan, expected);
+ }
+
+ #[test]
+ fn left_nullable() -> Result<()> {
+ let (t1, t2) = test_tables()?;
+ let plan = build_plan(t1, t2, "t1.optional_id", "t2.id")?;
+ let expected = "Inner Join: #t1.optional_id = #t2.id\
+ \n Filter: #t1.optional_id IS NOT NULL\
+ \n TableScan: t1 projection=None\
+ \n TableScan: t2 projection=None";
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn left_nullable_on_condition_reversed() -> Result<()> {
+ let (t1, t2) = test_tables()?;
+ let plan = build_plan(t1, t2, "t2.id", "t1.optional_id")?;
+ let expected = "Inner Join: #t1.optional_id = #t2.id\
+ \n Filter: #t1.optional_id IS NOT NULL\
+ \n TableScan: t1 projection=None\
+ \n TableScan: t2 projection=None";
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ #[test]
+ fn nested_join_multiple_filter_expr() -> Result<()> {
+ let (t1, t2) = test_tables()?;
+ let plan = build_plan(t1, t2, "t1.optional_id", "t2.id")?;
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::UInt32, false),
+ Field::new("t1_id", DataType::UInt32, true),
+ Field::new("t2_id", DataType::UInt32, true),
+ ]);
+ let t3 = table_scan(Some("t3"), &schema, None)?.build()?;
+ let plan = LogicalPlanBuilder::from(t3)
+ .join(
+ &plan,
+ JoinType::Inner,
+ (
+ vec![
+ Column::from_qualified_name("t3.t1_id"),
+ Column::from_qualified_name("t3.t2_id"),
+ ],
+ vec![
+ Column::from_qualified_name("t1.id"),
+ Column::from_qualified_name("t2.id"),
+ ],
+ ),
+ None,
+ )?
+ .build()?;
+ let expected = "Inner Join: #t3.t1_id = #t1.id, #t3.t2_id = #t2.id\
+ \n Filter: #t3.t1_id IS NOT NULL AND #t3.t2_id IS NOT NULL\
+ \n TableScan: t3 projection=None\
+ \n Inner Join: #t1.optional_id = #t2.id\
+ \n Filter: #t1.optional_id IS NOT NULL\
+ \n TableScan: t1 projection=None\
+ \n TableScan: t2 projection=None";
+ assert_optimized_plan_eq(&plan, expected);
+ Ok(())
+ }
+
+ fn build_plan(
+ left_table: LogicalPlan,
+ right_table: LogicalPlan,
+ left_key: &str,
+ right_key: &str,
+ ) -> Result<LogicalPlan> {
+ LogicalPlanBuilder::from(left_table)
+ .join(
+ &right_table,
+ JoinType::Inner,
+ (
+ vec![Column::from_qualified_name(left_key)],
+ vec![Column::from_qualified_name(right_key)],
+ ),
+ None,
+ )?
+ .build()
+ }
+
+ fn test_tables() -> Result<(LogicalPlan, LogicalPlan)> {
+ let schema = Schema::new(vec![
+ Field::new("id", DataType::UInt32, false),
+ Field::new("optional_id", DataType::UInt32, true),
+ ]);
+ let t1 = table_scan(Some("t1"), &schema, None)?.build()?;
+ let t2 = table_scan(Some("t2"), &schema, None)?.build()?;
+ Ok((t1, t2))
+ }
+}
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index cde355073..c01ea3c63 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -19,6 +19,7 @@ pub mod common_subexpr_eliminate;
pub mod eliminate_filter;
pub mod eliminate_limit;
pub mod expr_simplifier;
+pub mod filter_null_join_keys;
pub mod filter_push_down;
pub mod limit_push_down;
pub mod optimizer;