To close the loop on this: Rui just added a check that rejects distinct
aggregations for now[1]. I wrote up BEAM-7306[2] to track this feature
going forward.

[1] https://github.com/apache/beam/pull/8498
[2] https://issues.apache.org/jira/browse/BEAM-7306

*From: *Mingmin Xu <mingm...@gmail.com>
*Date: *Mon, May 6, 2019 at 10:16 AM
*To: *dev@beam.apache.org

Good point to reject DISTINCT operations currently, as it's not handled
> now. There could be more similar cases need to revise and document well.
>
> Regarding to how to DISTINCT support, I was confused by stateful CombineFn
> at first. To make it simple, we can extend step by step, like reject
> DISTINCT+nonDISTINCT cases:
> 1. customized CombineFn looks good to even support DISTINCT+nonDISTINCT
> case, just not sure about performance impact, and it looks odd when users
> write UDAF ;
> 2. for those with only one  DISTINCT, it's not hard to handle it with
> Calcite rule or during compile step, with two GBKs. To handle
> DISTINCT+nonDISTINCT case in this way, maybe we can try with
> JOIN(GBK_for_non_DISTINCT, 2_GBK_for_DISTINCT), need more tests to confirm;
>
> Mingmin
>
> On Fri, May 3, 2019 at 2:38 PM Rui Wang <ruw...@google.com> wrote:
>
>> A compromise solution would be using SELECT DISTINCT or GROUP BY to
>> duplicate before apply aggregations. It's two shuffles and works on non
>> floating point columns. The good thing is no code change is needed, but
>> downsides are users need to write more complicated query and floating point
>> data is not supported.
>>
>>
>> -Rui
>>
>> On Fri, May 3, 2019 at 1:23 PM Rui Wang <ruw...@google.com> wrote:
>>
>>> Fair point. It lacks of proper benchmarks for BeamSQL to test
>>> performance and scalability of implementations.
>>>
>>>
>>> -Rui
>>>
>>> On Fri, May 3, 2019 at 12:56 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Back to the original point: I'm very skeptical of adding something that
>>>> does not scale at all. In our experience, users get far more upset with an
>>>> advertised feature that doesn't work for them (e.g. their workers OOM) than
>>>> with a missing feature.
>>>>
>>>> Reuven
>>>>
>>>> On Fri, May 3, 2019 at 12:41 PM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>> All good points. My version of the two shuffle approach does not work
>>>>> at all.
>>>>>
>>>>> On Fri, May 3, 2019 at 11:38 AM Brian Hulette <bhule...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Rui's point about FLOAT/DOUBLE columns is interesting as well. We
>>>>>> couldn't support distinct aggregations on floating point columns with the
>>>>>> two-shuffle approach, but we could with the CombineFn approach. I'm not
>>>>>> sure if that's a good thing or not, it seems like an anti-pattern to do a
>>>>>> distinct aggregation on floating point numbers but I suppose the spec
>>>>>> allows it.
>>>>>>
>>>>>
>>>>> I can't find the Jira, but grouping on doubles has been discussed at
>>>>> some length before. Many DBMSs do not provide this, so it is not generally
>>>>> expected by SQL users. That is good, because mathematically it is
>>>>> questionable - floating point is usually used as a stand-in for real
>>>>> numbers, where computing equality is not generally possible. So any code
>>>>> that actually depends on equality of floating points is likely susceptible
>>>>> to rounding errors, other quirks of floating point, and also is probably
>>>>> misguided because the underlying thing that floats are approximating
>>>>> already cannot be checked for equality.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>> Brian
>>>>>>
>>>>>>
>>>>>> On Fri, May 3, 2019 at 10:52 AM Rui Wang <ruw...@google.com> wrote:
>>>>>>
>>>>>>> To clarify what I said "So two shuffle approach will lead to two
>>>>>>> different implementation for tables with and without FLOAT/DOUBLE 
>>>>>>> column.":
>>>>>>>
>>>>>>> Basically I wanted to say that two shuffles approach will be an
>>>>>>> implementation for some cases, and it will co-exist with CombineFn
>>>>>>> approach. In the feature, when we start cost based optimization in
>>>>>>> BeamSQL,  CBO is supposed to compare different plans.
>>>>>>>
>>>>>>> -Rui
>>>>>>>
>>>>>>> On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>> As to the distinct aggregations: At the least, these queries
>>>>>>>>> should be rejected, not evaluated incorrectly.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Yes. The least is not to support it, and throws clear message to
>>>>>>>> say no. (current implementation ignores DISTINCT and executes all
>>>>>>>> aggregations as ALL).
>>>>>>>>
>>>>>>>>
>>>>>>>>> The term "stateful CombineFn" is not one I would use, as the
>>>>>>>>> nature of state is linearity and the nature of CombineFn is 
>>>>>>>>> parallelism. So
>>>>>>>>> I don't totally understand this proposal. If I replace stateful 
>>>>>>>>> CombineFn
>>>>>>>>> with stateful DoFn with one combining state per column, then I think I
>>>>>>>>> understand. FWIW on a runner with scalable SetState or MapState it 
>>>>>>>>> will not
>>>>>>>>> be any risk at all.
>>>>>>>>>
>>>>>>>>> I see. "Stateful" is indeed misleading. In this thread, it was all
>>>>>>>> about using simple CombineFn to achieve DISTINCT aggregation with 
>>>>>>>> massive
>>>>>>>> parallelism.
>>>>>>>>
>>>>>>>> But if you go the two shuffle route, you don't have to separate the
>>>>>>>>> aggregations and re-join them. You just have to incur the cost of the 
>>>>>>>>> GBK +
>>>>>>>>> DISTINCT for all columns, and just drop the secondary key for the 
>>>>>>>>> second
>>>>>>>>> shuffle, no?
>>>>>>>>>
>>>>>>>>> Two shuffle approach cannot be the unified approach because it
>>>>>>>> requires to build a key of group_by_key + table_row to deduplicate, but
>>>>>>>> table_row might contain floating point numbers, which cannot be used 
>>>>>>>> as key
>>>>>>>> in GBK. So two shuffle approach will lead to two different 
>>>>>>>> implementation
>>>>>>>> for tables with and without FLOAT/DOUBLE column.
>>>>>>>>
>>>>>>>> CombineFn is the unified approach for distinct and non distinct
>>>>>>>> aggregation: each aggregation call will be a CombineFn.
>>>>>>>>
>>>>>>>>
>>>>>>>> -Rui
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Brian's first proposal is challenging also partially because in
>>>>>>>>>>> BeamSQL there is no good practice to deal with complex SQL plans. 
>>>>>>>>>>> Ideally
>>>>>>>>>>> we need enough rules and SQL plan node in Beam to construct
>>>>>>>>>>> easy-to-transform plans for different cases. I had a similar 
>>>>>>>>>>> situation
>>>>>>>>>>> before when I needed to separate logical plans of  "JOIN ON a OR 
>>>>>>>>>>> b"and
>>>>>>>>>>> "JOIN ON a AND b", which was because their implementation are so
>>>>>>>>>>> different to fit into the same JoinRelNode. It seems in similar 
>>>>>>>>>>> situation
>>>>>>>>>>> when both distinct aggregations and non-distinct aggregations are 
>>>>>>>>>>> mixed,
>>>>>>>>>>> one single AggregationRelNode is hard to encapsulate complex 
>>>>>>>>>>> logical.
>>>>>>>>>>>
>>>>>>>>>>> We will need a detailed plan to re-think about RelNodes and
>>>>>>>>>>> Rules in BeamSQL, which is out of scope for supporting DISTINCT.
>>>>>>>>>>>
>>>>>>>>>>> I would favor of second proposal because BeamSQL uses Beam
>>>>>>>>>>> schema and row. Schema itself uses Java primitives for most of its 
>>>>>>>>>>> types
>>>>>>>>>>> (int, long, float, etc.). It limits the size of each element. 
>>>>>>>>>>> Considering
>>>>>>>>>>> per key and per window combine, there is a good chance that stateful
>>>>>>>>>>> combine works for some (if not most) cases.
>>>>>>>>>>>
>>>>>>>>>>> Could we use @Experimental to tag stateful combine for
>>>>>>>>>>> supporting DISTINCT in aggregation so that we could have a chance 
>>>>>>>>>>> to test
>>>>>>>>>>> it by users?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -Rui
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <
>>>>>>>>>>> bhule...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Ahmet -
>>>>>>>>>>>> I think it would only require observing each key's partition of
>>>>>>>>>>>> the input independently, and the size of the state would only be
>>>>>>>>>>>> proportional to the number of distinct elements, not the entire 
>>>>>>>>>>>> input. Note
>>>>>>>>>>>> the pipeline would be a GBK with a key based on the GROUP BY, 
>>>>>>>>>>>> followed by a
>>>>>>>>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> Got it. Distinct elements could be proportional to the entire
>>>>>>>>>> input, however if this is a reasonable requirement from a product
>>>>>>>>>> perspective that is fine. Rui's suggestion of using experimental tag 
>>>>>>>>>> is
>>>>>>>>>> also a good idea. I supposed that will give us the ability to change 
>>>>>>>>>> the
>>>>>>>>>> implementation if it becomes necessary.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Luke -
>>>>>>>>>>>> Here's A little background on why I think (1) is harder (It may
>>>>>>>>>>>> also just be that it looks daunting to me as someone who's not that
>>>>>>>>>>>> familiar with the code).
>>>>>>>>>>>>
>>>>>>>>>>>> An aggregation node can have multiple aggregations. So, for
>>>>>>>>>>>> example, the query `SELECT k, SUM(x), COUNT(DISTINCT y), 
>>>>>>>>>>>> AVG(DISTINCT z)
>>>>>>>>>>>> FROM ...` would yield a logical plan that has a single aggregation 
>>>>>>>>>>>> node
>>>>>>>>>>>> with three different aggregations. We then take that node and 
>>>>>>>>>>>> build up a
>>>>>>>>>>>> CombineFn that is a composite of all of the aggregations we need 
>>>>>>>>>>>> to make a
>>>>>>>>>>>> combining PTransform [1]. To implement (1) we would need to 
>>>>>>>>>>>> distinguish
>>>>>>>>>>>> between all the DISTINCT and non-DISTINCT aggregations, and come 
>>>>>>>>>>>> up with a
>>>>>>>>>>>> way to unify the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT
>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>
>>>>>>>>>>>> That's certainly not unsolvable, but approach (2) is much
>>>>>>>>>>>> simpler - it just requires implementing some variations on the 
>>>>>>>>>>>> CombineFn's
>>>>>>>>>>>> that already exist [2] and re-using the existing logic for 
>>>>>>>>>>>> converting an
>>>>>>>>>>>> aggregation node to a combining PTransform.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Hopefully that makes sense, let me know if I need to clarify
>>>>>>>>>>>> further :)
>>>>>>>>>>>> Brian
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178
>>>>>>>>>>>> [2]
>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Can you also go into more detail why you think 1) is more
>>>>>>>>>>>>> challenging to implement?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> From my limited understanding, would not the stateful
>>>>>>>>>>>>>> combinefn option require observing the whole input before being 
>>>>>>>>>>>>>> able
>>>>>>>>>>>>>> combine and the risk of blowing memory is actually very high 
>>>>>>>>>>>>>> except for
>>>>>>>>>>>>>> trivial inputs?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <
>>>>>>>>>>>>>> bhule...@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations.
>>>>>>>>>>>>>>> These are queries like:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>>>>>>>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> These are represented in Calcite's logical plan with a
>>>>>>>>>>>>>>> distinct flag on aggregation calls, but we ignore the flag when 
>>>>>>>>>>>>>>> converting
>>>>>>>>>>>>>>> to a pipeline. I see two different ways that we could support 
>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on
>>>>>>>>>>>>>>> the <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr 
>>>>>>>>>>>>>>> followed by
>>>>>>>>>>>>>>> a second GBK on just <GROUP BY key> to perform the aggregation.
>>>>>>>>>>>>>>> 2. Stateful CombineFn - We could implement a version of the
>>>>>>>>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state 
>>>>>>>>>>>>>>> that tracks
>>>>>>>>>>>>>>> previously seen elements and uses it de-deupe new elements.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Of course, something like (1) is much more scalable, but it
>>>>>>>>>>>>>>> is also much more challenging to implement. While (2) is 
>>>>>>>>>>>>>>> trivial to
>>>>>>>>>>>>>>> implement, but runs the risk of blowing up a worker's memory 
>>>>>>>>>>>>>>> usage.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Personally, I think it could be worthwhile to provide
>>>>>>>>>>>>>>> support for DISTINCT quickly with approach (2), and implement 
>>>>>>>>>>>>>>> (1) as an
>>>>>>>>>>>>>>> optimization later. This combiner's state would be partitioned 
>>>>>>>>>>>>>>> by key and
>>>>>>>>>>>>>>> by window, so I think we would be pretty safe from OOM'ing a 
>>>>>>>>>>>>>>> worker except
>>>>>>>>>>>>>>> in some extreme cases (long windows, hot keys, very large batch 
>>>>>>>>>>>>>>> pipelines,
>>>>>>>>>>>>>>> ...).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> But I understand this could be controversial, so I wanted to
>>>>>>>>>>>>>>> open it up for discussion first: Would it be worthwhile to 
>>>>>>>>>>>>>>> provide support
>>>>>>>>>>>>>>> for DISTINCT aggregations quickly with approach #2, or is it 
>>>>>>>>>>>>>>> better to just
>>>>>>>>>>>>>>> reject these queries until we can implement them safely with 
>>>>>>>>>>>>>>> approach #1?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Brian
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>
> --
> ----
> Mingmin
>

Reply via email to