Hmm, I think I might still be missing something. CombinePerKey is made up
of "GBK() | CombineValues". Pulling it out into the Distinct, Distinct
looks like:

def Distinct(pcoll):  # pylint: disable=invalid-name
  """Produces a PCollection containing distinct elements of a
PCollection."""
  return (
     pcoll
     | 'ToPairs' >> Map(lambda v: (v, None))
      | 'Group' >> GroupByKey()
      | 'CombineValues >> CombineValues(lambda vs: None)
      | 'Distinct' >> Keys())

Does the combiner lifting somehow make the GroupByKey operation more
efficient despite coming after it? My intuition would suggest that we could
just remove the `CombineValues` altogether



On Fri, Jan 26, 2024 at 11:33 AM Robert Bradshaw via dev <
dev@beam.apache.org> wrote:

> This is because it allows us to do some of the deduplication before
> shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one
> worker and [B, B, B, B, C, C] on another. Rather than passing all that
> data through the GroupByKey (which involves (relatively) expensive
> materialization and cross-machine traffic, with this form the first
> worker will only emit [A, B] and the second [B, C] and only the B
> needs to be deduplicated post-shuffle.
>
> Wouldn't hurt to have a comment to that effect there.
>
> https://beam.apache.org/documentation/programming-guide/#combine
>
> On Fri, Jan 26, 2024 at 8:22 AM Joey Tran <joey.t...@schrodinger.com>
> wrote:
> >
> > Hey all,
> >
> > I was poking around and looking at `Distinct` and was confused about why
> it was implemented the way it was.
> >
> > Reproduced here:
> > @ptransform_fn
> > @typehints.with_input_types(T)
> > @typehints.with_output_types(T)
> > def Distinct(pcoll):  # pylint: disable=invalid-name
> >   """Produces a PCollection containing distinct elements of a
> PCollection."""
> >   return (
> >       pcoll
> >       | 'ToPairs' >> Map(lambda v: (v, None))
> >       | 'Group' >> CombinePerKey(lambda vs: None)
> >       | 'Distinct' >> Keys())
> >
> > Could anyone clarify why we'd use a `CombinePerKey` instead of just
> using `GroupByKey`?
> >
> > Cheers,
> > Joey
>

Reply via email to