2010YOUY01 commented on code in PR #23239:
URL: https://github.com/apache/datafusion/pull/23239#discussion_r3504695320
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -700,6 +748,201 @@ impl TestConfig {
}
}
+#[test]
+fn range_aggregate_keeps_range_partitioning_at_subset_threshold() ->
Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20, 30],
+ SortOptions::default(),
+ )?);
+ let aggregate =
+ aggregate_exec_with_alias(input, vec![("a".to_string(),
"a".to_string())]);
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .to_plan(aggregate, &DISTRIB_DISTRIB_SORT);
+
+ assert_plan!(
+ plan,
+ @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ DataSourceExec: file_groups={4 groups: [[p0], [p1], [p2], [p3]]},
projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20),
(30)], 4), file_type=parquet
Review Comment:
Thanks, adding slts would be enough. But I would lean toward removing most
of the unit tests added here after moving the equivalent coverage to SLT.
I’ll try to explain the reasoning a bit better, since I realize it is
uncommon to suggest removing tests.
The main concern is that internal module-level tests create maintenance
overhead while providing weaker guarantees:
* If the coverage is moved to SLT, we can also test whether this optimizer
rule works correctly with other optimizer rules in most cases.
* If future features change these unit tests, it is often hard to figure out
the test goal of those internal tests, and update them correctly. Especially, I
think this optimizer rule is likely to change a lot in the future.
So in practice, I would suggest moving most of the test coverage, especially
edge cases, to SLT.
I would only keep module-level unit tests for:
* 1–2 demo-like tests, since they can catch errors earlier, to make
development easier
* Cases that are difficult to test with end-to-end tests.
Reference: a DuckDB author trying to convince us to avoid most UTs in favor
of e2e tests: https://www.youtube.com/watch?v=BgC79Zt2fPs&t=940s
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -700,6 +748,201 @@ impl TestConfig {
}
}
+#[test]
+fn range_aggregate_keeps_range_partitioning_at_subset_threshold() ->
Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20, 30],
+ SortOptions::default(),
+ )?);
+ let aggregate =
+ aggregate_exec_with_alias(input, vec![("a".to_string(),
"a".to_string())]);
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .to_plan(aggregate, &DISTRIB_DISTRIB_SORT);
+
+ assert_plan!(
+ plan,
+ @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ DataSourceExec: file_groups={4 groups: [[p0], [p1], [p2], [p3]]},
projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20),
(30)], 4), file_type=parquet
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn range_aggregate_keeps_range_subset_partitioning_at_subset_threshold() ->
Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20, 30],
+ SortOptions::default(),
+ )?);
+ let aggregate = aggregate_exec_with_alias(
+ input,
+ vec![
+ ("a".to_string(), "a".to_string()),
+ ("b".to_string(), "b".to_string()),
+ ],
+ );
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .to_plan(aggregate, &DISTRIB_DISTRIB_SORT);
+
+ assert_plan!(
+ plan,
+ @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]
+ AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]
+ DataSourceExec: file_groups={4 groups: [[p0], [p1], [p2], [p3]]},
projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20),
(30)], 4), file_type=parquet
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn range_aggregate_rehashes_below_subset_threshold() -> Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20],
+ SortOptions::default(),
+ )?);
+ let aggregate =
+ aggregate_exec_with_alias(input, vec![("a".to_string(),
"a".to_string())]);
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .to_plan(aggregate, &DISTRIB_DISTRIB_SORT);
+
+ assert_plan!(
+ plan,
+ @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
+ DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]},
projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)],
3), file_type=parquet
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn range_aggregate_rehashes_preserved_file_partitions_below_target() ->
Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20],
+ SortOptions::default(),
+ )?);
+ let aggregate =
+ aggregate_exec_with_alias(input, vec![("a".to_string(),
"a".to_string())]);
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .with_preserve_file_partitions(1)
Review Comment:
It would be great to add some comment to explain this test case, especially
the implication of this config
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -700,6 +748,201 @@ impl TestConfig {
}
}
+#[test]
+fn range_aggregate_keeps_range_partitioning_at_subset_threshold() ->
Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20, 30],
+ SortOptions::default(),
+ )?);
+ let aggregate =
+ aggregate_exec_with_alias(input, vec![("a".to_string(),
"a".to_string())]);
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .to_plan(aggregate, &DISTRIB_DISTRIB_SORT);
+
+ assert_plan!(
+ plan,
+ @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ DataSourceExec: file_groups={4 groups: [[p0], [p1], [p2], [p3]]},
projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20),
(30)], 4), file_type=parquet
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn range_aggregate_keeps_range_subset_partitioning_at_subset_threshold() ->
Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20, 30],
+ SortOptions::default(),
+ )?);
+ let aggregate = aggregate_exec_with_alias(
+ input,
+ vec![
+ ("a".to_string(), "a".to_string()),
+ ("b".to_string(), "b".to_string()),
+ ],
+ );
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .to_plan(aggregate, &DISTRIB_DISTRIB_SORT);
+
+ assert_plan!(
+ plan,
+ @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a, b@1 as b], aggr=[]
+ AggregateExec: mode=Partial, gby=[a@0 as a, b@1 as b], aggr=[]
+ DataSourceExec: file_groups={4 groups: [[p0], [p1], [p2], [p3]]},
projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20),
(30)], 4), file_type=parquet
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn range_aggregate_rehashes_below_subset_threshold() -> Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20],
+ SortOptions::default(),
+ )?);
+ let aggregate =
+ aggregate_exec_with_alias(input, vec![("a".to_string(),
"a".to_string())]);
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .to_plan(aggregate, &DISTRIB_DISTRIB_SORT);
+
+ assert_plan!(
+ plan,
+ @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
+ DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]},
projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)],
3), file_type=parquet
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn range_aggregate_rehashes_preserved_file_partitions_below_target() ->
Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20],
+ SortOptions::default(),
+ )?);
+ let aggregate =
+ aggregate_exec_with_alias(input, vec![("a".to_string(),
"a".to_string())]);
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(4)
+ .with_preserve_file_partitions(1)
+ .to_plan(aggregate, &DISTRIB_DISTRIB_SORT);
+
+ assert_plan!(
+ plan,
+ @r"
+ AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
+ AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[]
+ RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
+ DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]},
projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)],
3), file_type=parquet
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn range_aggregate_rehashes_incompatible_partitioning() -> Result<()> {
+ let input = parquet_exec_with_output_partitioning(range_partitioning(
+ "a",
+ [10, 20],
+ SortOptions::default(),
+ )?);
+ let aggregate =
+ aggregate_exec_with_alias(input, vec![("b".to_string(),
"b".to_string())]);
+
+ let plan = TestConfig::default()
+ .with_query_execution_partitions(3)
+ .to_plan(aggregate, &DISTRIB_DISTRIB_SORT);
+
+ assert_plan!(
+ plan,
+ @r"
+ AggregateExec: mode=FinalPartitioned, gby=[b@0 as b], aggr=[]
+ RepartitionExec: partitioning=Hash([b@0], 3), input_partitions=3
+ AggregateExec: mode=Partial, gby=[b@1 as b], aggr=[]
+ DataSourceExec: file_groups={3 groups: [[p0], [p1], [p2]]},
projection=[a, b, c, d, e], output_partitioning=Range([a@0 ASC], [(10), (20)],
3), file_type=parquet
+ "
+ );
+
+ Ok(())
+}
+
+#[test]
+fn range_grouping_set_aggregate_rehashes_with_grouping_id() -> Result<()> {
Review Comment:
Is it possible to disable this optimization when grouping sets are present?
I’m finding it a bit hard to convince myself this is implemented safely. I
think it would be better to ignore this case for now, and enable the
optimization in a separate PR with more edge-case tests targeting grouping sets.
(We have seen quite a few tricky bugs around grouping sets before.)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]