On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote: > Brian's first proposal is challenging also partially because in BeamSQL > there is no good practice to deal with complex SQL plans. Ideally we need > enough rules and SQL plan node in Beam to construct easy-to-transform plans > for different cases. I had a similar situation before when I needed to > separate logical plans of "JOIN ON a OR b"and "JOIN ON a AND b", which was > because their implementation are so different to fit into the same > JoinRelNode. It seems in similar situation when both distinct aggregations > and non-distinct aggregations are mixed, one single AggregationRelNode is > hard to encapsulate complex logical. > > We will need a detailed plan to re-think about RelNodes and Rules in > BeamSQL, which is out of scope for supporting DISTINCT. > > I would favor of second proposal because BeamSQL uses Beam schema and row. > Schema itself uses Java primitives for most of its types (int, long, float, > etc.). It limits the size of each element. Considering per key and per > window combine, there is a good chance that stateful combine works for some > (if not most) cases. > > Could we use @Experimental to tag stateful combine for supporting DISTINCT > in aggregation so that we could have a chance to test it by users? > > > -Rui > > > > On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com> wrote: > >> Ahmet - >> I think it would only require observing each key's partition of the input >> independently, and the size of the state would only be proportional to the >> number of distinct elements, not the entire input. Note the pipeline would >> be a GBK with a key based on the GROUP BY, followed by a >> Combined.GroupedValue with a (possibly very stateful) CombineFn. >> > Got it. Distinct elements could be proportional to the entire input, however if this is a reasonable requirement from a product perspective that is fine. Rui's suggestion of using experimental tag is also a good idea. I supposed that will give us the ability to change the implementation if it becomes necessary.
> >> Luke - >> Here's A little background on why I think (1) is harder (It may also just >> be that it looks daunting to me as someone who's not that familiar with the >> code). >> >> An aggregation node can have multiple aggregations. So, for example, the >> query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT z) FROM ...` would >> yield a logical plan that has a single aggregation node with three >> different aggregations. We then take that node and build up a CombineFn >> that is a composite of all of the aggregations we need to make a combining >> PTransform [1]. To implement (1) we would need to distinguish between all >> the DISTINCT and non-DISTINCT aggregations, and come up with a way to unify >> the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT pipeline. >> >> That's certainly not unsolvable, but approach (2) is much simpler - it >> just requires implementing some variations on the CombineFn's that already >> exist [2] and re-using the existing logic for converting an aggregation >> node to a combining PTransform. >> >> >> Hopefully that makes sense, let me know if I need to clarify further :) >> Brian >> >> [1] >> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178 >> [2] >> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >> >> >> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> wrote: >> >>> Can you also go into more detail why you think 1) is more challenging to >>> implement? >>> >>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> wrote: >>> >>>> From my limited understanding, would not the stateful combinefn option >>>> require observing the whole input before being able combine and the risk of >>>> blowing memory is actually very high except for trivial inputs? >>>> >>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> Hi everyone, >>>>> Currently BeamSQL does not support DISTINCT aggregations. These are >>>>> queries like: >>>>> >>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k >>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2 >>>>> >>>>> These are represented in Calcite's logical plan with a distinct flag >>>>> on aggregation calls, but we ignore the flag when converting to a >>>>> pipeline. >>>>> I see two different ways that we could support this: >>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the <GROUP >>>>> BY key> + <DISTINCT expr> to de-dupe values of expr followed by a second >>>>> GBK on just <GROUP BY key> to perform the aggregation. >>>>> 2. Stateful CombineFn - We could implement a version of the combiners >>>>> used for SUM, COUNT, etc [1] that maintain some state that tracks >>>>> previously seen elements and uses it de-deupe new elements. >>>>> >>>>> Of course, something like (1) is much more scalable, but it is also >>>>> much more challenging to implement. While (2) is trivial to implement, but >>>>> runs the risk of blowing up a worker's memory usage. >>>>> >>>>> Personally, I think it could be worthwhile to provide support for >>>>> DISTINCT quickly with approach (2), and implement (1) as an optimization >>>>> later. This combiner's state would be partitioned by key and by window, so >>>>> I think we would be pretty safe from OOM'ing a worker except in some >>>>> extreme cases (long windows, hot keys, very large batch pipelines, ...). >>>>> >>>>> >>>>> But I understand this could be controversial, so I wanted to open it >>>>> up for discussion first: Would it be worthwhile to provide support for >>>>> DISTINCT aggregations quickly with approach #2, or is it better to just >>>>> reject these queries until we can implement them safely with approach #1? >>>>> >>>>> Thanks, >>>>> Brian >>>>> >>>>> [1] >>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >>>>> >>>>