drusso opened a new pull request #8222:
URL: https://github.com/apache/arrow/pull/8222
This is a proposal for an initial and partial implementation of the
`DISTINCT` keyword. Only `COUNT(DISTINCT)` is supported, with the following
conditions:
(a) only one argument, i.e. `COUNT(DISTINCT col)`, but not `COUNT(DISTINCT
col, other)`,
(b) the argument is an integer type, and
(c) the query must have a `GROUP BY` clause.
**Implementation Overview:**
The `Expr::AggregateFunction` variant has a new field, `distinct`, which
mirrors the `distinct` flag from `SQLExpr::Function` (up until now this flag
was unused). Any `Expr::AggregateFunction` may have its `distinct` flag
switched to `true` if the keyword is present in the SQL query. However, the
physical planner respects it only for `COUNT` expressions.
The count distinct aggregation slots into the existing physical plans as a
new set of `AggregateExpr`. To demonstrate, below are examples of the physical
plans for the following query, where `c1` may be any data type, and `c2` is a
`UInt8` column:
```
SELECT c1, COUNT(DISTINCT c2) FROM t1 GROUP BY c1
```
(a) Multiple Partitions:
HashAggregateExec:
mode: Final
group_expr:
Column(c1)
aggr_expr:
DistinctCountReduce(Column(c2))
schema:
c1: any
c2: UInt64
input:
MergeExec:
input:
HashAggregateExec:
mode: Partial
group_expr:
Column(c1)
aggr_expr:
DistinctCount(Column(c2))
schema:
c1: any
c2: LargeList(UInt8)
input:
CsvExec:
schema:
c1: any
c2: UInt8
The `DistinctCount` accumulates each `UInt8` into a list of distinct
`UInt8`. No counts are collected yet, this is a partial result: lists of
distinct values. In the `RecordBatch`, this is a `LargeListArray<UInt8>`
column. After the `MergeExec`, each list in `LargeListArray<UInt8>` is
accumulated by `DistinctCountReduce` (via `accumulate_batch()`), producing the
_final_ sets of distinct values. Finally, given the finalized sets of distinct
values, the counts are computed (always as `UInt64`).
(b) Single Partition:
HashAggregateExec:
mode: NoPartial
group_expr:
Column(c1)
aggr_expr:
DistinctCountReduce(Column(c2))
schema:
c1: any
c2: UInt64
input:
CsvExec:
schema:
c1: any
c2: UInt8
This scenario is unlike the multiple partition scenario: `DistinctCount` is
_not_ used, and there are no partial sets of distinct values. Rather, in a
single `HashAggregateExec` stage, each `UInt8` is accumulated into a distinct
value set, then the counts are computed at the end of the stage.
`DistinctCountReduce` is used, but note that unlike the multiple partition
case, it accumulates scalars via `accumulate_scalar()`.
There is a new aggregation mode: `NoPartial`. In summary, the modes are:
- `NoPartial`: used in single-stage aggregations
- `Partial`: used as the first stage of two-stage aggregations
- `Final`: used as the second stage of two-stage aggregaions
Prior to the new `NoPartial` mode, `Partial` was handling both of what are
now the responsibilities of `Partial` and `NoPartial`. No distinction was
required, because _non-distinct_ aggregations (such as count, sum, min, max,
and avg) do not need the distinction: the first aggregation stage is always the
same, regardless of whether the aggregation is one-stage or two-stage. This is
not the case for a _distinct_ count aggregation, and we can see that in the
physical plans above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]