[ 
https://issues.apache.org/jira/browse/DRILL-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14589218#comment-14589218
 ] 

Aman Sinha commented on DRILL-3298:
-----------------------------------

[~adeneche] were you asking about the final sort or the sort that is input to 
the window function ?  Also, what is the semantics of a window aggregate if 
there is no partition-by clause  but there's an order-by ? If the semantics is 
that the order-by column should be treated as a 'partitioning' column, then we 
would need to insert a HashToRandomExchange below the Sort that is input to the 
Window operator. 

Let me illustrate the issue with a simpler example below.  I removed the final 
order-by, added a filter condition to reduce the result set and am just doing a 
COUNT instead of SUM.  

slice_target = 1:  // incorrect result
{code}
select c_integer, count(*) over (order by c_integer) as cnt
 from j1 where c_integer in (447503383, 452697297, 456808172) ;

+------------+------+
| c_integer  | cnt  |
+------------+------+
| 456808172  | 1    |
| 447503383  | 2    |
| 447503383  | 2    |
| 452697297  | 4    |
| 452697297  | 4    |
| 456808172  | 5    |
| 447503383  | 2    |
| 447503383  | 2    |
| 456808172  | 2    |
| 456808172  | 2    |
+------------+------+
{code}

slice_target = 100000:  // correct result
{code}
+------------+------+
| c_integer  | cnt  |
+------------+------+
| 447503383  | 4    |
| 447503383  | 4    |
| 447503383  | 4    |
| 447503383  | 4    |
| 452697297  | 6    |
| 452697297  | 6    |
| 456808172  | 10   |
| 456808172  | 10   |
| 456808172  | 10   |
| 456808172  | 10   |
+------------+------+
{code}

Note that in the incorrect result, not only are the the 'cnt' values incorrect, 
 they are not the same for all rows of 456808172.  
So it seems to me that each minor fragment of the window is computing its own 
COUNT and that is not being set for all identical values of c_integer.   If you 
can verify this, I think adding a distribution step below the Sort would 
resolve it.  I can work on adding that. 
 

> Wrong result with SUM window function and order by without partition by in 
> the OVER clause
> ------------------------------------------------------------------------------------------
>
>                 Key: DRILL-3298
>                 URL: https://issues.apache.org/jira/browse/DRILL-3298
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Execution - Flow
>    Affects Versions: 1.0.0
>            Reporter: Victoria Markman
>            Assignee: Deneche A. Hakim
>            Priority: Critical
>              Labels: window_function
>             Fix For: 1.1.0
>
>         Attachments: j1.tar, test.res
>
>
> This query returns incorrect result when planner.slice_target = 1
> {code}
> select
>         j1.c_integer,
>         sum(j1.c_integer) over w
> from j1
> window  w as (order by c_integer desc)
> order by
>         1, 2;
> {code}
> Query plan with planner.slice_target = 1
> {noformat}
> 00-01      Project(c_integer=[$0], EXPR$1=[$1])
> 00-02        SingleMergeExchange(sort0=[0 ASC], sort1=[1 ASC])
> 01-01          SelectionVectorRemover
> 01-02            Sort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
> 01-03              Project(c_integer=[$0], EXPR$1=[$1])
> 01-04                HashToRandomExchange(dist0=[[$0]], dist1=[[$1]])
> 02-01                  UnorderedMuxExchange
> 03-01                    Project(c_integer=[$0], EXPR$1=[$1], 
> E_X_P_R_H_A_S_H_F_I_E_L_D=[castInt(hash64AsDouble($1, hash64AsDouble($0)))])
> 03-02                      Project(c_integer=[$0], EXPR$1=[CASE(>($1, 0), 
> CAST($2):ANY, null)])
> 03-03                        Window(window#0=[window(partition {} order by [0 
> DESC] range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), 
> $SUM0($0)])])
> 03-04                          SelectionVectorRemover
> 03-05                            Sort(sort0=[$0], dir0=[DESC])
> 03-06                              Scan(groupscan=[ParquetGroupScan 
> [entries=[ReadEntryWithPath [path=maprfs:///drill/testdata/subqueries/j1]], 
> selectionRoot=/drill/testdata/subqueries/j1, numFiles=1, 
> columns=[`c_integer`]]])
> {noformat}
> Query plan with planner.slice_target = 100000;
> {noformat}
> 00-01      Project(c_integer=[$0], EXPR$1=[$1])
> 00-02        SelectionVectorRemover
> 00-03          Sort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
> 00-04            Project(c_integer=[$0], EXPR$1=[CASE(>($1, 0), CAST($2):ANY, 
> null)])
> 00-05              Window(window#0=[window(partition {} order by [0 DESC] 
> range between UNBOUNDED PRECEDING and CURRENT ROW aggs [COUNT($0), 
> $SUM0($0)])])
> 00-06                SelectionVectorRemover
> 00-07                  Sort(sort0=[$0], dir0=[DESC])
> 00-08                    Scan(groupscan=[ParquetGroupScan 
> [entries=[ReadEntryWithPath [path=maprfs:///drill/testdata/subqueries/j1]], 
> selectionRoot=/drill/testdata/subqueries/j1, numFiles=1, 
> columns=[`c_integer`]]])
> {noformat}
> Attached:
>         * table j1
>         * test.res - result generated with postgres



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to