This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a181e1dffa Remove inline table scan analyzer rule (#15201)
a181e1dffa is described below
commit a181e1dffa4c52ddf79cb68434d1524c599902e9
Author: Jay Zhan <[email protected]>
AuthorDate: Wed Mar 19 07:07:23 2025 +0800
Remove inline table scan analyzer rule (#15201)
* add inline table scan
* fix test
* rm rule
* rm code
* dataframe
* remove logic in plan_from_tables
* fix query
---
datafusion/core/tests/dataframe/mod.rs | 24 ++-
datafusion/expr/src/logical_plan/builder.rs | 38 +++-
.../optimizer/src/analyzer/inline_table_scan.rs | 207 ---------------------
datafusion/optimizer/src/analyzer/mod.rs | 3 -
datafusion/sqllogictest/test_files/ddl.slt | 26 +++
datafusion/sqllogictest/test_files/explain.slt | 1 -
6 files changed, 74 insertions(+), 225 deletions(-)
diff --git a/datafusion/core/tests/dataframe/mod.rs
b/datafusion/core/tests/dataframe/mod.rs
index a6c06749b5..b19c0b9786 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -1581,14 +1581,18 @@ async fn with_column_join_same_columns() -> Result<()> {
assert_snapshot!(
df_with_column.logical_plan(),
- @r###"
+ @r"
Projection: t1.c1, t2.c1, Boolean(true) AS new_column
Limit: skip=0, fetch=1
Sort: t1.c1 ASC NULLS FIRST
Inner Join: t1.c1 = t2.c1
- TableScan: t1
- TableScan: t2
- "###
+ SubqueryAlias: t1
+ Projection: aggregate_test_100.c1
+ TableScan: aggregate_test_100
+ SubqueryAlias: t2
+ Projection: aggregate_test_100.c1
+ TableScan: aggregate_test_100
+ "
);
assert_snapshot!(
@@ -1748,14 +1752,18 @@ async fn with_column_renamed_join() -> Result<()> {
assert_snapshot!(
df_renamed.logical_plan(),
- @r###"
+ @r"
Projection: t1.c1 AS AAA, t1.c2, t1.c3, t2.c1, t2.c2, t2.c3
Limit: skip=0, fetch=1
Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS
FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST
Inner Join: t1.c1 = t2.c1
- TableScan: t1
- TableScan: t2
- "###
+ SubqueryAlias: t1
+ Projection: aggregate_test_100.c1, aggregate_test_100.c2,
aggregate_test_100.c3
+ TableScan: aggregate_test_100
+ SubqueryAlias: t2
+ Projection: aggregate_test_100.c1, aggregate_test_100.c2,
aggregate_test_100.c3
+ TableScan: aggregate_test_100
+ "
);
assert_snapshot!(
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index a1fa8c6483..39990ffe11 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -467,9 +467,7 @@ impl LogicalPlanBuilder {
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
) -> Result<Self> {
- TableScan::try_new(table_name, table_source, projection, filters, None)
- .map(LogicalPlan::TableScan)
- .map(Self::new)
+ Self::scan_with_filters_inner(table_name, table_source, projection,
filters, None)
}
/// Convert a table provider into a builder with a TableScan with filter
and fetch
@@ -480,9 +478,37 @@ impl LogicalPlanBuilder {
filters: Vec<Expr>,
fetch: Option<usize>,
) -> Result<Self> {
- TableScan::try_new(table_name, table_source, projection, filters,
fetch)
- .map(LogicalPlan::TableScan)
- .map(Self::new)
+ Self::scan_with_filters_inner(
+ table_name,
+ table_source,
+ projection,
+ filters,
+ fetch,
+ )
+ }
+
+ fn scan_with_filters_inner(
+ table_name: impl Into<TableReference>,
+ table_source: Arc<dyn TableSource>,
+ projection: Option<Vec<usize>>,
+ filters: Vec<Expr>,
+ fetch: Option<usize>,
+ ) -> Result<Self> {
+ let table_scan =
+ TableScan::try_new(table_name, table_source, projection, filters,
fetch)?;
+
+ // Inline TableScan
+ if table_scan.filters.is_empty() {
+ if let Some(p) = table_scan.source.get_logical_plan() {
+ let sub_plan = p.into_owned();
+ // Ensures that the reference to the inlined table remains the
+ // same, meaning we don't have to change any of the parent
nodes
+ // that reference this table.
+ return Self::new(sub_plan).alias(table_scan.table_name);
+ }
+ }
+
+ Ok(Self::new(LogicalPlan::TableScan(table_scan)))
}
/// Wrap a plan in a window
diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
b/datafusion/optimizer/src/analyzer/inline_table_scan.rs
deleted file mode 100644
index 350e65e1e3..0000000000
--- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs
+++ /dev/null
@@ -1,207 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! Analyzed rule to replace TableScan references
-//! such as DataFrames and Views and inlines the LogicalPlan.
-
-use crate::analyzer::AnalyzerRule;
-
-use datafusion_common::config::ConfigOptions;
-use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{Column, Result};
-use datafusion_expr::utils::expand_wildcard;
-use datafusion_expr::{logical_plan::LogicalPlan, Expr, LogicalPlanBuilder};
-
-/// Analyzed rule that inlines TableScan that provide a [`LogicalPlan`]
-/// (DataFrame / ViewTable)
-#[derive(Default, Debug)]
-pub struct InlineTableScan;
-
-impl InlineTableScan {
- pub fn new() -> Self {
- Self {}
- }
-}
-
-impl AnalyzerRule for InlineTableScan {
- fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) ->
Result<LogicalPlan> {
- plan.transform_up(analyze_internal).data()
- }
-
- fn name(&self) -> &str {
- "inline_table_scan"
- }
-}
-
-fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
- // rewrite any subqueries in the plan first
- let transformed_plan =
- plan.map_subqueries(|plan| plan.transform_up(analyze_internal))?;
-
- let transformed_plan = transformed_plan.transform_data(|plan| {
- match plan {
- // Match only on scans without filter / projection / fetch
- // Views and DataFrames won't have those added
- // during the early stage of planning.
- LogicalPlan::TableScan(table_scan) if
table_scan.filters.is_empty() => {
- if let Some(sub_plan) = table_scan.source.get_logical_plan() {
- let sub_plan = sub_plan.into_owned();
- let projection_exprs =
- generate_projection_expr(&table_scan.projection,
&sub_plan)?;
- LogicalPlanBuilder::from(sub_plan)
- .project(projection_exprs)?
- // Ensures that the reference to the inlined table
remains the
- // same, meaning we don't have to change any of the
parent nodes
- // that reference this table.
- .alias(table_scan.table_name)?
- .build()
- .map(Transformed::yes)
- } else {
- Ok(Transformed::no(LogicalPlan::TableScan(table_scan)))
- }
- }
- _ => Ok(Transformed::no(plan)),
- }
- })?;
-
- Ok(transformed_plan)
-}
-
-fn generate_projection_expr(
- projection: &Option<Vec<usize>>,
- sub_plan: &LogicalPlan,
-) -> Result<Vec<Expr>> {
- let mut exprs = vec![];
- if let Some(projection) = projection {
- for i in projection {
- exprs.push(Expr::Column(Column::from(
- sub_plan.schema().qualified_field(*i),
- )));
- }
- } else {
- let expanded = expand_wildcard(sub_plan.schema(), sub_plan, None)?;
- exprs.extend(expanded);
- }
- Ok(exprs)
-}
-
-#[cfg(test)]
-mod tests {
- use std::{borrow::Cow, sync::Arc, vec};
-
- use crate::analyzer::inline_table_scan::InlineTableScan;
- use crate::test::assert_analyzed_plan_eq;
-
- use arrow::datatypes::{DataType, Field, Schema};
- use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder,
TableSource};
-
- pub struct RawTableSource {}
-
- impl TableSource for RawTableSource {
- fn as_any(&self) -> &dyn std::any::Any {
- self
- }
-
- fn schema(&self) -> arrow::datatypes::SchemaRef {
- Arc::new(Schema::new(vec![
- Field::new("a", DataType::Int64, false),
- Field::new("b", DataType::Int64, false),
- ]))
- }
-
- fn supports_filters_pushdown(
- &self,
- filters: &[&Expr],
- ) ->
datafusion_common::Result<Vec<datafusion_expr::TableProviderFilterPushDown>>
- {
- Ok((0..filters.len())
- .map(|_| datafusion_expr::TableProviderFilterPushDown::Inexact)
- .collect())
- }
- }
-
- pub struct CustomSource {
- plan: LogicalPlan,
- }
-
- impl CustomSource {
- fn new() -> Self {
- Self {
- plan: LogicalPlanBuilder::scan("y", Arc::new(RawTableSource
{}), None)
- .unwrap()
- .build()
- .unwrap(),
- }
- }
- }
-
- impl TableSource for CustomSource {
- fn as_any(&self) -> &dyn std::any::Any {
- self
- }
-
- fn supports_filters_pushdown(
- &self,
- filters: &[&Expr],
- ) ->
datafusion_common::Result<Vec<datafusion_expr::TableProviderFilterPushDown>>
- {
- Ok((0..filters.len())
- .map(|_| datafusion_expr::TableProviderFilterPushDown::Exact)
- .collect())
- }
-
- fn schema(&self) -> arrow::datatypes::SchemaRef {
- Arc::new(Schema::new(vec![Field::new("a", DataType::Int64,
false)]))
- }
-
- fn get_logical_plan(&self) -> Option<Cow<LogicalPlan>> {
- Some(Cow::Borrowed(&self.plan))
- }
- }
-
- #[test]
- fn inline_table_scan() -> datafusion_common::Result<()> {
- let scan = LogicalPlanBuilder::scan(
- "x".to_string(),
- Arc::new(CustomSource::new()),
- None,
- )?;
- let plan = scan.filter(col("x.a").eq(lit(1)))?.build()?;
- let expected = "Filter: x.a = Int32(1)\
- \n SubqueryAlias: x\
- \n Projection: y.a, y.b\
- \n TableScan: y";
-
- assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan,
expected)
- }
-
- #[test]
- fn inline_table_scan_with_projection() -> datafusion_common::Result<()> {
- let scan = LogicalPlanBuilder::scan(
- "x".to_string(),
- Arc::new(CustomSource::new()),
- Some(vec![0]),
- )?;
-
- let plan = scan.build()?;
- let expected = "SubqueryAlias: x\
- \n Projection: y.a\
- \n TableScan: y";
-
- assert_analyzed_plan_eq(Arc::new(InlineTableScan::new()), plan,
expected)
- }
-}
diff --git a/datafusion/optimizer/src/analyzer/mod.rs
b/datafusion/optimizer/src/analyzer/mod.rs
index 1d199f2faa..2517e3c3a4 100644
--- a/datafusion/optimizer/src/analyzer/mod.rs
+++ b/datafusion/optimizer/src/analyzer/mod.rs
@@ -28,7 +28,6 @@ use datafusion_common::Result;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::{InvariantLevel, LogicalPlan};
-use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::resolve_grouping_function::ResolveGroupingFunction;
use crate::analyzer::type_coercion::TypeCoercion;
use crate::utils::log_plan;
@@ -36,7 +35,6 @@ use crate::utils::log_plan;
use self::function_rewrite::ApplyFunctionRewrites;
pub mod function_rewrite;
-pub mod inline_table_scan;
pub mod resolve_grouping_function;
pub mod type_coercion;
@@ -96,7 +94,6 @@ impl Analyzer {
/// Create a new analyzer using the recommended list of rules
pub fn new() -> Self {
let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
- Arc::new(InlineTableScan::new()),
Arc::new(ResolveGroupingFunction::new()),
Arc::new(TypeCoercion::new()),
];
diff --git a/datafusion/sqllogictest/test_files/ddl.slt
b/datafusion/sqllogictest/test_files/ddl.slt
index bc15f22103..088d0155a6 100644
--- a/datafusion/sqllogictest/test_files/ddl.slt
+++ b/datafusion/sqllogictest/test_files/ddl.slt
@@ -855,3 +855,29 @@ DROP TABLE t1;
statement ok
DROP TABLE t2;
+
+statement count 0
+create table t(a int) as values (1), (2), (3);
+
+statement count 0
+create view v as select a, count(a) from t group by a;
+
+query II rowsort
+select * from v;
+----
+1 1
+2 1
+3 1
+
+query II rowsort
+select "count(t.a)", a from v;
+----
+1 1
+1 2
+1 3
+
+statement count 0
+drop view v;
+
+statement count 0
+drop table t;
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index 2d4f504e2d..2e27ebe927 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -174,7 +174,6 @@ EXPLAIN VERBOSE SELECT a, b, c FROM simple_explain_test
initial_logical_plan
01)Projection: simple_explain_test.a, simple_explain_test.b,
simple_explain_test.c
02)--TableScan: simple_explain_test
-logical_plan after inline_table_scan SAME TEXT AS ABOVE
logical_plan after resolve_grouping_function SAME TEXT AS ABOVE
logical_plan after type_coercion SAME TEXT AS ABOVE
analyzed_logical_plan SAME TEXT AS ABOVE
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]