This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f9a1396306 Fix bug and add new test (#7099)
f9a1396306 is described below
commit f9a13963061b16dfc9af2a3d1ebdb4968fdf70a4
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Jul 26 22:32:25 2023 +0300
Fix bug and add new test (#7099)
---
datafusion/core/src/physical_plan/projection.rs | 5 +-
.../tests/sqllogictests/test_files/groupby.slt | 62 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 3 deletions(-)
diff --git a/datafusion/core/src/physical_plan/projection.rs
b/datafusion/core/src/physical_plan/projection.rs
index dac5227503..5c4b661143 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -97,7 +97,7 @@ impl ProjectionExec {
// construct a map from the input columns to the output columns of the
Projection
let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
- for (expression, name) in expr.iter() {
+ for (expr_idx, (expression, name)) in expr.iter().enumerate() {
if let Some(column) = expression.as_any().downcast_ref::<Column>()
{
// For some executors, logical and physical plan schema fields
// are not the same. The information in a `Column` comes from
@@ -107,11 +107,10 @@ impl ProjectionExec {
let idx = column.index();
let matching_input_field = input_schema.field(idx);
let matching_input_column =
Column::new(matching_input_field.name(), idx);
- let new_col_idx = schema.index_of(name)?;
let entry = columns_map
.entry(matching_input_column)
.or_insert_with(Vec::new);
- entry.push(Column::new(name, new_col_idx));
+ entry.push(Column::new(name, expr_idx));
};
}
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index dae48a9464..d65433c4fb 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2568,6 +2568,52 @@ TUR 100 75 175
GRC 80 30 110
FRA 200 50 250
+query TT
+EXPLAIN SELECT s.zip_code, s.country, s.sn, s.ts, s.currency,
LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate
+FROM sales_global AS s
+JOIN sales_global AS e
+ ON s.currency = e.currency AND
+ s.ts >= e.ts
+GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency
+ORDER BY s.sn
+----
+logical_plan
+Sort: s.sn ASC NULLS LAST
+--Projection: s.zip_code, s.country, s.sn, s.ts, s.currency,
LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST] AS last_rate
+----Aggregate: groupBy=[[s.sn, s.zip_code, s.country, s.ts, s.currency]],
aggr=[[LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]]]
+------Projection: s.zip_code, s.country, s.sn, s.ts, s.currency, e.sn, e.amount
+--------Inner Join: s.currency = e.currency Filter: s.ts >= e.ts
+----------SubqueryAlias: s
+------------TableScan: sales_global projection=[zip_code, country, sn, ts,
currency]
+----------SubqueryAlias: e
+------------TableScan: sales_global projection=[sn, ts, currency, amount]
+physical_plan
+SortExec: expr=[sn@2 ASC NULLS LAST]
+--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as
sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC
NULLS LAST]@5 as last_rate]
+----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code,
country@1 as country, ts@3 as ts, currency@4 as currency],
aggr=[LAST_VALUE(e.amount)]
+------SortExec: expr=[sn@5 ASC NULLS LAST]
+--------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country,
sn@2 as sn, ts@3 as ts, currency@4 as currency, sn@5 as sn, amount@8 as amount]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@4,
currency@2)], filter=ts@0 >= ts@1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITIPTR
+SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount
ORDER BY e.sn) AS last_rate
+FROM sales_global AS s
+JOIN sales_global AS e
+ ON s.currency = e.currency AND
+ s.ts >= e.ts
+GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency
+ORDER BY s.sn
+----
+0 GRC 0 2022-01-01T06:00:00 EUR 30
+1 FRA 1 2022-01-01T08:00:00 EUR 50
+1 TUR 2 2022-01-01T11:30:00 TRY 75
+1 FRA 3 2022-01-02T12:00:00 EUR 200
+0 GRC 4 2022-01-03T10:00:00 EUR 80
+1 TUR 4 2022-01-03T10:00:00 TRY 100
+
# Run order-sensitive aggregators in multiple partitions
statement ok
set datafusion.execution.target_partitions = 8;
@@ -2847,3 +2893,19 @@ SELECT country, ARRAY_AGG(amount ORDER BY amount DESC)
AS amounts,
FRA [200.0, 50.0] 50 50
GRC [80.0, 30.0] 30 30
TUR [100.0, 75.0] 75 75
+
+query ITIPTR
+SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount
ORDER BY e.sn) AS last_rate
+FROM sales_global AS s
+JOIN sales_global AS e
+ ON s.currency = e.currency AND
+ s.ts >= e.ts
+GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency
+ORDER BY s.sn
+----
+0 GRC 0 2022-01-01T06:00:00 EUR 30
+1 FRA 1 2022-01-01T08:00:00 EUR 50
+1 TUR 2 2022-01-01T11:30:00 TRY 75
+1 FRA 3 2022-01-02T12:00:00 EUR 200
+0 GRC 4 2022-01-03T10:00:00 EUR 80
+1 TUR 4 2022-01-03T10:00:00 TRY 100