> The fact that the annotation on the ParDo "changes" the GroupByKey implementation is very specific to the Spark runner implementation.

I don't quite agree. It is not very specific to Spark, it is specific to generally all runners, that produce grouped elements in a way that is not reiterable. That is the key property. The example you gave with HDFS does not satisfy this condition (files on HDFS are certainly reiterable), and that's why no change to the GBK is needed (it actually already has the required property). A quick look at what FlinkRunner (at least non portable does) is that it implements GBK using reducing elements into List. That is going to crash on big PCollection, which is even nicely documented:

    * <p>For internal use to translate {@link GroupByKey}. For a large {@link 
PCollection} this is
    * expected to crash!

If this is fixed, then it is likely to start behave the same as Spark. So actually I think the opposite is true - Dataflow is a special case, because of how its internal shuffle service works.

In general I sympathize with the worry about non-local effects. Beam
is already full of them (e.g. a Window.into statement effects downstream GroupByKeys). In each case where they were added there was extensive debate and discussion (Windowing semantics were debated for many months), exactly because there was concern over adding these non-local effects. In every case, no other good solution could be found. For the case of windowing for example, it was often easy to propose simple local APIs (e.g. just pass the window fn as a parameter to GroupByKey), however all of these local solutions ended up not working for important use cases when we analyzed them more deeply.

That is very interesting. Could you elaborate more about some examples of the use cases which didn't work? I'd like to try to match it against how Euphoria is structured, it should be more resistant to this non-local effects, because it very often bundles together multiple Beam's primitives to single transform - ReduceByKey is one example of this, if is actually mix of Window.into() + GBK + ParDo, Although it might look like if this transform can be broken down to something else, then it is not primitive (euphoria has no native equivalent of GBK itself), but it has several other nice implications - that is that Combine now becomes a special case of RBK. It now becomes only a question of where and how you can "run" the reduce function. The logic is absolutely equal. This can be worked in more detail and actually show, that even Combine and RBK can be decribed by a more general stateful operation (ReduceStateByKey), and so finally Euphoria actually has only two really "primitive" operations - these are FlatMap (basically stateless ParDo) and RSBK. As I already mentioned on some other thread, when stateful ParDo would support merging windows, it can be shown that both Combine and GBK become special cases of this.

As you mentioned below, I do think it's perfectly reasonable for a DSL
to impose its own semantics. Scio already does this - the raw Beam API is used by a DSL as a substrate, but the DSL does not need to blindly mirror the semantics of the raw Beam API - at least in my opinion!

Sure, but currently, there is no way for DSL to "hook" into runner, so it has to use raw Beam SDK, and so this will fail in cases like this - where Beam actually has stronger guarantees than it is required by the DSL. It would be cool if we could find a way to do that - this pretty much aligns with another question raised on ML, about the possibility to override a default implementation of a PTransform for specific pipeline.

Jan


On 9/29/19 7:46 PM, Reuven Lax wrote:
Jan,

The fact that the annotation on the ParDo "changes" the GroupByKey implementation is very specific to the Spark runner implementation. You can imagine another runner that simply writes out files in HDFS to implement a GroupByKey - this GroupByKey implementation is agnostic whether the result will be reiterated or not; in this case it is very much the ParDo implementation that changes to implement a reiterable. vI think you don't like the fact that an annotation on the ParDo will have a non-local effect on the implementation of the GroupByKey upstream. However arguably the non-local effect is just a quirk of how the Spark runner is implemented - other runners might have a local effect.

In general I sympathize with the worry about non-local effects. Beam is already full of them (e.g. a Window.into statement effects downstream GroupByKeys). In each case where they were added there was extensive debate and discussion (Windowing semantics were debated for many months), exactly because there was concern over adding these non-local effects. In every case, no other good solution could be found. For the case of windowing for example, it was often easy to propose simple local APIs (e.g. just pass the window fn as a parameter to GroupByKey), however all of these local solutions ended up not working for important use cases when we analyzed them more deeply.

As you mentioned below, I do think it's perfectly reasonable for a DSL to impose its own semantics. Scio already does this - the raw Beam API is used by a DSL as a substrate, but the DSL does not need to blindly mirror the semantics of the raw Beam API - at least in my opinion!

Reuven

On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    I understand the concerns. Still, it looks a little like we want
    to be able to modify behavior of an object from inside a submodule
    - quite like if my subprogram would accept a Map interface, but
    internally I would say "hey, this is supposed to be a HashMap,
    please change it so". Because of how pipeline is constructed, we
    can do that, the question is if there really isn't a better solution.

    What I do not like about the proposed solution:

     1) to specify that the grouped elements are supposed to be
    iterated only once can be done only on ParDo, although there are
    other (higher level) PTransforms, that can consume output of GBK

     2) the annontation on ParDo is by definition generic - i.e. can
    be used on input which is not output of GBK, which makes no sense

     3) we modify the behavior to unlock some optimizations (or change
    of behavior of the GBK itself), users will not understand that

     4) the annotation somewhat arbitrarily modifies data types
    passed, that is counter-intuitive and will be source of confusion

    I think that a solution that solves the above problems (but brings
    somoe new, as always :-)), could be to change the output of GBK
    from PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That
    way we can control which operators (and how) consume the grouping,
    and we can enable these transforms to specify additional
    parameters (like how they want to consume the grouping). It is
    obviously a breaking change (although can be probably made
    backwards compatible) and it would very much likely involve a
    substantial work. But maybe there are some other not yet discussed
    options.

    Jan

    On 9/28/19 6:46 AM, Reuven Lax wrote:
    In many cases, the writer of the ParDo has no access to the GBK
    (e.g. the GBK is hidden inside an upstream PTransform that they
    cannot modify). This is the same reason why RequiresStableInput
    was made a property of the ParDo, because the GroupByKey is quite
    often inaccessible.

    The car analogy doesn't quite apply here, because the runner does
    have a full view of the pipeline so can satisfy all constraints.
    The car dealership generally cannot read your mind (thankfully!),
    so you have to specify what you want. Or to put it another way,
    the various transforms in a Beam pipeline do not live in
    isolation. The full pipeline graph is what is executed, and the
    runner already has to analyze the full graph to run the pipeline
    (as well as to optimize the pipeline).

    Reuven

    On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        I'd suggest Stream instead of Iterator, it has the same
        semantics and much better API.

        Still not sure, what is wrong on letting the GBK to decide
        this. I have an analogy - if I decide to buy a car, I have to
        decide *what* car I'm going to buy (by think about how I'm
        going to use it) *before* I buy it. I cannot just buy "a car"
        and then change it from minivan to sport car based on my
        actual need. Same with the GBK - if I want to be able to
        reiterate the result, then I should tell it in advance.

        Jan

        On 9/27/19 10:50 PM, Kenneth Knowles wrote:
        Good point about sibling fusion requiring this.

        The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already
        does imply that the output iterable can be iterated
        arbitrarily many times.

        I think this should remain the default for all the reasons
        mentioned.

        We could have opt-in to the weaker KV<K, Iterator<V>>
        version. Agree that this is a property of the ParDo. A
        particular use of a GBK has no idea what is downstream. If
        you owned the whole pipeline, a special ParDo<Iterator<V>,
        Foo> would work. But to make the types line up, this would
        require changes upstream, which is not good.

        Maybe something like this:

        ParDo<Iterable<V>, Foo> {
          @ProcessElement
          void process(@OneShotIterator Iterator<V> iter) {
            ...
          }
        }

        I've described all of this in terms of Java SDK. So we would
        need a portable representation for all this metadata.

        Kenn

        On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax
        <[email protected] <mailto:[email protected]>> wrote:

            I think the behavior to make explicit is the need to
            reiterate, not the need to handle large results. How
            large of a result can be handled will always be
            dependent on the runner, and each runner will probably
            have a different definition of large keys. Reiteration
            however is a logical difference in the programming API.
            Therefore I think it makes sense to specify the latter.
            The need to reiterate is a property of the downstream
            ParDo, so it should be specified there - not on the GBK.

            Reuven

            On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                Ok, I think I understand there might be some
                benefits of this. Then I'd propose we make this
                clear on the GBK. If we would support somehing like
                this:

                 PCollection<?> input = ....;

                 input.apply(GroupByKey.withLargeKeys());

                then SparkRunner could expand this to
                repartitionAndSortWithinPartitions only on this
                PTransform, and fallback to the default (in memory)
                in other situations. The default expansion of
                LargeGroupByKey (let's say) would be classic GBK, so
                that only runners that need to make sure that they
                don't break reiterations can expand this.

                WDYT?

                Jan

                On 9/27/19 8:56 PM, Reuven Lax wrote:
                As I mentioned above, CoGroupByKey already takes
                advantage of this. Reiterating is not the most
                common use case, but it's definitely one that comes
                up. Also keep in mind that this API has supported
                reiterating for the past five years (since even
                before the SDK was donated to Apache). Therefore
                you should assume that users are relying on it, in
                ways we might not expect.

                Historically, Google's Flume system had collections
                that did not support reiterating (they were even
                called OneShotCollections to make it clear). This
                was the source of problems and user frustration,
                which was one reason that in the original Dataflow
                SDK we made sure that these iterables could be
                reiterated. Another reason why it's advantageous
                for a runner to support this is allowing for better
                fusion. If two ParDos A and B both read from the
                same GroupByKey, it is nice to be able to fuse them
                into one logical operator. For this, you'll
                probably need a shuffle implementation that allows
                two independent readers from the same shuffle session.

                How easy it is to implement reiterables that don't
                have to fit in memory will depend on the runner. 
                For Dataflow it's possible because the shuffle
                session is logically persistent, so the runner can
                simply reread the shuffle session. For other
                runners with different shuffle implementations, it
                might be harder to support both properties. Maybe
                we should introduce a new @RequiresReiteration
                annotation on ParDo? That way the Spark runner can
                see this and switch to the in-memory version just
                for groupings consumed by those ParDos. Runners
                that already support reiteration can ignore this
                annotation, so it should be backwards compatible.

                Reuven

                On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    I'd like to know the use-case. Why would you
                    *need* to actually iterate the grouped elements
                    twice? By definition the first iteration would
                    have to extract some statistic (or subset of
                    elements that must fit into memory). This
                    statistic can then be used as another input for
                    the second iteration. Why not then calculate
                    the statistic in a separate branch in the
                    pipeline and feed it then into the ParDo as
                    side input? That would be definitely more
                    efficient, because the calculation of the
                    statistic would be probably combinable (not
                    sure if it is absolutely required to be
                    combinable, but it seems probable). Even if the
                    calculation would not be combinable, it is not
                    less efficient than reiterating twice. Why then
                    support multiple iterations (besides the fact
                    that output of GBK is Iterable). Am I missing
                    something?

                    Jan

                    On 9/27/19 6:36 PM, Reuven Lax wrote:
                    This should be an element in the compatibility
                    matrix as well.

                    On Fri, Sep 27, 2019 at 9:26 AM Kenneth
                    Knowles <[email protected]
                    <mailto:[email protected]>> wrote:

                        I am pretty surprised that we do not have
                        a @Category(ValidatesRunner) test
                        in GroupByKeyTest that iterates multiple
                        times. That is a major oversight. We
                        should have this test, and it can be
                        disabled by the SparkRunner's configuration.

                        Kenn

                        On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax
                        <[email protected]
                        <mailto:[email protected]>> wrote:

                            The Dataflow version does not spill to
                            disk. However Spark's design might
                            require spilling to disk if you want
                            that to be implemented properly.

                            On Fri, Sep 27, 2019 at 9:08 AM David
                            Morávek <[email protected]
                            <mailto:[email protected]>> wrote:

                                Hi,

                                Spark's GBK is currently
                                implemented using `sortBy(key and
                                value).mapPartition(...)` for
                                non-merging windowing in order to
                                support large keys and large scale
                                shuffles. Merging windowing is
                                implemented using standard GBK
                                (underlying spark impl. uses
                                ListCombiner + Hash Grouping),
                                which is by design unable to
                                support large keys.

                                As Jan noted, problem with
                                mapPartition is, that its UDF
                                receives an Iterator. Only option
                                here is to wrap this iterator to
                                one that spills to disk once an
                                internal buffer is exceeded (the
                                approach suggested by Reuven).
                                This unfortunately comes with a
                                cost in some cases. The best
                                approach would be to somehow
                                determine, that user wants
                                multiple iterations and than wrap
                                it in "re-iterator" if necessary.
                                Does anyone have any ideas how to
                                approach this?

                                D.

                                On Fri, Sep 27, 2019 at 5:46 PM
                                Reuven Lax <[email protected]
                                <mailto:[email protected]>> wrote:

                                    The Beam API was written to
                                    support multiple iterations,
                                    and there are definitely
                                    transforms that do so. I
                                    believe that CoGroupByKey may
                                    do this as well with the
                                    resulting iterator.

                                    I know that the Dataflow
                                    runner is able to handles
                                    iterators larger than
                                    available memory by paging
                                    them in from shuffle, which
                                    still allows for reiterating.
                                    It sounds like Spark is less
                                    flexible here?

                                    Reuven

                                    On Fri, Sep 27, 2019 at 3:04
                                    AM Jan Lukavský
                                    <[email protected]
                                    <mailto:[email protected]>> wrote:

                                        +dev <[email protected]>
                                        <mailto:[email protected]>

                                        Lukasz, why do you think
                                        that users expect to be
                                        able to iterate multiple
                                        times grouped elements?
                                        Besides that it obviously
                                        suggests the 'Iterable'?
                                        The way that spark behaves
                                        is pretty much analogous
                                        to how MapReduce used to
                                        work - in certain cases it
                                        calles
                                        repartitionAndSortWithinPartitions
                                        and then does
                                        mapPartition, which
                                        accepts Iterator - that is
                                        because internally it
                                        merge sorts pre sorted
                                        segments. This approach
                                        enables to GroupByKey data
                                        sets that are too big to
                                        fit into memory (per key).

                                        If multiple iterations
                                        should be expected by
                                        users, we probably should:

                                         a) include that in
                                        @ValidatesRunner tests

                                         b) store values in memory
                                        on spark, which will break
                                        for certain pipelines

                                        Because of (b) I think
                                        that it would be much
                                        better to remove this
                                        "expectation" and clearly
                                        document that the Iterable
                                        is not supposed to be
                                        iterated multiple times.

                                        Jan

                                        On 9/27/19 9:27 AM, Jan
                                        Lukavský wrote:

                                        I pretty much think so,
                                        because that is how Spark
                                        works. The Iterable
                                        inside is really an
                                        Iterator, which cannot be
                                        iterated multiple times.

                                        Jan

                                        On 9/27/19 2:00 AM,
                                        Lukasz Cwik wrote:
                                        Jan, in Beam users
                                        expect to be able to
                                        iterate the GBK output
                                        multiple times even from
                                        within the same ParDo.
                                        Is this something that
                                        Beam on Spark Runner
                                        never supported?

                                        On Thu, Sep 26, 2019 at
                                        6:50 AM Jan Lukavský
                                        <[email protected]
                                        <mailto:[email protected]>>
                                        wrote:

                                            Hi Gershi,

                                            could you please
                                            outline the pipeline
                                            you are trying to
                                            execute? Basically,
                                            you cannot iterate
                                            the Iterable
                                            multiple times in
                                            single ParDo. It
                                            should be possible,
                                            though, to apply
                                            multiple ParDos to
                                            output from GroupByKey.

                                            Jan

                                            On 9/26/19 3:32 PM,
                                            Gershi, Noam wrote:

                                            Hi,

                                            I want to iterate
                                            multiple times on
                                            the Iterable<V>
                                            (the output of
                                            GroupByKey
                                            transformation)

                                            When my Runner is
                                            SparkRunner, I get
                                            an exception:

                                            Caused by:
                                            java.lang.IllegalStateException:
                                            ValueIterator can't
                                            be iterated more
                                            than once,otherwise
                                            there could be data
                                            lost

                                            at
                                            
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)

                                            at
                                            
java.lang.Iterable.spliterator(Iterable.java:101)

                                            I understood I can
                                            branch the pipeline
                                            after GroupByKey
                                            into multiple
                                            transformation and
                                            iterate in each of
                                            them once on the
                                            Iterable<V>.

                                            Is there a better
                                            way for that?

                                            citi_logo_mailciti_logo_mail*Noam
                                            Gershi*

                                            Software Developer

                                            *T*:+972 (3)
                                            7405718
                                            <tel:+972%203-740-5718>

                                            Mail_signature_blue

Reply via email to