> > > > My team has an internal implementation of a CartesianProduct > transform, based on using hashing to split a pcollection into a finite > number of groups and CoGroupByKey. > > > > Could this be contributed to Beam? >
If it would be of broader interest, I would be happy to work on this for the Python SDK. I can share a link to the code with Googlers. On Mon, Sep 19, 2022 at 10:47 AM Robert Bradshaw <rober...@google.com> wrote: > If one of your inputs fits into memory, using side inputs is > definitely the way to go. If neither side fits into memory, the cross > product may be prohibitively large to compute even on a distributed > computing platform (a billion times a billion is big, though I suppose > one may hit memory limits with fewer elements if the elements > themselves are large) I agree, in practice the side input solution will usually suffice. For CartesianProduct in particular, it is pretty common for one or more of the inputs to have a statically known size, because it was created from an in-memory sequence (i.e., with beam.Create). Otherwise we could look at user-supplied hints, falling back to CoGroupByKey only if required. There is also the (not uncommon) special case where _every_ input has statically known size, e.g., CreateCartesianProduct(). one can still do the partitioning hack. E.g. > > partitions = pcoll_B | beam.Partition(hash, N) > cross_product = tuple([ > pcoll_A | beam.FlatMap(lambda a, bs: [(a, b) for b in bs], > beam.pvalue.AsList(part)) > for part in partitions > ]) | beam.Flatten() Interesting! I imagine this would break at some scale. Do you have an intuition for what is a "reasonable" number of partitions -- 10s, 100s, 1000s?