This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f9a1396306 Fix bug and add new test (#7099)
f9a1396306 is described below

commit f9a13963061b16dfc9af2a3d1ebdb4968fdf70a4
Author: Mustafa Akur <[email protected]>
AuthorDate: Wed Jul 26 22:32:25 2023 +0300

    Fix bug and add new test (#7099)
---
 datafusion/core/src/physical_plan/projection.rs    |  5 +-
 .../tests/sqllogictests/test_files/groupby.slt     | 62 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_plan/projection.rs 
b/datafusion/core/src/physical_plan/projection.rs
index dac5227503..5c4b661143 100644
--- a/datafusion/core/src/physical_plan/projection.rs
+++ b/datafusion/core/src/physical_plan/projection.rs
@@ -97,7 +97,7 @@ impl ProjectionExec {
 
         // construct a map from the input columns to the output columns of the 
Projection
         let mut columns_map: HashMap<Column, Vec<Column>> = HashMap::new();
-        for (expression, name) in expr.iter() {
+        for (expr_idx, (expression, name)) in expr.iter().enumerate() {
             if let Some(column) = expression.as_any().downcast_ref::<Column>() 
{
                 // For some executors, logical and physical plan schema fields
                 // are not the same. The information in a `Column` comes from
@@ -107,11 +107,10 @@ impl ProjectionExec {
                 let idx = column.index();
                 let matching_input_field = input_schema.field(idx);
                 let matching_input_column = 
Column::new(matching_input_field.name(), idx);
-                let new_col_idx = schema.index_of(name)?;
                 let entry = columns_map
                     .entry(matching_input_column)
                     .or_insert_with(Vec::new);
-                entry.push(Column::new(name, new_col_idx));
+                entry.push(Column::new(name, expr_idx));
             };
         }
 
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt 
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index dae48a9464..d65433c4fb 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -2568,6 +2568,52 @@ TUR 100 75 175
 GRC 80 30 110
 FRA 200 50 250
 
+query TT
+EXPLAIN SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, 
LAST_VALUE(e.amount ORDER BY e.sn) AS last_rate
+FROM sales_global AS s
+JOIN sales_global AS e
+  ON s.currency = e.currency AND
+  s.ts >= e.ts
+GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency
+ORDER BY s.sn
+----
+logical_plan
+Sort: s.sn ASC NULLS LAST
+--Projection: s.zip_code, s.country, s.sn, s.ts, s.currency, 
LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST] AS last_rate
+----Aggregate: groupBy=[[s.sn, s.zip_code, s.country, s.ts, s.currency]], 
aggr=[[LAST_VALUE(e.amount) ORDER BY [e.sn ASC NULLS LAST]]]
+------Projection: s.zip_code, s.country, s.sn, s.ts, s.currency, e.sn, e.amount
+--------Inner Join: s.currency = e.currency Filter: s.ts >= e.ts
+----------SubqueryAlias: s
+------------TableScan: sales_global projection=[zip_code, country, sn, ts, 
currency]
+----------SubqueryAlias: e
+------------TableScan: sales_global projection=[sn, ts, currency, amount]
+physical_plan
+SortExec: expr=[sn@2 ASC NULLS LAST]
+--ProjectionExec: expr=[zip_code@1 as zip_code, country@2 as country, sn@0 as 
sn, ts@3 as ts, currency@4 as currency, LAST_VALUE(e.amount) ORDER BY [e.sn ASC 
NULLS LAST]@5 as last_rate]
+----AggregateExec: mode=Single, gby=[sn@2 as sn, zip_code@0 as zip_code, 
country@1 as country, ts@3 as ts, currency@4 as currency], 
aggr=[LAST_VALUE(e.amount)]
+------SortExec: expr=[sn@5 ASC NULLS LAST]
+--------ProjectionExec: expr=[zip_code@0 as zip_code, country@1 as country, 
sn@2 as sn, ts@3 as ts, currency@4 as currency, sn@5 as sn, amount@8 as amount]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(currency@4, 
currency@2)], filter=ts@0 >= ts@1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query ITIPTR
+SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount 
ORDER BY e.sn) AS last_rate
+FROM sales_global AS s
+JOIN sales_global AS e
+  ON s.currency = e.currency AND
+  s.ts >= e.ts
+GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency
+ORDER BY s.sn
+----
+0 GRC 0 2022-01-01T06:00:00 EUR 30
+1 FRA 1 2022-01-01T08:00:00 EUR 50
+1 TUR 2 2022-01-01T11:30:00 TRY 75
+1 FRA 3 2022-01-02T12:00:00 EUR 200
+0 GRC 4 2022-01-03T10:00:00 EUR 80
+1 TUR 4 2022-01-03T10:00:00 TRY 100
+
 # Run order-sensitive aggregators in multiple partitions
 statement ok
 set datafusion.execution.target_partitions = 8;
@@ -2847,3 +2893,19 @@ SELECT country, ARRAY_AGG(amount ORDER BY amount DESC) 
AS amounts,
 FRA [200.0, 50.0] 50 50
 GRC [80.0, 30.0] 30 30
 TUR [100.0, 75.0] 75 75
+
+query ITIPTR
+SELECT s.zip_code, s.country, s.sn, s.ts, s.currency, LAST_VALUE(e.amount 
ORDER BY e.sn) AS last_rate
+FROM sales_global AS s
+JOIN sales_global AS e
+  ON s.currency = e.currency AND
+  s.ts >= e.ts
+GROUP BY s.sn, s.zip_code, s.country, s.ts, s.currency
+ORDER BY s.sn
+----
+0 GRC 0 2022-01-01T06:00:00 EUR 30
+1 FRA 1 2022-01-01T08:00:00 EUR 50
+1 TUR 2 2022-01-01T11:30:00 TRY 75
+1 FRA 3 2022-01-02T12:00:00 EUR 200
+0 GRC 4 2022-01-03T10:00:00 EUR 80
+1 TUR 4 2022-01-03T10:00:00 TRY 100

Reply via email to