NGA-TRAN commented on code in PR #19304:
URL: https://github.com/apache/datafusion/pull/19304#discussion_r2620172650
##########
datafusion/common/src/config.rs:
##########
@@ -1000,6 +1000,29 @@ config_namespace! {
/// ```
pub repartition_sorts: bool, default = true
+ /// Should DataFusion still repartition if the required partitioning
+ /// expression is a subset of the current partition expression.
+ ///
+ /// How the option is used:
+ /// - repartition_subset_satisfactions=true: Always repartition on
a subset satisfaction.
+ /// - repartition_subset_satisfactions=false: Only repartition if
target_partitions > current partitions (increases parallelism).
Review Comment:
Can we have a third option that when it is false, we won't have to do
repartition even if target_partitions > current partitions? This will be useful
in certain use cases
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -339,4 +407,173 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_subset_partitioning_normal() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Int64, false),
+ Field::new("b", DataType::Int64, false),
+ Field::new("c", DataType::Int64, false),
+ ]));
+
+ let col_a: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("a", &schema)?);
+ let col_b: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("b", &schema)?);
+ let col_c: Arc<dyn PhysicalExpr> =
+ Arc::new(Column::new_with_schema("c", &schema)?);
+ let eq_properties = EquivalenceProperties::new(Arc::clone(&schema));
+
+ let test_cases = vec![
+ // Overlap: requirement is superset of partitions
+ (
+ "Overlap: Hash([a]) vs Hash([a, b])",
+ Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ ]),
+ PartitioningSatisfaction::Subset,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ (
+ "Overlap: Hash([a]) vs Hash([a, b, c])",
+ Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ Arc::clone(&col_c),
+ ]),
+ PartitioningSatisfaction::Subset,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ (
+ "Overlap: Hash([a, b]) vs Hash([a, b, c])",
+ Partitioning::Hash(vec![Arc::clone(&col_a),
Arc::clone(&col_b)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ Arc::clone(&col_c),
+ ]),
+ PartitioningSatisfaction::Subset,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ // No overlap: partitions and requirements are disjoint
+ (
+ "No overlap: Hash([a]) vs Hash([b, c])",
+ Partitioning::Hash(vec![Arc::clone(&col_a)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_b),
+ Arc::clone(&col_c),
+ ]),
+ PartitioningSatisfaction::NotSatisfied,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ (
+ "No overlap: Hash([a, b]) vs Hash([c])",
+ Partitioning::Hash(vec![Arc::clone(&col_a),
Arc::clone(&col_b)], 4),
+ Distribution::HashPartitioned(vec![Arc::clone(&col_c)]),
+ PartitioningSatisfaction::NotSatisfied,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
+ // Partial overlap: not a strict subset
+ (
+ "Partial overlap: Hash([a, c]) vs Hash([a, b])",
+ Partitioning::Hash(vec![Arc::clone(&col_a),
Arc::clone(&col_c)], 4),
+ Distribution::HashPartitioned(vec![
+ Arc::clone(&col_a),
+ Arc::clone(&col_b),
+ ]),
+ PartitioningSatisfaction::NotSatisfied,
+ PartitioningSatisfaction::NotSatisfied,
+ ),
Review Comment:
Can you add 3 tests: unknown each side and both unknown?
Will we ever see this `Hash([])`? If so, we want to test it, too
##########
datafusion/physical-expr/src/partitioning.rs:
##########
@@ -148,51 +170,87 @@ impl Partitioning {
}
}
- /// Returns true when the guarantees made by this [`Partitioning`] are
sufficient to
- /// satisfy the partitioning scheme mandated by the `required`
[`Distribution`].
+ /// Returns true if `subset_exprs` is a subset of `superset_exprs`.
+ fn is_subset_partitioning(
+ subset_exprs: &[Arc<dyn PhysicalExpr>],
+ superset_exprs: &[Arc<dyn PhysicalExpr>],
Review Comment:
I think it is a bit redundant in naming here. I would make the first one
`subset_exprs` and the second `exprs`. Or the first one `exprs` and the second
`superset_exprs`
##########
datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part:
##########
@@ -72,24 +72,21 @@ physical_plan
05)--------CoalesceBatchesExec: target_batch_size=8192
06)----------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1,
p_size@2], 4), input_partitions=4
07)------------AggregateExec: mode=Partial, gby=[p_brand@0 as p_brand,
p_type@1 as p_type, p_size@2 as p_size], aggr=[count(alias1)]
-08)--------------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as
p_brand, p_type@1 as p_type, p_size@2 as p_size, alias1@3 as alias1], aggr=[]
-09)----------------CoalesceBatchesExec: target_batch_size=8192
-10)------------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1,
p_size@2, alias1@3], 4), input_partitions=4
-11)--------------------AggregateExec: mode=Partial, gby=[p_brand@1 as p_brand,
p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1], aggr=[]
-12)----------------------HashJoinExec: mode=Partitioned, join_type=LeftAnti,
on=[(ps_suppkey@0, s_suppkey@0)]
+08)--------------AggregateExec: mode=SinglePartitioned, gby=[p_brand@1 as
p_brand, p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1],
aggr=[]
Review Comment:
🎉 : data is already partitioned by `ps_suppkey`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]