zhuqi-lucas commented on issue #16888:
URL: https://github.com/apache/datafusion/issues/16888#issuecomment-3125608299
Thank you @crepererum I can't reproduce this, it may cause "ODER BY k,
time;" this is a typo, need to change to ORDER BY k, time;
Here is the result, it's right.
```rust
statement ok
set datafusion.execution.batch_size = 1;
query TPI
SELECT
k,
time,
COUNT(v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS normal_count
FROM t
ORDER BY k, time;
----
a 1970-01-01T00:01:00Z 1
a 1970-01-01T00:02:00Z 2
a 1970-01-01T00:03:00Z 4
a 1970-01-01T00:03:00Z 4
a 1970-01-01T00:04:00Z 4
b 1970-01-01T00:01:00Z 1
b 1970-01-01T00:02:00Z 2
b 1970-01-01T00:03:00Z 4
b 1970-01-01T00:03:00Z 4
query TT
EXPLAIN VERBOSE SELECT
k,
time,
COUNT(v) OVER (
PARTITION BY k
ORDER BY time
RANGE BETWEEN INTERVAL '2 minutes' PRECEDING AND CURRENT ROW
) AS normal_count
FROM t
ORDER BY k, time;
----
initial_logical_plan
01)Sort: t.k ASC NULLS LAST, t.time ASC NULLS LAST
02)--Projection: t.k, t.time, count(t.v) PARTITION BY [t.k] ORDER BY [t.time
ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS
normal_count
03)----WindowAggr: windowExpr=[[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW]]
04)------TableScan: t
logical_plan after resolve_grouping_function SAME TEXT AS ABOVE
logical_plan after type_coercion
01)Sort: t.k ASC NULLS LAST, t.time ASC NULLS LAST
02)--Projection: t.k, t.time, count(t.v) PARTITION BY [t.k] ORDER BY [t.time
ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS
normal_count
03)----WindowAggr: windowExpr=[[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days:
0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(t.v)
PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes
PRECEDING AND CURRENT ROW]]
04)------TableScan: t
analyzed_logical_plan SAME TEXT AS ABOVE
logical_plan after eliminate_nested_union SAME TEXT AS ABOVE
logical_plan after simplify_expressions SAME TEXT AS ABOVE
logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE
logical_plan after eliminate_join SAME TEXT AS ABOVE
logical_plan after decorrelate_predicate_subquery SAME TEXT AS ABOVE
logical_plan after scalar_subquery_to_join SAME TEXT AS ABOVE
logical_plan after decorrelate_lateral_join SAME TEXT AS ABOVE
logical_plan after extract_equijoin_predicate SAME TEXT AS ABOVE
logical_plan after eliminate_duplicated_expr SAME TEXT AS ABOVE
logical_plan after eliminate_filter SAME TEXT AS ABOVE
logical_plan after eliminate_cross_join SAME TEXT AS ABOVE
logical_plan after eliminate_limit SAME TEXT AS ABOVE
logical_plan after propagate_empty_relation SAME TEXT AS ABOVE
logical_plan after eliminate_one_union SAME TEXT AS ABOVE
logical_plan after filter_null_join_keys SAME TEXT AS ABOVE
logical_plan after eliminate_outer_join SAME TEXT AS ABOVE
logical_plan after push_down_limit SAME TEXT AS ABOVE
logical_plan after push_down_filter SAME TEXT AS ABOVE
logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
logical_plan after optimize_projections
01)Sort: t.k ASC NULLS LAST, t.time ASC NULLS LAST
02)--Projection: t.k, t.time, count(t.v) PARTITION BY [t.k] ORDER BY [t.time
ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS
normal_count
03)----WindowAggr: windowExpr=[[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days:
0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(t.v)
PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes
PRECEDING AND CURRENT ROW]]
04)------TableScan: t projection=[k, v, time]
logical_plan after eliminate_nested_union SAME TEXT AS ABOVE
logical_plan after simplify_expressions SAME TEXT AS ABOVE
logical_plan after replace_distinct_aggregate SAME TEXT AS ABOVE
logical_plan after eliminate_join SAME TEXT AS ABOVE
logical_plan after decorrelate_predicate_subquery SAME TEXT AS ABOVE
logical_plan after scalar_subquery_to_join SAME TEXT AS ABOVE
logical_plan after decorrelate_lateral_join SAME TEXT AS ABOVE
logical_plan after extract_equijoin_predicate SAME TEXT AS ABOVE
logical_plan after eliminate_duplicated_expr SAME TEXT AS ABOVE
logical_plan after eliminate_filter SAME TEXT AS ABOVE
logical_plan after eliminate_cross_join SAME TEXT AS ABOVE
logical_plan after eliminate_limit SAME TEXT AS ABOVE
logical_plan after propagate_empty_relation SAME TEXT AS ABOVE
logical_plan after eliminate_one_union SAME TEXT AS ABOVE
logical_plan after filter_null_join_keys SAME TEXT AS ABOVE
logical_plan after eliminate_outer_join SAME TEXT AS ABOVE
logical_plan after push_down_limit SAME TEXT AS ABOVE
logical_plan after push_down_filter SAME TEXT AS ABOVE
logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE
logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE
logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE
logical_plan after optimize_projections SAME TEXT AS ABOVE
logical_plan
01)Sort: t.k ASC NULLS LAST, t.time ASC NULLS LAST
02)--Projection: t.k, t.time, count(t.v) PARTITION BY [t.k] ORDER BY [t.time
ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW AS
normal_count
03)----WindowAggr: windowExpr=[[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN IntervalMonthDayNano { months: 0, days:
0, nanoseconds: 120000000000 } PRECEDING AND CURRENT ROW AS count(t.v)
PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes
PRECEDING AND CURRENT ROW]]
04)------TableScan: t projection=[k, v, time]
initial_physical_plan
01)SortExec: expr=[k@0 ASC NULLS LAST, time@1 ASC NULLS LAST],
preserve_partitioning=[false]
02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION BY
[t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count]
03)----BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted]
04)------DataSourceExec: partitions=1, partition_sizes=[9]
initial_physical_plan_with_stats
01)SortExec: expr=[k@0 ASC NULLS LAST, time@1 ASC NULLS LAST],
preserve_partitioning=[false], statistics=[Rows=Exact(9), Bytes=Absent,
[(Col[0]: Null=Exact(0)),(Col[1]: Null=Exact(0)),(Col[2]:)]]
02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION BY
[t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count], statistics=[Rows=Exact(9), Bytes=Absent,
[(Col[0]: Null=Exact(0)),(Col[1]: Null=Exact(0)),(Col[2]:)]]
03)----BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted], statistics=[Rows=Exact(9),
Bytes=Absent, [(Col[0]: Null=Exact(0)),(Col[1]: Null=Exact(0)),(Col[2]:
Null=Exact(0)),(Col[3]:)]]
04)------DataSourceExec: partitions=1, partition_sizes=[9],
statistics=[Rows=Exact(9), Bytes=Exact(6840), [(Col[0]: Null=Exact(0)),(Col[1]:
Null=Exact(0)),(Col[2]: Null=Exact(0))]]
initial_physical_plan_with_schema
01)SortExec: expr=[k@0 ASC NULLS LAST, time@1 ASC NULLS LAST],
preserve_partitioning=[false], schema=[k:Utf8View;N, time:Timestamp(Nanosecond,
Some("+00:00"));N, normal_count:Int64]
02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION BY
[t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count], schema=[k:Utf8View;N,
time:Timestamp(Nanosecond, Some("+00:00"));N, normal_count:Int64]
03)----BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted], schema=[k:Utf8View;N, v:Int32;N,
time:Timestamp(Nanosecond, Some("+00:00"));N, count(t.v) PARTITION BY [t.k]
ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT
ROW:Int64]
04)------DataSourceExec: partitions=1, partition_sizes=[9],
schema=[k:Utf8View;N, v:Int32;N, time:Timestamp(Nanosecond, Some("+00:00"));N]
physical_plan after OutputRequirements
01)OutputRequirementExec: order_by=[(k@0, asc), (time@1, asc)],
dist_by=SinglePartition
02)--SortExec: expr=[k@0 ASC NULLS LAST, time@1 ASC NULLS LAST],
preserve_partitioning=[false]
03)----ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION
BY [t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count]
04)------BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted]
05)--------DataSourceExec: partitions=1, partition_sizes=[9]
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
physical_plan after join_selection SAME TEXT AS ABOVE
physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
physical_plan after FilterPushdown SAME TEXT AS ABOVE
physical_plan after EnforceDistribution
01)OutputRequirementExec: order_by=[(k@0, asc), (time@1, asc)],
dist_by=SinglePartition
02)--SortExec: expr=[k@0 ASC NULLS LAST, time@1 ASC NULLS LAST],
preserve_partitioning=[false]
03)----CoalescePartitionsExec
04)------ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v)
PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes
PRECEDING AND CURRENT ROW@3 as normal_count]
05)--------BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted]
06)----------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2
07)------------DataSourceExec: partitions=2, partition_sizes=[5, 4]
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
physical_plan after EnforceSorting
01)OutputRequirementExec: order_by=[(k@0, asc), (time@1, asc)],
dist_by=SinglePartition
02)--SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST]
03)----ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION
BY [t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count]
04)------BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted]
05)--------SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST],
preserve_partitioning=[true]
06)----------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2
07)------------DataSourceExec: partitions=2, partition_sizes=[5, 4]
physical_plan after OptimizeAggregateOrder SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches
01)OutputRequirementExec: order_by=[(k@0, asc), (time@1, asc)],
dist_by=SinglePartition
02)--SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST]
03)----ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION
BY [t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count]
04)------BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted]
05)--------SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST],
preserve_partitioning=[true]
06)----------CoalesceBatchesExec: target_batch_size=1
07)------------RepartitionExec: partitioning=Hash([k@0], 2),
input_partitions=2
08)--------------DataSourceExec: partitions=2, partition_sizes=[5, 4]
physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
physical_plan after OutputRequirements
01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST]
02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION BY
[t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count]
03)----BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted]
04)------SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST],
preserve_partitioning=[true]
05)--------CoalesceBatchesExec: target_batch_size=1
06)----------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2
07)------------DataSourceExec: partitions=2, partition_sizes=[5, 4]
physical_plan after LimitAggregation SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after EnsureCooperative SAME TEXT AS ABOVE
physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
physical_plan
01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST]
02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION BY
[t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count]
03)----BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted]
04)------SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST],
preserve_partitioning=[true]
05)--------CoalesceBatchesExec: target_batch_size=1
06)----------RepartitionExec: partitioning=Hash([k@0], 2), input_partitions=2
07)------------DataSourceExec: partitions=2, partition_sizes=[5, 4]
physical_plan_with_stats
01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST],
statistics=[Rows=Exact(9), Bytes=Absent, [(Col[0]: Null=Exact(0)),(Col[1]:
Null=Exact(0)),(Col[2]:)]]
02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION BY
[t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count], statistics=[Rows=Exact(9), Bytes=Absent,
[(Col[0]: Null=Exact(0)),(Col[1]: Null=Exact(0)),(Col[2]:)]]
03)----BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted], statistics=[Rows=Exact(9),
Bytes=Absent, [(Col[0]: Null=Exact(0)),(Col[1]: Null=Exact(0)),(Col[2]:
Null=Exact(0)),(Col[3]:)]]
04)------SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST],
preserve_partitioning=[true], statistics=[Rows=Exact(9), Bytes=Exact(6840),
[(Col[0]: Null=Exact(0)),(Col[1]: Null=Exact(0)),(Col[2]: Null=Exact(0))]]
05)--------CoalesceBatchesExec: target_batch_size=1,
statistics=[Rows=Exact(9), Bytes=Exact(6840), [(Col[0]: Null=Exact(0)),(Col[1]:
Null=Exact(0)),(Col[2]: Null=Exact(0))]]
06)----------RepartitionExec: partitioning=Hash([k@0], 2),
input_partitions=2, statistics=[Rows=Exact(9), Bytes=Exact(6840), [(Col[0]:
Null=Exact(0)),(Col[1]: Null=Exact(0)),(Col[2]: Null=Exact(0))]]
07)------------DataSourceExec: partitions=2, partition_sizes=[5, 4],
statistics=[Rows=Exact(9), Bytes=Exact(6840), [(Col[0]: Null=Exact(0)),(Col[1]:
Null=Exact(0)),(Col[2]: Null=Exact(0))]]
physical_plan_with_schema
01)SortPreservingMergeExec: [k@0 ASC NULLS LAST, time@1 ASC NULLS LAST],
schema=[k:Utf8View;N, time:Timestamp(Nanosecond, Some("+00:00"));N,
normal_count:Int64]
02)--ProjectionExec: expr=[k@0 as k, time@2 as time, count(t.v) PARTITION BY
[t.k] ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND
CURRENT ROW@3 as normal_count], schema=[k:Utf8View;N,
time:Timestamp(Nanosecond, Some("+00:00"));N, normal_count:Int64]
03)----BoundedWindowAggExec: wdw=[count(t.v) PARTITION BY [t.k] ORDER BY
[t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW:
Field { name: "count(t.v) PARTITION BY [t.k] ORDER BY [t.time ASC NULLS LAST]
RANGE BETWEEN 2 minutes PRECEDING AND CURRENT ROW", data_type: Int64, nullable:
false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: RANGE BETWEEN
IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 120000000000 }
PRECEDING AND CURRENT ROW], mode=[Sorted], schema=[k:Utf8View;N, v:Int32;N,
time:Timestamp(Nanosecond, Some("+00:00"));N, count(t.v) PARTITION BY [t.k]
ORDER BY [t.time ASC NULLS LAST] RANGE BETWEEN 2 minutes PRECEDING AND CURRENT
ROW:Int64]
04)------SortExec: expr=[k@0 ASC NULLS LAST, time@2 ASC NULLS LAST],
preserve_partitioning=[true], schema=[k:Utf8View;N, v:Int32;N,
time:Timestamp(Nanosecond, Some("+00:00"));N]
05)--------CoalesceBatchesExec: target_batch_size=1, schema=[k:Utf8View;N,
v:Int32;N, time:Timestamp(Nanosecond, Some("+00:00"));N]
06)----------RepartitionExec: partitioning=Hash([k@0], 2),
input_partitions=2, schema=[k:Utf8View;N, v:Int32;N, time:Timestamp(Nanosecond,
Some("+00:00"));N]
07)------------DataSourceExec: partitions=2, partition_sizes=[5, 4],
schema=[k:Utf8View;N, v:Int32;N, time:Timestamp(Nanosecond, Some("+00:00"));N]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]