On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote:

> Brian's first proposal is challenging also partially because in BeamSQL
> there is no good practice to deal with complex SQL plans. Ideally we need
> enough rules and SQL plan node in Beam to construct easy-to-transform plans
> for different cases. I had a similar situation before when I needed to
> separate logical plans of  "JOIN ON a OR b"and "JOIN ON a AND b", which was
> because their implementation are so different to fit into the same
> JoinRelNode. It seems in similar situation when both distinct aggregations
> and non-distinct aggregations are mixed, one single AggregationRelNode is
> hard to encapsulate complex logical.
>
> We will need a detailed plan to re-think about RelNodes and Rules in
> BeamSQL, which is out of scope for supporting DISTINCT.
>
> I would favor of second proposal because BeamSQL uses Beam schema and row.
> Schema itself uses Java primitives for most of its types (int, long, float,
> etc.). It limits the size of each element. Considering per key and per
> window combine, there is a good chance that stateful combine works for some
> (if not most) cases.
>
> Could we use @Experimental to tag stateful combine for supporting DISTINCT
> in aggregation so that we could have a chance to test it by users?
>
>
> -Rui
>
>
>
> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com> wrote:
>
>> Ahmet -
>> I think it would only require observing each key's partition of the input
>> independently, and the size of the state would only be proportional to the
>> number of distinct elements, not the entire input. Note the pipeline would
>> be a GBK with a key based on the GROUP BY, followed by a
>> Combined.GroupedValue with a (possibly very stateful) CombineFn.
>>
>
Got it. Distinct elements could be proportional to the entire input,
however if this is a reasonable requirement from a product perspective that
is fine. Rui's suggestion of using experimental tag is also a good idea. I
supposed that will give us the ability to change the implementation if it
becomes necessary.


>
>> Luke -
>> Here's A little background on why I think (1) is harder (It may also just
>> be that it looks daunting to me as someone who's not that familiar with the
>> code).
>>
>> An aggregation node can have multiple aggregations. So, for example, the
>> query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT z) FROM ...` would
>> yield a logical plan that has a single aggregation node with three
>> different aggregations. We then take that node and build up a CombineFn
>> that is a composite of all of the aggregations we need to make a combining
>> PTransform [1]. To implement (1) we would need to distinguish between all
>> the DISTINCT and non-DISTINCT aggregations, and come up with a way to unify
>> the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT pipeline.
>>
>> That's certainly not unsolvable, but approach (2) is much simpler - it
>> just requires implementing some variations on the CombineFn's that already
>> exist [2] and re-using the existing logic for converting an aggregation
>> node to a combining PTransform.
>>
>>
>> Hopefully that makes sense, let me know if I need to clarify further :)
>> Brian
>>
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178
>> [2]
>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>
>>
>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Can you also go into more detail why you think 1) is more challenging to
>>> implement?
>>>
>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> wrote:
>>>
>>>> From my limited understanding, would not the stateful combinefn option
>>>> require observing the whole input before being able combine and the risk of
>>>> blowing memory is actually very high except for trivial inputs?
>>>>
>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com>
>>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>> Currently BeamSQL does not support DISTINCT aggregations. These are
>>>>> queries like:
>>>>>
>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>>>
>>>>> These are represented in Calcite's logical plan with a distinct flag
>>>>> on aggregation calls, but we ignore the flag when converting to a 
>>>>> pipeline.
>>>>> I see two different ways that we could support this:
>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the <GROUP
>>>>> BY key> + <DISTINCT expr> to de-dupe values of expr followed by a second
>>>>> GBK on just <GROUP BY key> to perform the aggregation.
>>>>> 2. Stateful CombineFn - We could implement a version of the combiners
>>>>> used for SUM, COUNT, etc [1] that maintain some state that tracks
>>>>> previously seen elements and uses it de-deupe new elements.
>>>>>
>>>>> Of course, something like (1) is much more scalable, but it is also
>>>>> much more challenging to implement. While (2) is trivial to implement, but
>>>>> runs the risk of blowing up a worker's memory usage.
>>>>>
>>>>> Personally, I think it could be worthwhile to provide support for
>>>>> DISTINCT quickly with approach (2), and implement (1) as an optimization
>>>>> later. This combiner's state would be partitioned by key and by window, so
>>>>> I think we would be pretty safe from OOM'ing a worker except in some
>>>>> extreme cases (long windows, hot keys, very large batch pipelines, ...).
>>>>>
>>>>>
>>>>> But I understand this could be controversial, so I wanted to open it
>>>>> up for discussion first: Would it be worthwhile to provide support for
>>>>> DISTINCT aggregations quickly with approach #2, or is it better to just
>>>>> reject these queries until we can implement them safely with approach #1?
>>>>>
>>>>> Thanks,
>>>>> Brian
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>
>>>>

Reply via email to