This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 55730dcece Feat: Add fetch to CoalescePartitionsExec (#14499)
55730dcece is described below

commit 55730dcecebe5475af40b9e8a5c4805a73c31b11
Author: mertak-synnada <[email protected]>
AuthorDate: Thu Feb 6 10:07:03 2025 +0000

    Feat: Add fetch to CoalescePartitionsExec (#14499)
    
    * add fetch info to CoalescePartitionsExec
    
    * use Statistics with_fetch API on CoalescePartitionsExec
    
    * check limit_reached only if fetch is assigned
---
 .../tests/physical_optimizer/limit_pushdown.rs     | 20 ++++-----
 datafusion/core/tests/sql/explain_analyze.rs       |  7 ---
 .../physical-optimizer/src/limit_pushdown.rs       |  9 ----
 .../physical-plan/src/coalesce_partitions.rs       | 50 +++++++++++++++++-----
 datafusion/physical-plan/src/stream.rs             | 31 +++++++++++++-
 datafusion/physical-plan/src/union.rs              | 12 +++++-
 datafusion/sqllogictest/test_files/aggregate.slt   | 23 +++++-----
 datafusion/sqllogictest/test_files/limit.slt       |  5 +--
 datafusion/sqllogictest/test_files/repartition.slt | 11 +++--
 datafusion/sqllogictest/test_files/union.slt       | 43 +++++++++----------
 10 files changed, 128 insertions(+), 83 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
index 49490b2a3d..dd2c1960a6 100644
--- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
@@ -233,12 +233,11 @@ fn 
transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limi
         LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
 
     let expected = [
-            "GlobalLimitExec: skip=0, fetch=5",
-            "  CoalescePartitionsExec",
-            "    CoalesceBatchesExec: target_batch_size=8192, fetch=5",
-            "      FilterExec: c3@2 > 0",
-            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          StreamingTableExec: partition_sizes=1, projection=[c1, 
c2, c3], infinite_source=true"
+        "CoalescePartitionsExec: fetch=5",
+        "  CoalesceBatchesExec: target_batch_size=8192, fetch=5",
+        "    FilterExec: c3@2 > 0",
+        "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+        "        StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
         ];
     assert_eq!(get_plan_string(&after_optimize), expected);
 
@@ -378,11 +377,10 @@ fn 
keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions() -> R
         LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
 
     let expected = [
-            "GlobalLimitExec: skip=0, fetch=5",
-            "  CoalescePartitionsExec",
-            "    FilterExec: c3@2 > 0",
-            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        StreamingTableExec: partition_sizes=1, projection=[c1, 
c2, c3], infinite_source=true"
+            "CoalescePartitionsExec: fetch=5",
+            "  FilterExec: c3@2 > 0",
+            "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "      StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
         ];
     assert_eq!(get_plan_string(&after_optimize), expected);
 
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index 5fb0b98526..dce175d04b 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -69,11 +69,6 @@ async fn explain_analyze_baseline_metrics() {
         "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
         "metrics=[output_rows=99, elapsed_compute="
     );
-    assert_metrics!(
-        &formatted,
-        "GlobalLimitExec: skip=0, fetch=3, ",
-        "metrics=[output_rows=3, elapsed_compute="
-    );
     assert_metrics!(
         &formatted,
         "ProjectionExec: expr=[count(*)",
@@ -101,9 +96,7 @@ async fn explain_analyze_baseline_metrics() {
 
         plan.as_any().downcast_ref::<sorts::sort::SortExec>().is_some()
             || 
plan.as_any().downcast_ref::<physical_plan::aggregates::AggregateExec>().is_some()
-            // CoalescePartitionsExec doesn't do any work so is not included
             || 
plan.as_any().downcast_ref::<physical_plan::filter::FilterExec>().is_some()
-            || 
plan.as_any().downcast_ref::<physical_plan::limit::GlobalLimitExec>().is_some()
             || 
plan.as_any().downcast_ref::<physical_plan::limit::LocalLimitExec>().is_some()
             || 
plan.as_any().downcast_ref::<physical_plan::projection::ProjectionExec>().is_some()
             || 
plan.as_any().downcast_ref::<physical_plan::coalesce_batches::CoalesceBatchesExec>().is_some()
diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs 
b/datafusion/physical-optimizer/src/limit_pushdown.rs
index bc0b64cdd7..5887cb51a7 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -146,15 +146,6 @@ pub fn pushdown_limit_helper(
         global_state.skip = skip;
         global_state.fetch = fetch;
 
-        if limit_exec.input().as_any().is::<CoalescePartitionsExec>() {
-            // If the child is a `CoalescePartitionsExec`, we should not 
remove the limit
-            // the push_down through the `CoalescePartitionsExec` to each 
partition will not guarantee the limit.
-            // TODO: we may have a better solution if we can support 
with_fetch for limit inside CoalescePartitionsExec.
-            // Follow-up issue: 
https://github.com/apache/datafusion/issues/14446
-            global_state.satisfied = true;
-            return Ok((Transformed::no(pushdown_plan), global_state));
-        }
-
         // Now the global state has the most recent information, we can remove
         // the `LimitExec` plan. We will decide later if we should add it again
         // or not.
diff --git a/datafusion/physical-plan/src/coalesce_partitions.rs 
b/datafusion/physical-plan/src/coalesce_partitions.rs
index 3900bd1ddc..9a955155c0 100644
--- a/datafusion/physical-plan/src/coalesce_partitions.rs
+++ b/datafusion/physical-plan/src/coalesce_partitions.rs
@@ -43,6 +43,8 @@ pub struct CoalescePartitionsExec {
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     cache: PlanProperties,
+    /// Optional number of rows to fetch. Stops producing rows after this fetch
+    pub(crate) fetch: Option<usize>,
 }
 
 impl CoalescePartitionsExec {
@@ -53,6 +55,7 @@ impl CoalescePartitionsExec {
             input,
             metrics: ExecutionPlanMetricsSet::new(),
             cache,
+            fetch: None,
         }
     }
 
@@ -83,9 +86,12 @@ impl DisplayAs for CoalescePartitionsExec {
         f: &mut std::fmt::Formatter,
     ) -> std::fmt::Result {
         match t {
-            DisplayFormatType::Default | DisplayFormatType::Verbose => {
-                write!(f, "CoalescePartitionsExec")
-            }
+            DisplayFormatType::Default | DisplayFormatType::Verbose => match 
self.fetch {
+                Some(fetch) => {
+                    write!(f, "CoalescePartitionsExec: fetch={fetch}")
+                }
+                None => write!(f, "CoalescePartitionsExec"),
+            },
         }
     }
 }
@@ -116,9 +122,9 @@ impl ExecutionPlan for CoalescePartitionsExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(CoalescePartitionsExec::new(Arc::clone(
-            &children[0],
-        ))))
+        let mut plan = CoalescePartitionsExec::new(Arc::clone(&children[0]));
+        plan.fetch = self.fetch;
+        Ok(Arc::new(plan))
     }
 
     fn execute(
@@ -164,7 +170,11 @@ impl ExecutionPlan for CoalescePartitionsExec {
                 }
 
                 let stream = builder.build();
-                Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)))
+                Ok(Box::pin(ObservedStream::new(
+                    stream,
+                    baseline_metrics,
+                    self.fetch,
+                )))
             }
         }
     }
@@ -174,7 +184,7 @@ impl ExecutionPlan for CoalescePartitionsExec {
     }
 
     fn statistics(&self) -> Result<Statistics> {
-        self.input.statistics()
+        Statistics::with_fetch(self.input.statistics()?, self.schema(), 
self.fetch, 0, 1)
     }
 
     fn supports_limit_pushdown(&self) -> bool {
@@ -197,8 +207,28 @@ impl ExecutionPlan for CoalescePartitionsExec {
             return Ok(None);
         }
         // CoalescePartitionsExec always has a single child, so zero indexing 
is safe.
-        make_with_child(projection, projection.input().children()[0])
-            .map(|e| Some(Arc::new(CoalescePartitionsExec::new(e)) as _))
+        make_with_child(projection, projection.input().children()[0]).map(|e| {
+            if self.fetch.is_some() {
+                let mut plan = CoalescePartitionsExec::new(e);
+                plan.fetch = self.fetch;
+                Some(Arc::new(plan) as _)
+            } else {
+                Some(Arc::new(CoalescePartitionsExec::new(e)) as _)
+            }
+        })
+    }
+
+    fn fetch(&self) -> Option<usize> {
+        self.fetch
+    }
+
+    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn 
ExecutionPlan>> {
+        Some(Arc::new(CoalescePartitionsExec {
+            input: Arc::clone(&self.input),
+            fetch: limit,
+            metrics: self.metrics.clone(),
+            cache: self.cache.clone(),
+        }))
     }
 }
 
diff --git a/datafusion/physical-plan/src/stream.rs 
b/datafusion/physical-plan/src/stream.rs
index 331cded165..5c941c76ae 100644
--- a/datafusion/physical-plan/src/stream.rs
+++ b/datafusion/physical-plan/src/stream.rs
@@ -444,18 +444,44 @@ impl Stream for EmptyRecordBatchStream {
 pub(crate) struct ObservedStream {
     inner: SendableRecordBatchStream,
     baseline_metrics: BaselineMetrics,
+    fetch: Option<usize>,
+    produced: usize,
 }
 
 impl ObservedStream {
     pub fn new(
         inner: SendableRecordBatchStream,
         baseline_metrics: BaselineMetrics,
+        fetch: Option<usize>,
     ) -> Self {
         Self {
             inner,
             baseline_metrics,
+            fetch,
+            produced: 0,
         }
     }
+
+    fn limit_reached(
+        &mut self,
+        poll: Poll<Option<Result<RecordBatch>>>,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        let Some(fetch) = self.fetch else { return poll };
+
+        if self.produced >= fetch {
+            return Poll::Ready(None);
+        }
+
+        if let Poll::Ready(Some(Ok(batch))) = &poll {
+            if self.produced + batch.num_rows() > fetch {
+                let batch = batch.slice(0, 
fetch.saturating_sub(self.produced));
+                self.produced += batch.num_rows();
+                return Poll::Ready(Some(Ok(batch)));
+            };
+            self.produced += batch.num_rows()
+        }
+        poll
+    }
 }
 
 impl RecordBatchStream for ObservedStream {
@@ -471,7 +497,10 @@ impl Stream for ObservedStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        let poll = self.inner.poll_next_unpin(cx);
+        let mut poll = self.inner.poll_next_unpin(cx);
+        if self.fetch.is_some() {
+            poll = self.limit_reached(poll);
+        }
         self.baseline_metrics.record_poll(poll)
     }
 }
diff --git a/datafusion/physical-plan/src/union.rs 
b/datafusion/physical-plan/src/union.rs
index a41336ea6e..91d2f2c9e8 100644
--- a/datafusion/physical-plan/src/union.rs
+++ b/datafusion/physical-plan/src/union.rs
@@ -237,7 +237,11 @@ impl ExecutionPlan for UnionExec {
             if partition < input.output_partitioning().partition_count() {
                 let stream = input.execute(partition, context)?;
                 debug!("Found a Union partition to execute");
-                return Ok(Box::pin(ObservedStream::new(stream, 
baseline_metrics)));
+                return Ok(Box::pin(ObservedStream::new(
+                    stream,
+                    baseline_metrics,
+                    None,
+                )));
             } else {
                 partition -= input.output_partitioning().partition_count();
             }
@@ -448,7 +452,11 @@ impl ExecutionPlan for InterleaveExec {
                 self.schema(),
                 input_stream_vec,
             ));
-            return Ok(Box::pin(ObservedStream::new(stream, baseline_metrics)));
+            return Ok(Box::pin(ObservedStream::new(
+                stream,
+                baseline_metrics,
+                None,
+            )));
         }
 
         warn!("Error in InterleaveExec: Partition {} not found", partition);
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt 
b/datafusion/sqllogictest/test_files/aggregate.slt
index c8c544785d..bb2ddf0da4 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -5032,18 +5032,17 @@ logical_plan
 03)----Aggregate: groupBy=[[aggregate_test_100.c3]], 
aggr=[[min(aggregate_test_100.c1)]]
 04)------TableScan: aggregate_test_100 projection=[c1, c3]
 physical_plan
-01)GlobalLimitExec: skip=0, fetch=5
-02)--CoalescePartitionsExec
-03)----AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, 
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
-04)------CoalesceBatchesExec: target_batch_size=8192
-05)--------RepartitionExec: partitioning=Hash([c3@0, 
min(aggregate_test_100.c1)@1], 4), input_partitions=4
-06)----------AggregateExec: mode=Partial, gby=[c3@0 as c3, 
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
-07)------------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], 
aggr=[min(aggregate_test_100.c1)]
-08)--------------CoalesceBatchesExec: target_batch_size=8192
-09)----------------RepartitionExec: partitioning=Hash([c3@0], 4), 
input_partitions=4
-10)------------------AggregateExec: mode=Partial, gby=[c3@1 as c3], 
aggr=[min(aggregate_test_100.c1)]
-11)--------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
-12)----------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c3], has_header=true
+01)CoalescePartitionsExec: fetch=5
+02)--AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3, 
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([c3@0, 
min(aggregate_test_100.c1)@1], 4), input_partitions=4
+05)--------AggregateExec: mode=Partial, gby=[c3@0 as c3, 
min(aggregate_test_100.c1)@1 as min(aggregate_test_100.c1)], aggr=[], lim=[5]
+06)----------AggregateExec: mode=FinalPartitioned, gby=[c3@0 as c3], 
aggr=[min(aggregate_test_100.c1)]
+07)------------CoalesceBatchesExec: target_batch_size=8192
+08)--------------RepartitionExec: partitioning=Hash([c3@0], 4), 
input_partitions=4
+09)----------------AggregateExec: mode=Partial, gby=[c3@1 as c3], 
aggr=[min(aggregate_test_100.c1)]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+11)--------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c3], has_header=true
 
 
 #
diff --git a/datafusion/sqllogictest/test_files/limit.slt 
b/datafusion/sqllogictest/test_files/limit.slt
index 65f35d40fc..a999149418 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -852,9 +852,8 @@ physical_plan
 01)ProjectionExec: expr=[foo@0 as foo]
 02)--SortExec: TopK(fetch=1000), expr=[part_key@1 ASC NULLS LAST], 
preserve_partitioning=[false]
 03)----ProjectionExec: expr=[1 as foo, part_key@0 as part_key]
-04)------GlobalLimitExec: skip=0, fetch=1
-05)--------CoalescePartitionsExec
-06)----------ParquetExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]},
 projection=[part_key], limit=1
+04)------CoalescePartitionsExec: fetch=1
+05)--------ParquetExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]},
 projection=[part_key], limit=1
 
 query I
 with selection as (
diff --git a/datafusion/sqllogictest/test_files/repartition.slt 
b/datafusion/sqllogictest/test_files/repartition.slt
index 630674bb09..36a326928f 100644
--- a/datafusion/sqllogictest/test_files/repartition.slt
+++ b/datafusion/sqllogictest/test_files/repartition.slt
@@ -121,12 +121,11 @@ logical_plan
 02)--Filter: sink_table.c3 > Int16(0)
 03)----TableScan: sink_table projection=[c1, c2, c3]
 physical_plan
-01)GlobalLimitExec: skip=0, fetch=5
-02)--CoalescePartitionsExec
-03)----CoalesceBatchesExec: target_batch_size=8192, fetch=5
-04)------FilterExec: c3@2 > 0
-05)--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
-06)----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true
+01)CoalescePartitionsExec: fetch=5
+02)--CoalesceBatchesExec: target_batch_size=8192, fetch=5
+03)----FilterExec: c3@2 > 0
+04)------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1
+05)--------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true
 
 # Start repratition on empty column test.
 # See https://github.com/apache/datafusion/issues/12057
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index cbd19bf380..5517af5475 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -510,28 +510,27 @@ logical_plan
 19)------------Projection: Int64(1) AS c1
 20)--------------EmptyRelation
 physical_plan
-01)GlobalLimitExec: skip=0, fetch=3
-02)--CoalescePartitionsExec
-03)----UnionExec
-04)------ProjectionExec: expr=[count(*)@0 as cnt]
-05)--------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
-06)----------CoalescePartitionsExec
-07)------------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
-08)--------------ProjectionExec: expr=[]
-09)----------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[]
-10)------------------CoalesceBatchesExec: target_batch_size=2
-11)--------------------RepartitionExec: partitioning=Hash([c1@0], 4), 
input_partitions=4
-12)----------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
-13)------------------------CoalesceBatchesExec: target_batch_size=2
-14)--------------------------FilterExec: c13@1 != 
C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
-15)----------------------------RepartitionExec: 
partitioning=RoundRobinBatch(4), input_partitions=1
-16)------------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c13], has_header=true
-17)------ProjectionExec: expr=[1 as cnt]
-18)--------PlaceholderRowExec
-19)------ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
-20)--------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: 
"lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", 
data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), 
end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
-21)----------ProjectionExec: expr=[1 as c1]
-22)------------PlaceholderRowExec
+01)CoalescePartitionsExec: fetch=3
+02)--UnionExec
+03)----ProjectionExec: expr=[count(*)@0 as cnt]
+04)------AggregateExec: mode=Final, gby=[], aggr=[count(*)]
+05)--------CoalescePartitionsExec
+06)----------AggregateExec: mode=Partial, gby=[], aggr=[count(*)]
+07)------------ProjectionExec: expr=[]
+08)--------------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], 
aggr=[]
+09)----------------CoalesceBatchesExec: target_batch_size=2
+10)------------------RepartitionExec: partitioning=Hash([c1@0], 4), 
input_partitions=4
+11)--------------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[]
+12)----------------------CoalesceBatchesExec: target_batch_size=2
+13)------------------------FilterExec: c13@1 != 
C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0]
+14)--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+15)----------------------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c13], has_header=true
+16)----ProjectionExec: expr=[1 as cnt]
+17)------PlaceholderRowExec
+18)----ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt]
+19)------BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "lead(b.c1,Int64(1)) ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
+20)--------ProjectionExec: expr=[1 as c1]
+21)----------PlaceholderRowExec
 
 
 ########


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to