berkaysynnada commented on code in PR #14120:
URL: https://github.com/apache/datafusion/pull/14120#discussion_r1916044324
##########
datafusion/core/src/physical_optimizer/projection_pushdown.rs:
##########
@@ -144,7 +144,10 @@ pub fn remove_unnecessary_projections(
} else if let Some(cross_join) = input.downcast_ref::<CrossJoinExec>()
{
try_swapping_with_cross_join(projection, cross_join)?
} else if let Some(nl_join) =
input.downcast_ref::<NestedLoopJoinExec>() {
- try_swapping_with_nested_loop_join(projection, nl_join)?
+ try_pushdown_through_nested_loop_join(projection,
nl_join)?.map_or_else(
Review Comment:
I know the same pattern exists in HashJoin part too, but does it take much
effort if we somehow unify this "first try pushdown, if not possible, then
embed" approach? Like "embed and pushdown", whichever possible.
##########
datafusion/core/src/physical_optimizer/projection_pushdown.rs:
##########
@@ -2402,6 +2413,73 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_nested_loop_join_after_projection() -> Result<()> {
+ let left_csv = create_simple_csv_exec();
+ let right_csv = create_simple_csv_exec();
+
+ let col_left_a = col("a", &left_csv.schema())?;
+ let col_right_b = col("b", &right_csv.schema())?;
+ let col_left_c = col("c", &left_csv.schema())?;
+ // left_a < right_b
+ let filter_expr =
+ binary(col_left_a, Operator::Lt, col_right_b, &Schema::empty())?;
+ let filter_column_indices = vec![
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 1,
+ side: JoinSide::Right,
+ },
+ ColumnIndex {
+ index: 2,
+ side: JoinSide::Right,
+ },
+ ];
+ let filter_schema = Schema::new(vec![
+ Field::new("a", DataType::Int32, true),
+ Field::new("b", DataType::Int32, true),
+ Field::new("c", DataType::Int32, true),
+ ]);
+
+ let join: Arc<dyn ExecutionPlan> =
Arc::new(NestedLoopJoinExec::try_new(
+ left_csv,
+ right_csv,
+ Some(JoinFilter::new(
+ filter_expr,
+ filter_column_indices,
+ filter_schema,
+ )),
+ &JoinType::Inner,
+ None,
+ )?);
+
+ let projection: Arc<dyn ExecutionPlan> =
Arc::new(ProjectionExec::try_new(
+ vec![(col_left_c, "c".to_string())],
+ Arc::clone(&join),
+ )?);
+ let initial = get_plan_string(&projection);
+ let expected_initial = [
+ "ProjectionExec: expr=[c@2 as c]",
+ " NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], has_header=false",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], has_header=false",
+ ];
+ assert_eq!(initial, expected_initial);
+
+ let after_optimize =
+ ProjectionPushdown::new().optimize(projection,
&ConfigOptions::new())?;
+ let expected = [
+ "NestedLoopJoinExec: join_type=Inner, filter=a@0 < b@1,
projection=[c@2]",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], has_header=false",
+ " CsvExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], has_header=false",
Review Comment:
A solid example of what I am looking for is, these plans will project a&c
(first CsvExec), and b only (second CsvExec). Embedding the projection into
CsvExec is already done, we just need to pushdown the projection below
NestedLoopJoinExec
##########
datafusion/core/src/physical_optimizer/projection_pushdown.rs:
##########
@@ -626,6 +635,63 @@ fn collect_column_indices(exprs: &[(Arc<dyn PhysicalExpr>,
String)]) -> Vec<usiz
indices
}
+fn try_pushdown_through_nested_loop_join(
Review Comment:
What I mean is, can we do the possible pushdown and embed operation at the
same time in this function
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]