Fair point. It lacks of proper benchmarks for BeamSQL to test performance and scalability of implementations.
-Rui On Fri, May 3, 2019 at 12:56 PM Reuven Lax <re...@google.com> wrote: > Back to the original point: I'm very skeptical of adding something that > does not scale at all. In our experience, users get far more upset with an > advertised feature that doesn't work for them (e.g. their workers OOM) than > with a missing feature. > > Reuven > > On Fri, May 3, 2019 at 12:41 PM Kenneth Knowles <k...@apache.org> wrote: > >> All good points. My version of the two shuffle approach does not work at >> all. >> >> On Fri, May 3, 2019 at 11:38 AM Brian Hulette <bhule...@google.com> >> wrote: >> >>> Rui's point about FLOAT/DOUBLE columns is interesting as well. We >>> couldn't support distinct aggregations on floating point columns with the >>> two-shuffle approach, but we could with the CombineFn approach. I'm not >>> sure if that's a good thing or not, it seems like an anti-pattern to do a >>> distinct aggregation on floating point numbers but I suppose the spec >>> allows it. >>> >> >> I can't find the Jira, but grouping on doubles has been discussed at some >> length before. Many DBMSs do not provide this, so it is not generally >> expected by SQL users. That is good, because mathematically it is >> questionable - floating point is usually used as a stand-in for real >> numbers, where computing equality is not generally possible. So any code >> that actually depends on equality of floating points is likely susceptible >> to rounding errors, other quirks of floating point, and also is probably >> misguided because the underlying thing that floats are approximating >> already cannot be checked for equality. >> >> Kenn >> >> >>> >>> Brian >>> >>> >>> On Fri, May 3, 2019 at 10:52 AM Rui Wang <ruw...@google.com> wrote: >>> >>>> To clarify what I said "So two shuffle approach will lead to two >>>> different implementation for tables with and without FLOAT/DOUBLE column.": >>>> >>>> Basically I wanted to say that two shuffles approach will be an >>>> implementation for some cases, and it will co-exist with CombineFn >>>> approach. In the feature, when we start cost based optimization in >>>> BeamSQL, CBO is supposed to compare different plans. >>>> >>>> -Rui >>>> >>>> On Fri, May 3, 2019 at 10:40 AM Rui Wang <ruw...@google.com> wrote: >>>> >>>>> >>>>>> As to the distinct aggregations: At the least, these queries should >>>>>> be rejected, not evaluated incorrectly. >>>>>> >>>>> >>>>> Yes. The least is not to support it, and throws clear message to say >>>>> no. (current implementation ignores DISTINCT and executes all aggregations >>>>> as ALL). >>>>> >>>>> >>>>>> The term "stateful CombineFn" is not one I would use, as the nature >>>>>> of state is linearity and the nature of CombineFn is parallelism. So I >>>>>> don't totally understand this proposal. If I replace stateful CombineFn >>>>>> with stateful DoFn with one combining state per column, then I think I >>>>>> understand. FWIW on a runner with scalable SetState or MapState it will >>>>>> not >>>>>> be any risk at all. >>>>>> >>>>>> I see. "Stateful" is indeed misleading. In this thread, it was all >>>>> about using simple CombineFn to achieve DISTINCT aggregation with massive >>>>> parallelism. >>>>> >>>>> But if you go the two shuffle route, you don't have to separate the >>>>>> aggregations and re-join them. You just have to incur the cost of the >>>>>> GBK + >>>>>> DISTINCT for all columns, and just drop the secondary key for the second >>>>>> shuffle, no? >>>>>> >>>>>> Two shuffle approach cannot be the unified approach because it >>>>> requires to build a key of group_by_key + table_row to deduplicate, but >>>>> table_row might contain floating point numbers, which cannot be used as >>>>> key >>>>> in GBK. So two shuffle approach will lead to two different implementation >>>>> for tables with and without FLOAT/DOUBLE column. >>>>> >>>>> CombineFn is the unified approach for distinct and non distinct >>>>> aggregation: each aggregation call will be a CombineFn. >>>>> >>>>> >>>>> -Rui >>>>> >>>>> >>>>> >>>>>> Kenn >>>>>> >>>>>> On Thu, May 2, 2019 at 5:11 PM Ahmet Altay <al...@google.com> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, May 2, 2019 at 2:18 PM Rui Wang <ruw...@google.com> wrote: >>>>>>> >>>>>>>> Brian's first proposal is challenging also partially because in >>>>>>>> BeamSQL there is no good practice to deal with complex SQL plans. >>>>>>>> Ideally >>>>>>>> we need enough rules and SQL plan node in Beam to construct >>>>>>>> easy-to-transform plans for different cases. I had a similar situation >>>>>>>> before when I needed to separate logical plans of "JOIN ON a OR b"and >>>>>>>> "JOIN ON a AND b", which was because their implementation are so >>>>>>>> different to fit into the same JoinRelNode. It seems in similar >>>>>>>> situation >>>>>>>> when both distinct aggregations and non-distinct aggregations are >>>>>>>> mixed, >>>>>>>> one single AggregationRelNode is hard to encapsulate complex logical. >>>>>>>> >>>>>>>> We will need a detailed plan to re-think about RelNodes and Rules >>>>>>>> in BeamSQL, which is out of scope for supporting DISTINCT. >>>>>>>> >>>>>>>> I would favor of second proposal because BeamSQL uses Beam schema >>>>>>>> and row. Schema itself uses Java primitives for most of its types (int, >>>>>>>> long, float, etc.). It limits the size of each element. Considering >>>>>>>> per key >>>>>>>> and per window combine, there is a good chance that stateful combine >>>>>>>> works >>>>>>>> for some (if not most) cases. >>>>>>>> >>>>>>>> Could we use @Experimental to tag stateful combine for supporting >>>>>>>> DISTINCT in aggregation so that we could have a chance to test it by >>>>>>>> users? >>>>>>>> >>>>>>>> >>>>>>>> -Rui >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, May 2, 2019 at 1:16 PM Brian Hulette <bhule...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Ahmet - >>>>>>>>> I think it would only require observing each key's partition of >>>>>>>>> the input independently, and the size of the state would only be >>>>>>>>> proportional to the number of distinct elements, not the entire >>>>>>>>> input. Note >>>>>>>>> the pipeline would be a GBK with a key based on the GROUP BY, >>>>>>>>> followed by a >>>>>>>>> Combined.GroupedValue with a (possibly very stateful) CombineFn. >>>>>>>>> >>>>>>>> >>>>>>> Got it. Distinct elements could be proportional to the entire input, >>>>>>> however if this is a reasonable requirement from a product perspective >>>>>>> that >>>>>>> is fine. Rui's suggestion of using experimental tag is also a good >>>>>>> idea. I >>>>>>> supposed that will give us the ability to change the implementation if >>>>>>> it >>>>>>> becomes necessary. >>>>>>> >>>>>>> >>>>>>>> >>>>>>>>> Luke - >>>>>>>>> Here's A little background on why I think (1) is harder (It may >>>>>>>>> also just be that it looks daunting to me as someone who's not that >>>>>>>>> familiar with the code). >>>>>>>>> >>>>>>>>> An aggregation node can have multiple aggregations. So, for >>>>>>>>> example, the query `SELECT k, SUM(x), COUNT(DISTINCT y), AVG(DISTINCT >>>>>>>>> z) >>>>>>>>> FROM ...` would yield a logical plan that has a single aggregation >>>>>>>>> node >>>>>>>>> with three different aggregations. We then take that node and build >>>>>>>>> up a >>>>>>>>> CombineFn that is a composite of all of the aggregations we need to >>>>>>>>> make a >>>>>>>>> combining PTransform [1]. To implement (1) we would need to >>>>>>>>> distinguish >>>>>>>>> between all the DISTINCT and non-DISTINCT aggregations, and come up >>>>>>>>> with a >>>>>>>>> way to unify the 2-GBK DISTINCT pipeline and the 1-GBK non-DISTINCT >>>>>>>>> pipeline. >>>>>>>>> >>>>>>>>> That's certainly not unsolvable, but approach (2) is much simpler >>>>>>>>> - it just requires implementing some variations on the CombineFn's >>>>>>>>> that >>>>>>>>> already exist [2] and re-using the existing logic for converting an >>>>>>>>> aggregation node to a combining PTransform. >>>>>>>>> >>>>>>>>> >>>>>>>>> Hopefully that makes sense, let me know if I need to clarify >>>>>>>>> further :) >>>>>>>>> Brian >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java#L178 >>>>>>>>> [2] >>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, May 2, 2019 at 12:18 PM Lukasz Cwik <lc...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Can you also go into more detail why you think 1) is more >>>>>>>>>> challenging to implement? >>>>>>>>>> >>>>>>>>>> On Thu, May 2, 2019 at 11:58 AM Ahmet Altay <al...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> From my limited understanding, would not the stateful combinefn >>>>>>>>>>> option require observing the whole input before being able combine >>>>>>>>>>> and the >>>>>>>>>>> risk of blowing memory is actually very high except for trivial >>>>>>>>>>> inputs? >>>>>>>>>>> >>>>>>>>>>> On Thu, May 2, 2019 at 11:50 AM Brian Hulette < >>>>>>>>>>> bhule...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi everyone, >>>>>>>>>>>> Currently BeamSQL does not support DISTINCT aggregations. These >>>>>>>>>>>> are queries like: >>>>>>>>>>>> >>>>>>>>>>>> > SELECT k, SUM(DISTINCT v) FROM t GROUP BY k >>>>>>>>>>>> > SELECT k, k2, COUNT(DISTINCT k2) FROM t GROUP BY k, k2 >>>>>>>>>>>> >>>>>>>>>>>> These are represented in Calcite's logical plan with a distinct >>>>>>>>>>>> flag on aggregation calls, but we ignore the flag when converting >>>>>>>>>>>> to a >>>>>>>>>>>> pipeline. I see two different ways that we could support this: >>>>>>>>>>>> 1. Two GBKs - For any DISTINCT aggregation we do one GBK on the >>>>>>>>>>>> <GROUP BY key> + <DISTINCT expr> to de-dupe values of expr >>>>>>>>>>>> followed by a >>>>>>>>>>>> second GBK on just <GROUP BY key> to perform the aggregation. >>>>>>>>>>>> 2. Stateful CombineFn - We could implement a version of the >>>>>>>>>>>> combiners used for SUM, COUNT, etc [1] that maintain some state >>>>>>>>>>>> that tracks >>>>>>>>>>>> previously seen elements and uses it de-deupe new elements. >>>>>>>>>>>> >>>>>>>>>>>> Of course, something like (1) is much more scalable, but it is >>>>>>>>>>>> also much more challenging to implement. While (2) is trivial to >>>>>>>>>>>> implement, >>>>>>>>>>>> but runs the risk of blowing up a worker's memory usage. >>>>>>>>>>>> >>>>>>>>>>>> Personally, I think it could be worthwhile to provide support >>>>>>>>>>>> for DISTINCT quickly with approach (2), and implement (1) as an >>>>>>>>>>>> optimization later. This combiner's state would be partitioned by >>>>>>>>>>>> key and >>>>>>>>>>>> by window, so I think we would be pretty safe from OOM'ing a >>>>>>>>>>>> worker except >>>>>>>>>>>> in some extreme cases (long windows, hot keys, very large batch >>>>>>>>>>>> pipelines, >>>>>>>>>>>> ...). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> But I understand this could be controversial, so I wanted to >>>>>>>>>>>> open it up for discussion first: Would it be worthwhile to provide >>>>>>>>>>>> support >>>>>>>>>>>> for DISTINCT aggregations quickly with approach #2, or is it >>>>>>>>>>>> better to just >>>>>>>>>>>> reject these queries until we can implement them safely with >>>>>>>>>>>> approach #1? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Brian >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java#L48 >>>>>>>>>>>> >>>>>>>>>>>