xudong963 commented on code in PR #21107:
URL: https://github.com/apache/datafusion/pull/21107#discussion_r2986959269
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -928,6 +928,47 @@ fn add_hash_on_top(
///
/// * `input`: Current node.
///
+/// Checks whether preserving the child's ordering enables the parent to
+/// run in streaming mode. Compares the parent's pipeline behavior with
+/// the ordered child vs. an unordered (coalesced) child. If removing the
+/// ordering would cause the parent to switch from streaming to blocking,
+/// keeping the order-preserving variant is beneficial.
+///
+/// Only applicable to single-child operators; returns false for multi-child
+/// operators (e.g. joins) where child substitution semantics are ambiguous.
+fn preserving_order_enables_streaming(
+ parent: &Arc<dyn ExecutionPlan>,
+ ordered_child: &Arc<dyn ExecutionPlan>,
+) -> bool {
+ // Only applicable to single-child operators that maintain input order
+ // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
+ // maintain input order (e.g. SortExec) handle ordering themselves —
+ // preserving SPM for them is unnecessary.
+ if parent.children().len() != 1 {
+ return false;
+ }
+ if !parent.maintains_input_order()[0] {
+ return false;
+ }
+ // Build parent with the ordered child
+ let with_ordered =
+ match
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
+ Ok(p) => p,
+ Err(_) => return false,
+ };
+ if with_ordered.pipeline_behavior() == EmissionType::Final {
+ // Parent is blocking even with ordering — no benefit
+ return false;
+ }
+ // Build parent with an unordered child (simulating CoalescePartitionsExec)
Review Comment:
yeah, it's legecy
##########
datafusion/physical-optimizer/src/enforce_distribution.rs:
##########
@@ -928,6 +928,47 @@ fn add_hash_on_top(
///
/// * `input`: Current node.
///
+/// Checks whether preserving the child's ordering enables the parent to
+/// run in streaming mode. Compares the parent's pipeline behavior with
+/// the ordered child vs. an unordered (coalesced) child. If removing the
+/// ordering would cause the parent to switch from streaming to blocking,
+/// keeping the order-preserving variant is beneficial.
+///
+/// Only applicable to single-child operators; returns false for multi-child
+/// operators (e.g. joins) where child substitution semantics are ambiguous.
+fn preserving_order_enables_streaming(
+ parent: &Arc<dyn ExecutionPlan>,
+ ordered_child: &Arc<dyn ExecutionPlan>,
+) -> bool {
+ // Only applicable to single-child operators that maintain input order
+ // (e.g. AggregateExec in PartiallySorted mode). Operators that don't
+ // maintain input order (e.g. SortExec) handle ordering themselves —
+ // preserving SPM for them is unnecessary.
+ if parent.children().len() != 1 {
+ return false;
+ }
+ if !parent.maintains_input_order()[0] {
+ return false;
+ }
+ // Build parent with the ordered child
+ let with_ordered =
+ match
Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
+ Ok(p) => p,
+ Err(_) => return false,
Review Comment:
agree!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]