In the car analogy, you have something this:

    Iterable: car
    Iterator: taxi ride

They are related, but not as variations of a common concept.

In the discussion of Combine vs RSBK, if the reducer is required to be an
associative and commutative operator, then it is the same thing under a
different name. If the reducer can be non-associative or non-commutative,
then it admits fewer transformations/optimizations.

If you introduce a GroupIteratorsByKey and implement GroupByKey as a
transform that combines the iterator by concatenation, I think you do get
an internally consistent system. To execute efficiently, you need to always
identify and replace the GroupByKey operation with a primitive one. It does
make some sense to expose the weakest primitives for the sake of DSLs. But
they are very poorly suited for end-users, and for GBK on most runners you
get the more powerful one for free.

Kenn

On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <je...@seznam.cz> wrote:

> > The fact that the annotation on the ParDo "changes" the GroupByKey
> implementation is very specific to the Spark runner implementation.
>
> I don't quite agree. It is not very specific to Spark, it is specific to
> generally all runners, that produce grouped elements in a way that is not
> reiterable. That is the key property. The example you gave with HDFS does
> not satisfy this condition (files on HDFS are certainly reiterable), and
> that's why no change to the GBK is needed (it actually already has the
> required property). A quick look at what FlinkRunner (at least non portable
> does) is that it implements GBK using reducing elements into List. That is
> going to crash on big PCollection, which is even nicely documented:
>
>    * <p>For internal use to translate {@link GroupByKey}. For a large {@link 
> PCollection} this is
>    * expected to crash!
>
>
> If this is fixed, then it is likely to start behave the same as Spark. So
> actually I think the opposite is true - Dataflow is a special case, because
> of how its internal shuffle service works.
>
> > In general I sympathize with the worry about non-local effects. Beam is
> already full of them (e.g. a Window.into statement effects downstream
> GroupByKeys). In each case where they were added there was extensive debate
> and discussion (Windowing semantics were debated for many months), exactly
> because there was concern over adding these non-local effects. In every
> case, no other good solution could be found. For the case of windowing for
> example, it was often easy to propose simple local APIs (e.g. just pass the
> window fn as a parameter to GroupByKey), however all of these local
> solutions ended up not working for important use cases when we analyzed
> them more deeply.
>
> That is very interesting. Could you elaborate more about some examples of
> the use cases which didn't work? I'd like to try to match it against how
> Euphoria is structured, it should be more resistant to this non-local
> effects, because it very often bundles together multiple Beam's primitives
> to single transform - ReduceByKey is one example of this, if is actually
> mix of Window.into() + GBK + ParDo, Although it might look like if this
> transform can be broken down to something else, then it is not primitive
> (euphoria has no native equivalent of GBK itself), but it has several other
> nice implications - that is that Combine now becomes a special case of RBK.
> It now becomes only a question of where and how you can "run" the reduce
> function. The logic is absolutely equal. This can be worked in more detail
> and actually show, that even Combine and RBK can be decribed by a more
> general stateful operation (ReduceStateByKey), and so finally Euphoria
> actually has only two really "primitive" operations - these are FlatMap
> (basically stateless ParDo) and RSBK. As I already mentioned on some other
> thread, when stateful ParDo would support merging windows, it can be shown
> that both Combine and GBK become special cases of this.
>
> > As you mentioned below, I do think it's perfectly reasonable for a DSL
> to impose its own semantics. Scio already does this - the raw Beam API is
> used by a DSL as a substrate, but the DSL does not need to blindly mirror
> the semantics of the raw Beam API - at least in my opinion!
>
> Sure, but currently, there is no way for DSL to "hook" into runner, so it
> has to use raw Beam SDK, and so this will fail in cases like this - where
> Beam actually has stronger guarantees than it is required by the DSL. It
> would be cool if we could find a way to do that - this pretty much aligns
> with another question raised on ML, about the possibility to override a
> default implementation of a PTransform for specific pipeline.
>
> Jan
>
>
> On 9/29/19 7:46 PM, Reuven Lax wrote:
>
> Jan,
>
> The fact that the annotation on the ParDo "changes" the GroupByKey
> implementation is very specific to the Spark runner implementation. You can
> imagine another runner that simply writes out files in HDFS to implement a
> GroupByKey - this GroupByKey implementation is agnostic whether the result
> will be reiterated or not; in this case it is very much the ParDo
> implementation that changes to implement a reiterable. vI think you don't
> like the fact that an annotation on the ParDo will have a non-local effect
> on the implementation of the GroupByKey upstream. However arguably the
> non-local effect is just a quirk of how the Spark runner is implemented -
> other runners might have a local effect.
>
> In general I sympathize with the worry about non-local effects. Beam is
> already full of them (e.g. a Window.into statement effects downstream
> GroupByKeys). In each case where they were added there was extensive debate
> and discussion (Windowing semantics were debated for many months), exactly
> because there was concern over adding these non-local effects. In every
> case, no other good solution could be found. For the case of windowing for
> example, it was often easy to propose simple local APIs (e.g. just pass the
> window fn as a parameter to GroupByKey), however all of these local
> solutions ended up not working for important use cases when we analyzed
> them more deeply.
>
> As you mentioned below, I do think it's perfectly reasonable for a DSL to
> impose its own semantics. Scio already does this - the raw Beam API is used
> by a DSL as a substrate, but the DSL does not need to blindly mirror the
> semantics of the raw Beam API - at least in my opinion!
>
> Reuven
>
> On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> I understand the concerns. Still, it looks a little like we want to be
>> able to modify behavior of an object from inside a submodule - quite like
>> if my subprogram would accept a Map interface, but internally I would say
>> "hey, this is supposed to be a HashMap, please change it so". Because of
>> how pipeline is constructed, we can do that, the question is if there
>> really isn't a better solution.
>>
>> What I do not like about the proposed solution:
>>
>>  1) to specify that the grouped elements are supposed to be iterated only
>> once can be done only on ParDo, although there are other (higher level)
>> PTransforms, that can consume output of GBK
>>
>>  2) the annontation on ParDo is by definition generic - i.e. can be used
>> on input which is not output of GBK, which makes no sense
>>
>>  3) we modify the behavior to unlock some optimizations (or change of
>> behavior of the GBK itself), users will not understand that
>>
>>  4) the annotation somewhat arbitrarily modifies data types passed, that
>> is counter-intuitive and will be source of confusion
>>
>> I think that a solution that solves the above problems (but brings somoe
>> new, as always :-)), could be to change the output of GBK from
>> PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can
>> control which operators (and how) consume the grouping, and we can enable
>> these transforms to specify additional parameters (like how they want to
>> consume the grouping). It is obviously a breaking change (although can be
>> probably made backwards compatible) and it would very much likely involve a
>> substantial work. But maybe there are some other not yet discussed options.
>>
>> Jan
>> On 9/28/19 6:46 AM, Reuven Lax wrote:
>>
>> In many cases, the writer of the ParDo has no access to the GBK (e.g. the
>> GBK is hidden inside an upstream PTransform that they cannot modify). This
>> is the same reason why RequiresStableInput was made a property of the
>> ParDo, because the GroupByKey is quite often inaccessible.
>>
>> The car analogy doesn't quite apply here, because the runner does have a
>> full view of the pipeline so can satisfy all constraints. The car
>> dealership generally cannot read your mind (thankfully!), so you have to
>> specify what you want. Or to put it another way, the various transforms in
>> a Beam pipeline do not live in isolation. The full pipeline graph is what
>> is executed, and the runner already has to analyze the full graph to run
>> the pipeline (as well as to optimize the pipeline).
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> I'd suggest Stream instead of Iterator, it has the same semantics and
>>> much better API.
>>>
>>> Still not sure, what is wrong on letting the GBK to decide this. I have
>>> an analogy - if I decide to buy a car, I have to decide *what* car I'm
>>> going to buy (by think about how I'm going to use it) *before* I buy it. I
>>> cannot just buy "a car" and then change it from minivan to sport car based
>>> on my actual need. Same with the GBK - if I want to be able to reiterate
>>> the result, then I should tell it in advance.
>>>
>>> Jan
>>> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>>>
>>> Good point about sibling fusion requiring this.
>>>
>>> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply
>>> that the output iterable can be iterated arbitrarily many times.
>>>
>>> I think this should remain the default for all the reasons mentioned.
>>>
>>> We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree
>>> that this is a property of the ParDo. A particular use of a GBK has no idea
>>> what is downstream. If you owned the whole pipeline, a special
>>> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this
>>> would require changes upstream, which is not good.
>>>
>>> Maybe something like this:
>>>
>>> ParDo<Iterable<V>, Foo> {
>>>   @ProcessElement
>>>   void process(@OneShotIterator Iterator<V> iter) {
>>>     ...
>>>   }
>>> }
>>>
>>> I've described all of this in terms of Java SDK. So we would need a
>>> portable representation for all this metadata.
>>>
>>> Kenn
>>>
>>> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I think the behavior to make explicit is the need to reiterate, not the
>>>> need to handle large results. How large of a result can be handled will
>>>> always be dependent on the runner, and each runner will probably have a
>>>> different definition of large keys. Reiteration however is a logical
>>>> difference in the programming API. Therefore I think it makes sense to
>>>> specify the latter. The need to reiterate is a property of the downstream
>>>> ParDo, so it should be specified there - not on the GBK.
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Ok, I think I understand there might be some benefits of this. Then
>>>>> I'd propose we make this clear on the GBK. If we would support somehing
>>>>> like this:
>>>>>
>>>>>  PCollection<?> input = ....;
>>>>>
>>>>>  input.apply(GroupByKey.withLargeKeys());
>>>>>
>>>>> then SparkRunner could expand this to
>>>>> repartitionAndSortWithinPartitions only on this PTransform, and fallback 
>>>>> to
>>>>> the default (in memory) in other situations. The default expansion of
>>>>> LargeGroupByKey (let's say) would be classic GBK, so that only runners 
>>>>> that
>>>>> need to make sure that they don't break reiterations can expand this.
>>>>>
>>>>> WDYT?
>>>>>
>>>>> Jan
>>>>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>>>>
>>>>> As I mentioned above, CoGroupByKey already takes advantage of this.
>>>>> Reiterating is not the most common use case, but it's definitely one that
>>>>> comes up. Also keep in mind that this API has supported reiterating for 
>>>>> the
>>>>> past five years (since even before the SDK was donated to Apache).
>>>>> Therefore you should assume that users are relying on it, in ways we might
>>>>> not expect.
>>>>>
>>>>> Historically, Google's Flume system had collections that did not
>>>>> support reiterating (they were even called OneShotCollections to make it
>>>>> clear). This was the source of problems and user frustration, which was 
>>>>> one
>>>>> reason that in the original Dataflow SDK we made sure that these iterables
>>>>> could be reiterated. Another reason why it's advantageous for a runner to
>>>>> support this is allowing for better fusion. If two ParDos A and B both 
>>>>> read
>>>>> from the same GroupByKey, it is nice to be able to fuse them into one
>>>>> logical operator. For this, you'll probably need a shuffle implementation
>>>>> that allows two independent readers from the same shuffle session.
>>>>>
>>>>> How easy it is to implement reiterables that don't have to fit in
>>>>> memory will depend on the runner.  For Dataflow it's possible because the
>>>>> shuffle session is logically persistent, so the runner can simply reread
>>>>> the shuffle session. For other runners with different shuffle
>>>>> implementations, it might be harder to support both properties. Maybe we
>>>>> should introduce a new @RequiresReiteration annotation on ParDo? That way
>>>>> the Spark runner can see this and switch to the in-memory version just for
>>>>> groupings consumed by those ParDos. Runners that already support
>>>>> reiteration can ignore this annotation, so it should be backwards
>>>>> compatible.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> I'd like to know the use-case. Why would you *need* to actually
>>>>>> iterate the grouped elements twice? By definition the first iteration 
>>>>>> would
>>>>>> have to extract some statistic (or subset of elements that must fit into
>>>>>> memory). This statistic can then be used as another input for the second
>>>>>> iteration. Why not then calculate the statistic in a separate branch in 
>>>>>> the
>>>>>> pipeline and feed it then into the ParDo as side input? That would be
>>>>>> definitely more efficient, because the calculation of the statistic would
>>>>>> be probably combinable (not sure if it is absolutely required to be
>>>>>> combinable, but it seems probable). Even if the calculation would not be
>>>>>> combinable, it is not less efficient than reiterating twice. Why then
>>>>>> support multiple iterations (besides the fact that output of GBK is
>>>>>> Iterable). Am I missing something?
>>>>>>
>>>>>> Jan
>>>>>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>>>>>
>>>>>> This should be an element in the compatibility matrix as well.
>>>>>>
>>>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I am pretty surprised that we do not have
>>>>>>> a @Category(ValidatesRunner) test in GroupByKeyTest that iterates 
>>>>>>> multiple
>>>>>>> times. That is a major oversight. We should have this test, and it can 
>>>>>>> be
>>>>>>> disabled by the SparkRunner's configuration.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> The Dataflow version does not spill to disk. However Spark's design
>>>>>>>> might require spilling to disk if you want that to be implemented 
>>>>>>>> properly.
>>>>>>>>
>>>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>>>>>>> value).mapPartition(...)` for non-merging windowing in order to 
>>>>>>>>> support
>>>>>>>>> large keys and large scale shuffles. Merging windowing is implemented 
>>>>>>>>> using
>>>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash 
>>>>>>>>> Grouping),
>>>>>>>>> which is by design unable to support large keys.
>>>>>>>>>
>>>>>>>>> As Jan noted, problem with mapPartition is, that its UDF receives
>>>>>>>>> an Iterator. Only option here is to wrap this iterator to one that 
>>>>>>>>> spills
>>>>>>>>> to disk once an internal buffer is exceeded (the approach suggested by
>>>>>>>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>>>>>>>> approach would be to somehow determine, that user wants multiple 
>>>>>>>>> iterations
>>>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any 
>>>>>>>>> ideas
>>>>>>>>> how to approach this?
>>>>>>>>>
>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The Beam API was written to support multiple iterations, and
>>>>>>>>>> there are definitely transforms that do so. I believe that 
>>>>>>>>>> CoGroupByKey may
>>>>>>>>>> do this as well with the resulting iterator.
>>>>>>>>>>
>>>>>>>>>> I know that the Dataflow runner is able to handles iterators
>>>>>>>>>> larger than available memory by paging them in from shuffle, which 
>>>>>>>>>> still
>>>>>>>>>> allows for reiterating. It sounds like Spark is less flexible here?
>>>>>>>>>>
>>>>>>>>>> Reuven
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>>>>>>>>
>>>>>>>>>>> Lukasz, why do you think that users expect to be able to iterate
>>>>>>>>>>> multiple times grouped elements? Besides that it obviously suggests 
>>>>>>>>>>> the
>>>>>>>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to 
>>>>>>>>>>> how
>>>>>>>>>>> MapReduce used to work - in certain cases it calles
>>>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>>>>>>>>>> accepts Iterator - that is because internally it merge sorts pre 
>>>>>>>>>>> sorted
>>>>>>>>>>> segments. This approach enables to GroupByKey data sets that are 
>>>>>>>>>>> too big to
>>>>>>>>>>> fit into memory (per key).
>>>>>>>>>>>
>>>>>>>>>>> If multiple iterations should be expected by users, we probably
>>>>>>>>>>> should:
>>>>>>>>>>>
>>>>>>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>>>>>>
>>>>>>>>>>>  b) store values in memory on spark, which will break for
>>>>>>>>>>> certain pipelines
>>>>>>>>>>>
>>>>>>>>>>> Because of (b) I think that it would be much better to remove
>>>>>>>>>>> this "expectation" and clearly document that the Iterable is not 
>>>>>>>>>>> supposed
>>>>>>>>>>> to be iterated multiple times.
>>>>>>>>>>>
>>>>>>>>>>> Jan
>>>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>>>>>>
>>>>>>>>>>> I pretty much think so, because that is how Spark works. The
>>>>>>>>>>> Iterable inside is really an Iterator, which cannot be iterated 
>>>>>>>>>>> multiple
>>>>>>>>>>> times.
>>>>>>>>>>>
>>>>>>>>>>> Jan
>>>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>>>>>>
>>>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>>>>>>> multiple times even from within the same ParDo.
>>>>>>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Gershi,
>>>>>>>>>>>>
>>>>>>>>>>>> could you please outline the pipeline you are trying to
>>>>>>>>>>>> execute? Basically, you cannot iterate the Iterable multiple times 
>>>>>>>>>>>> in
>>>>>>>>>>>> single ParDo. It should be possible, though, to apply multiple 
>>>>>>>>>>>> ParDos to
>>>>>>>>>>>> output from GroupByKey.
>>>>>>>>>>>>
>>>>>>>>>>>> Jan
>>>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the output
>>>>>>>>>>>> of GroupByKey transformation)
>>>>>>>>>>>>
>>>>>>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't
>>>>>>>>>>>> be iterated more than once,otherwise there could be data lost
>>>>>>>>>>>>
>>>>>>>>>>>>                 at
>>>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>>>>>>
>>>>>>>>>>>>                 at
>>>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>>>>>>> Iterable<V>.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Is there a better way for that?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>>>>>>
>>>>>>>>>>>> Software Developer
>>>>>>>>>>>>
>>>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>>>>>>
>>>>>>>>>>>> [image: Mail_signature_blue]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to