To clarify what I said "So two shuffle approach will lead to two different
implementation for tables with and without FLOAT/DOUBLE column.":

Basically I wanted to say that two shuffles approach will be an
implementation for some cases, and it will co-exist with CombineFn
approach. In the feature, when we start cost based optimization in
BeamSQL,  CBO is supposed to compare different plans.

-Rui

On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote:

>
>> As to the distinct aggregations: At the least, these queries should be
>> rejected, not evaluated incorrectly.
>>
>
> Yes. The least is not to support it, and throws clear message to say no.
> (current implementation ignores DISTINCT and executes all aggregations as
> ALL).
>
>
>> The term "stateful CombineFn" is not one I would use, as the nature of
>> state is linearity and the nature of CombineFn is parallelism. So I don't
>> totally understand this proposal. If I replace stateful CombineFn with
>> stateful DoFn with one combining state per column, then I think I
>> understand. FWIW on a runner with scalable SetState or MapState it will not
>> be any risk at all.
>>
>> I see. "Stateful" is indeed misleading. In this thread, it was all about
> using simple CombineFn to achieve DISTINCT aggregation with massive
> parallelism.
>
> But if you go the two shuffle route, you don't have to separate the
>> aggregations and re-join them. You just have to incur the cost of the GBK +
>> DISTINCT for all columns, and just drop the secondary key for the second
>> shuffle, no?
>>
>> Two shuffle approach cannot be the unified approach because it requires
> to build a key of group_by_key + table_row to deduplicate, but table_row
> might contain floating point numbers, which cannot be used as key in GBK.
> So two shuffle approach will lead to two different implementation for
> tables with and without FLOAT/DOUBLE column.
>
> CombineFn is the unified approach for distinct and non distinct
> aggregation: each aggregation call will be a CombineFn.
>
>
> -Rui
>
>
>
>> Kenn
>>
>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> wrote:
>>
>>>
>>>
>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote:
>>>
>>>> Brian's first proposal is challenging also partially because in BeamSQL
>>>> there is no good practice to deal with complex SQL plans. Ideally we need
>>>> enough rules and SQL plan node in Beam to construct easy-to-transform plans
>>>> for different cases. I had a similar situation before when I needed to
>>>> separate logical plans of  "JOIN ON a OR b"and "JOIN ON a AND b", which was
>>>> because their implementation are so different to fit into the same
>>>> JoinRelNode. It seems in similar situation when both distinct aggregations
>>>> and non-distinct aggregations are mixed, one single AggregationRelNode is
>>>> hard to encapsulate complex logical.
>>>>
>>>> We will need a detailed plan to re-think about RelNodes and Rules in
>>>> BeamSQL, which is out of scope for supporting DISTINCT.
>>>>
>>>> I would favor of second proposal because BeamSQL uses Beam schema and
>>>> row. Schema itself uses Java primitives for most of its types (int, long,
>>>> float, etc.). It limits the size of each element. Considering per key and
>>>> per window combine, there is a good chance that stateful combine works for
>>>> some (if not most) cases.
>>>>
>>>> Could we use @Experimental to tag stateful combine for supporting
>>>> DISTINCT in aggregation so that we could have a chance to test it by users?
>>>>
>>>>
>>>> -Rui
>>>>
>>>>
>>>>
>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com>
>>>> wrote:
>>>>
>>>>> Ahmet -
>>>>> I think it would only require observing each key's partition of the
>>>>> input independently, and the size of the state would only be proportional
>>>>> to the number of distinct elements, not the entire input. Note the 
>>>>> pipeline
>>>>> would be a GBK with a key based on the GROUP BY, followed by a
>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn.
>>>>>
>>>>
>>> Got it. Distinct elements could be proportional to the entire input,
>>> however if this is a reasonable requirement from a product perspective that
>>> is fine. Rui's suggestion of using experimental tag is also a good idea. I
>>> supposed that will give us the ability to change the implementation if it
>>> becomes necessary.
>>>
>>>
>>>>
>>>>> Luke -
>>>>> Here's A little background on why I think (1) is harder (It may also
>>>>> just be that it looks daunting to me as someone who's not that familiar
>>>>> with the code).
>>>>>
>>>>> An aggregation node can have multiple aggregations. So, for example,
>>>>> the query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT z) FROM ...`
>>>>> would yield a logical plan that has a single aggregation node with three
>>>>> different aggregations. We then take that node and build up a CombineFn
>>>>> that is a composite of all of the aggregations we need to make a combining
>>>>> PTransform [1]. To implement (1) we would need to distinguish between all
>>>>> the DISTINCT and non-DISTINCT aggregations, and come up with a way to 
>>>>> unify
>>>>> the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT pipeline.
>>>>>
>>>>> That's certainly not unsolvable, but approach (2) is much simpler - it
>>>>> just requires implementing some variations on the CombineFn's that already
>>>>> exist [2] and re-using the existing logic for converting an aggregation
>>>>> node to a combining PTransform.
>>>>>
>>>>>
>>>>> Hopefully that makes sense, let me know if I need to clarify further :)
>>>>> Brian
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178
>>>>> [2]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>
>>>>>
>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>>
>>>>>> Can you also go into more detail why you think 1) is more challenging
>>>>>> to implement?
>>>>>>
>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> wrote:
>>>>>>
>>>>>>> From my limited understanding, would not the stateful combinefn
>>>>>>> option require observing the whole input before being able combine and 
>>>>>>> the
>>>>>>> risk of blowing memory is actually very high except for trivial inputs?
>>>>>>>
>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi everyone,
>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations. These are
>>>>>>>> queries like:
>>>>>>>>
>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>>>>>>
>>>>>>>> These are represented in Calcite's logical plan with a distinct
>>>>>>>> flag on aggregation calls, but we ignore the flag when converting to a
>>>>>>>> pipeline. I see two different ways that we could support this:
>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the
>>>>>>>> <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr followed by 
>>>>>>>> a
>>>>>>>> second GBK on just <GROUP BY key> to perform the aggregation.
>>>>>>>> 2. Stateful CombineFn - We could implement a version of the
>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state that 
>>>>>>>> tracks
>>>>>>>> previously seen elements and uses it de-deupe new elements.
>>>>>>>>
>>>>>>>> Of course, something like (1) is much more scalable, but it is also
>>>>>>>> much more challenging to implement. While (2) is trivial to implement, 
>>>>>>>> but
>>>>>>>> runs the risk of blowing up a worker's memory usage.
>>>>>>>>
>>>>>>>> Personally, I think it could be worthwhile to provide support for
>>>>>>>> DISTINCT quickly with approach (2), and implement (1) as an 
>>>>>>>> optimization
>>>>>>>> later. This combiner's state would be partitioned by key and by 
>>>>>>>> window, so
>>>>>>>> I think we would be pretty safe from OOM'ing a worker except in some
>>>>>>>> extreme cases (long windows, hot keys, very large batch pipelines, 
>>>>>>>> ...).
>>>>>>>>
>>>>>>>>
>>>>>>>> But I understand this could be controversial, so I wanted to open
>>>>>>>> it up for discussion first: Would it be worthwhile to provide support 
>>>>>>>> for
>>>>>>>> DISTINCT aggregations quickly with approach #2, or is it better to just
>>>>>>>> reject these queries until we can implement them safely with approach 
>>>>>>>> #1?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Brian
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>>
>>>>>>>

Reply via email to