On Mon, Sep 30, 2019 at 2:02 AM Jan Lukavský <je...@seznam.cz> wrote:

> > The fact that the annotation on the ParDo "changes" the GroupByKey
> implementation is very specific to the Spark runner implementation.
>
> I don't quite agree. It is not very specific to Spark, it is specific to
> generally all runners, that produce grouped elements in a way that is not
> reiterable. That is the key property. The example you gave with HDFS does
> not satisfy this condition (files on HDFS are certainly reiterable), and
> that's why no change to the GBK is needed (it actually already has the
> required property). A quick look at what FlinkRunner (at least non portable
> does) is that it implements GBK using reducing elements into List. That is
> going to crash on big PCollection, which is even nicely documented:
>
>    * <p>For internal use to translate {@link GroupByKey}. For a large {@link 
> PCollection} this is
>    * expected to crash!
>
>
> If this is fixed, then it is likely to start behave the same as Spark. So
> actually I think the opposite is true - Dataflow is a special case, because
> of how its internal shuffle service works.
>

I think you misunderstood - I was not trying to dish on the Spark runner.
Rather my point is that whether the GroupByKey implementation is affected
or not is runner dependent. In some runners it is and in others it isn't.
However in all cases the *semantics* of the ParDo is affected. Since Beam
tries as much as possible to be runner agnostic, we should default to
making the change where there is an obvious semantic difference.

> In general I sympathize with the worry about non-local effects. Beam is
> already full of them (e.g. a Window.into statement effects downstream
> GroupByKeys). In each case where they were added there was extensive debate
> and discussion (Windowing semantics were debated for many months), exactly
> because there was concern over adding these non-local effects. In every
> case, no other good solution could be found. For the case of windowing for
> example, it was often easy to propose simple local APIs (e.g. just pass the
> window fn as a parameter to GroupByKey), however all of these local
> solutions ended up not working for important use cases when we analyzed
> them more deeply.
>
> That is very interesting. Could you elaborate more about some examples of
> the use cases which didn't work? I'd like to try to match it against how
> Euphoria is structured, it should be more resistant to this non-local
> effects, because it very often bundles together multiple Beam's primitives
> to single transform - ReduceByKey is one example of this, if is actually
> mix of Window.into() + GBK + ParDo, Although it might look like if this
> transform can be broken down to something else, then it is not primitive
> (euphoria has no native equivalent of GBK itself), but it has several other
> nice implications - that is that Combine now becomes a special case of RBK.
> It now becomes only a question of where and how you can "run" the reduce
> function. The logic is absolutely equal. This can be worked in more detail
> and actually show, that even Combine and RBK can be decribed by a more
> general stateful operation (ReduceStateByKey), and so finally Euphoria
> actually has only two really "primitive" operations - these are FlatMap
> (basically stateless ParDo) and RSBK. As I already mentioned on some other
> thread, when stateful ParDo would support merging windows, it can be shown
> that both Combine and GBK become special cases of this.
>
> > As you mentioned below, I do think it's perfectly reasonable for a DSL
> to impose its own semantics. Scio already does this - the raw Beam API is
> used by a DSL as a substrate, but the DSL does not need to blindly mirror
> the semantics of the raw Beam API - at least in my opinion!
>
> Sure, but currently, there is no way for DSL to "hook" into runner, so it
> has to use raw Beam SDK, and so this will fail in cases like this - where
> Beam actually has stronger guarantees than it is required by the DSL. It
> would be cool if we could find a way to do that - this pretty much aligns
> with another question raised on ML, about the possibility to override a
> default implementation of a PTransform for specific pipeline.
>
> Jan
>
>
> On 9/29/19 7:46 PM, Reuven Lax wrote:
>
> Jan,
>
> The fact that the annotation on the ParDo "changes" the GroupByKey
> implementation is very specific to the Spark runner implementation. You can
> imagine another runner that simply writes out files in HDFS to implement a
> GroupByKey - this GroupByKey implementation is agnostic whether the result
> will be reiterated or not; in this case it is very much the ParDo
> implementation that changes to implement a reiterable. vI think you don't
> like the fact that an annotation on the ParDo will have a non-local effect
> on the implementation of the GroupByKey upstream. However arguably the
> non-local effect is just a quirk of how the Spark runner is implemented -
> other runners might have a local effect.
>
> In general I sympathize with the worry about non-local effects. Beam is
> already full of them (e.g. a Window.into statement effects downstream
> GroupByKeys). In each case where they were added there was extensive debate
> and discussion (Windowing semantics were debated for many months), exactly
> because there was concern over adding these non-local effects. In every
> case, no other good solution could be found. For the case of windowing for
> example, it was often easy to propose simple local APIs (e.g. just pass the
> window fn as a parameter to GroupByKey), however all of these local
> solutions ended up not working for important use cases when we analyzed
> them more deeply.
>
> As you mentioned below, I do think it's perfectly reasonable for a DSL to
> impose its own semantics. Scio already does this - the raw Beam API is used
> by a DSL as a substrate, but the DSL does not need to blindly mirror the
> semantics of the raw Beam API - at least in my opinion!
>
> Reuven
>
> On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> I understand the concerns. Still, it looks a little like we want to be
>> able to modify behavior of an object from inside a submodule - quite like
>> if my subprogram would accept a Map interface, but internally I would say
>> "hey, this is supposed to be a HashMap, please change it so". Because of
>> how pipeline is constructed, we can do that, the question is if there
>> really isn't a better solution.
>>
>> What I do not like about the proposed solution:
>>
>>  1) to specify that the grouped elements are supposed to be iterated only
>> once can be done only on ParDo, although there are other (higher level)
>> PTransforms, that can consume output of GBK
>>
>>  2) the annontation on ParDo is by definition generic - i.e. can be used
>> on input which is not output of GBK, which makes no sense
>>
>>  3) we modify the behavior to unlock some optimizations (or change of
>> behavior of the GBK itself), users will not understand that
>>
>>  4) the annotation somewhat arbitrarily modifies data types passed, that
>> is counter-intuitive and will be source of confusion
>>
>> I think that a solution that solves the above problems (but brings somoe
>> new, as always :-)), could be to change the output of GBK from
>> PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That way we can
>> control which operators (and how) consume the grouping, and we can enable
>> these transforms to specify additional parameters (like how they want to
>> consume the grouping). It is obviously a breaking change (although can be
>> probably made backwards compatible) and it would very much likely involve a
>> substantial work. But maybe there are some other not yet discussed options.
>>
>> Jan
>> On 9/28/19 6:46 AM, Reuven Lax wrote:
>>
>> In many cases, the writer of the ParDo has no access to the GBK (e.g. the
>> GBK is hidden inside an upstream PTransform that they cannot modify). This
>> is the same reason why RequiresStableInput was made a property of the
>> ParDo, because the GroupByKey is quite often inaccessible.
>>
>> The car analogy doesn't quite apply here, because the runner does have a
>> full view of the pipeline so can satisfy all constraints. The car
>> dealership generally cannot read your mind (thankfully!), so you have to
>> specify what you want. Or to put it another way, the various transforms in
>> a Beam pipeline do not live in isolation. The full pipeline graph is what
>> is executed, and the runner already has to analyze the full graph to run
>> the pipeline (as well as to optimize the pipeline).
>>
>> Reuven
>>
>> On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> I'd suggest Stream instead of Iterator, it has the same semantics and
>>> much better API.
>>>
>>> Still not sure, what is wrong on letting the GBK to decide this. I have
>>> an analogy - if I decide to buy a car, I have to decide *what* car I'm
>>> going to buy (by think about how I'm going to use it) *before* I buy it. I
>>> cannot just buy "a car" and then change it from minivan to sport car based
>>> on my actual need. Same with the GBK - if I want to be able to reiterate
>>> the result, then I should tell it in advance.
>>>
>>> Jan
>>> On 9/27/19 10:50 PM, Kenneth Knowles wrote:
>>>
>>> Good point about sibling fusion requiring this.
>>>
>>> The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already does imply
>>> that the output iterable can be iterated arbitrarily many times.
>>>
>>> I think this should remain the default for all the reasons mentioned.
>>>
>>> We could have opt-in to the weaker KV<K, Iterator<V>> version. Agree
>>> that this is a property of the ParDo. A particular use of a GBK has no idea
>>> what is downstream. If you owned the whole pipeline, a special
>>> ParDo<Iterator<V>, Foo> would work. But to make the types line up, this
>>> would require changes upstream, which is not good.
>>>
>>> Maybe something like this:
>>>
>>> ParDo<Iterable<V>, Foo> {
>>>   @ProcessElement
>>>   void process(@OneShotIterator Iterator<V> iter) {
>>>     ...
>>>   }
>>> }
>>>
>>> I've described all of this in terms of Java SDK. So we would need a
>>> portable representation for all this metadata.
>>>
>>> Kenn
>>>
>>> On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I think the behavior to make explicit is the need to reiterate, not the
>>>> need to handle large results. How large of a result can be handled will
>>>> always be dependent on the runner, and each runner will probably have a
>>>> different definition of large keys. Reiteration however is a logical
>>>> difference in the programming API. Therefore I think it makes sense to
>>>> specify the latter. The need to reiterate is a property of the downstream
>>>> ParDo, so it should be specified there - not on the GBK.
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský <je...@seznam.cz> wrote:
>>>>
>>>>> Ok, I think I understand there might be some benefits of this. Then
>>>>> I'd propose we make this clear on the GBK. If we would support somehing
>>>>> like this:
>>>>>
>>>>>  PCollection<?> input = ....;
>>>>>
>>>>>  input.apply(GroupByKey.withLargeKeys());
>>>>>
>>>>> then SparkRunner could expand this to
>>>>> repartitionAndSortWithinPartitions only on this PTransform, and fallback 
>>>>> to
>>>>> the default (in memory) in other situations. The default expansion of
>>>>> LargeGroupByKey (let's say) would be classic GBK, so that only runners 
>>>>> that
>>>>> need to make sure that they don't break reiterations can expand this.
>>>>>
>>>>> WDYT?
>>>>>
>>>>> Jan
>>>>> On 9/27/19 8:56 PM, Reuven Lax wrote:
>>>>>
>>>>> As I mentioned above, CoGroupByKey already takes advantage of this.
>>>>> Reiterating is not the most common use case, but it's definitely one that
>>>>> comes up. Also keep in mind that this API has supported reiterating for 
>>>>> the
>>>>> past five years (since even before the SDK was donated to Apache).
>>>>> Therefore you should assume that users are relying on it, in ways we might
>>>>> not expect.
>>>>>
>>>>> Historically, Google's Flume system had collections that did not
>>>>> support reiterating (they were even called OneShotCollections to make it
>>>>> clear). This was the source of problems and user frustration, which was 
>>>>> one
>>>>> reason that in the original Dataflow SDK we made sure that these iterables
>>>>> could be reiterated. Another reason why it's advantageous for a runner to
>>>>> support this is allowing for better fusion. If two ParDos A and B both 
>>>>> read
>>>>> from the same GroupByKey, it is nice to be able to fuse them into one
>>>>> logical operator. For this, you'll probably need a shuffle implementation
>>>>> that allows two independent readers from the same shuffle session.
>>>>>
>>>>> How easy it is to implement reiterables that don't have to fit in
>>>>> memory will depend on the runner.  For Dataflow it's possible because the
>>>>> shuffle session is logically persistent, so the runner can simply reread
>>>>> the shuffle session. For other runners with different shuffle
>>>>> implementations, it might be harder to support both properties. Maybe we
>>>>> should introduce a new @RequiresReiteration annotation on ParDo? That way
>>>>> the Spark runner can see this and switch to the in-memory version just for
>>>>> groupings consumed by those ParDos. Runners that already support
>>>>> reiteration can ignore this annotation, so it should be backwards
>>>>> compatible.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský <je...@seznam.cz> wrote:
>>>>>
>>>>>> I'd like to know the use-case. Why would you *need* to actually
>>>>>> iterate the grouped elements twice? By definition the first iteration 
>>>>>> would
>>>>>> have to extract some statistic (or subset of elements that must fit into
>>>>>> memory). This statistic can then be used as another input for the second
>>>>>> iteration. Why not then calculate the statistic in a separate branch in 
>>>>>> the
>>>>>> pipeline and feed it then into the ParDo as side input? That would be
>>>>>> definitely more efficient, because the calculation of the statistic would
>>>>>> be probably combinable (not sure if it is absolutely required to be
>>>>>> combinable, but it seems probable). Even if the calculation would not be
>>>>>> combinable, it is not less efficient than reiterating twice. Why then
>>>>>> support multiple iterations (besides the fact that output of GBK is
>>>>>> Iterable). Am I missing something?
>>>>>>
>>>>>> Jan
>>>>>> On 9/27/19 6:36 PM, Reuven Lax wrote:
>>>>>>
>>>>>> This should be an element in the compatibility matrix as well.
>>>>>>
>>>>>> On Fri, Sep 27, 2019 at 9:26 AM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> I am pretty surprised that we do not have
>>>>>>> a @Category(ValidatesRunner) test in GroupByKeyTest that iterates 
>>>>>>> multiple
>>>>>>> times. That is a major oversight. We should have this test, and it can 
>>>>>>> be
>>>>>>> disabled by the SparkRunner's configuration.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> The Dataflow version does not spill to disk. However Spark's design
>>>>>>>> might require spilling to disk if you want that to be implemented 
>>>>>>>> properly.
>>>>>>>>
>>>>>>>> On Fri, Sep 27, 2019 at 9:08 AM David Morávek <d...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Spark's GBK is currently implemented using `sortBy(key and
>>>>>>>>> value).mapPartition(...)` for non-merging windowing in order to 
>>>>>>>>> support
>>>>>>>>> large keys and large scale shuffles. Merging windowing is implemented 
>>>>>>>>> using
>>>>>>>>> standard GBK (underlying spark impl. uses ListCombiner + Hash 
>>>>>>>>> Grouping),
>>>>>>>>> which is by design unable to support large keys.
>>>>>>>>>
>>>>>>>>> As Jan noted, problem with mapPartition is, that its UDF receives
>>>>>>>>> an Iterator. Only option here is to wrap this iterator to one that 
>>>>>>>>> spills
>>>>>>>>> to disk once an internal buffer is exceeded (the approach suggested by
>>>>>>>>> Reuven). This unfortunately comes with a cost in some cases. The best
>>>>>>>>> approach would be to somehow determine, that user wants multiple 
>>>>>>>>> iterations
>>>>>>>>> and than wrap it in "re-iterator" if necessary. Does anyone have any 
>>>>>>>>> ideas
>>>>>>>>> how to approach this?
>>>>>>>>>
>>>>>>>>> D.
>>>>>>>>>
>>>>>>>>> On Fri, Sep 27, 2019 at 5:46 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> The Beam API was written to support multiple iterations, and
>>>>>>>>>> there are definitely transforms that do so. I believe that 
>>>>>>>>>> CoGroupByKey may
>>>>>>>>>> do this as well with the resulting iterator.
>>>>>>>>>>
>>>>>>>>>> I know that the Dataflow runner is able to handles iterators
>>>>>>>>>> larger than available memory by paging them in from shuffle, which 
>>>>>>>>>> still
>>>>>>>>>> allows for reiterating. It sounds like Spark is less flexible here?
>>>>>>>>>>
>>>>>>>>>> Reuven
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 27, 2019 at 3:04 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> +dev <dev@beam.apache.org> <dev@beam.apache.org>
>>>>>>>>>>>
>>>>>>>>>>> Lukasz, why do you think that users expect to be able to iterate
>>>>>>>>>>> multiple times grouped elements? Besides that it obviously suggests 
>>>>>>>>>>> the
>>>>>>>>>>> 'Iterable'? The way that spark behaves is pretty much analogous to 
>>>>>>>>>>> how
>>>>>>>>>>> MapReduce used to work - in certain cases it calles
>>>>>>>>>>> repartitionAndSortWithinPartitions and then does mapPartition, which
>>>>>>>>>>> accepts Iterator - that is because internally it merge sorts pre 
>>>>>>>>>>> sorted
>>>>>>>>>>> segments. This approach enables to GroupByKey data sets that are 
>>>>>>>>>>> too big to
>>>>>>>>>>> fit into memory (per key).
>>>>>>>>>>>
>>>>>>>>>>> If multiple iterations should be expected by users, we probably
>>>>>>>>>>> should:
>>>>>>>>>>>
>>>>>>>>>>>  a) include that in @ValidatesRunner tests
>>>>>>>>>>>
>>>>>>>>>>>  b) store values in memory on spark, which will break for
>>>>>>>>>>> certain pipelines
>>>>>>>>>>>
>>>>>>>>>>> Because of (b) I think that it would be much better to remove
>>>>>>>>>>> this "expectation" and clearly document that the Iterable is not 
>>>>>>>>>>> supposed
>>>>>>>>>>> to be iterated multiple times.
>>>>>>>>>>>
>>>>>>>>>>> Jan
>>>>>>>>>>> On 9/27/19 9:27 AM, Jan Lukavský wrote:
>>>>>>>>>>>
>>>>>>>>>>> I pretty much think so, because that is how Spark works. The
>>>>>>>>>>> Iterable inside is really an Iterator, which cannot be iterated 
>>>>>>>>>>> multiple
>>>>>>>>>>> times.
>>>>>>>>>>>
>>>>>>>>>>> Jan
>>>>>>>>>>> On 9/27/19 2:00 AM, Lukasz Cwik wrote:
>>>>>>>>>>>
>>>>>>>>>>> Jan, in Beam users expect to be able to iterate the GBK output
>>>>>>>>>>> multiple times even from within the same ParDo.
>>>>>>>>>>> Is this something that Beam on Spark Runner never supported?
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Sep 26, 2019 at 6:50 AM Jan Lukavský <je...@seznam.cz>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Gershi,
>>>>>>>>>>>>
>>>>>>>>>>>> could you please outline the pipeline you are trying to
>>>>>>>>>>>> execute? Basically, you cannot iterate the Iterable multiple times 
>>>>>>>>>>>> in
>>>>>>>>>>>> single ParDo. It should be possible, though, to apply multiple 
>>>>>>>>>>>> ParDos to
>>>>>>>>>>>> output from GroupByKey.
>>>>>>>>>>>>
>>>>>>>>>>>> Jan
>>>>>>>>>>>> On 9/26/19 3:32 PM, Gershi, Noam wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I want to iterate multiple times on the Iterable<V> (the output
>>>>>>>>>>>> of GroupByKey transformation)
>>>>>>>>>>>>
>>>>>>>>>>>> When my Runner is SparkRunner, I get an exception:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: ValueIterator can't
>>>>>>>>>>>> be iterated more than once,otherwise there could be data lost
>>>>>>>>>>>>
>>>>>>>>>>>>                 at
>>>>>>>>>>>> org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
>>>>>>>>>>>>
>>>>>>>>>>>>                 at
>>>>>>>>>>>> java.lang.Iterable.spliterator(Iterable.java:101)
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I understood I can branch the pipeline after GroupByKey into
>>>>>>>>>>>> multiple transformation and iterate in each of them once on the 
>>>>>>>>>>>> Iterable<V>.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Is there a better way for that?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> [image: citi_logo_mail][image: citi_logo_mail]*Noam Gershi*
>>>>>>>>>>>>
>>>>>>>>>>>> Software Developer
>>>>>>>>>>>>
>>>>>>>>>>>> *T*: +972 (3) 7405718 <+972%203-740-5718>
>>>>>>>>>>>>
>>>>>>>>>>>> [image: Mail_signature_blue]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to