This is because it allows us to do some of the deduplication before
shuffle via combiner lifting. E.g. say we have [A, A, A, B, B] on one
worker and [B, B, B, B, C, C] on another. Rather than passing all that
data through the GroupByKey (which involves (relatively) expensive
materialization and cross-machine traffic, with this form the first
worker will only emit [A, B] and the second [B, C] and only the B
needs to be deduplicated post-shuffle.

Wouldn't hurt to have a comment to that effect there.

https://beam.apache.org/documentation/programming-guide/#combine

On Fri, Jan 26, 2024 at 8:22 AM Joey Tran <joey.t...@schrodinger.com> wrote:
>
> Hey all,
>
> I was poking around and looking at `Distinct` and was confused about why it 
> was implemented the way it was.
>
> Reproduced here:
> @ptransform_fn
> @typehints.with_input_types(T)
> @typehints.with_output_types(T)
> def Distinct(pcoll):  # pylint: disable=invalid-name
>   """Produces a PCollection containing distinct elements of a PCollection."""
>   return (
>       pcoll
>       | 'ToPairs' >> Map(lambda v: (v, None))
>       | 'Group' >> CombinePerKey(lambda vs: None)
>       | 'Distinct' >> Keys())
>
> Could anyone clarify why we'd use a `CombinePerKey` instead of just using 
> `GroupByKey`?
>
> Cheers,
> Joey

Reply via email to