drusso opened a new pull request #8222: URL: https://github.com/apache/arrow/pull/8222
This is a proposal for an initial and partial implementation of the `DISTINCT` keyword. Only `COUNT(DISTINCT)` is supported, with the following conditions: (a) only one argument, i.e. `COUNT(DISTINCT col)`, but not `COUNT(DISTINCT col, other)`, (b) the argument is an integer type, and (c) the query must have a `GROUP BY` clause. **Implementation Overview:** The `Expr::AggregateFunction` variant has a new field, `distinct`, which mirrors the `distinct` flag from `SQLExpr::Function` (up until now this flag was unused). Any `Expr::AggregateFunction` may have its `distinct` flag switched to `true` if the keyword is present in the SQL query. However, the physical planner respects it only for `COUNT` expressions. The count distinct aggregation slots into the existing physical plans as a new set of `AggregateExpr`. To demonstrate, below are examples of the physical plans for the following query, where `c1` may be any data type, and `c2` is a `UInt8` column: ``` SELECT c1, COUNT(DISTINCT c2) FROM t1 GROUP BY c1 ``` (a) Multiple Partitions: HashAggregateExec: mode: Final group_expr: Column(c1) aggr_expr: DistinctCountReduce(Column(c2)) schema: c1: any c2: UInt64 input: MergeExec: input: HashAggregateExec: mode: Partial group_expr: Column(c1) aggr_expr: DistinctCount(Column(c2)) schema: c1: any c2: LargeList(UInt8) input: CsvExec: schema: c1: any c2: UInt8 The `DistinctCount` accumulates each `UInt8` into a list of distinct `UInt8`. No counts are collected yet, this is a partial result: lists of distinct values. In the `RecordBatch`, this is a `LargeListArray<UInt8>` column. After the `MergeExec`, each list in `LargeListArray<UInt8>` is accumulated by `DistinctCountReduce` (via `accumulate_batch()`), producing the _final_ sets of distinct values. Finally, given the finalized sets of distinct values, the counts are computed (always as `UInt64`). (b) Single Partition: HashAggregateExec: mode: NoPartial group_expr: Column(c1) aggr_expr: DistinctCountReduce(Column(c2)) schema: c1: any c2: UInt64 input: CsvExec: schema: c1: any c2: UInt8 This scenario is unlike the multiple partition scenario: `DistinctCount` is _not_ used, and there are no partial sets of distinct values. Rather, in a single `HashAggregateExec` stage, each `UInt8` is accumulated into a distinct value set, then the counts are computed at the end of the stage. `DistinctCountReduce` is used, but note that unlike the multiple partition case, it accumulates scalars via `accumulate_scalar()`. There is a new aggregation mode: `NoPartial`. In summary, the modes are: - `NoPartial`: used in single-stage aggregations - `Partial`: used as the first stage of two-stage aggregations - `Final`: used as the second stage of two-stage aggregaions Prior to the new `NoPartial` mode, `Partial` was handling both of what are now the responsibilities of `Partial` and `NoPartial`. No distinction was required, because _non-distinct_ aggregations (such as count, sum, min, max, and avg) do not need the distinction: the first aggregation stage is always the same, regardless of whether the aggregation is one-stage or two-stage. This is not the case for a _distinct_ count aggregation, and we can see that in the physical plans above. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org