LiaCastaneda commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2803810495
##########
datafusion/common/src/config.rs:
##########
@@ -996,6 +996,39 @@ config_namespace! {
///
/// Note: This may reduce parallelism, rooting from the I/O level, if
the number of distinct
/// partitions is less than the target_partitions.
+ ///
+ /// Note for partitioned hash join dynamic filtering:
Review Comment:
nice explanation!
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -737,6 +814,182 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}
+#[test]
+fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
+-> Result<()> {
+ let left = parquet_exec();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 1)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::PartitionIndex,
+ );
+ assert_eq!(direct_hash_repartition_children, 0);
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_disables_dynamic_filtering_for_misaligned_partitioning()
+-> Result<()> {
+ let left = parquet_exec_multiple();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // One side starts with multiple partitions while target is 1.
EnforceDistribution inserts a
+ // hash repartition on the left child. The partitioning schemes are now
misaligned:
+ // - Left: hash-repartitioned (repartitioned=true)
+ // - Right: file-grouped (repartitioned=false)
+ // This is a correctness bug, so we expect an error.
Review Comment:
Can we have the other way around as well? having a Join of type
`Partitioned` and the left perserving file parttioning and the right having
`RepartitionExec`.
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -921,6 +917,38 @@ async fn
test_topk_filter_passes_through_coalesce_partitions() {
);
}
+/// Returns a `SessionConfig` with `pushdown_filters`,
`enable_dynamic_filter_pushdown`, and `batch_size=10`.
+fn dynamic_filter_session_config() -> SessionConfig {
Review Comment:
this was a much needed helper for all the tests in this file
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -737,6 +814,182 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}
+#[test]
+fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
+-> Result<()> {
+ let left = parquet_exec();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 1)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::PartitionIndex,
+ );
+ assert_eq!(direct_hash_repartition_children, 0);
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_disables_dynamic_filtering_for_misaligned_partitioning()
+-> Result<()> {
+ let left = parquet_exec_multiple();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // One side starts with multiple partitions while target is 1.
EnforceDistribution inserts a
+ // hash repartition on the left child. The partitioning schemes are now
misaligned:
+ // - Left: hash-repartitioned (repartitioned=true)
+ // - Right: file-grouped (repartitioned=false)
+ // This is a correctness bug, so we expect an error.
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let result = ensure_distribution_helper_transform_up(join, 1);
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("incompatible partitioning schemes"),
+ "Expected error about incompatible partitioning, got: {err}",
+ );
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()
+-> Result<()> {
+ let left = projection_exec_with_alias(
+ hash_repartition_on_column(parquet_exec(), "a", 4),
+ vec![("a".to_string(), "a".to_string())],
+ );
+
+ let right = aggregate_exec_with_alias(
+ hash_repartition_on_column(parquet_exec(), "a", 4),
+ vec![("a".to_string(), "a".to_string())],
+ );
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // Both sides are already hash repartitioned, but the hash repartitions
are below other
+ // operators not directly under the join. EnforceDistribution should
detect both sides are
+ // repartitioned and set CaseHash routing mode.
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 4)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+ ProjectionExec: expr=[a@0 as a]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::CaseHash,
+ );
+ assert_eq!(direct_hash_repartition_children, 1);
+
+ Ok(())
+}
+
+#[test]
+fn enforce_distribution_ignores_hash_repartition_off_dynamic_filter_path() ->
Result<()> {
+ // This hash repartition is in the probe subtree but off the dynamic
filter pushdown path
+ // because the top filter references `a` while this branch only exposes
`a2`.
+ let lower_left = projection_exec_with_alias(
+ hash_repartition_on_column(parquet_exec(), "a", 4),
+ vec![("a".to_string(), "a2".to_string())],
+ );
+ let lower_right: Arc<dyn ExecutionPlan> = parquet_exec();
+
+ let lower_join_on = vec![(
+ Arc::new(Column::new_with_schema("a2", &lower_left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &lower_right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ let lower_join: Arc<dyn ExecutionPlan> = Arc::new(
+ HashJoinExecBuilder::new(
+ lower_left.clone(),
+ lower_right.clone(),
+ lower_join_on,
+ JoinType::Inner,
+ )
+ .with_partition_mode(PartitionMode::CollectLeft)
+ .with_null_equality(NullEquality::NullEqualsNothing)
+ .build()
+ .unwrap(),
+ );
+
+ let left = parquet_exec();
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &lower_join.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // EnforceDistribution should detect no repartition on either side and
switch to PartitionIndex.
+ let join = partitioned_hash_join_exec(left, lower_join, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 1)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@1)]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
Review Comment:
Would it make sense to display if DataSourceExec is perserving partitioning?
something like `preserve_partitioning=[bool]`? this may be useful for users to
know why there is no RepartitionExec in the plan even if the mode is
`Partitioned`
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1603,24 +1776,164 @@ async fn
test_nested_hashjoin_dynamic_filter_pushdown() {
// Execute to populate the dynamic filters
stream.next().await.unwrap().unwrap();
- // Verify that both the inner and outer join have updated dynamic filters
+ // No RepartitionExec on either join, so partition-index routing is used.
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ]
Review Comment:
If this was a `Partitioned` Join why was there no `CASE` dynamic filter
before? 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]