To close the loop on this: Rui just added a check that rejects distinct aggregations for now[1]. I wrote up BEAM-7306[2] to track this feature going forward.
[1] https://github.com/apache/beam/pull/8498 [2] https://issues.apache.org/jira/browse/BEAM-7306 *From: *Mingmin Xu <mingm...@gmail.com> *Date: *Mon, May 6, 2019 at 10:16 AM *To: *dev@beam.apache.org Good point to reject DISTINCT operations currently, as it's not handled > now. There could be more similar cases need to revise and document well. > > Regarding to how to DISTINCT support, I was confused by stateful CombineFn > at first. To make it simple, we can extend step by step, like reject > DISTINCT+nonDISTINCT cases: > 1. customized CombineFn looks good to even support DISTINCT+nonDISTINCT > case, just not sure about performance impact, and it looks odd when users > write UDAF ; > 2. for those with only one DISTINCT, it's not hard to handle it with > Calcite rule or during compile step, with two GBKs. To handle > DISTINCT+nonDISTINCT case in this way, maybe we can try with > JOIN(GBK_for_non_DISTINCT, 2_GBK_for_DISTINCT), need more tests to confirm; > > Mingmin > > On Fri, May 3, 2019 at 2:38 PM Rui Wang <ruw...@google.com> wrote: > >> A compromise solution would be using SELECT DISTINCT or GROUP BY to >> duplicate before apply aggregations. It's two shuffles and works on non >> floating point columns. The good thing is no code change is needed, but >> downsides are users need to write more complicated query and floating point >> data is not supported. >> >> >> -Rui >> >> On Fri, May 3, 2019 at 1:23 PM Rui Wang <ruw...@google.com> wrote: >> >>> Fair point. It lacks of proper benchmarks for BeamSQL to test >>> performance and scalability of implementations. >>> >>> >>> -Rui >>> >>> On Fri, May 3, 2019 at 12:56 PM Reuven Lax <re...@google.com> wrote: >>> >>>> Back to the original point: I'm very skeptical of adding something that >>>> does not scale at all. In our experience, users get far more upset with an >>>> advertised feature that doesn't work for them (e.g. their workers OOM) than >>>> with a missing feature. >>>> >>>> Reuven >>>> >>>> On Fri, May 3, 2019 at 12:41 PM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> All good points. My version of the two shuffle approach does not work >>>>> at all. >>>>> >>>>> On Fri, May 3, 2019 at 11:38 AM Brian Hulette <bhule...@google.com> >>>>> wrote: >>>>> >>>>>> Rui's point about FLOAT/DOUBLE columns is interesting as well. We >>>>>> couldn't support distinct aggregations on floating point columns with the >>>>>> two-shuffle approach, but we could with the CombineFn approach. I'm not >>>>>> sure if that's a good thing or not, it seems like an anti-pattern to do a >>>>>> distinct aggregation on floating point numbers but I suppose the spec >>>>>> allows it. >>>>>> >>>>> >>>>> I can't find the Jira, but grouping on doubles has been discussed at >>>>> some length before. Many DBMSs do not provide this, so it is not generally >>>>> expected by SQL users. That is good, because mathematically it is >>>>> questionable - floating point is usually used as a stand-in for real >>>>> numbers, where computing equality is not generally possible. So any code >>>>> that actually depends on equality of floating points is likely susceptible >>>>> to rounding errors, other quirks of floating point, and also is probably >>>>> misguided because the underlying thing that floats are approximating >>>>> already cannot be checked for equality. >>>>> >>>>> Kenn >>>>> >>>>> >>>>>> >>>>>> Brian >>>>>> >>>>>> >>>>>> On Fri, May 3, 2019 at 10:52 AM Rui Wang <ruw...@google.com> wrote: >>>>>> >>>>>>> To clarify what I said "So two shuffle approach will lead to two >>>>>>> different implementation for tables with and without FLOAT/DOUBLE >>>>>>> column.": >>>>>>> >>>>>>> Basically I wanted to say that two shuffles approach will be an >>>>>>> implementation for some cases, and it will co-exist with CombineFn >>>>>>> approach. In the feature, when we start cost based optimization in >>>>>>> BeamSQL, CBO is supposed to compare different plans. >>>>>>> >>>>>>> -Rui >>>>>>> >>>>>>> On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote: >>>>>>> >>>>>>>> >>>>>>>>> As to the distinct aggregations: At the least, these queries >>>>>>>>> should be rejected, not evaluated incorrectly. >>>>>>>>> >>>>>>>> >>>>>>>> Yes. The least is not to support it, and throws clear message to >>>>>>>> say no. (current implementation ignores DISTINCT and executes all >>>>>>>> aggregations as ALL). >>>>>>>> >>>>>>>> >>>>>>>>> The term "stateful CombineFn" is not one I would use, as the >>>>>>>>> nature of state is linearity and the nature of CombineFn is >>>>>>>>> parallelism. So >>>>>>>>> I don't totally understand this proposal. If I replace stateful >>>>>>>>> CombineFn >>>>>>>>> with stateful DoFn with one combining state per column, then I think I >>>>>>>>> understand. FWIW on a runner with scalable SetState or MapState it >>>>>>>>> will not >>>>>>>>> be any risk at all. >>>>>>>>> >>>>>>>>> I see. "Stateful" is indeed misleading. In this thread, it was all >>>>>>>> about using simple CombineFn to achieve DISTINCT aggregation with >>>>>>>> massive >>>>>>>> parallelism. >>>>>>>> >>>>>>>> But if you go the two shuffle route, you don't have to separate the >>>>>>>>> aggregations and re-join them. You just have to incur the cost of the >>>>>>>>> GBK + >>>>>>>>> DISTINCT for all columns, and just drop the secondary key for the >>>>>>>>> second >>>>>>>>> shuffle, no? >>>>>>>>> >>>>>>>>> Two shuffle approach cannot be the unified approach because it >>>>>>>> requires to build a key of group_by_key + table_row to deduplicate, but >>>>>>>> table_row might contain floating point numbers, which cannot be used >>>>>>>> as key >>>>>>>> in GBK. So two shuffle approach will lead to two different >>>>>>>> implementation >>>>>>>> for tables with and without FLOAT/DOUBLE column. >>>>>>>> >>>>>>>> CombineFn is the unified approach for distinct and non distinct >>>>>>>> aggregation: each aggregation call will be a CombineFn. >>>>>>>> >>>>>>>> >>>>>>>> -Rui >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Brian's first proposal is challenging also partially because in >>>>>>>>>>> BeamSQL there is no good practice to deal with complex SQL plans. >>>>>>>>>>> Ideally >>>>>>>>>>> we need enough rules and SQL plan node in Beam to construct >>>>>>>>>>> easy-to-transform plans for different cases. I had a similar >>>>>>>>>>> situation >>>>>>>>>>> before when I needed to separate logical plans of "JOIN ON a OR >>>>>>>>>>> b"and >>>>>>>>>>> "JOIN ON a AND b", which was because their implementation are so >>>>>>>>>>> different to fit into the same JoinRelNode. It seems in similar >>>>>>>>>>> situation >>>>>>>>>>> when both distinct aggregations and non-distinct aggregations are >>>>>>>>>>> mixed, >>>>>>>>>>> one single AggregationRelNode is hard to encapsulate complex >>>>>>>>>>> logical. >>>>>>>>>>> >>>>>>>>>>> We will need a detailed plan to re-think about RelNodes and >>>>>>>>>>> Rules in BeamSQL, which is out of scope for supporting DISTINCT. >>>>>>>>>>> >>>>>>>>>>> I would favor of second proposal because BeamSQL uses Beam >>>>>>>>>>> schema and row. Schema itself uses Java primitives for most of its >>>>>>>>>>> types >>>>>>>>>>> (int, long, float, etc.). It limits the size of each element. >>>>>>>>>>> Considering >>>>>>>>>>> per key and per window combine, there is a good chance that stateful >>>>>>>>>>> combine works for some (if not most) cases. >>>>>>>>>>> >>>>>>>>>>> Could we use @Experimental to tag stateful combine for >>>>>>>>>>> supporting DISTINCT in aggregation so that we could have a chance >>>>>>>>>>> to test >>>>>>>>>>> it by users? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Rui >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette < >>>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ahmet - >>>>>>>>>>>> I think it would only require observing each key's partition of >>>>>>>>>>>> the input independently, and the size of the state would only be >>>>>>>>>>>> proportional to the number of distinct elements, not the entire >>>>>>>>>>>> input. Note >>>>>>>>>>>> the pipeline would be a GBK with a key based on the GROUP BY, >>>>>>>>>>>> followed by a >>>>>>>>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Got it. Distinct elements could be proportional to the entire >>>>>>>>>> input, however if this is a reasonable requirement from a product >>>>>>>>>> perspective that is fine. Rui's suggestion of using experimental tag >>>>>>>>>> is >>>>>>>>>> also a good idea. I supposed that will give us the ability to change >>>>>>>>>> the >>>>>>>>>> implementation if it becomes necessary. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Luke - >>>>>>>>>>>> Here's A little background on why I think (1) is harder (It may >>>>>>>>>>>> also just be that it looks daunting to me as someone who's not that >>>>>>>>>>>> familiar with the code). >>>>>>>>>>>> >>>>>>>>>>>> An aggregation node can have multiple aggregations. So, for >>>>>>>>>>>> example, the query `SELECT k, SUM(x), COUNT(DISTINCT y), >>>>>>>>>>>> AVG(DISTINCT z) >>>>>>>>>>>> FROM ...` would yield a logical plan that has a single aggregation >>>>>>>>>>>> node >>>>>>>>>>>> with three different aggregations. We then take that node and >>>>>>>>>>>> build up a >>>>>>>>>>>> CombineFn that is a composite of all of the aggregations we need >>>>>>>>>>>> to make a >>>>>>>>>>>> combining PTransform [1]. To implement (1) we would need to >>>>>>>>>>>> distinguish >>>>>>>>>>>> between all the DISTINCT and non-DISTINCT aggregations, and come >>>>>>>>>>>> up with a >>>>>>>>>>>> way to unify the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT >>>>>>>>>>>> pipeline. >>>>>>>>>>>> >>>>>>>>>>>> That's certainly not unsolvable, but approach (2) is much >>>>>>>>>>>> simpler - it just requires implementing some variations on the >>>>>>>>>>>> CombineFn's >>>>>>>>>>>> that already exist [2] and re-using the existing logic for >>>>>>>>>>>> converting an >>>>>>>>>>>> aggregation node to a combining PTransform. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Hopefully that makes sense, let me know if I need to clarify >>>>>>>>>>>> further :) >>>>>>>>>>>> Brian >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178 >>>>>>>>>>>> [2] >>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Can you also go into more detail why you think 1) is more >>>>>>>>>>>>> challenging to implement? >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> From my limited understanding, would not the stateful >>>>>>>>>>>>>> combinefn option require observing the whole input before being >>>>>>>>>>>>>> able >>>>>>>>>>>>>> combine and the risk of blowing memory is actually very high >>>>>>>>>>>>>> except for >>>>>>>>>>>>>> trivial inputs? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette < >>>>>>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations. >>>>>>>>>>>>>>> These are queries like: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k >>>>>>>>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> These are represented in Calcite's logical plan with a >>>>>>>>>>>>>>> distinct flag on aggregation calls, but we ignore the flag when >>>>>>>>>>>>>>> converting >>>>>>>>>>>>>>> to a pipeline. I see two different ways that we could support >>>>>>>>>>>>>>> this: >>>>>>>>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on >>>>>>>>>>>>>>> the <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr >>>>>>>>>>>>>>> followed by >>>>>>>>>>>>>>> a second GBK on just <GROUP BY key> to perform the aggregation. >>>>>>>>>>>>>>> 2. Stateful CombineFn - We could implement a version of the >>>>>>>>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state >>>>>>>>>>>>>>> that tracks >>>>>>>>>>>>>>> previously seen elements and uses it de-deupe new elements. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Of course, something like (1) is much more scalable, but it >>>>>>>>>>>>>>> is also much more challenging to implement. While (2) is >>>>>>>>>>>>>>> trivial to >>>>>>>>>>>>>>> implement, but runs the risk of blowing up a worker's memory >>>>>>>>>>>>>>> usage. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Personally, I think it could be worthwhile to provide >>>>>>>>>>>>>>> support for DISTINCT quickly with approach (2), and implement >>>>>>>>>>>>>>> (1) as an >>>>>>>>>>>>>>> optimization later. This combiner's state would be partitioned >>>>>>>>>>>>>>> by key and >>>>>>>>>>>>>>> by window, so I think we would be pretty safe from OOM'ing a >>>>>>>>>>>>>>> worker except >>>>>>>>>>>>>>> in some extreme cases (long windows, hot keys, very large batch >>>>>>>>>>>>>>> pipelines, >>>>>>>>>>>>>>> ...). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> But I understand this could be controversial, so I wanted to >>>>>>>>>>>>>>> open it up for discussion first: Would it be worthwhile to >>>>>>>>>>>>>>> provide support >>>>>>>>>>>>>>> for DISTINCT aggregations quickly with approach #2, or is it >>>>>>>>>>>>>>> better to just >>>>>>>>>>>>>>> reject these queries until we can implement them safely with >>>>>>>>>>>>>>> approach #1? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Brian >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >>>>>>>>>>>>>>> >>>>>>>>>>>>>> > > -- > ---- > Mingmin >