This is because it allows us to do some of the deduplication before shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one worker and [B, B, B, B, C, C] on another. Rather than passing all that data through the GroupByKey (which involves (relatively) expensive materialization and cross-machine traffic, with this form the first worker will only emit [A, B] and the second [B, C] and only the B needs to be deduplicated post-shuffle.
Wouldn't hurt to have a comment to that effect there. https://beam.apache.org/documentation/programming-guide/#combine On Fri, Jan 26, 2024 at 8:22 AM Joey Tran <joey.t...@schrodinger.com> wrote: > > Hey all, > > I was poking around and looking at `Distinct` and was confused about why it > was implemented the way it was. > > Reproduced here: > @ptransform_fn > @typehints.with_input_types(T) > @typehints.with_output_types(T) > def Distinct(pcoll): # pylint: disable=invalid-name > """Produces a PCollection containing distinct elements of a PCollection.""" > return ( > pcoll > | 'ToPairs' >> Map(lambda v: (v, None)) > | 'Group' >> CombinePerKey(lambda vs: None) > | 'Distinct' >> Keys()) > > Could anyone clarify why we'd use a `CombinePerKey` instead of just using > `GroupByKey`? > > Cheers, > Joey