zhuqi-lucas commented on code in PR #14418:
URL: https://github.com/apache/datafusion/pull/14418#discussion_r1940594604
##########
datafusion/physical-optimizer/src/limit_pushdown.rs:
##########
@@ -146,6 +146,15 @@ pub fn pushdown_limit_helper(
global_state.skip = skip;
global_state.fetch = fetch;
+ if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
Review Comment:
Thank you @mertak-synnada for review:
> While I agree with checking via API suggestion, please also check with the
combines_input_partitions() helper function so that SortPreservingMerge can be
affected as well.
I agree, i checked the SortPreservingMergeExec already, it supported
with_fetch() and fetch(), so it's not affected i think?
```rust
impl SortPreservingMergeExec {
/// Create a new sort execution plan
pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
let cache = Self::compute_properties(&input, expr.clone());
Self {
input,
expr,
metrics: ExecutionPlanMetricsSet::new(),
fetch: None,
cache,
enable_round_robin_repartition: true,
}
}
/// Sets the number of rows to fetch
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.fetch = fetch;
self
}
/// Sets the selection strategy of tied winners of the loser tree
algorithm
///
/// If true (the default) equal output rows are placed in the merged
stream
/// in round robin fashion. This approach consumes input streams at more
/// even rates when there are many rows with the same sort key.
///
/// If false, equal output rows are always placed in the merged stream in
/// the order of the inputs, resulting in potentially slower execution
but a
/// stable output order.
pub fn with_round_robin_repartition(
mut self,
enable_round_robin_repartition: bool,
) -> Self {
self.enable_round_robin_repartition = enable_round_robin_repartition;
self
}
/// Input schema
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
/// Sort expressions
pub fn expr(&self) -> &LexOrdering {
self.expr.as_ref()
}
/// Fetch
pub fn fetch(&self) -> Option<usize> {
self.fetch
}
/// Creates the cache object that stores the plan properties
/// such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
ordering: LexOrdering,
) -> PlanProperties {
let mut eq_properties = input.equivalence_properties().clone();
eq_properties.clear_per_partition_constants();
eq_properties.add_new_orderings(vec![ordering]);
PlanProperties::new(
eq_properties, // Equivalence Properties
Partitioning::UnknownPartitioning(1), // Output Partitioning
input.pipeline_behavior(), // Pipeline Behavior
input.boundedness(), // Boundedness
)
}
}
```
> But in the end, I think rather than adding a global limit, we should be
able to limit in the CoalescePartitionsExec or in SortPreservingMerge so that
it won't unnecessarily push more data.
I totally agree this! So i created a follow-up
https://github.com/apache/datafusion/issues/14446 to support limit in the
CoalescePartitionsExec, SortPreservingMerge already supported this according
above code.
> So if the plan is combining input partitions, we're only adding a global
limit if skip information is there, maybe we can identify if the local limits
are enough or not and then decide to add the global limit at there.
This is a good point, we can create another issue to try to improve this!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]