jackwener commented on code in PR #4324: URL: https://github.com/apache/arrow-datafusion/pull/4324#discussion_r1029180997
########## datafusion/optimizer/src/eliminate_limit.rs: ########## @@ -157,180 +80,148 @@ mod tests { sum, }; - fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) { - let rule = EliminateLimit::new(); - let optimized_plan = rule + fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> Result<()> { + let optimized_plan = EliminateLimit::new() + .optimize(plan, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); + let formatted_plan = format!("{:?}", optimized_plan); + assert_eq!(formatted_plan, expected); + assert_eq!(plan.schema(), optimized_plan.schema()); + Ok(()) + } + + fn assert_optimized_plan_eq_with_pushdown( + plan: &LogicalPlan, + expected: &str, + ) -> Result<()> { + let optimized_plan = LimitPushDown::new() .optimize(plan, &mut OptimizerConfig::new()) .expect("failed to optimize plan"); + let optimized_plan = EliminateLimit::new() + .optimize(&optimized_plan, &mut OptimizerConfig::new()) + .expect("failed to optimize plan"); let formatted_plan = format!("{:?}", optimized_plan); assert_eq!(formatted_plan, expected); assert_eq!(plan.schema(), optimized_plan.schema()); + Ok(()) } #[test] - fn limit_0_root() { + fn limit_0_root() -> Result<()> { let table_scan = test_table_scan().unwrap(); let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(0)) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, Some(0))? + .build()?; // No aggregate / scan / limit let expected = "EmptyRelation"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) } #[test] - fn limit_0_nested() { - let table_scan = test_table_scan().unwrap(); + fn limit_0_nested() -> Result<()> { + let table_scan = test_table_scan()?; let plan1 = LogicalPlanBuilder::from(table_scan.clone()) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .build()?; let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(0)) - .unwrap() - .union(plan1) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, Some(0))? + .union(plan1)? + .build()?; // Left side is removed let expected = "Union\ \n EmptyRelation\ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) } #[test] - fn limit_fetch_with_ancestor_limit_skip() { - let table_scan = test_table_scan().unwrap(); + fn limit_fetch_with_ancestor_limit_fetch() -> Result<()> { + let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(2)) - .unwrap() - .limit(2, None) - .unwrap() - .build() - .unwrap(); - - // No aggregate / scan / limit - let expected = "Limit: skip=2, fetch=None\ - \n EmptyRelation"; - assert_optimized_plan_eq(&plan, expected); - } - - #[test] - fn multi_limit_offset_sort_eliminate() { - let table_scan = test_table_scan().unwrap(); - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(2)) - .unwrap() - .sort(vec![col("a")]) - .unwrap() - .limit(2, Some(1)) - .unwrap() - .build() - .unwrap(); - - let expected = "Limit: skip=2, fetch=1\ - \n Sort: test.a\ - \n EmptyRelation"; - assert_optimized_plan_eq(&plan, expected); - } - - #[test] - fn limit_fetch_with_ancestor_limit_fetch() { - let table_scan = test_table_scan().unwrap(); - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, Some(2)) - .unwrap() - .sort(vec![col("a")]) - .unwrap() - .limit(0, Some(1)) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, Some(2))? + .sort(vec![col("a")])? + .limit(0, Some(1))? + .build()?; let expected = "Limit: skip=0, fetch=1\ \n Sort: test.a\ \n Limit: skip=0, fetch=2\ \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) } #[test] - fn limit_with_ancestor_limit() { - let table_scan = test_table_scan().unwrap(); + fn limit_join_with_ancestor_limit() -> Result<()> { + let table_scan = test_table_scan()?; + let table_scan_inner = test_table_scan_with_name("test1")?; let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(2, Some(1)) - .unwrap() - .sort(vec![col("a")]) - .unwrap() - .limit(3, Some(1)) - .unwrap() - .build() - .unwrap(); - - let expected = "Limit: skip=3, fetch=1\ - \n Sort: test.a\ - \n EmptyRelation"; - assert_optimized_plan_eq(&plan, expected); - } - - #[test] - fn limit_join_with_ancestor_limit() { - let table_scan = test_table_scan().unwrap(); - let table_scan_inner = test_table_scan_with_name("test1").unwrap(); - let plan = LogicalPlanBuilder::from(table_scan) - .limit(2, Some(1)) - .unwrap() + .limit(2, Some(1))? .join_using( &table_scan_inner, JoinType::Inner, vec![Column::from_name("a".to_string())], - ) - .unwrap() - .limit(3, Some(1)) - .unwrap() - .build() - .unwrap(); + )? + .limit(3, Some(1))? + .build()?; let expected = "Limit: skip=3, fetch=1\ \n Inner Join: Using test.a = test1.a\ \n Limit: skip=2, fetch=1\ \n TableScan: test\ \n TableScan: test1"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) } #[test] - fn remove_zero_offset() { - let table_scan = test_table_scan().unwrap(); + fn remove_zero_offset() -> Result<()> { + let table_scan = test_table_scan()?; let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a")], vec![sum(col("b"))]) - .unwrap() - .limit(0, None) - .unwrap() - .build() - .unwrap(); + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, None)? + .build()?; let expected = "Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ \n TableScan: test"; - assert_optimized_plan_eq(&plan, expected); + assert_optimized_plan_eq(&plan, expected) + } + + #[test] + fn limit_fetch_with_ancestor_limit_skip() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(0, Some(2))? + .limit(2, None)? + .build()?; + + // No aggregate / scan / limit + let expected = "EmptyRelation"; + assert_optimized_plan_eq_with_pushdown(&plan, expected) + } + + #[test] + fn limit_with_ancestor_limit() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a")], vec![sum(col("b"))])? + .limit(2, Some(1))? + .sort(vec![col("a")])? + .limit(3, Some(1))? + .build()?; + + // After remove global-state, we don't record the parent <skip, fetch> + // So, bottom don't know parent info, so can't eliminate. + let expected = "Limit: skip=3, fetch=1\ + \n Sort: test.a, fetch=4\ + \n Limit: skip=2, fetch=1\ + \n Aggregate: groupBy=[[test.a]], aggr=[[SUM(test.b)]]\ + \n TableScan: test"; + assert_optimized_plan_eq_with_pushdown(&plan, expected) Review Comment: Some case need to record information cross plannode. Exist regression, but it's trivial. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org