alamb commented on code in PR #14207:
URL: https://github.com/apache/datafusion/pull/14207#discussion_r1934089417
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -932,12 +932,16 @@ fn add_hash_on_top(
/// # Arguments
///
/// * `input`: Current node.
+/// * `fetch`: Possible fetch value
Review Comment:
I think it would help here to describe what is happening with the `fetch`
argument as well and when it is updated
Maybe something like the followig
```suggestion
/// * `fetch`: Possible fetch value. If a `SortPreservingMerge` is created
/// its fetch is set to this value and `fetch` is set to `None`
```
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3173,3 +3182,78 @@ fn optimize_away_unnecessary_repartition2() ->
Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
+ // Create a configuration
+ let config = SessionConfig::new();
+ let ctx = SessionContext::new_with_config(config);
+
+ // Create table schema and data
+ let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT,
+ c5 INT,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT UNSIGNED NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+ )
+ STORED AS CSV
+ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+ OPTIONS ('format.has_header' 'true')";
+
+ ctx.sql(sql).await?;
+
+ let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL
SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
+ let logical_plan = df.logical_plan().clone();
+ let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
+ logical_plan,
+ ctx.state().config_options(),
+ |_, _| (),
+ )?;
+
+ let optimized_logical_plan = ctx.state().optimizer().optimize(
+ analyzed_logical_plan,
+ &ctx.state(),
+ |_, _| (),
+ )?;
+
+ let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+ Arc::new(OutputRequirements::new_add_mode()),
+ Arc::new(EnforceDistribution::new()),
+ Arc::new(EnforceSorting::new()),
+ Arc::new(ProjectionPushdown::new()),
+ Arc::new(CoalesceBatches::new()),
+ Arc::new(EnforceDistribution::new()), // -- Add enforce distribution
rule again
+ Arc::new(OutputRequirements::new_remove_mode()),
+ Arc::new(ProjectionPushdown::new()),
+ Arc::new(LimitPushdown::new()),
+ Arc::new(SanityCheckPlan::new()),
+ ];
+
+ let planner = DefaultPhysicalPlanner::default();
+ let session_state = SessionStateBuilder::new()
+ .with_config(ctx.copied_config())
+ .with_default_features()
+ .with_physical_optimizer_rules(optimizers)
+ .build();
+ let optimized_physical_plan = planner
+ .create_physical_plan(&optimized_logical_plan, &session_state)
+ .await?;
+
+ let mut results = optimized_physical_plan
+ .execute(0, ctx.task_ctx().clone())
+ .unwrap();
+
+ let batch = results.next().await.unwrap()?;
+ // With the fix of https://github.com/apache/datafusion/pull/14207, the
number of rows will be 10
Review Comment:
```suggestion
// Without the fix of https://github.com/apache/datafusion/pull/14207,
the number of rows will be 10
```
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1362,6 +1383,21 @@ pub fn ensure_distribution(
plan.with_new_children(children_plans)?
};
+ // If `fetch` was not consumed, it means that there was
`SortPreservingMergeExec` with fetch before
+ // It was removed by `remove_dist_changing_operators`
+ // and we need to add it back.
+ if fetch.is_some() {
+ plan = Arc::new(
+ SortPreservingMergeExec::new(
+ plan.output_ordering()
+ .unwrap_or(&LexOrdering::default())
+ .clone(),
+ plan,
+ )
+ .with_fetch(fetch.take()),
+ )
Review Comment:
Why does this add back a SortPreservingMerge without sort exprs? Wouldn't it
be better to use a `GlobalLimitExec` or something?
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() ->
Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
Review Comment:
I think it is nice to have this "end to end" style test, but given the
amount of code changed I think it is important to have more "unit style" tests
otherwise it is hard to understand how general this fix is (or if it just works
for the specified query)
I wonder if you could construct some cases using the same framework as the
tests above? Aka make a plan and then run EnforceDistribution twice on it and
ensure the plans are ok?
Or perhaps you can update the `assert_optimized!` to ensure that running
`EnforceDistribution` twice doesn't change the plan again
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -1020,23 +1033,26 @@ fn remove_dist_changing_operators(
/// ```
fn replace_order_preserving_variants(
mut context: DistributionContext,
-) -> Result<DistributionContext> {
- context.children = context
- .children
- .into_iter()
- .map(|child| {
- if child.data {
- replace_order_preserving_variants(child)
- } else {
- Ok(child)
- }
- })
- .collect::<Result<Vec<_>>>()?;
+) -> Result<(DistributionContext, Option<usize>)> {
+ let mut children = vec![];
+ let mut fetch = None;
+ for child in context.children.into_iter() {
Review Comment:
Since the `DistributionContext` is already passed through most of the
functions in this code, I wonder if you considiered adding a `fetch` field, like
```rust
struct DistributionContext {
...
/// Limit which must be applied to any sort preserving merge that is
created
fetch: Option<usize>
}
```
🤔
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() ->
Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
+ // Create a configuration
+ let config = SessionConfig::new();
+ let ctx = SessionContext::new_with_config(config);
+
+ // Create table schema and data
+ let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT,
+ c5 INT,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT UNSIGNED NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+ )
+ STORED AS CSV
+ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+ OPTIONS ('format.has_header' 'true')";
+
+ ctx.sql(sql).await?;
+
+ let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL
SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
+ let logical_plan = df.logical_plan().clone();
+ let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
+ logical_plan,
+ ctx.state().config_options(),
+ |_, _| (),
+ )?;
+
+ let optimized_logical_plan = ctx.state().optimizer().optimize(
+ analyzed_logical_plan,
+ &ctx.state(),
+ |_, _| (),
+ )?;
+
+ let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
Review Comment:
I am somewhat worried about this test being brittle -- it seems like it
requires a very specific sequence of optimizer passes that are required. And I
worry that if the default sequences of passes is changed then this test might
no longer cover the issues
I actually tried to reproduce the results by just adding
`EnforceDistribution` at the end of the default list of optimizers and the
issue did not manifest itself 🤔
```rust
let planner = DefaultPhysicalPlanner::default();
let session_state = SessionStateBuilder::new()
.with_config(ctx.copied_config())
.with_default_features()
.with_physical_optimizer_rule(Arc::new(EnforceDistribution::new()))// -- Add
enforce distribution rule again
.build();
let optimized_physical_plan = planner
.create_physical_plan(&optimized_logical_plan, &session_state)
.await?;
```
##########
datafusion/core/tests/physical_optimizer/enforce_distribution.rs:
##########
@@ -3172,3 +3181,78 @@ fn optimize_away_unnecessary_repartition2() ->
Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn apply_enforce_distribution_multiple_times() -> Result<()> {
+ // Create a configuration
+ let config = SessionConfig::new();
+ let ctx = SessionContext::new_with_config(config);
+
+ // Create table schema and data
+ let sql = "CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT,
+ c5 INT,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT UNSIGNED NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+ )
+ STORED AS CSV
+ LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+ OPTIONS ('format.has_header' 'true')";
+
+ ctx.sql(sql).await?;
+
+ let df = ctx.sql("SELECT * FROM(SELECT * FROM aggregate_test_100 UNION ALL
SELECT * FROM aggregate_test_100) ORDER BY c13 LIMIT 5").await?;
+ let logical_plan = df.logical_plan().clone();
+ let analyzed_logical_plan = ctx.state().analyzer().execute_and_check(
+ logical_plan,
+ ctx.state().config_options(),
+ |_, _| (),
+ )?;
+
+ let optimized_logical_plan = ctx.state().optimizer().optimize(
+ analyzed_logical_plan,
+ &ctx.state(),
+ |_, _| (),
+ )?;
+
+ let optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Send + Sync>> = vec![
+ Arc::new(OutputRequirements::new_add_mode()),
+ Arc::new(EnforceDistribution::new()),
+ Arc::new(EnforceSorting::new()),
+ Arc::new(ProjectionPushdown::new()),
+ Arc::new(CoalesceBatches::new()),
+ Arc::new(EnforceDistribution::new()), // -- Add enforce distribution
rule again
+ Arc::new(OutputRequirements::new_remove_mode()),
+ Arc::new(ProjectionPushdown::new()),
+ Arc::new(LimitPushdown::new()),
+ Arc::new(SanityCheckPlan::new()),
+ ];
+
+ let planner = DefaultPhysicalPlanner::default();
+ let session_state = SessionStateBuilder::new()
+ .with_config(ctx.copied_config())
+ .with_default_features()
+ .with_physical_optimizer_rules(optimizers)
+ .build();
+ let optimized_physical_plan = planner
+ .create_physical_plan(&optimized_logical_plan, &session_state)
+ .await?;
+
+ let mut results = optimized_physical_plan
+ .execute(0, ctx.task_ctx().clone())
+ .unwrap();
+
+ let batch = results.next().await.unwrap()?;
Review Comment:
I verified that this test does fail without the code in this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]