Good point to reject DISTINCT operations currently, as it's not handled now. There could be more similar cases need to revise and document well.
Regarding to how to DISTINCT support, I was confused by stateful CombineFn at first. To make it simple, we can extend step by step, like reject DISTINCT+nonDISTINCT cases: 1. customized CombineFn looks good to even support DISTINCT+nonDISTINCT case, just not sure about performance impact, and it looks odd when users write UDAF ; 2. for those with only one DISTINCT, it's not hard to handle it with Calcite rule or during compile step, with two GBKs. To handle DISTINCT+nonDISTINCT case in this way, maybe we can try with JOIN(GBK_for_non_DISTINCT, 2_GBK_for_DISTINCT), need more tests to confirm; Mingmin On Fri, May 3, 2019 at 2:38 PM Rui Wang <ruw...@google.com> wrote: > A compromise solution would be using SELECT DISTINCT or GROUP BY to > duplicate before apply aggregations. It's two shuffles and works on non > floating point columns. The good thing is no code change is needed, but > downsides are users need to write more complicated query and floating point > data is not supported. > > > -Rui > > On Fri, May 3, 2019 at 1:23 PM Rui Wang <ruw...@google.com> wrote: > >> Fair point. It lacks of proper benchmarks for BeamSQL to test performance >> and scalability of implementations. >> >> >> -Rui >> >> On Fri, May 3, 2019 at 12:56 PM Reuven Lax <re...@google.com> wrote: >> >>> Back to the original point: I'm very skeptical of adding something that >>> does not scale at all. In our experience, users get far more upset with an >>> advertised feature that doesn't work for them (e.g. their workers OOM) than >>> with a missing feature. >>> >>> Reuven >>> >>> On Fri, May 3, 2019 at 12:41 PM Kenneth Knowles <k...@apache.org> wrote: >>> >>>> All good points. My version of the two shuffle approach does not work >>>> at all. >>>> >>>> On Fri, May 3, 2019 at 11:38 AM Brian Hulette <bhule...@google.com> >>>> wrote: >>>> >>>>> Rui's point about FLOAT/DOUBLE columns is interesting as well. We >>>>> couldn't support distinct aggregations on floating point columns with the >>>>> two-shuffle approach, but we could with the CombineFn approach. I'm not >>>>> sure if that's a good thing or not, it seems like an anti-pattern to do a >>>>> distinct aggregation on floating point numbers but I suppose the spec >>>>> allows it. >>>>> >>>> >>>> I can't find the Jira, but grouping on doubles has been discussed at >>>> some length before. Many DBMSs do not provide this, so it is not generally >>>> expected by SQL users. That is good, because mathematically it is >>>> questionable - floating point is usually used as a stand-in for real >>>> numbers, where computing equality is not generally possible. So any code >>>> that actually depends on equality of floating points is likely susceptible >>>> to rounding errors, other quirks of floating point, and also is probably >>>> misguided because the underlying thing that floats are approximating >>>> already cannot be checked for equality. >>>> >>>> Kenn >>>> >>>> >>>>> >>>>> Brian >>>>> >>>>> >>>>> On Fri, May 3, 2019 at 10:52 AM Rui Wang <ruw...@google.com> wrote: >>>>> >>>>>> To clarify what I said "So two shuffle approach will lead to two >>>>>> different implementation for tables with and without FLOAT/DOUBLE >>>>>> column.": >>>>>> >>>>>> Basically I wanted to say that two shuffles approach will be an >>>>>> implementation for some cases, and it will co-exist with CombineFn >>>>>> approach. In the feature, when we start cost based optimization in >>>>>> BeamSQL, CBO is supposed to compare different plans. >>>>>> >>>>>> -Rui >>>>>> >>>>>> On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote: >>>>>> >>>>>>> >>>>>>>> As to the distinct aggregations: At the least, these queries should >>>>>>>> be rejected, not evaluated incorrectly. >>>>>>>> >>>>>>> >>>>>>> Yes. The least is not to support it, and throws clear message to say >>>>>>> no. (current implementation ignores DISTINCT and executes all >>>>>>> aggregations >>>>>>> as ALL). >>>>>>> >>>>>>> >>>>>>>> The term "stateful CombineFn" is not one I would use, as the nature >>>>>>>> of state is linearity and the nature of CombineFn is parallelism. So I >>>>>>>> don't totally understand this proposal. If I replace stateful CombineFn >>>>>>>> with stateful DoFn with one combining state per column, then I think I >>>>>>>> understand. FWIW on a runner with scalable SetState or MapState it >>>>>>>> will not >>>>>>>> be any risk at all. >>>>>>>> >>>>>>>> I see. "Stateful" is indeed misleading. In this thread, it was all >>>>>>> about using simple CombineFn to achieve DISTINCT aggregation with >>>>>>> massive >>>>>>> parallelism. >>>>>>> >>>>>>> But if you go the two shuffle route, you don't have to separate the >>>>>>>> aggregations and re-join them. You just have to incur the cost of the >>>>>>>> GBK + >>>>>>>> DISTINCT for all columns, and just drop the secondary key for the >>>>>>>> second >>>>>>>> shuffle, no? >>>>>>>> >>>>>>>> Two shuffle approach cannot be the unified approach because it >>>>>>> requires to build a key of group_by_key + table_row to deduplicate, but >>>>>>> table_row might contain floating point numbers, which cannot be used as >>>>>>> key >>>>>>> in GBK. So two shuffle approach will lead to two different >>>>>>> implementation >>>>>>> for tables with and without FLOAT/DOUBLE column. >>>>>>> >>>>>>> CombineFn is the unified approach for distinct and non distinct >>>>>>> aggregation: each aggregation call will be a CombineFn. >>>>>>> >>>>>>> >>>>>>> -Rui >>>>>>> >>>>>>> >>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote: >>>>>>>>> >>>>>>>>>> Brian's first proposal is challenging also partially because in >>>>>>>>>> BeamSQL there is no good practice to deal with complex SQL plans. >>>>>>>>>> Ideally >>>>>>>>>> we need enough rules and SQL plan node in Beam to construct >>>>>>>>>> easy-to-transform plans for different cases. I had a similar >>>>>>>>>> situation >>>>>>>>>> before when I needed to separate logical plans of "JOIN ON a OR >>>>>>>>>> b"and >>>>>>>>>> "JOIN ON a AND b", which was because their implementation are so >>>>>>>>>> different to fit into the same JoinRelNode. It seems in similar >>>>>>>>>> situation >>>>>>>>>> when both distinct aggregations and non-distinct aggregations are >>>>>>>>>> mixed, >>>>>>>>>> one single AggregationRelNode is hard to encapsulate complex logical. >>>>>>>>>> >>>>>>>>>> We will need a detailed plan to re-think about RelNodes and Rules >>>>>>>>>> in BeamSQL, which is out of scope for supporting DISTINCT. >>>>>>>>>> >>>>>>>>>> I would favor of second proposal because BeamSQL uses Beam schema >>>>>>>>>> and row. Schema itself uses Java primitives for most of its types >>>>>>>>>> (int, >>>>>>>>>> long, float, etc.). It limits the size of each element. Considering >>>>>>>>>> per key >>>>>>>>>> and per window combine, there is a good chance that stateful combine >>>>>>>>>> works >>>>>>>>>> for some (if not most) cases. >>>>>>>>>> >>>>>>>>>> Could we use @Experimental to tag stateful combine for supporting >>>>>>>>>> DISTINCT in aggregation so that we could have a chance to test it by >>>>>>>>>> users? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Rui >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Ahmet - >>>>>>>>>>> I think it would only require observing each key's partition of >>>>>>>>>>> the input independently, and the size of the state would only be >>>>>>>>>>> proportional to the number of distinct elements, not the entire >>>>>>>>>>> input. Note >>>>>>>>>>> the pipeline would be a GBK with a key based on the GROUP BY, >>>>>>>>>>> followed by a >>>>>>>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> Got it. Distinct elements could be proportional to the entire >>>>>>>>> input, however if this is a reasonable requirement from a product >>>>>>>>> perspective that is fine. Rui's suggestion of using experimental tag >>>>>>>>> is >>>>>>>>> also a good idea. I supposed that will give us the ability to change >>>>>>>>> the >>>>>>>>> implementation if it becomes necessary. >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Luke - >>>>>>>>>>> Here's A little background on why I think (1) is harder (It may >>>>>>>>>>> also just be that it looks daunting to me as someone who's not that >>>>>>>>>>> familiar with the code). >>>>>>>>>>> >>>>>>>>>>> An aggregation node can have multiple aggregations. So, for >>>>>>>>>>> example, the query `SELECT k, SUM(x), COUNT(DISTINCT y), >>>>>>>>>>> AVG(DISTINCT z) >>>>>>>>>>> FROM ...` would yield a logical plan that has a single aggregation >>>>>>>>>>> node >>>>>>>>>>> with three different aggregations. We then take that node and build >>>>>>>>>>> up a >>>>>>>>>>> CombineFn that is a composite of all of the aggregations we need to >>>>>>>>>>> make a >>>>>>>>>>> combining PTransform [1]. To implement (1) we would need to >>>>>>>>>>> distinguish >>>>>>>>>>> between all the DISTINCT and non-DISTINCT aggregations, and come up >>>>>>>>>>> with a >>>>>>>>>>> way to unify the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT >>>>>>>>>>> pipeline. >>>>>>>>>>> >>>>>>>>>>> That's certainly not unsolvable, but approach (2) is much >>>>>>>>>>> simpler - it just requires implementing some variations on the >>>>>>>>>>> CombineFn's >>>>>>>>>>> that already exist [2] and re-using the existing logic for >>>>>>>>>>> converting an >>>>>>>>>>> aggregation node to a combining PTransform. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Hopefully that makes sense, let me know if I need to clarify >>>>>>>>>>> further :) >>>>>>>>>>> Brian >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178 >>>>>>>>>>> [2] >>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Can you also go into more detail why you think 1) is more >>>>>>>>>>>> challenging to implement? >>>>>>>>>>>> >>>>>>>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> From my limited understanding, would not the stateful >>>>>>>>>>>>> combinefn option require observing the whole input before being >>>>>>>>>>>>> able >>>>>>>>>>>>> combine and the risk of blowing memory is actually very high >>>>>>>>>>>>> except for >>>>>>>>>>>>> trivial inputs? >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette < >>>>>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi everyone, >>>>>>>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations. >>>>>>>>>>>>>> These are queries like: >>>>>>>>>>>>>> >>>>>>>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k >>>>>>>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2 >>>>>>>>>>>>>> >>>>>>>>>>>>>> These are represented in Calcite's logical plan with a >>>>>>>>>>>>>> distinct flag on aggregation calls, but we ignore the flag when >>>>>>>>>>>>>> converting >>>>>>>>>>>>>> to a pipeline. I see two different ways that we could support >>>>>>>>>>>>>> this: >>>>>>>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on >>>>>>>>>>>>>> the <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr >>>>>>>>>>>>>> followed by >>>>>>>>>>>>>> a second GBK on just <GROUP BY key> to perform the aggregation. >>>>>>>>>>>>>> 2. Stateful CombineFn - We could implement a version of the >>>>>>>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state >>>>>>>>>>>>>> that tracks >>>>>>>>>>>>>> previously seen elements and uses it de-deupe new elements. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Of course, something like (1) is much more scalable, but it >>>>>>>>>>>>>> is also much more challenging to implement. While (2) is trivial >>>>>>>>>>>>>> to >>>>>>>>>>>>>> implement, but runs the risk of blowing up a worker's memory >>>>>>>>>>>>>> usage. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Personally, I think it could be worthwhile to provide support >>>>>>>>>>>>>> for DISTINCT quickly with approach (2), and implement (1) as an >>>>>>>>>>>>>> optimization later. This combiner's state would be partitioned >>>>>>>>>>>>>> by key and >>>>>>>>>>>>>> by window, so I think we would be pretty safe from OOM'ing a >>>>>>>>>>>>>> worker except >>>>>>>>>>>>>> in some extreme cases (long windows, hot keys, very large batch >>>>>>>>>>>>>> pipelines, >>>>>>>>>>>>>> ...). >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> But I understand this could be controversial, so I wanted to >>>>>>>>>>>>>> open it up for discussion first: Would it be worthwhile to >>>>>>>>>>>>>> provide support >>>>>>>>>>>>>> for DISTINCT aggregations quickly with approach #2, or is it >>>>>>>>>>>>>> better to just >>>>>>>>>>>>>> reject these queries until we can implement them safely with >>>>>>>>>>>>>> approach #1? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Brian >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] >>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >>>>>>>>>>>>>> >>>>>>>>>>>>> -- ---- Mingmin