alamb commented on code in PR #16985:
URL: https://github.com/apache/datafusion/pull/16985#discussion_r2436520010
##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -99,11 +107,64 @@ impl UnnestExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
+ list_column_indices: &[ListUnnest],
+ struct_column_indices: &[usize],
schema: SchemaRef,
) -> PlanProperties {
+ // Find out which indices are not unnested, such that they can be
copied over from the input plan
+ let input_schema = input.schema();
+ let mut unnested_indices =
BooleanBufferBuilder::new(input_schema.fields().len());
+ unnested_indices.append_n(input_schema.fields().len(), false);
+ for list_unnest in list_column_indices {
+ unnested_indices.set_bit(list_unnest.index_in_input_schema, true);
+ }
+ for list_unnest in struct_column_indices {
Review Comment:
Maybe calling this `struct_unnest` would be clearer
##########
datafusion/sqllogictest/test_files/unnest.slt:
##########
@@ -941,3 +941,242 @@ where min_height * width1 = (
)
----
4 7 4 28
+
+## Unnest with ordering on unrelated column is preserved
+query TT
+EXPLAIN WITH unnested AS (SELECT
+ ROW_NUMBER() OVER () AS generated_id,
+ unnest(array[value]) as ar
+ FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
+----
+logical_plan
+01)Projection: array_agg(unnested.ar)
+02)--Aggregate: groupBy=[[unnested.generated_id]],
aggr=[[array_agg(unnested.ar)]]
+03)----SubqueryAlias: unnested
+04)------Projection: generated_id,
__unnest_placeholder(make_array(range().value),depth=1) AS
UNNEST(make_array(range().value)) AS ar
+05)--------Unnest:
lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
+06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS
__unnest_placeholder(make_array(range().value))
+07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]
+08)--------------TableScan: range() projection=[value]
+physical_plan
+01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as
generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
+03)----SortExec: expr=[generated_id@0 ASC NULLS LAST],
preserve_partitioning=[true]
Review Comment:
this plan shows the data being sorted, but the comment suggests it should
not be 🤔
Could you please explain in more detail what you expect this explain plan to
be showing? Given there is no `ORDER BY` in the query (or in the `OVER` clause)
it is not clear why this is testing ordering
##########
datafusion/sqllogictest/test_files/unnest.slt:
##########
@@ -941,3 +941,242 @@ where min_height * width1 = (
)
----
4 7 4 28
+
+## Unnest with ordering on unrelated column is preserved
+query TT
+EXPLAIN WITH unnested AS (SELECT
+ ROW_NUMBER() OVER () AS generated_id,
+ unnest(array[value]) as ar
+ FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
+----
+logical_plan
+01)Projection: array_agg(unnested.ar)
+02)--Aggregate: groupBy=[[unnested.generated_id]],
aggr=[[array_agg(unnested.ar)]]
+03)----SubqueryAlias: unnested
+04)------Projection: generated_id,
__unnest_placeholder(make_array(range().value),depth=1) AS
UNNEST(make_array(range().value)) AS ar
+05)--------Unnest:
lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
+06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS
__unnest_placeholder(make_array(range().value))
+07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]
+08)--------------TableScan: range() projection=[value]
+physical_plan
+01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as
generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
+03)----SortExec: expr=[generated_id@0 ASC NULLS LAST],
preserve_partitioning=[true]
+04)------CoalesceBatchesExec: target_batch_size=8192
+05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4),
input_partitions=4
+06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as
generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
+07)------------ProjectionExec: expr=[generated_id@0 as generated_id,
__unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
+08)--------------UnnestExec
+09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as
__unnest_placeholder(make_array(range().value))]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame:
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
+12)----------------------LazyMemoryExec: partitions=1,
batch_generators=[range: start=1, end=5, batch_size=8192]
+
+## Unnest with ordering on unrelated column is preserved
+query TT
+EXPLAIN WITH unnested AS (SELECT
+ ROW_NUMBER() OVER () AS generated_id,
+ unnest(array[value]) as ar
+ FROM range(1,5)) SELECT array_agg(ar) FROM unnested group by generated_id;
+----
+logical_plan
+01)Projection: array_agg(unnested.ar)
+02)--Aggregate: groupBy=[[unnested.generated_id]],
aggr=[[array_agg(unnested.ar)]]
+03)----SubqueryAlias: unnested
+04)------Projection: generated_id,
__unnest_placeholder(make_array(range().value),depth=1) AS
UNNEST(make_array(range().value)) AS ar
+05)--------Unnest:
lists[__unnest_placeholder(make_array(range().value))|depth=1] structs[]
+06)----------Projection: row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING AS generated_id, make_array(range().value) AS
__unnest_placeholder(make_array(range().value))
+07)------------WindowAggr: windowExpr=[[row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]
+08)--------------TableScan: range() projection=[value]
+physical_plan
+01)ProjectionExec: expr=[array_agg(unnested.ar)@1 as array_agg(unnested.ar)]
+02)--AggregateExec: mode=FinalPartitioned, gby=[generated_id@0 as
generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
+03)----SortExec: expr=[generated_id@0 ASC NULLS LAST],
preserve_partitioning=[true]
+04)------CoalesceBatchesExec: target_batch_size=8192
+05)--------RepartitionExec: partitioning=Hash([generated_id@0], 4),
input_partitions=4
+06)----------AggregateExec: mode=Partial, gby=[generated_id@0 as
generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
+07)------------ProjectionExec: expr=[generated_id@0 as generated_id,
__unnest_placeholder(make_array(range().value),depth=1)@1 as ar]
+08)--------------UnnestExec
+09)----------------ProjectionExec: expr=[row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as generated_id, make_array(value@0) as
__unnest_placeholder(make_array(range().value))]
+10)------------------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1
+11)--------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { name: "row_number() ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64,
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame:
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
+12)----------------------LazyMemoryExec: partitions=1,
batch_generators=[range: start=1, end=5, batch_size=8192]
+
+# Unnest array where data is already ordered by column2 (100, 200, 300, 400)
+statement ok
+COPY (
+ SELECT * FROM VALUES
+ ([1,2,3], 100),
+ ([3], 200),
+ ([], 300),
+ ([3,1], 400)
+ ORDER BY column2
+ ) TO 'test_files/scratch/unnest/ordered_array.parquet';
+
+statement ok
+CREATE EXTERNAL TABLE t
+STORED AS PARQUET
+LOCATION 'test_files/scratch/unnest/ordered_array.parquet'
+WITH ORDER (column2)
+
+query ?I
+SELECT * FROM t;
+----
+[1, 2, 3] 100
+[3] 200
+[] 300
+[3, 1] 400
+
+# Data is sorted on column2 already, so no need to sort again
+query II
+SELECT UNNEST(column1), column2 FROM t ORDER BY column2;
+----
+1 100
+2 100
+3 100
+3 200
+3 400
+1 400
+
+# Explain should not have a SortExec
Review Comment:
Could you also please add two additional tests:
1. a negative test case here. order by the output of the unnest and verify
that it is in fact sorted correctly
2. A case with the ordering column as the first index (e.g. tuples like
(`100, [3,2,1], 'a')` and then order by 100
##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -99,11 +107,64 @@ impl UnnestExec {
/// This function creates the cache object that stores the plan properties
such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
+ list_column_indices: &[ListUnnest],
+ struct_column_indices: &[usize],
schema: SchemaRef,
) -> PlanProperties {
+ let list_column_indices: Vec<usize> = list_column_indices
+ .iter()
+ .map(|list_unnest| list_unnest.index_in_input_schema)
+ .collect();
+ let non_unnested_indices: Vec<usize> = input
+ .schema()
+ .fields()
+ .iter()
+ .enumerate()
+ .filter(|(idx, _)| {
+ !list_column_indices.contains(idx) &&
!struct_column_indices.contains(idx)
+ })
+ .map(|(idx, _)| idx)
+ .collect();
+
+ // Manually build projection mapping from non-unnested input columns
to their positions in the output
+ let input_schema = input.schema();
+ let projection_mapping: ProjectionMapping = non_unnested_indices
+ .iter()
+ .map(|&input_idx| {
+ // Find what index the input column has in the output schema
+ let input_field = input_schema.field(input_idx);
+ let output_idx = schema
+ .fields()
+ .iter()
+ .position(|output_field| output_field.name() ==
input_field.name())
Review Comment:
It would be nice to avoid the expect here so that any potential bugs result
in errors rather than panic
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]