gene-bordegaray commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2801896206
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1603,24 +1776,164 @@ async fn
test_nested_hashjoin_dynamic_filter_pushdown() {
// Execute to populate the dynamic filters
stream.next().await.unwrap().unwrap();
- // Verify that both the inner and outer join have updated dynamic filters
+ // No RepartitionExec on either join, so partition-index routing is used.
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ]
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ {0: b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa,
ab])} ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ {0: d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca,
cb])} ]
"
);
}
#[tokio::test]
-async fn test_hashjoin_parent_filter_pushdown() {
+async fn test_nested_hashjoin_with_repartition_dynamic_filter_pushdown() {
use datafusion_common::JoinType;
- use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+ use datafusion_physical_plan::Partitioning;
+ use datafusion_physical_plan::joins::PartitionMode;
+ use datafusion_physical_plan::repartition::RepartitionExec;
+ // Tests remap_children through repartition when nested join is build side.
+ let t1_batches = vec![
+ record_batch!(("a", Utf8, ["aa", "ab"]), ("x", Float64, [1.0,
2.0])).unwrap(),
+ ];
+ let t1_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("x", DataType::Float64, false),
+ ]));
+ let t1_scan = TestScanBuilder::new(Arc::clone(&t1_schema))
+ .with_support(true)
+ .with_batches(t1_batches)
+ .build();
+
+ let t2_batches = vec![
+ record_batch!(
+ ("b", Utf8, ["aa", "ab", "ac", "ad", "ae"]),
+ ("c", Utf8, ["ca", "cb", "cc", "cd", "ce"]),
+ ("y", Float64, [1.0, 2.0, 3.0, 4.0, 5.0])
+ )
+ .unwrap(),
+ ];
+ let t2_schema = Arc::new(Schema::new(vec![
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Utf8, false),
+ Field::new("y", DataType::Float64, false),
+ ]));
+ let t2_scan = TestScanBuilder::new(Arc::clone(&t2_schema))
+ .with_support(true)
+ .with_batches(t2_batches)
+ .build();
+
+ let t3_batches = vec![
+ record_batch!(
+ ("d", Utf8, ["ca", "cb", "cc", "cd", "ce", "cf", "cg", "ch"]),
+ ("z", Float64, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0])
+ )
+ .unwrap(),
+ ];
+ let t3_schema = Arc::new(Schema::new(vec![
+ Field::new("d", DataType::Utf8, false),
+ Field::new("z", DataType::Float64, false),
+ ]));
+ let t3_scan = TestScanBuilder::new(Arc::clone(&t3_schema))
+ .with_support(true)
+ .with_batches(t3_batches)
+ .build();
+
+ // Create Join1: t2.c = t3.d
+ let join1_on = vec![(col("c", &t2_schema).unwrap(), col("d",
&t3_schema).unwrap())];
+ let join1 = Arc::new(
+ HashJoinExecBuilder::new(t2_scan, t3_scan, join1_on, JoinType::Inner)
+ .with_partition_mode(PartitionMode::Partitioned)
+
.with_null_equality(datafusion_common::NullEquality::NullEqualsNothing)
+
.with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::PartitionIndex)
+ .build()
+ .unwrap(),
+ );
+
+ // Add RepartitionExec to change partitioning key from 'c' to 'b'
+ // Join1 is on 'c=d', but Join2 will be on 'a=b'
+ let join1_schema = join1.schema();
+ let repartition = Arc::new(
+ RepartitionExec::try_new(
+ join1 as Arc<dyn ExecutionPlan>,
+ Partitioning::Hash(vec![col("b", &join1_schema).unwrap()], 1),
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+
+ // Create Join2 with repartitioned nested join as build side
+ let join2_on = vec![(
+ col("b", &repartition.schema()).unwrap(),
+ col("a", &t1_schema).unwrap(),
+ )];
+ let join2 = Arc::new(
+ HashJoinExecBuilder::new(repartition, t1_scan, join2_on,
JoinType::Inner)
+ .with_partition_mode(PartitionMode::Partitioned)
+
.with_null_equality(datafusion_common::NullEquality::NullEqualsNothing)
+
.with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::CaseHash)
+ .build()
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&join2),
FilterPushdown::new_post_optimization(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@0, a@0)]
+ - RepartitionExec: partitioning=Hash([b@0], 1), input_partitions=1
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@0, a@0)]
+ - RepartitionExec: partitioning=Hash([b@0], 1), input_partitions=1
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1,
d@0)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
+ ",
+ );
+
+ let session_config = dynamic_filter_session_config();
+ let plan = FilterPushdown::new_post_optimization()
+ .optimize(join2, session_config.options())
+ .unwrap();
+ let session_ctx = SessionContext::new_with_config(session_config);
+ session_ctx.register_object_store(
+ ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+ Arc::new(InMemory::new()),
+ );
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+ stream.next().await.unwrap().unwrap();
+
+ // Verify dynamic filters are populated correctly despite the repartition.
+ // The filter on t1 proves that:
+ // - Join2's build side (the nested join after repartition) successfully
creates filters
+ // - Those filters successfully propagate through the RepartitionExec to
t1 (probe side)
+ // - remap_children correctly handles the expression remapping through
repartition
+ insta::assert_snapshot!(
+ format!("{}", format_plan_for_test(&plan)),
+ @r"
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@0, a@0)]
+ - RepartitionExec: partitioning=Hash([b@0], 1), input_partitions=1
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true
Review Comment:
no dynamic filter because its the build side of a build a build side...took
me a second 😂
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]