A compromise solution would be using SELECT DISTINCT or GROUP BY to
duplicate before apply aggregations. It's two shuffles and works on non
floating point columns. The good thing is no code change is needed, but
downsides are users need to write more complicated query and floating point
data is not supported.


-Rui

On Fri, May 3, 2019 at 1:23 PM Rui Wang <ruw...@google.com> wrote:

> Fair point. It lacks of proper benchmarks for BeamSQL to test performance
> and scalability of implementations.
>
>
> -Rui
>
> On Fri, May 3, 2019 at 12:56 PM Reuven Lax <re...@google.com> wrote:
>
>> Back to the original point: I'm very skeptical of adding something that
>> does not scale at all. In our experience, users get far more upset with an
>> advertised feature that doesn't work for them (e.g. their workers OOM) than
>> with a missing feature.
>>
>> Reuven
>>
>> On Fri, May 3, 2019 at 12:41 PM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> All good points. My version of the two shuffle approach does not work at
>>> all.
>>>
>>> On Fri, May 3, 2019 at 11:38 AM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Rui's point about FLOAT/DOUBLE columns is interesting as well. We
>>>> couldn't support distinct aggregations on floating point columns with the
>>>> two-shuffle approach, but we could with the CombineFn approach. I'm not
>>>> sure if that's a good thing or not, it seems like an anti-pattern to do a
>>>> distinct aggregation on floating point numbers but I suppose the spec
>>>> allows it.
>>>>
>>>
>>> I can't find the Jira, but grouping on doubles has been discussed at
>>> some length before. Many DBMSs do not provide this, so it is not generally
>>> expected by SQL users. That is good, because mathematically it is
>>> questionable - floating point is usually used as a stand-in for real
>>> numbers, where computing equality is not generally possible. So any code
>>> that actually depends on equality of floating points is likely susceptible
>>> to rounding errors, other quirks of floating point, and also is probably
>>> misguided because the underlying thing that floats are approximating
>>> already cannot be checked for equality.
>>>
>>> Kenn
>>>
>>>
>>>>
>>>> Brian
>>>>
>>>>
>>>> On Fri, May 3, 2019 at 10:52 AM Rui Wang <ruw...@google.com> wrote:
>>>>
>>>>> To clarify what I said "So two shuffle approach will lead to two
>>>>> different implementation for tables with and without FLOAT/DOUBLE 
>>>>> column.":
>>>>>
>>>>> Basically I wanted to say that two shuffles approach will be an
>>>>> implementation for some cases, and it will co-exist with CombineFn
>>>>> approach. In the feature, when we start cost based optimization in
>>>>> BeamSQL,  CBO is supposed to compare different plans.
>>>>>
>>>>> -Rui
>>>>>
>>>>> On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote:
>>>>>
>>>>>>
>>>>>>> As to the distinct aggregations: At the least, these queries should
>>>>>>> be rejected, not evaluated incorrectly.
>>>>>>>
>>>>>>
>>>>>> Yes. The least is not to support it, and throws clear message to say
>>>>>> no. (current implementation ignores DISTINCT and executes all 
>>>>>> aggregations
>>>>>> as ALL).
>>>>>>
>>>>>>
>>>>>>> The term "stateful CombineFn" is not one I would use, as the nature
>>>>>>> of state is linearity and the nature of CombineFn is parallelism. So I
>>>>>>> don't totally understand this proposal. If I replace stateful CombineFn
>>>>>>> with stateful DoFn with one combining state per column, then I think I
>>>>>>> understand. FWIW on a runner with scalable SetState or MapState it will 
>>>>>>> not
>>>>>>> be any risk at all.
>>>>>>>
>>>>>>> I see. "Stateful" is indeed misleading. In this thread, it was all
>>>>>> about using simple CombineFn to achieve DISTINCT aggregation with massive
>>>>>> parallelism.
>>>>>>
>>>>>> But if you go the two shuffle route, you don't have to separate the
>>>>>>> aggregations and re-join them. You just have to incur the cost of the 
>>>>>>> GBK +
>>>>>>> DISTINCT for all columns, and just drop the secondary key for the second
>>>>>>> shuffle, no?
>>>>>>>
>>>>>>> Two shuffle approach cannot be the unified approach because it
>>>>>> requires to build a key of group_by_key + table_row to deduplicate, but
>>>>>> table_row might contain floating point numbers, which cannot be used as 
>>>>>> key
>>>>>> in GBK. So two shuffle approach will lead to two different implementation
>>>>>> for tables with and without FLOAT/DOUBLE column.
>>>>>>
>>>>>> CombineFn is the unified approach for distinct and non distinct
>>>>>> aggregation: each aggregation call will be a CombineFn.
>>>>>>
>>>>>>
>>>>>> -Rui
>>>>>>
>>>>>>
>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Brian's first proposal is challenging also partially because in
>>>>>>>>> BeamSQL there is no good practice to deal with complex SQL plans. 
>>>>>>>>> Ideally
>>>>>>>>> we need enough rules and SQL plan node in Beam to construct
>>>>>>>>> easy-to-transform plans for different cases. I had a similar situation
>>>>>>>>> before when I needed to separate logical plans of  "JOIN ON a OR b"and
>>>>>>>>> "JOIN ON a AND b", which was because their implementation are so
>>>>>>>>> different to fit into the same JoinRelNode. It seems in similar 
>>>>>>>>> situation
>>>>>>>>> when both distinct aggregations and non-distinct aggregations are 
>>>>>>>>> mixed,
>>>>>>>>> one single AggregationRelNode is hard to encapsulate complex logical.
>>>>>>>>>
>>>>>>>>> We will need a detailed plan to re-think about RelNodes and Rules
>>>>>>>>> in BeamSQL, which is out of scope for supporting DISTINCT.
>>>>>>>>>
>>>>>>>>> I would favor of second proposal because BeamSQL uses Beam schema
>>>>>>>>> and row. Schema itself uses Java primitives for most of its types 
>>>>>>>>> (int,
>>>>>>>>> long, float, etc.). It limits the size of each element. Considering 
>>>>>>>>> per key
>>>>>>>>> and per window combine, there is a good chance that stateful combine 
>>>>>>>>> works
>>>>>>>>> for some (if not most) cases.
>>>>>>>>>
>>>>>>>>> Could we use @Experimental to tag stateful combine for supporting
>>>>>>>>> DISTINCT in aggregation so that we could have a chance to test it by 
>>>>>>>>> users?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Rui
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ahmet -
>>>>>>>>>> I think it would only require observing each key's partition of
>>>>>>>>>> the input independently, and the size of the state would only be
>>>>>>>>>> proportional to the number of distinct elements, not the entire 
>>>>>>>>>> input. Note
>>>>>>>>>> the pipeline would be a GBK with a key based on the GROUP BY, 
>>>>>>>>>> followed by a
>>>>>>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>> Got it. Distinct elements could be proportional to the entire
>>>>>>>> input, however if this is a reasonable requirement from a product
>>>>>>>> perspective that is fine. Rui's suggestion of using experimental tag is
>>>>>>>> also a good idea. I supposed that will give us the ability to change 
>>>>>>>> the
>>>>>>>> implementation if it becomes necessary.
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Luke -
>>>>>>>>>> Here's A little background on why I think (1) is harder (It may
>>>>>>>>>> also just be that it looks daunting to me as someone who's not that
>>>>>>>>>> familiar with the code).
>>>>>>>>>>
>>>>>>>>>> An aggregation node can have multiple aggregations. So, for
>>>>>>>>>> example, the query `SELECT k, SUM(x), COUNT(DISTINCT y), 
>>>>>>>>>> AVG(DISTINCT z)
>>>>>>>>>> FROM ...` would yield a logical plan that has a single aggregation 
>>>>>>>>>> node
>>>>>>>>>> with three different aggregations. We then take that node and build 
>>>>>>>>>> up a
>>>>>>>>>> CombineFn that is a composite of all of the aggregations we need to 
>>>>>>>>>> make a
>>>>>>>>>> combining PTransform [1]. To implement (1) we would need to 
>>>>>>>>>> distinguish
>>>>>>>>>> between all the DISTINCT and non-DISTINCT aggregations, and come up 
>>>>>>>>>> with a
>>>>>>>>>> way to unify the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT
>>>>>>>>>> pipeline.
>>>>>>>>>>
>>>>>>>>>> That's certainly not unsolvable, but approach (2) is much simpler
>>>>>>>>>> - it just requires implementing some variations on the CombineFn's 
>>>>>>>>>> that
>>>>>>>>>> already exist [2] and re-using the existing logic for converting an
>>>>>>>>>> aggregation node to a combining PTransform.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Hopefully that makes sense, let me know if I need to clarify
>>>>>>>>>> further :)
>>>>>>>>>> Brian
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178
>>>>>>>>>> [2]
>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Can you also go into more detail why you think 1) is more
>>>>>>>>>>> challenging to implement?
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> From my limited understanding, would not the stateful combinefn
>>>>>>>>>>>> option require observing the whole input before being able combine 
>>>>>>>>>>>> and the
>>>>>>>>>>>> risk of blowing memory is actually very high except for trivial 
>>>>>>>>>>>> inputs?
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <
>>>>>>>>>>>> bhule...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations.
>>>>>>>>>>>>> These are queries like:
>>>>>>>>>>>>>
>>>>>>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>>>>>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>>>>>>>>>>>
>>>>>>>>>>>>> These are represented in Calcite's logical plan with a
>>>>>>>>>>>>> distinct flag on aggregation calls, but we ignore the flag when 
>>>>>>>>>>>>> converting
>>>>>>>>>>>>> to a pipeline. I see two different ways that we could support 
>>>>>>>>>>>>> this:
>>>>>>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on
>>>>>>>>>>>>> the <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr 
>>>>>>>>>>>>> followed by
>>>>>>>>>>>>> a second GBK on just <GROUP BY key> to perform the aggregation.
>>>>>>>>>>>>> 2. Stateful CombineFn - We could implement a version of the
>>>>>>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state 
>>>>>>>>>>>>> that tracks
>>>>>>>>>>>>> previously seen elements and uses it de-deupe new elements.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Of course, something like (1) is much more scalable, but it is
>>>>>>>>>>>>> also much more challenging to implement. While (2) is trivial to 
>>>>>>>>>>>>> implement,
>>>>>>>>>>>>> but runs the risk of blowing up a worker's memory usage.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Personally, I think it could be worthwhile to provide support
>>>>>>>>>>>>> for DISTINCT quickly with approach (2), and implement (1) as an
>>>>>>>>>>>>> optimization later. This combiner's state would be partitioned by 
>>>>>>>>>>>>> key and
>>>>>>>>>>>>> by window, so I think we would be pretty safe from OOM'ing a 
>>>>>>>>>>>>> worker except
>>>>>>>>>>>>> in some extreme cases (long windows, hot keys, very large batch 
>>>>>>>>>>>>> pipelines,
>>>>>>>>>>>>> ...).
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> But I understand this could be controversial, so I wanted to
>>>>>>>>>>>>> open it up for discussion first: Would it be worthwhile to 
>>>>>>>>>>>>> provide support
>>>>>>>>>>>>> for DISTINCT aggregations quickly with approach #2, or is it 
>>>>>>>>>>>>> better to just
>>>>>>>>>>>>> reject these queries until we can implement them safely with 
>>>>>>>>>>>>> approach #1?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Brian
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to