drusso opened a new pull request #8222:
URL: https://github.com/apache/arrow/pull/8222


   This is a proposal for an initial and partial implementation of the 
`DISTINCT` keyword. Only `COUNT(DISTINCT)` is supported, with the following 
conditions:
   
   (a) only one argument, i.e. `COUNT(DISTINCT col)`, but not `COUNT(DISTINCT 
col, other)`,
   (b) the argument is an integer type, and
   (c) the query must have a `GROUP BY` clause.
   
   **Implementation Overview:**
   
   The `Expr::AggregateFunction` variant has a new field, `distinct`, which 
mirrors the `distinct` flag from `SQLExpr::Function` (up until now this flag 
was unused). Any `Expr::AggregateFunction` may have its `distinct` flag 
switched to `true` if the keyword is present in the SQL query. However, the 
physical planner respects it only for `COUNT` expressions.
   
   The count distinct aggregation slots into the existing physical plans as a 
new set of `AggregateExpr`. To demonstrate, below are examples of the physical 
plans for the following query, where `c1` may be any data type, and `c2` is a 
`UInt8` column:
   
   ```
   SELECT c1, COUNT(DISTINCT c2) FROM t1 GROUP BY c1
   ```
   
   (a) Multiple Partitions:
   
       HashAggregateExec:
         mode: Final
         group_expr:
           Column(c1)
         aggr_expr:
           DistinctCountReduce(Column(c2))
         schema:
           c1: any
           c2: UInt64
         input:
           MergeExec:
             input:
               HashAggregateExec:
                 mode: Partial
                 group_expr:
                   Column(c1)
                 aggr_expr:
                   DistinctCount(Column(c2))
                 schema:
                   c1: any
                   c2: LargeList(UInt8)
                 input:
                   CsvExec:
                     schema:
                       c1: any
                       c2: UInt8
   
   The `DistinctCount` accumulates each `UInt8` into a list of distinct 
`UInt8`. No counts are collected yet, this is a partial result: lists of 
distinct values. In the `RecordBatch`, this is a `LargeListArray<UInt8>` 
column. After the `MergeExec`, each list in `LargeListArray<UInt8>` is 
accumulated by `DistinctCountReduce` (via `accumulate_batch()`), producing the 
_final_ sets of distinct values. Finally, given the finalized sets of distinct 
values, the counts are computed (always as `UInt64`).
   
   (b) Single Partition:
   
       HashAggregateExec:
         mode: NoPartial
         group_expr:
           Column(c1)
         aggr_expr:
           DistinctCountReduce(Column(c2))
         schema:
           c1: any
           c2: UInt64
         input:
           CsvExec:
             schema:
               c1: any
               c2: UInt8
   
   This scenario is unlike the multiple partition scenario: `DistinctCount` is 
_not_ used, and there are no partial sets of distinct values. Rather, in a 
single `HashAggregateExec` stage, each `UInt8` is accumulated into a distinct 
value set, then the counts are computed at the end of the stage. 
`DistinctCountReduce` is used, but note that unlike the multiple partition 
case, it accumulates scalars via `accumulate_scalar()`.
   
   There is a new aggregation mode: `NoPartial`. In summary, the modes are:
   
   - `NoPartial`: used in single-stage aggregations
   - `Partial`: used as the first stage of two-stage aggregations
   - `Final`: used as the second stage of two-stage aggregaions
   
   Prior to the new `NoPartial` mode, `Partial` was handling both of what are 
now the responsibilities of `Partial` and `NoPartial`. No distinction was 
required, because _non-distinct_ aggregations (such as count, sum, min, max, 
and avg) do not need the distinction: the first aggregation stage is always the 
same, regardless of whether the aggregation is one-stage or two-stage. This is 
not the case for a _distinct_ count aggregation, and we can see that in the 
physical plans above.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to