alamb commented on code in PR #4465:
URL: https://github.com/apache/arrow-datafusion/pull/4465#discussion_r1110298562
##########
datafusion/core/tests/sql/avro.rs:
##########
@@ -140,18 +140,16 @@ async fn avro_explain() {
let expected = vec![
vec![
"logical_plan",
- "Projection: COUNT(UInt8(1))\
- \n Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
- \n TableScan: alltypes_plain projection=[id]",
+ "Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]\
+ \n TableScan: alltypes_plain projection=[id]"
],
vec![
"physical_plan",
- "ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))]\
- \n AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
- \n CoalescePartitionsExec\
- \n AggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]\
- \n RepartitionExec:
partitioning=RoundRobinBatch(NUM_CORES), input_partitions=1\
- \n AvroExec: files={1 group:
[[ARROW_TEST_DATA/avro/alltypes_plain.avro]]}, limit=None\
+ "AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]\
Review Comment:
👍 that looks nicer
##########
datafusion/optimizer/tests/integration-test.rs:
##########
@@ -195,9 +195,10 @@ fn between_date64_plus_interval() -> Result<()> {
WHERE col_date64 between '1998-03-18T00:00:00' AND cast('1998-03-18' as
date) + INTERVAL '90 days'";
let plan = test_sql(sql)?;
let expected =
- "Projection: COUNT(Int64(1))\n Aggregate: groupBy=[[]],
aggr=[[COUNT(Int64(1))]]\
+ "Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]\
+ \n Projection: test.col_int32\
\n Filter: test.col_date64 >= Date64(\"890179200000\") AND
test.col_date64 <= Date64(\"897955200000\")\
- \n TableScan: test projection=[col_date64]";
+ \n TableScan: test projection=[col_int32, col_date64]";
Review Comment:
would it be possible to not push a column down if there was already a column
that was scanned (e.g. in this case use col_date64?)
##########
datafusion/core/tests/sql/window.rs:
##########
@@ -2287,13 +2286,17 @@ async fn test_remove_unnecessary_sort_in_sub_query() ->
Result<()> {
" CoalescePartitionsExec",
" AggregateExec: mode=Partial, gby=[],
aggr=[COUNT(UInt8(1))]",
" RepartitionExec: partitioning=RoundRobinBatch(8),
input_partitions=8",
- " AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1],
aggr=[COUNT(UInt8(1))]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " RepartitionExec: partitioning=Hash([Column { name:
\"c1\", index: 0 }], 8), input_partitions=8",
- " AggregateExec: mode=Partial, gby=[c1@0 as c1],
aggr=[COUNT(UInt8(1))]",
- " CoalesceBatchesExec: target_batch_size=4096",
- " FilterExec: c13@1 !=
C2GT5KVyOPZpgKVl110TyZO0NcJ434",
- " RepartitionExec:
partitioning=RoundRobinBatch(8), input_partitions=1",
+ " ProjectionExec: expr=[COUNT(UInt8(1))@0 as
COUNT(UInt8(1))]",
Review Comment:
I don't understand why there are multiple new ProjectionExec's in this plan
##########
datafusion/core/tests/sql/parquet.rs:
##########
@@ -309,11 +309,11 @@ async fn parquet_query_with_max_min() {
let sql = "SELECT max(c1) FROM foo";
let actual = execute_to_batches(&ctx, sql).await;
let expected = vec![
- "+-------------+",
- "| MAX(foo.c1) |",
- "+-------------+",
- "| 3 |",
- "+-------------+",
+ "+---------+",
+ "| MAX(c1) |",
Review Comment:
I am not sure what is responsible for removing the relation name `foo` but I
think it is a good change (and more consistent with postgres)
##########
datafusion/optimizer/src/push_down_projection.rs:
##########
@@ -45,445 +42,482 @@ use std::{
#[derive(Default)]
pub struct PushDownProjection {}
+// Get the expr in the order of the schema
+fn get_expr(columns: &HashSet<Column>, schema: &DFSchemaRef) ->
Result<Vec<Expr>> {
+ let expr = schema
+ .fields()
+ .iter()
+ .flat_map(|field| {
+ let qc = field.qualified_column();
+ let uqc = field.unqualified_column();
+ if columns.contains(&qc) || columns.contains(&uqc) {
+ Some(Expr::Column(qc))
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<Expr>>();
+ if columns.len() != expr.len() {
+ Err(DataFusionError::Plan(format!(
+ "required columns can't push down, columns: {columns:?}"
+ )))
+ } else {
+ Ok(expr)
+ }
+}
+
impl OptimizerRule for PushDownProjection {
fn try_optimize(
&self,
plan: &LogicalPlan,
- config: &dyn OptimizerConfig,
+ _config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
- // set of all columns referred by the plan (and thus considered
required by the root)
- let required_columns = plan
- .schema()
- .fields()
- .iter()
- .map(|f| f.qualified_column())
- .collect::<HashSet<Column>>();
- Ok(Some(optimize_plan(
- self,
- plan,
- &required_columns,
- false,
- config,
- )?))
- }
+ let projection = match plan {
+ LogicalPlan::Projection(projection) => projection,
+ LogicalPlan::Aggregate(agg) => {
+ let mut required_columns = HashSet::new();
+ for e in agg.aggr_expr.iter().chain(agg.group_expr.iter()) {
+ expr_to_columns(e, &mut required_columns)?
+ }
+ if required_columns.is_empty() {
+ let first_col = agg
+ .input
+ .schema()
+ .fields()
+ .get(0)
+ .ok_or(DataFusionError::Internal(
+ "schema of agg child must exist one
field".to_string(),
+ ))?
+ .qualified_column();
+ required_columns.insert(first_col);
+ }
+ let new_expr = get_expr(&required_columns,
agg.input.schema())?;
+ let projection = LogicalPlan::Projection(Projection::try_new(
+ new_expr,
+ agg.input.clone(),
+ )?);
+ let optimize_plan = plan.with_new_inputs(&[projection])?;
+ return Ok(Some(optimize_plan));
+ }
+ LogicalPlan::TableScan(scan) if scan.projection.is_none() => {
+ return Ok(Some(push_down_scan(&HashSet::new(), scan, false)?));
+ }
+ _ => return Ok(None),
+ };
- fn name(&self) -> &str {
- "push_down_projection"
- }
-}
+ let child_plan = &*projection.input;
-impl PushDownProjection {
- #[allow(missing_docs)]
- pub fn new() -> Self {
- Self {}
- }
-}
+ if projection.expr.is_empty() {
+ return Err(DataFusionError::Plan(
+ "Projection expr is empty.".to_string(),
+ ));
+ }
-/// Recursively transverses the logical plan removing expressions and that are
not needed.
-fn optimize_plan(
- _optimizer: &PushDownProjection,
- plan: &LogicalPlan,
- required_columns: &HashSet<Column>, // set of columns required up to this
step
- has_projection: bool,
- _config: &dyn OptimizerConfig,
-) -> Result<LogicalPlan> {
- let mut new_required_columns = required_columns.clone();
- let new_plan = match plan {
- LogicalPlan::Projection(Projection {
- input,
- expr,
- schema,
- ..
- }) => {
- // projection:
- // * remove any expression that is not required
- // * construct the new set of required columns
-
- let mut new_expr = Vec::new();
- let mut new_fields = Vec::new();
- // When meet projection, its expr must contain all columns that
its child need.
- // So we need create a empty required_columns instead use original
new_required_columns.
- // Otherwise it cause redundant columns.
- let mut new_required_columns = HashSet::new();
-
- // Gather all columns needed for expressions in this Projection
- schema.fields().iter().enumerate().for_each(|(i, field)| {
- if required_columns.contains(&field.qualified_column()) {
- new_expr.push(expr[i].clone());
- new_fields.push(field.clone());
+ let new_plan = match child_plan {
+ LogicalPlan::Projection(child_projection) => {
+ // merge projection
+ let replace_map = collect_projection_expr(child_projection);
+ let new_exprs = projection
+ .expr
+ .iter()
+ .map(|expr| replace_cols_by_name(expr.clone(),
&replace_map))
+ .enumerate()
+ .map(|(i, e)| {
+ match e {
+ Ok(e) => {
+ let parent_expr = projection
+ .schema
+ .fields()
+ .get(i)
+ .unwrap()
+ .qualified_name();
+ // TODO
Review Comment:
I wonder what this TODO is tracking
##########
benchmarks/expected-plans/q10.txt:
##########
@@ -1,12 +1,17 @@
Sort: revenue DESC NULLS FIRST
Projection: customer.c_custkey, customer.c_name,
SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS revenue,
customer.c_acctbal, nation.n_name, customer.c_address, customer.c_phone,
customer.c_comment
Aggregate: groupBy=[[customer.c_custkey, customer.c_name,
customer.c_acctbal, customer.c_phone, nation.n_name, customer.c_address,
customer.c_comment]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS
Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount
AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice *
Int64(1) - lineitem.l_discount)]]
- Inner Join: customer.c_nationkey = nation.n_nationkey
- Inner Join: orders.o_orderkey = lineitem.l_orderkey
- Inner Join: customer.c_custkey = orders.o_custkey
- TableScan: customer projection=[c_custkey, c_name, c_address,
c_nationkey, c_phone, c_acctbal, c_comment]
- Filter: orders.o_orderdate >= Date32("8674") AND
orders.o_orderdate < Date32("8766")
- TableScan: orders projection=[o_orderkey, o_custkey, o_orderdate]
- Filter: lineitem.l_returnflag = Utf8("R")
- TableScan: lineitem projection=[l_orderkey, l_extendedprice,
l_discount, l_returnflag]
- TableScan: nation projection=[n_nationkey, n_name]
\ No newline at end of file
+ Projection: customer.c_custkey, customer.c_name, customer.c_address,
customer.c_phone, customer.c_acctbal, customer.c_comment,
lineitem.l_extendedprice, lineitem.l_discount, nation.n_name
Review Comment:
So the idea is that the `Projection` nodes above the join are added to make
it clear what columns that come out of the join are actually needed above it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]