Ah! I understand now. Both GroupByKey _and_ CombineValues are joined
together and both run locally on the worker first. I forgot that GroupByKey
is also 'lifted'. So we groupbykey locally and drop the extraneous None's
locally so we don't need to unnecessarily communicate them back to the full
groupbykey.

Thanks for the clarification!

On Fri, Jan 26, 2024 at 12:03 PM Robert Bradshaw <rober...@google.com>
wrote:

> On Fri, Jan 26, 2024 at 8:43 AM Joey Tran <joey.t...@schrodinger.com>
> wrote:
> >
> > Hmm, I think I might still be missing something. CombinePerKey is made
> up of "GBK() | CombineValues". Pulling it out into the Distinct, Distinct
> looks like:
> >
> > def Distinct(pcoll):  # pylint: disable=invalid-name
> >   """Produces a PCollection containing distinct elements of a
> PCollection."""
> >   return (
> >      pcoll
> >      | 'ToPairs' >> Map(lambda v: (v, None))
> >       | 'Group' >> GroupByKey()
> >       | 'CombineValues >> CombineValues(lambda vs: None)
> >       | 'Distinct' >> Keys())
> >
> > Does the combiner lifting somehow make the GroupByKey operation more
> efficient despite coming after it? My intuition would suggest that we could
> just remove the `CombineValues` altogether
>
> The key property of CombineFns is that they are commutative and
> associative which permits an optimization called combiner lifting.
> Specifically, the operation
>
>     GroupByKey() | CombineValues(C)
>
> re-written into
>
>     PartialCombineUsingLocalBufferMap(C) | GroupByKey() | FinalCombine(C)
>
> that pretty much every runner supports (going back to the days of the
> original MapReduce), which is what can make this so much more
> efficient.
>
>
> https://github.com/apache/beam/blob/release-2.21.0/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py#L669
>
> I am, unfortunately, coming up short in finding good documentation on
> this (Apache Beam specific or otherwise).
>
>
> > On Fri, Jan 26, 2024 at 11:33 AM Robert Bradshaw via dev <
> dev@beam.apache.org> wrote:
> >>
> >> This is because it allows us to do some of the deduplication before
> >> shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one
> >> worker and [B, B, B, B, C, C] on another. Rather than passing all that
> >> data through the GroupByKey (which involves (relatively) expensive
> >> materialization and cross-machine traffic, with this form the first
> >> worker will only emit [A, B] and the second [B, C] and only the B
> >> needs to be deduplicated post-shuffle.
> >>
> >> Wouldn't hurt to have a comment to that effect there.
> >>
> >> https://beam.apache.org/documentation/programming-guide/#combine
> >>
> >> On Fri, Jan 26, 2024 at 8:22 AM Joey Tran <joey.t...@schrodinger.com>
> wrote:
> >> >
> >> > Hey all,
> >> >
> >> > I was poking around and looking at `Distinct` and was confused about
> why it was implemented the way it was.
> >> >
> >> > Reproduced here:
> >> > @ptransform_fn
> >> > @typehints.with_input_types(T)
> >> > @typehints.with_output_types(T)
> >> > def Distinct(pcoll):  # pylint: disable=invalid-name
> >> >   """Produces a PCollection containing distinct elements of a
> PCollection."""
> >> >   return (
> >> >       pcoll
> >> >       | 'ToPairs' >> Map(lambda v: (v, None))
> >> >       | 'Group' >> CombinePerKey(lambda vs: None)
> >> >       | 'Distinct' >> Keys())
> >> >
> >> > Could anyone clarify why we'd use a `CombinePerKey` instead of just
> using `GroupByKey`?
> >> >
> >> > Cheers,
> >> > Joey
>

Reply via email to