Good point to reject DISTINCT operations currently, as it's not handled
now. There could be more similar cases need to revise and document well.

Regarding to how to DISTINCT support, I was confused by stateful CombineFn
at first. To make it simple, we can extend step by step, like reject
1. customized CombineFn looks good to even support DISTINCT+nonDISTINCT
case, just not sure about performance impact, and it looks odd when users
write UDAF ;
2. for those with only one  DISTINCT, it's not hard to handle it with
Calcite rule or during compile step, with two GBKs. To handle
DISTINCT+nonDISTINCT case in this way, maybe we can try with
JOIN(GBK_for_non_DISTINCT, 2_GBK_for_DISTINCT), need more tests to confirm;


On Fri, May 3, 2019 at 2:38 PM Rui Wang <> wrote:

> A compromise solution would be using SELECT DISTINCT or GROUP BY to
> duplicate before apply aggregations. It's two shuffles and works on non
> floating point columns. The good thing is no code change is needed, but
> downsides are users need to write more complicated query and floating point
> data is not supported.
> -Rui
> On Fri, May 3, 2019 at 1:23 PM Rui Wang <> wrote:
>> Fair point. It lacks of proper benchmarks for BeamSQL to test performance
>> and scalability of implementations.
>> -Rui
>> On Fri, May 3, 2019 at 12:56 PM Reuven Lax <> wrote:
>>> Back to the original point: I'm very skeptical of adding something that
>>> does not scale at all. In our experience, users get far more upset with an
>>> advertised feature that doesn't work for them (e.g. their workers OOM) than
>>> with a missing feature.
>>> Reuven
>>> On Fri, May 3, 2019 at 12:41 PM Kenneth Knowles <> wrote:
>>>> All good points. My version of the two shuffle approach does not work
>>>> at all.
>>>> On Fri, May 3, 2019 at 11:38 AM Brian Hulette <>
>>>> wrote:
>>>>> Rui's point about FLOAT/DOUBLE columns is interesting as well. We
>>>>> couldn't support distinct aggregations on floating point columns with the
>>>>> two-shuffle approach, but we could with the CombineFn approach. I'm not
>>>>> sure if that's a good thing or not, it seems like an anti-pattern to do a
>>>>> distinct aggregation on floating point numbers but I suppose the spec
>>>>> allows it.
>>>> I can't find the Jira, but grouping on doubles has been discussed at
>>>> some length before. Many DBMSs do not provide this, so it is not generally
>>>> expected by SQL users. That is good, because mathematically it is
>>>> questionable - floating point is usually used as a stand-in for real
>>>> numbers, where computing equality is not generally possible. So any code
>>>> that actually depends on equality of floating points is likely susceptible
>>>> to rounding errors, other quirks of floating point, and also is probably
>>>> misguided because the underlying thing that floats are approximating
>>>> already cannot be checked for equality.
>>>> Kenn
>>>>> Brian
>>>>> On Fri, May 3, 2019 at 10:52 AM Rui Wang <> wrote:
>>>>>> To clarify what I said "So two shuffle approach will lead to two
>>>>>> different implementation for tables with and without FLOAT/DOUBLE 
>>>>>> column.":
>>>>>> Basically I wanted to say that two shuffles approach will be an
>>>>>> implementation for some cases, and it will co-exist with CombineFn
>>>>>> approach. In the feature, when we start cost based optimization in
>>>>>> BeamSQL,  CBO is supposed to compare different plans.
>>>>>> -Rui
>>>>>> On Fri, May 3, 2019 at 10:40 AM Rui Wang <> wrote:
>>>>>>>> As to the distinct aggregations: At the least, these queries should
>>>>>>>> be rejected, not evaluated incorrectly.
>>>>>>> Yes. The least is not to support it, and throws clear message to say
>>>>>>> no. (current implementation ignores DISTINCT and executes all 
>>>>>>> aggregations
>>>>>>> as ALL).
>>>>>>>> The term "stateful CombineFn" is not one I would use, as the nature
>>>>>>>> of state is linearity and the nature of CombineFn is parallelism. So I
>>>>>>>> don't totally understand this proposal. If I replace stateful CombineFn
>>>>>>>> with stateful DoFn with one combining state per column, then I think I
>>>>>>>> understand. FWIW on a runner with scalable SetState or MapState it 
>>>>>>>> will not
>>>>>>>> be any risk at all.
>>>>>>>> I see. "Stateful" is indeed misleading. In this thread, it was all
>>>>>>> about using simple CombineFn to achieve DISTINCT aggregation with 
>>>>>>> massive
>>>>>>> parallelism.
>>>>>>> But if you go the two shuffle route, you don't have to separate the
>>>>>>>> aggregations and re-join them. You just have to incur the cost of the 
>>>>>>>> GBK +
>>>>>>>> DISTINCT for all columns, and just drop the secondary key for the 
>>>>>>>> second
>>>>>>>> shuffle, no?
>>>>>>>> Two shuffle approach cannot be the unified approach because it
>>>>>>> requires to build a key of group_by_key + table_row to deduplicate, but
>>>>>>> table_row might contain floating point numbers, which cannot be used as 
>>>>>>> key
>>>>>>> in GBK. So two shuffle approach will lead to two different 
>>>>>>> implementation
>>>>>>> for tables with and without FLOAT/DOUBLE column.
>>>>>>> CombineFn is the unified approach for distinct and non distinct
>>>>>>> aggregation: each aggregation call will be a CombineFn.
>>>>>>> -Rui
>>>>>>>> Kenn
>>>>>>>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <>
>>>>>>>> wrote:
>>>>>>>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <> wrote:
>>>>>>>>>> Brian's first proposal is challenging also partially because in
>>>>>>>>>> BeamSQL there is no good practice to deal with complex SQL plans. 
>>>>>>>>>> Ideally
>>>>>>>>>> we need enough rules and SQL plan node in Beam to construct
>>>>>>>>>> easy-to-transform plans for different cases. I had a similar 
>>>>>>>>>> situation
>>>>>>>>>> before when I needed to separate logical plans of  "JOIN ON a OR 
>>>>>>>>>> b"and
>>>>>>>>>> "JOIN ON a AND b", which was because their implementation are so
>>>>>>>>>> different to fit into the same JoinRelNode. It seems in similar 
>>>>>>>>>> situation
>>>>>>>>>> when both distinct aggregations and non-distinct aggregations are 
>>>>>>>>>> mixed,
>>>>>>>>>> one single AggregationRelNode is hard to encapsulate complex logical.
>>>>>>>>>> We will need a detailed plan to re-think about RelNodes and Rules
>>>>>>>>>> in BeamSQL, which is out of scope for supporting DISTINCT.
>>>>>>>>>> I would favor of second proposal because BeamSQL uses Beam schema
>>>>>>>>>> and row. Schema itself uses Java primitives for most of its types 
>>>>>>>>>> (int,
>>>>>>>>>> long, float, etc.). It limits the size of each element. Considering 
>>>>>>>>>> per key
>>>>>>>>>> and per window combine, there is a good chance that stateful combine 
>>>>>>>>>> works
>>>>>>>>>> for some (if not most) cases.
>>>>>>>>>> Could we use @Experimental to tag stateful combine for supporting
>>>>>>>>>> DISTINCT in aggregation so that we could have a chance to test it by 
>>>>>>>>>> users?
>>>>>>>>>> -Rui
>>>>>>>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <>
>>>>>>>>>> wrote:
>>>>>>>>>>> Ahmet -
>>>>>>>>>>> I think it would only require observing each key's partition of
>>>>>>>>>>> the input independently, and the size of the state would only be
>>>>>>>>>>> proportional to the number of distinct elements, not the entire 
>>>>>>>>>>> input. Note
>>>>>>>>>>> the pipeline would be a GBK with a key based on the GROUP BY, 
>>>>>>>>>>> followed by a
>>>>>>>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn.
>>>>>>>>> Got it. Distinct elements could be proportional to the entire
>>>>>>>>> input, however if this is a reasonable requirement from a product
>>>>>>>>> perspective that is fine. Rui's suggestion of using experimental tag 
>>>>>>>>> is
>>>>>>>>> also a good idea. I supposed that will give us the ability to change 
>>>>>>>>> the
>>>>>>>>> implementation if it becomes necessary.
>>>>>>>>>>> Luke -
>>>>>>>>>>> Here's A little background on why I think (1) is harder (It may
>>>>>>>>>>> also just be that it looks daunting to me as someone who's not that
>>>>>>>>>>> familiar with the code).
>>>>>>>>>>> An aggregation node can have multiple aggregations. So, for
>>>>>>>>>>> example, the query `SELECT k, SUM(x), COUNT(DISTINCT y), 
>>>>>>>>>>> AVG(DISTINCT z)
>>>>>>>>>>> FROM ...` would yield a logical plan that has a single aggregation 
>>>>>>>>>>> node
>>>>>>>>>>> with three different aggregations. We then take that node and build 
>>>>>>>>>>> up a
>>>>>>>>>>> CombineFn that is a composite of all of the aggregations we need to 
>>>>>>>>>>> make a
>>>>>>>>>>> combining PTransform [1]. To implement (1) we would need to 
>>>>>>>>>>> distinguish
>>>>>>>>>>> between all the DISTINCT and non-DISTINCT aggregations, and come up 
>>>>>>>>>>> with a
>>>>>>>>>>> way to unify the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT
>>>>>>>>>>> pipeline.
>>>>>>>>>>> That's certainly not unsolvable, but approach (2) is much
>>>>>>>>>>> simpler - it just requires implementing some variations on the 
>>>>>>>>>>> CombineFn's
>>>>>>>>>>> that already exist [2] and re-using the existing logic for 
>>>>>>>>>>> converting an
>>>>>>>>>>> aggregation node to a combining PTransform.
>>>>>>>>>>> Hopefully that makes sense, let me know if I need to clarify
>>>>>>>>>>> further :)
>>>>>>>>>>> Brian
>>>>>>>>>>> [1]
>>>>>>>>>>> [2]
>>>>>>>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <>
>>>>>>>>>>> wrote:
>>>>>>>>>>>> Can you also go into more detail why you think 1) is more
>>>>>>>>>>>> challenging to implement?
>>>>>>>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> From my limited understanding, would not the stateful
>>>>>>>>>>>>> combinefn option require observing the whole input before being 
>>>>>>>>>>>>> able
>>>>>>>>>>>>> combine and the risk of blowing memory is actually very high 
>>>>>>>>>>>>> except for
>>>>>>>>>>>>> trivial inputs?
>>>>>>>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations.
>>>>>>>>>>>>>> These are queries like:
>>>>>>>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>>>>>>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>>>>>>>>>>>> These are represented in Calcite's logical plan with a
>>>>>>>>>>>>>> distinct flag on aggregation calls, but we ignore the flag when 
>>>>>>>>>>>>>> converting
>>>>>>>>>>>>>> to a pipeline. I see two different ways that we could support 
>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on
>>>>>>>>>>>>>> the <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr 
>>>>>>>>>>>>>> followed by
>>>>>>>>>>>>>> a second GBK on just <GROUP BY key> to perform the aggregation.
>>>>>>>>>>>>>> 2. Stateful CombineFn - We could implement a version of the
>>>>>>>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state 
>>>>>>>>>>>>>> that tracks
>>>>>>>>>>>>>> previously seen elements and uses it de-deupe new elements.
>>>>>>>>>>>>>> Of course, something like (1) is much more scalable, but it
>>>>>>>>>>>>>> is also much more challenging to implement. While (2) is trivial 
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> implement, but runs the risk of blowing up a worker's memory 
>>>>>>>>>>>>>> usage.
>>>>>>>>>>>>>> Personally, I think it could be worthwhile to provide support
>>>>>>>>>>>>>> for DISTINCT quickly with approach (2), and implement (1) as an
>>>>>>>>>>>>>> optimization later. This combiner's state would be partitioned 
>>>>>>>>>>>>>> by key and
>>>>>>>>>>>>>> by window, so I think we would be pretty safe from OOM'ing a 
>>>>>>>>>>>>>> worker except
>>>>>>>>>>>>>> in some extreme cases (long windows, hot keys, very large batch 
>>>>>>>>>>>>>> pipelines,
>>>>>>>>>>>>>> ...).
>>>>>>>>>>>>>> But I understand this could be controversial, so I wanted to
>>>>>>>>>>>>>> open it up for discussion first: Would it be worthwhile to 
>>>>>>>>>>>>>> provide support
>>>>>>>>>>>>>> for DISTINCT aggregations quickly with approach #2, or is it 
>>>>>>>>>>>>>> better to just
>>>>>>>>>>>>>> reject these queries until we can implement them safely with 
>>>>>>>>>>>>>> approach #1?
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Brian
>>>>>>>>>>>>>> [1]


Reply via email to