Fair point. It lacks of proper benchmarks for BeamSQL to test performance
and scalability of implementations.


-Rui

On Fri, May 3, 2019 at 12:56 PM Reuven Lax <re...@google.com> wrote:

> Back to the original point: I'm very skeptical of adding something that
> does not scale at all. In our experience, users get far more upset with an
> advertised feature that doesn't work for them (e.g. their workers OOM) than
> with a missing feature.
>
> Reuven
>
> On Fri, May 3, 2019 at 12:41 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> All good points. My version of the two shuffle approach does not work at
>> all.
>>
>> On Fri, May 3, 2019 at 11:38 AM Brian Hulette <bhule...@google.com>
>> wrote:
>>
>>> Rui's point about FLOAT/DOUBLE columns is interesting as well. We
>>> couldn't support distinct aggregations on floating point columns with the
>>> two-shuffle approach, but we could with the CombineFn approach. I'm not
>>> sure if that's a good thing or not, it seems like an anti-pattern to do a
>>> distinct aggregation on floating point numbers but I suppose the spec
>>> allows it.
>>>
>>
>> I can't find the Jira, but grouping on doubles has been discussed at some
>> length before. Many DBMSs do not provide this, so it is not generally
>> expected by SQL users. That is good, because mathematically it is
>> questionable - floating point is usually used as a stand-in for real
>> numbers, where computing equality is not generally possible. So any code
>> that actually depends on equality of floating points is likely susceptible
>> to rounding errors, other quirks of floating point, and also is probably
>> misguided because the underlying thing that floats are approximating
>> already cannot be checked for equality.
>>
>> Kenn
>>
>>
>>>
>>> Brian
>>>
>>>
>>> On Fri, May 3, 2019 at 10:52 AM Rui Wang <ruw...@google.com> wrote:
>>>
>>>> To clarify what I said "So two shuffle approach will lead to two
>>>> different implementation for tables with and without FLOAT/DOUBLE column.":
>>>>
>>>> Basically I wanted to say that two shuffles approach will be an
>>>> implementation for some cases, and it will co-exist with CombineFn
>>>> approach. In the feature, when we start cost based optimization in
>>>> BeamSQL,  CBO is supposed to compare different plans.
>>>>
>>>> -Rui
>>>>
>>>> On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote:
>>>>
>>>>>
>>>>>> As to the distinct aggregations: At the least, these queries should
>>>>>> be rejected, not evaluated incorrectly.
>>>>>>
>>>>>
>>>>> Yes. The least is not to support it, and throws clear message to say
>>>>> no. (current implementation ignores DISTINCT and executes all aggregations
>>>>> as ALL).
>>>>>
>>>>>
>>>>>> The term "stateful CombineFn" is not one I would use, as the nature
>>>>>> of state is linearity and the nature of CombineFn is parallelism. So I
>>>>>> don't totally understand this proposal. If I replace stateful CombineFn
>>>>>> with stateful DoFn with one combining state per column, then I think I
>>>>>> understand. FWIW on a runner with scalable SetState or MapState it will 
>>>>>> not
>>>>>> be any risk at all.
>>>>>>
>>>>>> I see. "Stateful" is indeed misleading. In this thread, it was all
>>>>> about using simple CombineFn to achieve DISTINCT aggregation with massive
>>>>> parallelism.
>>>>>
>>>>> But if you go the two shuffle route, you don't have to separate the
>>>>>> aggregations and re-join them. You just have to incur the cost of the 
>>>>>> GBK +
>>>>>> DISTINCT for all columns, and just drop the secondary key for the second
>>>>>> shuffle, no?
>>>>>>
>>>>>> Two shuffle approach cannot be the unified approach because it
>>>>> requires to build a key of group_by_key + table_row to deduplicate, but
>>>>> table_row might contain floating point numbers, which cannot be used as 
>>>>> key
>>>>> in GBK. So two shuffle approach will lead to two different implementation
>>>>> for tables with and without FLOAT/DOUBLE column.
>>>>>
>>>>> CombineFn is the unified approach for distinct and non distinct
>>>>> aggregation: each aggregation call will be a CombineFn.
>>>>>
>>>>>
>>>>> -Rui
>>>>>
>>>>>
>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote:
>>>>>>>
>>>>>>>> Brian's first proposal is challenging also partially because in
>>>>>>>> BeamSQL there is no good practice to deal with complex SQL plans. 
>>>>>>>> Ideally
>>>>>>>> we need enough rules and SQL plan node in Beam to construct
>>>>>>>> easy-to-transform plans for different cases. I had a similar situation
>>>>>>>> before when I needed to separate logical plans of  "JOIN ON a OR b"and
>>>>>>>> "JOIN ON a AND b", which was because their implementation are so
>>>>>>>> different to fit into the same JoinRelNode. It seems in similar 
>>>>>>>> situation
>>>>>>>> when both distinct aggregations and non-distinct aggregations are 
>>>>>>>> mixed,
>>>>>>>> one single AggregationRelNode is hard to encapsulate complex logical.
>>>>>>>>
>>>>>>>> We will need a detailed plan to re-think about RelNodes and Rules
>>>>>>>> in BeamSQL, which is out of scope for supporting DISTINCT.
>>>>>>>>
>>>>>>>> I would favor of second proposal because BeamSQL uses Beam schema
>>>>>>>> and row. Schema itself uses Java primitives for most of its types (int,
>>>>>>>> long, float, etc.). It limits the size of each element. Considering 
>>>>>>>> per key
>>>>>>>> and per window combine, there is a good chance that stateful combine 
>>>>>>>> works
>>>>>>>> for some (if not most) cases.
>>>>>>>>
>>>>>>>> Could we use @Experimental to tag stateful combine for supporting
>>>>>>>> DISTINCT in aggregation so that we could have a chance to test it by 
>>>>>>>> users?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Rui
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ahmet -
>>>>>>>>> I think it would only require observing each key's partition of
>>>>>>>>> the input independently, and the size of the state would only be
>>>>>>>>> proportional to the number of distinct elements, not the entire 
>>>>>>>>> input. Note
>>>>>>>>> the pipeline would be a GBK with a key based on the GROUP BY, 
>>>>>>>>> followed by a
>>>>>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn.
>>>>>>>>>
>>>>>>>>
>>>>>>> Got it. Distinct elements could be proportional to the entire input,
>>>>>>> however if this is a reasonable requirement from a product perspective 
>>>>>>> that
>>>>>>> is fine. Rui's suggestion of using experimental tag is also a good 
>>>>>>> idea. I
>>>>>>> supposed that will give us the ability to change the implementation if 
>>>>>>> it
>>>>>>> becomes necessary.
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>> Luke -
>>>>>>>>> Here's A little background on why I think (1) is harder (It may
>>>>>>>>> also just be that it looks daunting to me as someone who's not that
>>>>>>>>> familiar with the code).
>>>>>>>>>
>>>>>>>>> An aggregation node can have multiple aggregations. So, for
>>>>>>>>> example, the query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT 
>>>>>>>>> z)
>>>>>>>>> FROM ...` would yield a logical plan that has a single aggregation 
>>>>>>>>> node
>>>>>>>>> with three different aggregations. We then take that node and build 
>>>>>>>>> up a
>>>>>>>>> CombineFn that is a composite of all of the aggregations we need to 
>>>>>>>>> make a
>>>>>>>>> combining PTransform [1]. To implement (1) we would need to 
>>>>>>>>> distinguish
>>>>>>>>> between all the DISTINCT and non-DISTINCT aggregations, and come up 
>>>>>>>>> with a
>>>>>>>>> way to unify the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT
>>>>>>>>> pipeline.
>>>>>>>>>
>>>>>>>>> That's certainly not unsolvable, but approach (2) is much simpler
>>>>>>>>> - it just requires implementing some variations on the CombineFn's 
>>>>>>>>> that
>>>>>>>>> already exist [2] and re-using the existing logic for converting an
>>>>>>>>> aggregation node to a combining PTransform.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hopefully that makes sense, let me know if I need to clarify
>>>>>>>>> further :)
>>>>>>>>> Brian
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178
>>>>>>>>> [2]
>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Can you also go into more detail why you think 1) is more
>>>>>>>>>> challenging to implement?
>>>>>>>>>>
>>>>>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> From my limited understanding, would not the stateful combinefn
>>>>>>>>>>> option require observing the whole input before being able combine 
>>>>>>>>>>> and the
>>>>>>>>>>> risk of blowing memory is actually very high except for trivial 
>>>>>>>>>>> inputs?
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <
>>>>>>>>>>> bhule...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi everyone,
>>>>>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations. These
>>>>>>>>>>>> are queries like:
>>>>>>>>>>>>
>>>>>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>>>>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>>>>>>>>>>
>>>>>>>>>>>> These are represented in Calcite's logical plan with a distinct
>>>>>>>>>>>> flag on aggregation calls, but we ignore the flag when converting 
>>>>>>>>>>>> to a
>>>>>>>>>>>> pipeline. I see two different ways that we could support this:
>>>>>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the
>>>>>>>>>>>> <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr 
>>>>>>>>>>>> followed by a
>>>>>>>>>>>> second GBK on just <GROUP BY key> to perform the aggregation.
>>>>>>>>>>>> 2. Stateful CombineFn - We could implement a version of the
>>>>>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state 
>>>>>>>>>>>> that tracks
>>>>>>>>>>>> previously seen elements and uses it de-deupe new elements.
>>>>>>>>>>>>
>>>>>>>>>>>> Of course, something like (1) is much more scalable, but it is
>>>>>>>>>>>> also much more challenging to implement. While (2) is trivial to 
>>>>>>>>>>>> implement,
>>>>>>>>>>>> but runs the risk of blowing up a worker's memory usage.
>>>>>>>>>>>>
>>>>>>>>>>>> Personally, I think it could be worthwhile to provide support
>>>>>>>>>>>> for DISTINCT quickly with approach (2), and implement (1) as an
>>>>>>>>>>>> optimization later. This combiner's state would be partitioned by 
>>>>>>>>>>>> key and
>>>>>>>>>>>> by window, so I think we would be pretty safe from OOM'ing a 
>>>>>>>>>>>> worker except
>>>>>>>>>>>> in some extreme cases (long windows, hot keys, very large batch 
>>>>>>>>>>>> pipelines,
>>>>>>>>>>>> ...).
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> But I understand this could be controversial, so I wanted to
>>>>>>>>>>>> open it up for discussion first: Would it be worthwhile to provide 
>>>>>>>>>>>> support
>>>>>>>>>>>> for DISTINCT aggregations quickly with approach #2, or is it 
>>>>>>>>>>>> better to just
>>>>>>>>>>>> reject these queries until we can implement them safely with 
>>>>>>>>>>>> approach #1?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Brian
>>>>>>>>>>>>
>>>>>>>>>>>> [1]
>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>>>>>>
>>>>>>>>>>>

Reply via email to