Hmm, I think I might still be missing something. CombinePerKey is made up of "GBK() | CombineValues". Pulling it out into the Distinct, Distinct looks like:
def Distinct(pcoll): # pylint: disable=invalid-name """Produces a PCollection containing distinct elements of a PCollection.""" return ( pcoll | 'ToPairs' >> Map(lambda v: (v, None)) | 'Group' >> GroupByKey() | 'CombineValues >> CombineValues(lambda vs: None) | 'Distinct' >> Keys()) Does the combiner lifting somehow make the GroupByKey operation more efficient despite coming after it? My intuition would suggest that we could just remove the `CombineValues` altogether On Fri, Jan 26, 2024 at 11:33 AM Robert Bradshaw via dev < dev@beam.apache.org> wrote: > This is because it allows us to do some of the deduplication before > shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one > worker and [B, B, B, B, C, C] on another. Rather than passing all that > data through the GroupByKey (which involves (relatively) expensive > materialization and cross-machine traffic, with this form the first > worker will only emit [A, B] and the second [B, C] and only the B > needs to be deduplicated post-shuffle. > > Wouldn't hurt to have a comment to that effect there. > > https://beam.apache.org/documentation/programming-guide/#combine > > On Fri, Jan 26, 2024 at 8:22 AM Joey Tran <joey.t...@schrodinger.com> > wrote: > > > > Hey all, > > > > I was poking around and looking at `Distinct` and was confused about why > it was implemented the way it was. > > > > Reproduced here: > > @ptransform_fn > > @typehints.with_input_types(T) > > @typehints.with_output_types(T) > > def Distinct(pcoll): # pylint: disable=invalid-name > > """Produces a PCollection containing distinct elements of a > PCollection.""" > > return ( > > pcoll > > | 'ToPairs' >> Map(lambda v: (v, None)) > > | 'Group' >> CombinePerKey(lambda vs: None) > > | 'Distinct' >> Keys()) > > > > Could anyone clarify why we'd use a `CombinePerKey` instead of just > using `GroupByKey`? > > > > Cheers, > > Joey >