NGA-TRAN commented on code in PR #20331:
URL: https://github.com/apache/datafusion/pull/20331#discussion_r2804289136
##########
datafusion/common/src/config.rs:
##########
@@ -996,6 +996,39 @@ config_namespace! {
///
/// Note: This may reduce parallelism, rooting from the I/O level, if
the number of distinct
/// partitions is less than the target_partitions.
+ ///
+ /// Note for partitioned hash join dynamic filtering:
+ /// preserving file partitions can allow partition-index routing (`i
-> i`) instead of
+ /// CASE-hash routing, but this assumes build/probe partition indices
stay aligned for
+ /// partition hash join / dynamic filter consumers.
+ ///
+ /// Misaligned Partitioned Hash Join Example:
+ /// ```text
+ /// ┌───────────────────────────┐
+ /// │ HashJoinExec │
+ /// │ mode=Partitioned │
+ /// │┌───────┐┌───────┐┌───────┐│
+ /// ││ Hash ││ Hash ││ Hash ││
+ /// ││Table 1││Table 2││Table 2││
Review Comment:
```suggestion
/// ││Table 1││Table 2││Table 3││
```
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -366,6 +370,62 @@ fn hash_join_exec(
.unwrap()
}
+fn partitioned_hash_join_exec(
Review Comment:
```suggestion
// Build a partitioned hash join for 2 given inputs
fn build_partitioned_hash_join(
```
##########
datafusion/common/src/config.rs:
##########
@@ -996,6 +996,39 @@ config_namespace! {
///
/// Note: This may reduce parallelism, rooting from the I/O level, if
the number of distinct
/// partitions is less than the target_partitions.
+ ///
+ /// Note for partitioned hash join dynamic filtering:
+ /// preserving file partitions can allow partition-index routing (`i
-> i`) instead of
+ /// CASE-hash routing, but this assumes build/probe partition indices
stay aligned for
+ /// partition hash join / dynamic filter consumers.
+ ///
+ /// Misaligned Partitioned Hash Join Example:
+ /// ```text
+ /// ┌───────────────────────────┐
+ /// │ HashJoinExec │
+ /// │ mode=Partitioned │
+ /// │┌───────┐┌───────┐┌───────┐│
+ /// ││ Hash ││ Hash ││ Hash ││
+ /// ││Table 1││Table 2││Table 2││
+ /// ││ ││ ││ ││
+ /// ││ key=A ││ key=B ││ key=C ││
+ /// │└───▲───┘└───▲───┘└───▲───┘│
+ /// └────┴────────┼────────┼────┘
+ /// ... Misaligned! Misaligned!
+ /// │ │
+ /// ... ┌───────┼────────┴───────────────┐
+ /// ┌────────┼───────┴───────────────┐ │
+ /// │ │ │ │ │ │
+ ///┌────┴────────┴────────┴────┐ ┌───┴─────────┴────────┴────┐
+ ///│ DataSourceExec │ │ DataSourceExec │
+ ///│┌───────┐┌───────┐┌───────┐│ │┌───────┐┌───────┐┌───────┐│
+ ///││ File ││ File ││ File ││ ││ File ││ File ││ File ││
+ ///││Group 1││Group 2││Group 3││ ││Group 1││Group 2││Group 3││
+ ///││ ││ ││ ││ ││ ││ ││ ││
+ ///││ key=A ││ key=B ││ key=C ││ ││ key=A ││ key=C ││ key=B ││
+ ///│└───────┘└───────┘└───────┘│ │└───────┘└───────┘└───────┘│
+ ///└───────────────────────────┘ └───────────────────────────┘
Review Comment:
Nice diagram
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -737,6 +814,182 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}
+#[test]
+fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
Review Comment:
```suggestion
// Verify that if the join’s inputs are not explicitly direct/indirect hash
repartitioned,
// its `dynamic_filter_routing_mode` must be
`DynamicFilterRoutingMode::PartitionIndex`.
fn
enforce_distribution_switches_to_partition_index_without_hash_repartition()
```
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -737,6 +814,182 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}
+#[test]
+fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
+-> Result<()> {
+ let left = parquet_exec();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 1)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::PartitionIndex,
+ );
+ assert_eq!(direct_hash_repartition_children, 0);
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_disables_dynamic_filtering_for_misaligned_partitioning()
+-> Result<()> {
+ let left = parquet_exec_multiple();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // One side starts with multiple partitions while target is 1.
EnforceDistribution inserts a
+ // hash repartition on the left child. The partitioning schemes are now
misaligned:
+ // - Left: hash-repartitioned (repartitioned=true)
+ // - Right: file-grouped (repartitioned=false)
+ // This is a correctness bug, so we expect an error.
Review Comment:
I think I understand this test but it will be clearer if you can explain the
flow in an example.
Maybe add comments right before the test name using example to describe the
query plan and why EnforceDistribution insert hash repartition there. And then
when the error is caught and thrown
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+
+ // Build side: different data per partition so bounds differ.
+ // Partition 0: ("aa", "ba") -> bounds a:[aa,aa], b:[ba,ba]
+ // Partition 1: ("zz", "zz") -> bounds a:[zz,zz], b:[zz,zz]
+ let build_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa"]),
+ ("b", Utf8, ["ba"]),
+ ("c", Float64, [1.0])
+ )
+ .unwrap(),
+ ];
+ let build_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz"]),
+ ("b", Utf8, ["zz"]),
+ ("c", Float64, [2.0])
+ )
+ .unwrap(),
+ ];
+
+ // Probe side: each partition has a mix of matching and non-matching rows.
+ // Partition 0: ("aa","ba") matches p0 bounds, ("zz","zz") does not
+ // Partition 1: ("zz","zz") matches p1 bounds, ("aa","ba") does not
+ let probe_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "zz"]),
+ ("b", Utf8, ["ba", "zz"]),
+ ("e", Float64, [10.0, 20.0])
+ )
+ .unwrap(),
+ ];
+ let probe_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz", "aa"]),
Review Comment:
```suggestion
("a", Utf8, ["zz", "zz"]),
```
##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -272,6 +274,61 @@ impl DynamicFilterPhysicalExpr {
});
}
+ /// Update the dynamic filter with per-partition filter expressions.
+ ///
+ /// This stores one filter per partition, indexed by partition number.
+ pub fn update_partitioned(&self, partition_exprs: PartitionedFilters) ->
Result<()> {
+ let mut current = self.inner.write();
+ let new_generation = current.generation + 1;
+ current.generation = new_generation;
+ current.partitioned_exprs = partition_exprs;
+ drop(current);
+
+ // Broadcast the new state.
+ let _ = self.state_watch.send(FilterState::InProgress {
+ generation: new_generation,
+ });
+ Ok(())
+ }
+
+ /// Get the filter expression for a specific partition.
+ ///
+ /// Semantics when per-partition filters are present:
+ /// - `Some(Some(expr))`: use the partition-local filter.
+ /// - `Some(None)`: the build partition is known empty, so return `false`.
+ /// - `None` (out-of-range): return `true` (fail-open) to avoid incorrect
pruning if
+ /// partition alignment/count assumptions are violated by a source.
Review Comment:
Can you add a comment here describing what the returned value means?
Something like this:
- Ok(Expr) : dynamic filter expression will be used for the given partition
- Ok(false): will filter everything on the probe side because the build
side is empty
- Ok(true): will not filter anything from the probe side and return as-is
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
Review Comment:
@gene-bordegaray : We want the test data that makes sense. With the
suggestion above that the data is partitioned on column `a` and the query is
joined on columns `[a, b]`, you will be able to have realistic tests as you
wanted.
All of my suggestions below are for this purpose. You need to update the
test output as needed.
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -366,6 +370,62 @@ fn hash_join_exec(
.unwrap()
}
+fn partitioned_hash_join_exec(
+ left: Arc<dyn ExecutionPlan>,
+ right: Arc<dyn ExecutionPlan>,
+ join_on: &JoinOn,
+ join_type: &JoinType,
+) -> Arc<dyn ExecutionPlan> {
+ Arc::new(
+ HashJoinExecBuilder::new(left, right, join_on.clone(), *join_type)
+ .with_partition_mode(PartitionMode::Partitioned)
+ .with_null_equality(NullEquality::NullEqualsNothing)
+ .build()
+ .unwrap(),
+ )
+}
+
+fn first_hash_join_and_direct_hash_repartition_children(
Review Comment:
Does this comment sound correct? Fix it as you see fit
```suggestion
// Traversing down the plan and returning the first hash join with direct
repartition children
fn first_hash_join_and_direct_hash_repartition_children(
```
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -366,6 +370,62 @@ fn hash_join_exec(
.unwrap()
}
+fn partitioned_hash_join_exec(
+ left: Arc<dyn ExecutionPlan>,
+ right: Arc<dyn ExecutionPlan>,
+ join_on: &JoinOn,
+ join_type: &JoinType,
+) -> Arc<dyn ExecutionPlan> {
+ Arc::new(
+ HashJoinExecBuilder::new(left, right, join_on.clone(), *join_type)
+ .with_partition_mode(PartitionMode::Partitioned)
+ .with_null_equality(NullEquality::NullEqualsNothing)
+ .build()
+ .unwrap(),
+ )
+}
+
+fn first_hash_join_and_direct_hash_repartition_children(
+ plan: &Arc<dyn ExecutionPlan>,
+) -> Option<(&HashJoinExec, usize)> {
+ if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
+ let direct_hash_repartition_children = hash_join
+ .children()
+ .into_iter()
+ .filter(|child| {
+ child
+ .as_any()
+ .downcast_ref::<RepartitionExec>()
+ .is_some_and(|repartition| {
+ matches!(repartition.partitioning(),
Partitioning::Hash(_, _))
+ })
+ })
+ .count();
+ return Some((hash_join, direct_hash_repartition_children));
+ }
+
+ for child in plan.children() {
+ if let Some(result) =
first_hash_join_and_direct_hash_repartition_children(child)
+ {
+ return Some(result);
+ }
+ }
+ None
+}
+
+fn hash_repartition_on_column(
Review Comment:
The purpose of this function is to add RepartitionExec on top of the input
plan. How about rename it to:
```suggestion
// Add RepartitionExec for the given input
fn add_repartition(
```
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -279,15 +275,15 @@ async fn
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
// Iterate one batch
stream.next().await.unwrap().unwrap();
- // Test that filters are pushed down correctly to each side of the join
- // NOTE: We dropped the CASE expression here because we now optimize that
away if there's only 1 partition
+ // Test that filters are pushed down correctly to each side of the join.
+ // This test has no RepartitionExec, so partition-index routing is used.
insta::assert_snapshot!(
format_plan_for_test(&plan),
@r"
- SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false],
filter=[e@4 IS NULL OR e@4 < bb]
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, e, f], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa, ab]) ]
AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, e, f], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ {0: d@0 >= aa AND d@0 <= ab AND d@0 IN (SET) ([aa,
ab])} ] AND DynamicFilter [ e@1 IS NULL OR e@1 < bb ]
Review Comment:
Nice
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -405,6 +465,23 @@ fn ensure_distribution_helper(
ensure_distribution(distribution_context, &config).map(|item|
item.data.plan)
}
+/// Like [`ensure_distribution_helper`] but uses bottom-up `transform_up`.
Review Comment:
It will be useful to also explain what will be transformed. Add example if
it is easier to explain
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -405,6 +465,23 @@ fn ensure_distribution_helper(
ensure_distribution(distribution_context, &config).map(|item|
item.data.plan)
}
+/// Like [`ensure_distribution_helper`] but uses bottom-up `transform_up`.
+fn ensure_distribution_helper_transform_up(
+ plan: Arc<dyn ExecutionPlan>,
+ target_partitions: usize,
+) -> Result<Arc<dyn ExecutionPlan>> {
+ let distribution_context = DistributionContext::new_default(plan);
+ let mut config = ConfigOptions::new();
+ config.execution.target_partitions = target_partitions;
+ config.optimizer.enable_round_robin_repartition = false;
+ config.optimizer.repartition_file_scans = false;
+ config.optimizer.repartition_file_min_size = 1024;
+ config.optimizer.prefer_existing_sort = false;
Review Comment:
I will be clearer if you add comments explaining why you need these settings
and for which tests.
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -737,6 +814,182 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}
+#[test]
+fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
+-> Result<()> {
+ let left = parquet_exec();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 1)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::PartitionIndex,
+ );
+ assert_eq!(direct_hash_repartition_children, 0);
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_disables_dynamic_filtering_for_misaligned_partitioning()
+-> Result<()> {
+ let left = parquet_exec_multiple();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // One side starts with multiple partitions while target is 1.
EnforceDistribution inserts a
+ // hash repartition on the left child. The partitioning schemes are now
misaligned:
+ // - Left: hash-repartitioned (repartitioned=true)
+ // - Right: file-grouped (repartitioned=false)
+ // This is a correctness bug, so we expect an error.
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let result = ensure_distribution_helper_transform_up(join, 1);
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("incompatible partitioning schemes"),
+ "Expected error about incompatible partitioning, got: {err}",
+ );
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()
Review Comment:
```suggestion
// Verify that if the join inputs are direct/indirect hash repartitioned,
// its `dynamic_filter_routing_mode` must be
`DynamicFilterRoutingMode::CaseHash`
fn
enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()
```
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -737,6 +814,182 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}
+#[test]
+fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
+-> Result<()> {
+ let left = parquet_exec();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 1)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::PartitionIndex,
+ );
+ assert_eq!(direct_hash_repartition_children, 0);
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_disables_dynamic_filtering_for_misaligned_partitioning()
+-> Result<()> {
+ let left = parquet_exec_multiple();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // One side starts with multiple partitions while target is 1.
EnforceDistribution inserts a
+ // hash repartition on the left child. The partitioning schemes are now
misaligned:
+ // - Left: hash-repartitioned (repartitioned=true)
+ // - Right: file-grouped (repartitioned=false)
+ // This is a correctness bug, so we expect an error.
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let result = ensure_distribution_helper_transform_up(join, 1);
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("incompatible partitioning schemes"),
+ "Expected error about incompatible partitioning, got: {err}",
+ );
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()
+-> Result<()> {
+ let left = projection_exec_with_alias(
+ hash_repartition_on_column(parquet_exec(), "a", 4),
+ vec![("a".to_string(), "a".to_string())],
+ );
+
+ let right = aggregate_exec_with_alias(
+ hash_repartition_on_column(parquet_exec(), "a", 4),
+ vec![("a".to_string(), "a".to_string())],
+ );
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // Both sides are already hash repartitioned, but the hash repartitions
are below other
+ // operators not directly under the join. EnforceDistribution should
detect both sides are
+ // repartitioned and set CaseHash routing mode.
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 4)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+ ProjectionExec: expr=[a@0 as a]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
Review Comment:
Nice test
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+
+ // Build side: different data per partition so bounds differ.
+ // Partition 0: ("aa", "ba") -> bounds a:[aa,aa], b:[ba,ba]
+ // Partition 1: ("zz", "zz") -> bounds a:[zz,zz], b:[zz,zz]
+ let build_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa"]),
+ ("b", Utf8, ["ba"]),
+ ("c", Float64, [1.0])
Review Comment:
```suggestion
("a", Utf8, ["aa", "kk"]),
("b", Utf8, ["ba", "whatever"]),
("c", Float64, [1.0, 2.0] )
```
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -737,6 +814,182 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}
+#[test]
+fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
+-> Result<()> {
+ let left = parquet_exec();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 1)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::PartitionIndex,
+ );
+ assert_eq!(direct_hash_repartition_children, 0);
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_disables_dynamic_filtering_for_misaligned_partitioning()
+-> Result<()> {
+ let left = parquet_exec_multiple();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // One side starts with multiple partitions while target is 1.
EnforceDistribution inserts a
+ // hash repartition on the left child. The partitioning schemes are now
misaligned:
+ // - Left: hash-repartitioned (repartitioned=true)
+ // - Right: file-grouped (repartitioned=false)
+ // This is a correctness bug, so we expect an error.
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let result = ensure_distribution_helper_transform_up(join, 1);
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("incompatible partitioning schemes"),
+ "Expected error about incompatible partitioning, got: {err}",
+ );
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()
+-> Result<()> {
+ let left = projection_exec_with_alias(
+ hash_repartition_on_column(parquet_exec(), "a", 4),
+ vec![("a".to_string(), "a".to_string())],
+ );
+
+ let right = aggregate_exec_with_alias(
+ hash_repartition_on_column(parquet_exec(), "a", 4),
+ vec![("a".to_string(), "a".to_string())],
+ );
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // Both sides are already hash repartitioned, but the hash repartitions
are below other
+ // operators not directly under the join. EnforceDistribution should
detect both sides are
+ // repartitioned and set CaseHash routing mode.
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 4)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+ ProjectionExec: expr=[a@0 as a]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::CaseHash,
+ );
+ assert_eq!(direct_hash_repartition_children, 1);
+
+ Ok(())
+}
+
+#[test]
+fn enforce_distribution_ignores_hash_repartition_off_dynamic_filter_path() ->
Result<()> {
+ // This hash repartition is in the probe subtree but off the dynamic
filter pushdown path
+ // because the top filter references `a` while this branch only exposes
`a2`.
Review Comment:
```suggestion
// Create alias `a2` for column `a`
```
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -737,6 +814,182 @@ fn multi_hash_joins() -> Result<()> {
Ok(())
}
+#[test]
+fn enforce_distribution_switches_to_partition_index_without_hash_repartition()
+-> Result<()> {
+ let left = parquet_exec();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 1)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c, d,
e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::PartitionIndex,
+ );
+ assert_eq!(direct_hash_repartition_children, 0);
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_disables_dynamic_filtering_for_misaligned_partitioning()
+-> Result<()> {
+ let left = parquet_exec_multiple();
+ let right = parquet_exec();
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // One side starts with multiple partitions while target is 1.
EnforceDistribution inserts a
+ // hash repartition on the left child. The partitioning schemes are now
misaligned:
+ // - Left: hash-repartitioned (repartitioned=true)
+ // - Right: file-grouped (repartitioned=false)
+ // This is a correctness bug, so we expect an error.
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let result = ensure_distribution_helper_transform_up(join, 1);
+ assert!(result.is_err());
+ let err = result.unwrap_err();
+ assert!(
+ err.to_string()
+ .contains("incompatible partitioning schemes"),
+ "Expected error about incompatible partitioning, got: {err}",
+ );
+
+ Ok(())
+}
+
+#[test]
+fn
enforce_distribution_uses_case_hash_with_hidden_hash_repartition_through_aggregate()
+-> Result<()> {
+ let left = projection_exec_with_alias(
+ hash_repartition_on_column(parquet_exec(), "a", 4),
+ vec![("a".to_string(), "a".to_string())],
+ );
+
+ let right = aggregate_exec_with_alias(
+ hash_repartition_on_column(parquet_exec(), "a", 4),
+ vec![("a".to_string(), "a".to_string())],
+ );
+
+ let join_on = vec![(
+ Arc::new(Column::new_with_schema("a", &left.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ Arc::new(Column::new_with_schema("a", &right.schema()).unwrap())
+ as Arc<dyn PhysicalExpr>,
+ )];
+
+ // Both sides are already hash repartitioned, but the hash repartitions
are below other
+ // operators not directly under the join. EnforceDistribution should
detect both sides are
+ // repartitioned and set CaseHash routing mode.
+ let join = partitioned_hash_join_exec(left, right, &join_on,
&JoinType::Inner);
+
+ let optimized = ensure_distribution_helper_transform_up(join, 4)?;
+ assert_plan!(optimized, @r"
+ HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0)]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+ ProjectionExec: expr=[a@0 as a]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=1
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], file_type=parquet
+ ");
+ let (hash_join, direct_hash_repartition_children) =
+ first_hash_join_and_direct_hash_repartition_children(&optimized)
+ .expect("expected HashJoinExec");
+
+ assert_eq!(
+ hash_join.dynamic_filter_routing_mode,
+ DynamicFilterRoutingMode::CaseHash,
+ );
+ assert_eq!(direct_hash_repartition_children, 1);
+
+ Ok(())
+}
+
+#[test]
+fn enforce_distribution_ignores_hash_repartition_off_dynamic_filter_path() ->
Result<()> {
Review Comment:
```suggestion
// Verify dynamic_filter_routing_mode works correctly with alias
fn enforce_distribution_ignores_hash_repartition_off_dynamic_filter_path()
-> Result<()> {
```
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -2004,4 +2020,129 @@ mod test {
"Reverse scan with non-contiguous row groups should correctly map
RowSelection"
);
}
+
+ #[tokio::test]
+ async fn test_partition_snapshot_in_opener() {
+ let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+
+ let batch = record_batch!(("a", Int32, vec![Some(1), Some(2),
Some(3)])).unwrap();
+ let data_size =
+ write_parquet(Arc::clone(&store), "test.parquet",
batch.clone()).await;
+
+ let schema = batch.schema();
+ let file = PartitionedFile::new(
+ "test.parquet".to_string(),
+ u64::try_from(data_size).unwrap(),
+ );
+
+ let col_a = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
+
+ let dynamic_filter = Arc::new(DynamicFilterPhysicalExpr::new(
+ vec![Arc::clone(&col_a)],
+ datafusion_physical_expr::expressions::lit(true) as Arc<dyn
PhysicalExpr>,
+ ));
+
+ // Creates a DynamicFilterPhysicalExpr with per-partition bounds:
+ // - Partition 0: a >= 1 AND a <= 3 (matches all rows)
+ // - Partition 1: a >= 10 AND a <= 20 (excludes all rows via row group
stats)
+ // - Partition 2: a >= 2 AND a <= 4 (matches some rows)
Review Comment:
So Partitions 0 and 2 overlap? Why you need this unit test? Haven's you
already throw error if this happens in the query plan?
For unit test, we can make it whatever but I suggest we have the test that
makes sense to us. Test non-overlapped partitions
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+
+ // Build side: different data per partition so bounds differ.
+ // Partition 0: ("aa", "ba") -> bounds a:[aa,aa], b:[ba,ba]
+ // Partition 1: ("zz", "zz") -> bounds a:[zz,zz], b:[zz,zz]
Review Comment:
```suggestion
// Build side:
// Partition 0 (on column a): bounds a["aa", "kk"]
// Partition 1 (on column a): bounds a:["zz","zz"]
```
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+
+ // Build side: different data per partition so bounds differ.
+ // Partition 0: ("aa", "ba") -> bounds a:[aa,aa], b:[ba,ba]
+ // Partition 1: ("zz", "zz") -> bounds a:[zz,zz], b:[zz,zz]
+ let build_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa"]),
+ ("b", Utf8, ["ba"]),
+ ("c", Float64, [1.0])
+ )
+ .unwrap(),
+ ];
+ let build_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz"]),
+ ("b", Utf8, ["zz"]),
+ ("c", Float64, [2.0])
+ )
+ .unwrap(),
+ ];
+
+ // Probe side: each partition has a mix of matching and non-matching rows.
+ // Partition 0: ("aa","ba") matches p0 bounds, ("zz","zz") does not
+ // Partition 1: ("zz","zz") matches p1 bounds, ("aa","ba") does not
+ let probe_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "zz"]),
Review Comment:
```suggestion
("a", Utf8, ["aa", "kk"]),
```
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+
+ // Build side: different data per partition so bounds differ.
+ // Partition 0: ("aa", "ba") -> bounds a:[aa,aa], b:[ba,ba]
+ // Partition 1: ("zz", "zz") -> bounds a:[zz,zz], b:[zz,zz]
+ let build_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa"]),
+ ("b", Utf8, ["ba"]),
+ ("c", Float64, [1.0])
+ )
+ .unwrap(),
+ ];
+ let build_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz"]),
+ ("b", Utf8, ["zz"]),
+ ("c", Float64, [2.0])
+ )
+ .unwrap(),
+ ];
+
+ // Probe side: each partition has a mix of matching and non-matching rows.
+ // Partition 0: ("aa","ba") matches p0 bounds, ("zz","zz") does not
+ // Partition 1: ("zz","zz") matches p1 bounds, ("aa","ba") does not
Review Comment:
```suggestion
// Probe side:
// Partition 0 (on column a): bounds a["aa", "kk"]
// Partition 1 (on column a): bounds a:["zz","zz"]
```
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
Review Comment:
It is a bit hard to review without knowing what you plan to build
```suggestion
// Simulate the query plan of this:
// SELECT p.a
// FROM build_side AS b, probe_side AS p
// WHERE b.a = p.a and b.b = p.b
// ORDER BY p.a
// Both `build_side` and `probe_side` are partitioned on column `a`
// Note: the join is on `[a, b]`
```
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1603,24 +1776,164 @@ async fn
test_nested_hashjoin_dynamic_filter_pushdown() {
// Execute to populate the dynamic filters
stream.next().await.unwrap().unwrap();
- // Verify that both the inner and outer join have updated dynamic filters
+ // No RepartitionExec on either join, so partition-index routing is used.
insta::assert_snapshot!(
format!("{}", format_plan_for_test(&plan)),
@r"
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
- DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true
- HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa, ab]) ]
- - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca, cb]) ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ {0: b@0 >= aa AND b@0 <= ab AND b@0 IN (SET) ([aa,
ab])} ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ {0: d@0 >= ca AND d@0 <= cb AND d@0 IN (SET) ([ca,
cb])} ]
"
);
}
#[tokio::test]
-async fn test_hashjoin_parent_filter_pushdown() {
+async fn test_nested_hashjoin_with_repartition_dynamic_filter_pushdown() {
use datafusion_common::JoinType;
- use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+ use datafusion_physical_plan::Partitioning;
+ use datafusion_physical_plan::joins::PartitionMode;
+ use datafusion_physical_plan::repartition::RepartitionExec;
+ // Tests remap_children through repartition when nested join is build side.
+ let t1_batches = vec![
+ record_batch!(("a", Utf8, ["aa", "ab"]), ("x", Float64, [1.0,
2.0])).unwrap(),
+ ];
+ let t1_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("x", DataType::Float64, false),
+ ]));
+ let t1_scan = TestScanBuilder::new(Arc::clone(&t1_schema))
+ .with_support(true)
+ .with_batches(t1_batches)
+ .build();
+
+ let t2_batches = vec![
+ record_batch!(
+ ("b", Utf8, ["aa", "ab", "ac", "ad", "ae"]),
+ ("c", Utf8, ["ca", "cb", "cc", "cd", "ce"]),
+ ("y", Float64, [1.0, 2.0, 3.0, 4.0, 5.0])
+ )
+ .unwrap(),
+ ];
+ let t2_schema = Arc::new(Schema::new(vec![
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Utf8, false),
+ Field::new("y", DataType::Float64, false),
+ ]));
+ let t2_scan = TestScanBuilder::new(Arc::clone(&t2_schema))
+ .with_support(true)
+ .with_batches(t2_batches)
+ .build();
+
+ let t3_batches = vec![
+ record_batch!(
+ ("d", Utf8, ["ca", "cb", "cc", "cd", "ce", "cf", "cg", "ch"]),
+ ("z", Float64, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0])
+ )
+ .unwrap(),
+ ];
+ let t3_schema = Arc::new(Schema::new(vec![
+ Field::new("d", DataType::Utf8, false),
+ Field::new("z", DataType::Float64, false),
+ ]));
+ let t3_scan = TestScanBuilder::new(Arc::clone(&t3_schema))
+ .with_support(true)
+ .with_batches(t3_batches)
+ .build();
+
+ // Create Join1: t2.c = t3.d
+ let join1_on = vec![(col("c", &t2_schema).unwrap(), col("d",
&t3_schema).unwrap())];
+ let join1 = Arc::new(
+ HashJoinExecBuilder::new(t2_scan, t3_scan, join1_on, JoinType::Inner)
+ .with_partition_mode(PartitionMode::Partitioned)
+
.with_null_equality(datafusion_common::NullEquality::NullEqualsNothing)
+
.with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::PartitionIndex)
+ .build()
+ .unwrap(),
+ );
+
+ // Add RepartitionExec to change partitioning key from 'c' to 'b'
+ // Join1 is on 'c=d', but Join2 will be on 'a=b'
+ let join1_schema = join1.schema();
+ let repartition = Arc::new(
+ RepartitionExec::try_new(
+ join1 as Arc<dyn ExecutionPlan>,
+ Partitioning::Hash(vec![col("b", &join1_schema).unwrap()], 1),
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+
+ // Create Join2 with repartitioned nested join as build side
+ let join2_on = vec![(
+ col("b", &repartition.schema()).unwrap(),
+ col("a", &t1_schema).unwrap(),
+ )];
+ let join2 = Arc::new(
+ HashJoinExecBuilder::new(repartition, t1_scan, join2_on,
JoinType::Inner)
+ .with_partition_mode(PartitionMode::Partitioned)
+
.with_null_equality(datafusion_common::NullEquality::NullEqualsNothing)
+
.with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::CaseHash)
+ .build()
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&join2),
FilterPushdown::new_post_optimization(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@0, a@0)]
+ - RepartitionExec: partitioning=Hash([b@0], 1), input_partitions=1
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@0, a@0)]
+ - RepartitionExec: partitioning=Hash([b@0], 1), input_partitions=1
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1,
d@0)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
+ ",
+ );
+
+ let session_config = dynamic_filter_session_config();
+ let plan = FilterPushdown::new_post_optimization()
+ .optimize(join2, session_config.options())
+ .unwrap();
+ let session_ctx = SessionContext::new_with_config(session_config);
+ session_ctx.register_object_store(
+ ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+ Arc::new(InMemory::new()),
+ );
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+ stream.next().await.unwrap().unwrap();
+
+ // Verify dynamic filters are populated correctly despite the repartition.
+ // The filter on t1 proves that:
+ // - Join2's build side (the nested join after repartition) successfully
creates filters
+ // - Those filters successfully propagate through the RepartitionExec to
t1 (probe side)
+ // - remap_children correctly handles the expression remapping through
repartition
+ insta::assert_snapshot!(
+ format!("{}", format_plan_for_test(&plan)),
+ @r"
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(b@0, a@0)]
+ - RepartitionExec: partitioning=Hash([b@0], 1), input_partitions=1
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[b, c, y], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[d, z], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ {0: d@0 >= ca AND d@0 <= ce AND d@0 IN (SET) ([ca,
cb, cc, cd, ce])} ]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, x], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ae AND a@0 IN (SET) ([aa, ab,
ac, ad, ae]) ]
+ "
Review Comment:
Nice test
##########
datafusion/core/tests/physical_optimizer/filter_pushdown.rs:
##########
@@ -1291,6 +1301,194 @@ async fn
test_hashjoin_dynamic_filter_pushdown_partitioned() {
);
}
+#[tokio::test]
+async fn test_partitioned_hashjoin_no_repartition_dynamic_filter_pushdown() {
+ use datafusion_common::JoinType;
+
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+
+ // Build side: different data per partition so bounds differ.
+ // Partition 0: ("aa", "ba") -> bounds a:[aa,aa], b:[ba,ba]
+ // Partition 1: ("zz", "zz") -> bounds a:[zz,zz], b:[zz,zz]
+ let build_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa"]),
+ ("b", Utf8, ["ba"]),
+ ("c", Float64, [1.0])
+ )
+ .unwrap(),
+ ];
+ let build_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz"]),
+ ("b", Utf8, ["zz"]),
+ ("c", Float64, [2.0])
+ )
+ .unwrap(),
+ ];
+
+ // Probe side: each partition has a mix of matching and non-matching rows.
+ // Partition 0: ("aa","ba") matches p0 bounds, ("zz","zz") does not
+ // Partition 1: ("zz","zz") matches p1 bounds, ("aa","ba") does not
+ let probe_p0 = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "zz"]),
+ ("b", Utf8, ["ba", "zz"]),
+ ("e", Float64, [10.0, 20.0])
+ )
+ .unwrap(),
+ ];
+ let probe_p1 = vec![
+ record_batch!(
+ ("a", Utf8, ["zz", "aa"]),
+ ("b", Utf8, ["zz", "ba"]),
+ ("e", Float64, [30.0, 40.0])
+ )
+ .unwrap(),
+ ];
+
+ // Use 2 file groups per side (2 partitions) with no RepartitionExec.
+ // This triggers partition-index routing and simulates what happens when
+ // `preserve_file_partitions` is enabled and declares Hash partitioning on
the DataSourceExec.
+ let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+ .with_support(true)
+ .with_batches_for_partition(build_p0)
+ .with_batches_for_partition(build_p1)
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "build_0.parquet",
+ 123,
+ )]))
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "build_1.parquet",
+ 123,
+ )]))
+ .build();
+
+ let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+ .with_support(true)
+ .with_batches_for_partition(probe_p0)
+ .with_batches_for_partition(probe_p1)
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "probe_0.parquet",
+ 123,
+ )]))
+ .with_file_group(FileGroup::new(vec![PartitionedFile::new(
+ "probe_1.parquet",
+ 123,
+ )]))
+ .build();
+
+ let on = vec![
+ (
+ col("a", &build_side_schema).unwrap(),
+ col("a", &probe_side_schema).unwrap(),
+ ),
+ (
+ col("b", &build_side_schema).unwrap(),
+ col("b", &probe_side_schema).unwrap(),
+ ),
+ ];
+ let hash_join = Arc::new(
+ HashJoinExecBuilder::new(
+ build_scan,
+ Arc::clone(&probe_scan),
+ on,
+ JoinType::Inner,
+ )
+ .with_partition_mode(PartitionMode::Partitioned)
+ .with_null_equality(datafusion_common::NullEquality::NullEqualsNothing)
+
.with_dynamic_filter_routing_mode(DynamicFilterRoutingMode::PartitionIndex)
+ .build()
+ .unwrap(),
+ );
+
+ // Top-level CoalescePartitionsExec
+ let cp = Arc::new(CoalescePartitionsExec::new(hash_join)) as Arc<dyn
ExecutionPlan>;
+ // Add a sort for deterministic output
+ let plan = Arc::new(SortExec::new(
+ LexOrdering::new(vec![PhysicalSortExpr::new(
+ col("a", &probe_side_schema).unwrap(),
+ SortOptions::new(true, false), // descending, nulls_first
+ )])
+ .unwrap(),
+ cp,
+ )) as Arc<dyn ExecutionPlan>;
+
+ // expect the predicate to be pushed down into the probe side DataSource
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan),
FilterPushdown::new_post_optimization(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0),
(b@1, b@1)]
+ - DataSourceExec: file_groups={2 groups: [[build_0.parquet],
[build_1.parquet]]}, projection=[a, b, c], file_type=test,
pushdown_supported=true
+ - DataSourceExec: file_groups={2 groups: [[probe_0.parquet],
[probe_1.parquet]]}, projection=[a, b, e], file_type=test,
pushdown_supported=true
+ output:
+ Ok:
+ - SortExec: expr=[a@0 DESC NULLS LAST], preserve_partitioning=[false]
+ - CoalescePartitionsExec
+ - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0,
a@0), (b@1, b@1)]
+ - DataSourceExec: file_groups={2 groups: [[build_0.parquet],
[build_1.parquet]]}, projection=[a, b, c], file_type=test,
pushdown_supported=true
+ - DataSourceExec: file_groups={2 groups: [[probe_0.parquet],
[probe_1.parquet]]}, projection=[a, b, e], file_type=test,
pushdown_supported=true, predicate=DynamicFilter [ empty ]
Review Comment:
The plan looks good. Only need to make test data partitioned correctly as I
suggested above
##########
datafusion/physical-expr/src/expressions/dynamic_filters.rs:
##########
@@ -86,6 +90,11 @@ struct Inner {
/// This is redundant with the watch channel state, but allows us to
return immediately
/// from `wait_complete()` without subscribing if already complete.
is_complete: bool,
+ /// Per-partition filter expressions for partition-index routing.
+ /// When both sides of a hash join preserve their file partitioning (no
RepartitionExec(Hash)),
+ /// build-partition i corresponds to probe-partition i. This allows
storing per-partition
+ /// filters so that each partition only sees its own bounds, giving
tighter filtering.
+ partitioned_exprs: PartitionedFilters,
Review Comment:
👍
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1454,21 +1481,86 @@ pub fn ensure_distribution(
plan.with_new_children(children_plans)?
};
+ // For partitioned hash joins, decide dynamic filter routing mode.
+ //
+ // Dynamic filtering requires matching partitioning schemes on both sides:
+ // - PartitionIndex: Both sides use file-grouped partitioning
(value-based).
+ // Partition i on build corresponds to partition i on probe by partition
value.
+ // - CaseHash: Both sides use hash repartitioning (hash-based).
+ // Uses CASE expression with hash(row) % N to route to correct partition
filter.
+ //
+ // NOTE: If partitioning schemes are misaligned (one file-grouped, one
hash-repartitioned),
+ // the partitioned join itself is incorrect.
+ // Partition assignments don't match:
+ // - File-grouped: partition 0 = all rows where column="A" (value-based)
+ // - Hash-repartitioned: partition 0 = all rows where hash(column) % N ==
0 (hash-based)
+ // These are incompatible, so the join will miss matching rows.
+ plan = if let Some(hash_join) =
plan.as_any().downcast_ref::<HashJoinExec>()
+ && matches!(hash_join.mode, PartitionMode::Partitioned)
+ {
+ let routing_mode = match (
+ children[0].data.repartitioned,
+ children[1].data.repartitioned,
+ ) {
+ (false, false) => DynamicFilterRoutingMode::PartitionIndex,
+ (true, true) => DynamicFilterRoutingMode::CaseHash,
+ _ => {
+ // Misaligned partitioning schemes
+ return plan_err!(
+ "Partitioned hash join has incompatible partitioning
schemes: \
+ left side is {}, right side is {}.",
+ if children[0].data.repartitioned {
+ "hash-repartitioned"
+ } else {
+ "file-grouped"
+ },
+ if children[1].data.repartitioned {
+ "hash-repartitioned"
+ } else {
+ "file-grouped"
+ }
+ );
+ }
Review Comment:
Reasonable check and throwing error 👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]