>
>
> As to the distinct aggregations: At the least, these queries should be
> rejected, not evaluated incorrectly.
>

Yes. The least is not to support it, and throws clear message to say no.
(current implementation ignores DISTINCT and executes all aggregations as
ALL).


> The term "stateful CombineFn" is not one I would use, as the nature of
> state is linearity and the nature of CombineFn is parallelism. So I don't
> totally understand this proposal. If I replace stateful CombineFn with
> stateful DoFn with one combining state per column, then I think I
> understand. FWIW on a runner with scalable SetState or MapState it will not
> be any risk at all.
>
> I see. "Stateful" is indeed misleading. In this thread, it was all about
using simple CombineFn to achieve DISTINCT aggregation with massive
parallelism.

But if you go the two shuffle route, you don't have to separate the
> aggregations and re-join them. You just have to incur the cost of the GBK +
> DISTINCT for all columns, and just drop the secondary key for the second
> shuffle, no?
>
> Two shuffle approach cannot be the unified approach because it requires to
build a key of group_by_key + table_row to deduplicate, but table_row might
contain floating point numbers, which cannot be used as key in GBK. So two
shuffle approach will lead to two different implementation for tables with
and without FLOAT/DOUBLE column.

CombineFn is the unified approach for distinct and non distinct
aggregation: each aggregation call will be a CombineFn.


-Rui



> Kenn
>
> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> wrote:
>
>>
>>
>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote:
>>
>>> Brian's first proposal is challenging also partially because in BeamSQL
>>> there is no good practice to deal with complex SQL plans. Ideally we need
>>> enough rules and SQL plan node in Beam to construct easy-to-transform plans
>>> for different cases. I had a similar situation before when I needed to
>>> separate logical plans of  "JOIN ON a OR b"and "JOIN ON a AND b", which was
>>> because their implementation are so different to fit into the same
>>> JoinRelNode. It seems in similar situation when both distinct aggregations
>>> and non-distinct aggregations are mixed, one single AggregationRelNode is
>>> hard to encapsulate complex logical.
>>>
>>> We will need a detailed plan to re-think about RelNodes and Rules in
>>> BeamSQL, which is out of scope for supporting DISTINCT.
>>>
>>> I would favor of second proposal because BeamSQL uses Beam schema and
>>> row. Schema itself uses Java primitives for most of its types (int, long,
>>> float, etc.). It limits the size of each element. Considering per key and
>>> per window combine, there is a good chance that stateful combine works for
>>> some (if not most) cases.
>>>
>>> Could we use @Experimental to tag stateful combine for supporting
>>> DISTINCT in aggregation so that we could have a chance to test it by users?
>>>
>>>
>>> -Rui
>>>
>>>
>>>
>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com>
>>> wrote:
>>>
>>>> Ahmet -
>>>> I think it would only require observing each key's partition of the
>>>> input independently, and the size of the state would only be proportional
>>>> to the number of distinct elements, not the entire input. Note the pipeline
>>>> would be a GBK with a key based on the GROUP BY, followed by a
>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn.
>>>>
>>>
>> Got it. Distinct elements could be proportional to the entire input,
>> however if this is a reasonable requirement from a product perspective that
>> is fine. Rui's suggestion of using experimental tag is also a good idea. I
>> supposed that will give us the ability to change the implementation if it
>> becomes necessary.
>>
>>
>>>
>>>> Luke -
>>>> Here's A little background on why I think (1) is harder (It may also
>>>> just be that it looks daunting to me as someone who's not that familiar
>>>> with the code).
>>>>
>>>> An aggregation node can have multiple aggregations. So, for example,
>>>> the query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT z) FROM ...`
>>>> would yield a logical plan that has a single aggregation node with three
>>>> different aggregations. We then take that node and build up a CombineFn
>>>> that is a composite of all of the aggregations we need to make a combining
>>>> PTransform [1]. To implement (1) we would need to distinguish between all
>>>> the DISTINCT and non-DISTINCT aggregations, and come up with a way to unify
>>>> the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT pipeline.
>>>>
>>>> That's certainly not unsolvable, but approach (2) is much simpler - it
>>>> just requires implementing some variations on the CombineFn's that already
>>>> exist [2] and re-using the existing logic for converting an aggregation
>>>> node to a combining PTransform.
>>>>
>>>>
>>>> Hopefully that makes sense, let me know if I need to clarify further :)
>>>> Brian
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178
>>>> [2]
>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>
>>>>
>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Can you also go into more detail why you think 1) is more challenging
>>>>> to implement?
>>>>>
>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> wrote:
>>>>>
>>>>>> From my limited understanding, would not the stateful combinefn
>>>>>> option require observing the whole input before being able combine and 
>>>>>> the
>>>>>> risk of blowing memory is actually very high except for trivial inputs?
>>>>>>
>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi everyone,
>>>>>>> Currently BeamSQL does not support DISTINCT aggregations. These are
>>>>>>> queries like:
>>>>>>>
>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k
>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2
>>>>>>>
>>>>>>> These are represented in Calcite's logical plan with a distinct flag
>>>>>>> on aggregation calls, but we ignore the flag when converting to a 
>>>>>>> pipeline.
>>>>>>> I see two different ways that we could support this:
>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the
>>>>>>> <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr followed by a
>>>>>>> second GBK on just <GROUP BY key> to perform the aggregation.
>>>>>>> 2. Stateful CombineFn - We could implement a version of the
>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state that 
>>>>>>> tracks
>>>>>>> previously seen elements and uses it de-deupe new elements.
>>>>>>>
>>>>>>> Of course, something like (1) is much more scalable, but it is also
>>>>>>> much more challenging to implement. While (2) is trivial to implement, 
>>>>>>> but
>>>>>>> runs the risk of blowing up a worker's memory usage.
>>>>>>>
>>>>>>> Personally, I think it could be worthwhile to provide support for
>>>>>>> DISTINCT quickly with approach (2), and implement (1) as an optimization
>>>>>>> later. This combiner's state would be partitioned by key and by window, 
>>>>>>> so
>>>>>>> I think we would be pretty safe from OOM'ing a worker except in some
>>>>>>> extreme cases (long windows, hot keys, very large batch pipelines, ...).
>>>>>>>
>>>>>>>
>>>>>>> But I understand this could be controversial, so I wanted to open it
>>>>>>> up for discussion first: Would it be worthwhile to provide support for
>>>>>>> DISTINCT aggregations quickly with approach #2, or is it better to just
>>>>>>> reject these queries until we can implement them safely with approach 
>>>>>>> #1?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Brian
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48
>>>>>>>
>>>>>>

Reply via email to