alamb commented on code in PR #8281: URL: https://github.com/apache/arrow-datafusion/pull/8281#discussion_r1402147301
########## datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs: ########## @@ -357,15 +357,19 @@ mod tests { let physical_plan = sort_preserving_merge_exec(vec![sort_expr("a", &schema)], sort); - let expected_input = ["SortPreservingMergeExec: [a@0 ASC NULLS LAST]", + let expected_input = [ Review Comment: Did these plans actually change? Or is this just whitespace changes? If it is just whitespace changes I would really appreciate breaking such changes out into their own PRs as they are much faster / easier to review and merge. ########## datafusion/core/src/physical_optimizer/enforce_distribution.rs: ########## @@ -3787,7 +3787,7 @@ pub(crate) mod tests { fn repartition_transitively_past_sort_with_projection_and_filter() -> Result<()> { let schema = schema(); let sort_key = vec![PhysicalSortExpr { - expr: col("c", &schema).unwrap(), + expr: col("a", &schema).unwrap(), Review Comment: why was this test changed? ########## datafusion/sqllogictest/test_files/groupby.slt: ########## @@ -3842,6 +3842,51 @@ ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX( --------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y] ----------------------MemoryExec: partitions=1, partition_sizes=[1] +# create an unbounded table that contains ordered timestamp. +statement ok +CREATE UNBOUNDED EXTERNAL TABLE csv_with_timestamps ( + name VARCHAR, + ts TIMESTAMP +) +STORED AS CSV +WITH ORDER (ts DESC) +LOCATION '../core/tests/data/timestamps.csv' + +# below query should work in streaming mode. +query TT +EXPLAIN SELECT date_bin('15 minutes', ts) as time_chunks + FROM csv_with_timestamps + GROUP BY (date_bin('15 minutes', ts)) + ORDER BY time_chunks DESC + LIMIT 5 +---- +logical_plan +Limit: skip=0, fetch=5 +--Sort: time_chunks DESC NULLS FIRST, fetch=5 +----Projection: date_bin(Utf8("15 minutes"),csv_with_timestamps.ts) AS time_chunks +------Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("900000000000"), csv_with_timestamps.ts) AS date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)]], aggr=[[]] +--------TableScan: csv_with_timestamps projection=[ts] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--SortPreservingMergeExec: [time_chunks@0 DESC], fetch=5 +----ProjectionExec: expr=[date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 as time_chunks] +------AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 as date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted +--------CoalesceBatchesExec: target_batch_size=2 +----------SortPreservingRepartitionExec: partitioning=Hash([date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0], 8), input_partitions=8, sort_exprs=date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 DESC +------------AggregateExec: mode=Partial, gby=[date_bin(900000000000, ts@0) as date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted +--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------------StreamingTableExec: partition_sizes=1, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC] + +query P +SELECT date_bin('15 minutes', ts) as time_chunks + FROM csv_with_timestamps + GROUP BY (date_bin('15 minutes', ts)) + ORDER BY time_chunks DESC + LIMIT 5 +---- +2018-12-13T12:00:00 +2018-11-13T17:00:00 + Review Comment: Could you also please add a negative test that can not work in streaming mode with an expression that doesn't preserve the order? perhaps something like ``` SELECT extract(month from ts) as months, FROM csv_with_timestamps GROUP BY extract(month from ts) ORDER BY time_chunks DESC LIMIT 5 ``` ########## datafusion/sqllogictest/test_files/groupby.slt: ########## @@ -3842,6 +3842,51 @@ ProjectionExec: expr=[SUM(alias1)@1 as SUM(DISTINCT t1.x), MAX(alias1)@2 as MAX( --------------------ProjectionExec: expr=[CAST(x@0 AS Float64) as CAST(t1.x AS Float64)t1.x, y@1 as y] ----------------------MemoryExec: partitions=1, partition_sizes=[1] +# create an unbounded table that contains ordered timestamp. +statement ok +CREATE UNBOUNDED EXTERNAL TABLE csv_with_timestamps ( + name VARCHAR, + ts TIMESTAMP +) +STORED AS CSV +WITH ORDER (ts DESC) +LOCATION '../core/tests/data/timestamps.csv' + +# below query should work in streaming mode. +query TT +EXPLAIN SELECT date_bin('15 minutes', ts) as time_chunks + FROM csv_with_timestamps + GROUP BY (date_bin('15 minutes', ts)) + ORDER BY time_chunks DESC + LIMIT 5 +---- +logical_plan +Limit: skip=0, fetch=5 +--Sort: time_chunks DESC NULLS FIRST, fetch=5 +----Projection: date_bin(Utf8("15 minutes"),csv_with_timestamps.ts) AS time_chunks +------Aggregate: groupBy=[[date_bin(IntervalMonthDayNano("900000000000"), csv_with_timestamps.ts) AS date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)]], aggr=[[]] +--------TableScan: csv_with_timestamps projection=[ts] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--SortPreservingMergeExec: [time_chunks@0 DESC], fetch=5 +----ProjectionExec: expr=[date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 as time_chunks] +------AggregateExec: mode=FinalPartitioned, gby=[date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 as date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted +--------CoalesceBatchesExec: target_batch_size=2 +----------SortPreservingRepartitionExec: partitioning=Hash([date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0], 8), input_partitions=8, sort_exprs=date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)@0 DESC +------------AggregateExec: mode=Partial, gby=[date_bin(900000000000, ts@0) as date_bin(Utf8("15 minutes"),csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted +--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 +----------------StreamingTableExec: partition_sizes=1, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC] + +query P +SELECT date_bin('15 minutes', ts) as time_chunks + FROM csv_with_timestamps + GROUP BY (date_bin('15 minutes', ts)) Review Comment: I am confused about the seeming extra `(` and `)` here and in the above query: Is there any reason it is not: ```suggestion GROUP BY date_bin('15 minutes', ts) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org