In the car analogy, you have something this: Iterable: car Iterator: taxi ride
They are related, but not as variations of a common concept. In the discussion of Combine vs RSBK, if the reducer is required to be an associative and commutative operator, then it is the same thing under a different name. If the reducer can be non-associative or non-commutative, then it admits fewer transformations/optimizations. If you introduce a GroupIteratorsByKey and implement GroupByKey as a transform that combines the iterator by concatenation, I think you do get an internally consistent system. To execute efficiently, you need to always identify and replace the GroupByKey operation with a primitive one. It does make some sense to expose the weakest primitives for the sake of DSLs. But they are very poorly suited for end-users, and for GBK on most runners you get the more powerful one for free. Kenn On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <je...@seznam.cz> wrote: > > The fact that the annotation on the ParDo "changes" the GroupByKey > implementation is very specific to the Spark runner implementation. > > I don't quite agree. It is not very specific to Spark, it is specific to > generally all runners, that produce grouped elements in a way that is not > reiterable. That is the key property. The example you gave with HDFS does > not satisfy this condition (files on HDFS are certainly reiterable), and > that's why no change to the GBK is needed (it actually already has the > required property). A quick look at what FlinkRunner (at least non portable > does) is that it implements GBK using reducing elements into List. That is > going to crash on big PCollection, which is even nicely documented: > > * <p>For internal use to translate {@link GroupByKey}. For a large {@link > PCollection} this is > * expected to crash! > > > If this is fixed, then it is likely to start behave the same as Spark. So > actually I think the opposite is true - Dataflow is a special case, because > of how its internal shuffle service works. > > > In general I sympathize with the worry about non-local effects. Beam is > already full of them (e.g. a Window.into statement effects downstream > GroupByKeys). In each case where they were added there was extensive debate > and discussion (Windowing semantics were debated for many months), exactly > because there was concern over adding these non-local effects. In every > case, no other good solution could be found. For the case of windowing for > example, it was often easy to propose simple local APIs (e.g. just pass the > window fn as a parameter to GroupByKey), however all of these local > solutions ended up not working for important use cases when we analyzed > them more deeply. > > That is very interesting. Could you elaborate more about some examples of > the use cases which didn't work? I'd like to try to match it against how > Euphoria is structured, it should be more resistant to this non-local > effects, because it very often bundles together multiple Beam's primitives > to single transform - ReduceByKey is one example of this, if is actually > mix of Window.into() + GBK + ParDo, Although it might look like if this > transform can be broken down to something else, then it is not primitive > (euphoria has no native equivalent of GBK itself), but it has several other > nice implications - that is that Combine now becomes a special case of RBK. > It now becomes only a question of where and how you can "run" the reduce > function. The logic is absolutely equal. This can be worked in more detail > and actually show, that even Combine and RBK can be decribed by a more > general stateful operation (ReduceStateByKey), and so finally Euphoria > actually has only two really "primitive" operations - these are FlatMap > (basically stateless ParDo) and RSBK. As I already mentioned on some other > thread, when stateful ParDo would support merging windows, it can be shown > that both Combine and GBK become special cases of this. > > > As you mentioned below, I do think it's perfectly reasonable for a DSL > to impose its own semantics. Scio already does this - the raw Beam API is > used by a DSL as a substrate, but the DSL does not need to blindly mirror > the semantics of the raw Beam API - at least in my opinion! > > Sure, but currently, there is no way for DSL to "hook" into runner, so it > has to use raw Beam SDK, and so this will fail in cases like this - where > Beam actually has stronger guarantees than it is required by the DSL. It > would be cool if we could find a way to do that - this pretty much aligns > with another question raised on ML, about the possibility to override a > default implementation of a PTransform for specific pipeline. > > Jan > > > On 9/29/19 7:46 PM, Reuven Lax wrote: > > Jan, > > The fact that the annotation on the ParDo "changes" the GroupByKey > implementation is very specific to the Spark runner implementation. You can > imagine another runner that simply writes out files in HDFS to implement a > GroupByKey - this GroupByKey implementation is agnostic whether the result > will be reiterated or not; in this case it is very much the ParDo > implementation that changes to implement a reiterable. vI think you don't > like the fact that an annotation on the ParDo will have a non-local effect > on the implementation of the GroupByKey upstream. However arguably the > non-local effect is just a quirk of how the Spark runner is implemented - > other runners might have a local effect. > > In general I sympathize with the worry about non-local effects. Beam is > already full of them (e.g. a Window.into statement effects downstream > GroupByKeys). In each case where they were added there was extensive debate > and discussion (Windowing semantics were debated for many months), exactly > because there was concern over adding these non-local effects. In every > case, no other good solution could be found. For the case of windowing for > example, it was often easy to propose simple local APIs (e.g. just pass the > window fn as a parameter to GroupByKey), however all of these local > solutions ended up not working for important use cases when we analyzed > them more deeply. > > As you mentioned below, I do think it's perfectly reasonable for a DSL to > impose its own semantics. Scio already does this - the raw Beam API is used > by a DSL as a substrate, but the DSL does not need to blindly mirror the > semantics of the raw Beam API - at least in my opinion! > > Reuven > > On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <je...@seznam.cz> wrote: > >> I understand the concerns. Still, it looks a little like we want to be >> able to modify behavior of an object from inside a submodule - quite like >> if my subprogram would accept a Map interface, but internally I would say >> "hey, this is supposed to be a HashMap, please change it so". Because of >> how pipeline is constructed, we can do that, the question is if there >> really isn't a better solution. >> >> What I do not like about the proposed solution: >> >> 1) to specify that the grouped elements are supposed to be iterated only >> once can be done only on ParDo, although there are other (higher level) >> PTransforms, that can consume output of GBK >> >> 2) the annontation on ParDo is by definition generic - i.e. can be used >> on input which is not output of GBK, which makes no sense >> >> 3) we modify the behavior to unlock some optimizations (or change of >> behavior of the GBK itself), users will not understand that >> >> 4) the annotation somewhat arbitrarily modifies data types passed, that >> is counter-intuitive and will be source of confusion >> >> I think that a solution that solves the above problems (but brings somoe >> new, as always :-)), could be to change the output of GBK from >> PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can >> control which operators (and how) consume the grouping, and we can enable >> these transforms to specify additional parameters (like how they want to >> consume the grouping). It is obviously a breaking change (although can be >> probably made backwards compatible) and it would very much likely involve a >> substantial work. But maybe there are some other not yet discussed options. >> >> Jan >> On 9/28/19 6:46 AM, Reuven Lax wrote: >> >> In many cases, the writer of the ParDo has no access to the GBK (e.g. the >> GBK is hidden inside an upstream PTransform that they cannot modify). This >> is the same reason why RequiresStableInput was made a property of the >> ParDo, because the GroupByKey is quite often inaccessible. >> >> The car analogy doesn't quite apply here, because the runner does have a >> full view of the pipeline so can satisfy all constraints. The car >> dealership generally cannot read your mind (thankfully!), so you have to >> specify what you want. Or to put it another way, the various transforms in >> a Beam pipeline do not live in isolation. The full pipeline graph is what >> is executed, and the runner already has to analyze the full graph to run >> the pipeline (as well as to optimize the pipeline). >> >> Reuven >> >> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz> wrote: >> >>> I'd suggest Stream instead of Iterator, it has the same semantics and >>> much better API. >>> >>> Still not sure, what is wrong on letting the GBK to decide this. I have >>> an analogy - if I decide to buy a car, I have to decide *what* car I'm >>> going to buy (by think about how I'm going to use it) *before* I buy it. I >>> cannot just buy "a car" and then change it from minivan to sport car based >>> on my actual need. Same with the GBK - if I want to be able to reiterate >>> the result, then I should tell it in advance. >>> >>> Jan >>> On 9/27/19 10:50 PM, Kenneth Knowles wrote: >>> >>> Good point about sibling fusion requiring this. >>> >>> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply >>> that the output iterable can be iterated arbitrarily many times. >>> >>> I think this should remain the default for all the reasons mentioned. >>> >>> We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree >>> that this is a property of the ParDo. A particular use of a GBK has no idea >>> what is downstream. If you owned the whole pipeline, a special >>> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this >>> would require changes upstream, which is not good. >>> >>> Maybe something like this: >>> >>> ParDo<Iterable<V>, Foo> { >>> @ProcessElement >>> void process(@OneShotIterator Iterator<V> iter) { >>> ... >>> } >>> } >>> >>> I've described all of this in terms of Java SDK. So we would need a >>> portable representation for all this metadata. >>> >>> Kenn >>> >>> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com> wrote: >>> >>>> I think the behavior to make explicit is the need to reiterate, not the >>>> need to handle large results. How large of a result can be handled will >>>> always be dependent on the runner, and each runner will probably have a >>>> different definition of large keys. Reiteration however is a logical >>>> difference in the programming API. Therefore I think it makes sense to >>>> specify the latter. The need to reiterate is a property of the downstream >>>> ParDo, so it should be specified there - not on the GBK. >>>> >>>> Reuven >>>> >>>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>>> Ok, I think I understand there might be some benefits of this. Then >>>>> I'd propose we make this clear on the GBK. If we would support somehing >>>>> like this: >>>>> >>>>> PCollection<?> input = ....; >>>>> >>>>> input.apply(GroupByKey.withLargeKeys()); >>>>> >>>>> then SparkRunner could expand this to >>>>> repartitionAndSortWithinPartitions only on this PTransform, and fallback >>>>> to >>>>> the default (in memory) in other situations. The default expansion of >>>>> LargeGroupByKey (let's say) would be classic GBK, so that only runners >>>>> that >>>>> need to make sure that they don't break reiterations can expand this. >>>>> >>>>> WDYT? >>>>> >>>>> Jan >>>>> On 9/27/19 8:56 PM, Reuven Lax wrote: >>>>> >>>>> As I mentioned above, CoGroupByKey already takes advantage of this. >>>>> Reiterating is not the most common use case, but it's definitely one that >>>>> comes up. Also keep in mind that this API has supported reiterating for >>>>> the >>>>> past five years (since even before the SDK was donated to Apache). >>>>> Therefore you should assume that users are relying on it, in ways we might >>>>> not expect. >>>>> >>>>> Historically, Google's Flume system had collections that did not >>>>> support reiterating (they were even called OneShotCollections to make it >>>>> clear). This was the source of problems and user frustration, which was >>>>> one >>>>> reason that in the original Dataflow SDK we made sure that these iterables >>>>> could be reiterated. Another reason why it's advantageous for a runner to >>>>> support this is allowing for better fusion. If two ParDos A and B both >>>>> read >>>>> from the same GroupByKey, it is nice to be able to fuse them into one >>>>> logical operator. For this, you'll probably need a shuffle implementation >>>>> that allows two independent readers from the same shuffle session. >>>>> >>>>> How easy it is to implement reiterables that don't have to fit in >>>>> memory will depend on the runner. For Dataflow it's possible because the >>>>> shuffle session is logically persistent, so the runner can simply reread >>>>> the shuffle session. For other runners with different shuffle >>>>> implementations, it might be harder to support both properties. Maybe we >>>>> should introduce a new @RequiresReiteration annotation on ParDo? That way >>>>> the Spark runner can see this and switch to the in-memory version just for >>>>> groupings consumed by those ParDos. Runners that already support >>>>> reiteration can ignore this annotation, so it should be backwards >>>>> compatible. >>>>> >>>>> Reuven >>>>> >>>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz> wrote: >>>>> >>>>>> I'd like to know the use-case. Why would you *need* to actually >>>>>> iterate the grouped elements twice? By definition the first iteration >>>>>> would >>>>>> have to extract some statistic (or subset of elements that must fit into >>>>>> memory). This statistic can then be used as another input for the second >>>>>> iteration. Why not then calculate the statistic in a separate branch in >>>>>> the >>>>>> pipeline and feed it then into the ParDo as side input? That would be >>>>>> definitely more efficient, because the calculation of the statistic would >>>>>> be probably combinable (not sure if it is absolutely required to be >>>>>> combinable, but it seems probable). Even if the calculation would not be >>>>>> combinable, it is not less efficient than reiterating twice. Why then >>>>>> support multiple iterations (besides the fact that output of GBK is >>>>>> Iterable). Am I missing something? >>>>>> >>>>>> Jan >>>>>> On 9/27/19 6:36 PM, Reuven Lax wrote: >>>>>> >>>>>> This should be an element in the compatibility matrix as well. >>>>>> >>>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> I am pretty surprised that we do not have >>>>>>> a @Category(ValidatesRunner) test in GroupByKeyTest that iterates >>>>>>> multiple >>>>>>> times. That is a major oversight. We should have this test, and it can >>>>>>> be >>>>>>> disabled by the SparkRunner's configuration. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> The Dataflow version does not spill to disk. However Spark's design >>>>>>>> might require spilling to disk if you want that to be implemented >>>>>>>> properly. >>>>>>>> >>>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> Spark's GBK is currently implemented using `sortBy(key and >>>>>>>>> value).mapPartition(...)` for non-merging windowing in order to >>>>>>>>> support >>>>>>>>> large keys and large scale shuffles. Merging windowing is implemented >>>>>>>>> using >>>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash >>>>>>>>> Grouping), >>>>>>>>> which is by design unable to support large keys. >>>>>>>>> >>>>>>>>> As Jan noted, problem with mapPartition is, that its UDF receives >>>>>>>>> an Iterator. Only option here is to wrap this iterator to one that >>>>>>>>> spills >>>>>>>>> to disk once an internal buffer is exceeded (the approach suggested by >>>>>>>>> Reuven). This unfortunately comes with a cost in some cases. The best >>>>>>>>> approach would be to somehow determine, that user wants multiple >>>>>>>>> iterations >>>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any >>>>>>>>> ideas >>>>>>>>> how to approach this? >>>>>>>>> >>>>>>>>> D. >>>>>>>>> >>>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> The Beam API was written to support multiple iterations, and >>>>>>>>>> there are definitely transforms that do so. I believe that >>>>>>>>>> CoGroupByKey may >>>>>>>>>> do this as well with the resulting iterator. >>>>>>>>>> >>>>>>>>>> I know that the Dataflow runner is able to handles iterators >>>>>>>>>> larger than available memory by paging them in from shuffle, which >>>>>>>>>> still >>>>>>>>>> allows for reiterating. It sounds like Spark is less flexible here? >>>>>>>>>> >>>>>>>>>> Reuven >>>>>>>>>> >>>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org> >>>>>>>>>>> >>>>>>>>>>> Lukasz, why do you think that users expect to be able to iterate >>>>>>>>>>> multiple times grouped elements? Besides that it obviously suggests >>>>>>>>>>> the >>>>>>>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to >>>>>>>>>>> how >>>>>>>>>>> MapReduce used to work - in certain cases it calles >>>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which >>>>>>>>>>> accepts Iterator - that is because internally it merge sorts pre >>>>>>>>>>> sorted >>>>>>>>>>> segments. This approach enables to GroupByKey data sets that are >>>>>>>>>>> too big to >>>>>>>>>>> fit into memory (per key). >>>>>>>>>>> >>>>>>>>>>> If multiple iterations should be expected by users, we probably >>>>>>>>>>> should: >>>>>>>>>>> >>>>>>>>>>> a) include that in @ValidatesRunner tests >>>>>>>>>>> >>>>>>>>>>> b) store values in memory on spark, which will break for >>>>>>>>>>> certain pipelines >>>>>>>>>>> >>>>>>>>>>> Because of (b) I think that it would be much better to remove >>>>>>>>>>> this "expectation" and clearly document that the Iterable is not >>>>>>>>>>> supposed >>>>>>>>>>> to be iterated multiple times. >>>>>>>>>>> >>>>>>>>>>> Jan >>>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote: >>>>>>>>>>> >>>>>>>>>>> I pretty much think so, because that is how Spark works. The >>>>>>>>>>> Iterable inside is really an Iterator, which cannot be iterated >>>>>>>>>>> multiple >>>>>>>>>>> times. >>>>>>>>>>> >>>>>>>>>>> Jan >>>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote: >>>>>>>>>>> >>>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output >>>>>>>>>>> multiple times even from within the same ParDo. >>>>>>>>>>> Is this something that Beam on Spark Runner never supported? >>>>>>>>>>> >>>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Gershi, >>>>>>>>>>>> >>>>>>>>>>>> could you please outline the pipeline you are trying to >>>>>>>>>>>> execute? Basically, you cannot iterate the Iterable multiple times >>>>>>>>>>>> in >>>>>>>>>>>> single ParDo. It should be possible, though, to apply multiple >>>>>>>>>>>> ParDos to >>>>>>>>>>>> output from GroupByKey. >>>>>>>>>>>> >>>>>>>>>>>> Jan >>>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the output >>>>>>>>>>>> of GroupByKey transformation) >>>>>>>>>>>> >>>>>>>>>>>> When my Runner is SparkRunner, I get an exception: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't >>>>>>>>>>>> be iterated more than once,otherwise there could be data lost >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221) >>>>>>>>>>>> >>>>>>>>>>>> at >>>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I understood I can branch the pipeline after GroupByKey into >>>>>>>>>>>> multiple transformation and iterate in each of them once on the >>>>>>>>>>>> Iterable<V>. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Is there a better way for that? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi* >>>>>>>>>>>> >>>>>>>>>>>> Software Developer >>>>>>>>>>>> >>>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718> >>>>>>>>>>>> >>>>>>>>>>>> [image: Mail_signature_blue] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>