> The fact that the annotation on the ParDo "changes" the GroupByKey
implementation is very specific to the Spark runner implementation.
I don't quite agree. It is not very specific to Spark, it is specific to
generally all runners, that produce grouped elements in a way that is
not reiterable. That is the key property. The example you gave with HDFS
does not satisfy this condition (files on HDFS are certainly
reiterable), and that's why no change to the GBK is needed (it actually
already has the required property). A quick look at what FlinkRunner (at
least non portable does) is that it implements GBK using reducing
elements into List. That is going to crash on big PCollection, which is
even nicely documented:
* <p>For internal use to translate {@link GroupByKey}. For a large {@link
PCollection} this is
* expected to crash!
If this is fixed, then it is likely to start behave the same as Spark.
So actually I think the opposite is true - Dataflow is a special case,
because of how its internal shuffle service works.
In general I sympathize with the worry about non-local effects. Beam
is already full of them (e.g. a Window.into statement effects downstream
GroupByKeys). In each case where they were added there was extensive
debate and discussion (Windowing semantics were debated for many
months), exactly because there was concern over adding these non-local
effects. In every case, no other good solution could be found. For the
case of windowing for example, it was often easy to propose simple local
APIs (e.g. just pass the window fn as a parameter to GroupByKey),
however all of these local solutions ended up not working for important
use cases when we analyzed them more deeply.
That is very interesting. Could you elaborate more about some examples
of the use cases which didn't work? I'd like to try to match it against
how Euphoria is structured, it should be more resistant to this
non-local effects, because it very often bundles together multiple
Beam's primitives to single transform - ReduceByKey is one example of
this, if is actually mix of Window.into() + GBK + ParDo, Although it
might look like if this transform can be broken down to something else,
then it is not primitive (euphoria has no native equivalent of GBK
itself), but it has several other nice implications - that is that
Combine now becomes a special case of RBK. It now becomes only a
question of where and how you can "run" the reduce function. The logic
is absolutely equal. This can be worked in more detail and actually
show, that even Combine and RBK can be decribed by a more general
stateful operation (ReduceStateByKey), and so finally Euphoria actually
has only two really "primitive" operations - these are FlatMap
(basically stateless ParDo) and RSBK. As I already mentioned on some
other thread, when stateful ParDo would support merging windows, it can
be shown that both Combine and GBK become special cases of this.
As you mentioned below, I do think it's perfectly reasonable for a DSL
to impose its own semantics. Scio already does this - the raw Beam API
is used by a DSL as a substrate, but the DSL does not need to blindly
mirror the semantics of the raw Beam API - at least in my opinion!
Sure, but currently, there is no way for DSL to "hook" into runner, so
it has to use raw Beam SDK, and so this will fail in cases like this -
where Beam actually has stronger guarantees than it is required by the
DSL. It would be cool if we could find a way to do that - this pretty
much aligns with another question raised on ML, about the possibility to
override a default implementation of a PTransform for specific pipeline.
Jan
On 9/29/19 7:46 PM, Reuven Lax wrote:
Jan,
The fact that the annotation on the ParDo "changes" the GroupByKey
implementation is very specific to the Spark runner implementation.
You can imagine another runner that simply writes out files in HDFS to
implement a GroupByKey - this GroupByKey implementation is agnostic
whether the result will be reiterated or not; in this case it is very
much the ParDo implementation that changes to implement a reiterable.
vI think you don't like the fact that an annotation on the ParDo will
have a non-local effect on the implementation of the GroupByKey
upstream. However arguably the non-local effect is just a quirk of how
the Spark runner is implemented - other runners might have a local effect.
In general I sympathize with the worry about non-local effects. Beam
is already full of them (e.g. a Window.into statement effects
downstream GroupByKeys). In each case where they were added there was
extensive debate and discussion (Windowing semantics were debated for
many months), exactly because there was concern over adding these
non-local effects. In every case, no other good solution could be
found. For the case of windowing for example, it was often easy to
propose simple local APIs (e.g. just pass the window fn as a parameter
to GroupByKey), however all of these local solutions ended up not
working for important use cases when we analyzed them more deeply.
As you mentioned below, I do think it's perfectly reasonable for a DSL
to impose its own semantics. Scio already does this - the raw Beam API
is used by a DSL as a substrate, but the DSL does not need to blindly
mirror the semantics of the raw Beam API - at least in my opinion!
Reuven
On Sat, Sep 28, 2019 at 12:26 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
I understand the concerns. Still, it looks a little like we want
to be able to modify behavior of an object from inside a submodule
- quite like if my subprogram would accept a Map interface, but
internally I would say "hey, this is supposed to be a HashMap,
please change it so". Because of how pipeline is constructed, we
can do that, the question is if there really isn't a better solution.
What I do not like about the proposed solution:
1) to specify that the grouped elements are supposed to be
iterated only once can be done only on ParDo, although there are
other (higher level) PTransforms, that can consume output of GBK
2) the annontation on ParDo is by definition generic - i.e. can
be used on input which is not output of GBK, which makes no sense
3) we modify the behavior to unlock some optimizations (or change
of behavior of the GBK itself), users will not understand that
4) the annotation somewhat arbitrarily modifies data types
passed, that is counter-intuitive and will be source of confusion
I think that a solution that solves the above problems (but brings
somoe new, as always :-)), could be to change the output of GBK
from PCollection<K, Iterable<V>> to GroupedPCollection<K, V>. That
way we can control which operators (and how) consume the grouping,
and we can enable these transforms to specify additional
parameters (like how they want to consume the grouping). It is
obviously a breaking change (although can be probably made
backwards compatible) and it would very much likely involve a
substantial work. But maybe there are some other not yet discussed
options.
Jan
On 9/28/19 6:46 AM, Reuven Lax wrote:
In many cases, the writer of the ParDo has no access to the GBK
(e.g. the GBK is hidden inside an upstream PTransform that they
cannot modify). This is the same reason why RequiresStableInput
was made a property of the ParDo, because the GroupByKey is quite
often inaccessible.
The car analogy doesn't quite apply here, because the runner does
have a full view of the pipeline so can satisfy all constraints.
The car dealership generally cannot read your mind (thankfully!),
so you have to specify what you want. Or to put it another way,
the various transforms in a Beam pipeline do not live in
isolation. The full pipeline graph is what is executed, and the
runner already has to analyze the full graph to run the pipeline
(as well as to optimize the pipeline).
Reuven
On Fri, Sep 27, 2019 at 2:35 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
I'd suggest Stream instead of Iterator, it has the same
semantics and much better API.
Still not sure, what is wrong on letting the GBK to decide
this. I have an analogy - if I decide to buy a car, I have to
decide *what* car I'm going to buy (by think about how I'm
going to use it) *before* I buy it. I cannot just buy "a car"
and then change it from minivan to sport car based on my
actual need. Same with the GBK - if I want to be able to
reiterate the result, then I should tell it in advance.
Jan
On 9/27/19 10:50 PM, Kenneth Knowles wrote:
Good point about sibling fusion requiring this.
The type PTransform<KV<K, V>, KV<K, Iterable<V>>> already
does imply that the output iterable can be iterated
arbitrarily many times.
I think this should remain the default for all the reasons
mentioned.
We could have opt-in to the weaker KV<K, Iterator<V>>
version. Agree that this is a property of the ParDo. A
particular use of a GBK has no idea what is downstream. If
you owned the whole pipeline, a special ParDo<Iterator<V>,
Foo> would work. But to make the types line up, this would
require changes upstream, which is not good.
Maybe something like this:
ParDo<Iterable<V>, Foo> {
@ProcessElement
void process(@OneShotIterator Iterator<V> iter) {
...
}
}
I've described all of this in terms of Java SDK. So we would
need a portable representation for all this metadata.
Kenn
On Fri, Sep 27, 2019 at 12:13 PM Reuven Lax
<[email protected] <mailto:[email protected]>> wrote:
I think the behavior to make explicit is the need to
reiterate, not the need to handle large results. How
large of a result can be handled will always be
dependent on the runner, and each runner will probably
have a different definition of large keys. Reiteration
however is a logical difference in the programming API.
Therefore I think it makes sense to specify the latter.
The need to reiterate is a property of the downstream
ParDo, so it should be specified there - not on the GBK.
Reuven
On Fri, Sep 27, 2019 at 12:05 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Ok, I think I understand there might be some
benefits of this. Then I'd propose we make this
clear on the GBK. If we would support somehing like
this:
PCollection<?> input = ....;
input.apply(GroupByKey.withLargeKeys());
then SparkRunner could expand this to
repartitionAndSortWithinPartitions only on this
PTransform, and fallback to the default (in memory)
in other situations. The default expansion of
LargeGroupByKey (let's say) would be classic GBK, so
that only runners that need to make sure that they
don't break reiterations can expand this.
WDYT?
Jan
On 9/27/19 8:56 PM, Reuven Lax wrote:
As I mentioned above, CoGroupByKey already takes
advantage of this. Reiterating is not the most
common use case, but it's definitely one that comes
up. Also keep in mind that this API has supported
reiterating for the past five years (since even
before the SDK was donated to Apache). Therefore
you should assume that users are relying on it, in
ways we might not expect.
Historically, Google's Flume system had collections
that did not support reiterating (they were even
called OneShotCollections to make it clear). This
was the source of problems and user frustration,
which was one reason that in the original Dataflow
SDK we made sure that these iterables could be
reiterated. Another reason why it's advantageous
for a runner to support this is allowing for better
fusion. If two ParDos A and B both read from the
same GroupByKey, it is nice to be able to fuse them
into one logical operator. For this, you'll
probably need a shuffle implementation that allows
two independent readers from the same shuffle session.
How easy it is to implement reiterables that don't
have to fit in memory will depend on the runner.
For Dataflow it's possible because the shuffle
session is logically persistent, so the runner can
simply reread the shuffle session. For other
runners with different shuffle implementations, it
might be harder to support both properties. Maybe
we should introduce a new @RequiresReiteration
annotation on ParDo? That way the Spark runner can
see this and switch to the in-memory version just
for groupings consumed by those ParDos. Runners
that already support reiteration can ignore this
annotation, so it should be backwards compatible.
Reuven
On Fri, Sep 27, 2019 at 10:57 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
I'd like to know the use-case. Why would you
*need* to actually iterate the grouped elements
twice? By definition the first iteration would
have to extract some statistic (or subset of
elements that must fit into memory). This
statistic can then be used as another input for
the second iteration. Why not then calculate
the statistic in a separate branch in the
pipeline and feed it then into the ParDo as
side input? That would be definitely more
efficient, because the calculation of the
statistic would be probably combinable (not
sure if it is absolutely required to be
combinable, but it seems probable). Even if the
calculation would not be combinable, it is not
less efficient than reiterating twice. Why then
support multiple iterations (besides the fact
that output of GBK is Iterable). Am I missing
something?
Jan
On 9/27/19 6:36 PM, Reuven Lax wrote:
This should be an element in the compatibility
matrix as well.
On Fri, Sep 27, 2019 at 9:26 AM Kenneth
Knowles <[email protected]
<mailto:[email protected]>> wrote:
I am pretty surprised that we do not have
a @Category(ValidatesRunner) test
in GroupByKeyTest that iterates multiple
times. That is a major oversight. We
should have this test, and it can be
disabled by the SparkRunner's configuration.
Kenn
On Fri, Sep 27, 2019 at 9:24 AM Reuven Lax
<[email protected]
<mailto:[email protected]>> wrote:
The Dataflow version does not spill to
disk. However Spark's design might
require spilling to disk if you want
that to be implemented properly.
On Fri, Sep 27, 2019 at 9:08 AM David
Morávek <[email protected]
<mailto:[email protected]>> wrote:
Hi,
Spark's GBK is currently
implemented using `sortBy(key and
value).mapPartition(...)` for
non-merging windowing in order to
support large keys and large scale
shuffles. Merging windowing is
implemented using standard GBK
(underlying spark impl. uses
ListCombiner + Hash Grouping),
which is by design unable to
support large keys.
As Jan noted, problem with
mapPartition is, that its UDF
receives an Iterator. Only option
here is to wrap this iterator to
one that spills to disk once an
internal buffer is exceeded (the
approach suggested by Reuven).
This unfortunately comes with a
cost in some cases. The best
approach would be to somehow
determine, that user wants
multiple iterations and than wrap
it in "re-iterator" if necessary.
Does anyone have any ideas how to
approach this?
D.
On Fri, Sep 27, 2019 at 5:46 PM
Reuven Lax <[email protected]
<mailto:[email protected]>> wrote:
The Beam API was written to
support multiple iterations,
and there are definitely
transforms that do so. I
believe that CoGroupByKey may
do this as well with the
resulting iterator.
I know that the Dataflow
runner is able to handles
iterators larger than
available memory by paging
them in from shuffle, which
still allows for reiterating.
It sounds like Spark is less
flexible here?
Reuven
On Fri, Sep 27, 2019 at 3:04
AM Jan Lukavský
<[email protected]
<mailto:[email protected]>> wrote:
+dev <[email protected]>
<mailto:[email protected]>
Lukasz, why do you think
that users expect to be
able to iterate multiple
times grouped elements?
Besides that it obviously
suggests the 'Iterable'?
The way that spark behaves
is pretty much analogous
to how MapReduce used to
work - in certain cases it
calles
repartitionAndSortWithinPartitions
and then does
mapPartition, which
accepts Iterator - that is
because internally it
merge sorts pre sorted
segments. This approach
enables to GroupByKey data
sets that are too big to
fit into memory (per key).
If multiple iterations
should be expected by
users, we probably should:
a) include that in
@ValidatesRunner tests
b) store values in memory
on spark, which will break
for certain pipelines
Because of (b) I think
that it would be much
better to remove this
"expectation" and clearly
document that the Iterable
is not supposed to be
iterated multiple times.
Jan
On 9/27/19 9:27 AM, Jan
Lukavský wrote:
I pretty much think so,
because that is how Spark
works. The Iterable
inside is really an
Iterator, which cannot be
iterated multiple times.
Jan
On 9/27/19 2:00 AM,
Lukasz Cwik wrote:
Jan, in Beam users
expect to be able to
iterate the GBK output
multiple times even from
within the same ParDo.
Is this something that
Beam on Spark Runner
never supported?
On Thu, Sep 26, 2019 at
6:50 AM Jan Lukavský
<[email protected]
<mailto:[email protected]>>
wrote:
Hi Gershi,
could you please
outline the pipeline
you are trying to
execute? Basically,
you cannot iterate
the Iterable
multiple times in
single ParDo. It
should be possible,
though, to apply
multiple ParDos to
output from GroupByKey.
Jan
On 9/26/19 3:32 PM,
Gershi, Noam wrote:
Hi,
I want to iterate
multiple times on
the Iterable<V>
(the output of
GroupByKey
transformation)
When my Runner is
SparkRunner, I get
an exception:
Caused by:
java.lang.IllegalStateException:
ValueIterator can't
be iterated more
than once,otherwise
there could be data
lost
at
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions$GroupByKeyIterator$ValueIterator.iterator(GroupNonMergingWindowsFunctions.java:221)
at
java.lang.Iterable.spliterator(Iterable.java:101)
I understood I can
branch the pipeline
after GroupByKey
into multiple
transformation and
iterate in each of
them once on the
Iterable<V>.
Is there a better
way for that?
citi_logo_mailciti_logo_mail*Noam
Gershi*
Software Developer
*T*:+972 (3)
7405718
<tel:+972%203-740-5718>
Mail_signature_blue