Yes, with_hot_key_fanout only performs a single level of fanout. I don't think fanning out more than this has been explored, but I would imagine that for most cases the increased IO would negate most if not all of the benefits.
In particular, note that we already do "combiner lifting" to do as much combining as we can on the mapper side, e.g. suppose we have M elements and N workers. Each worker will (to a first order of approximation) combine M/N elements down to a single element, leaving N elements total to be combined by a worker in the subsequent stage. If N is large (or the combining computation expensive) one can use with_hot_key_fanout to add an intermediate step and let the N workers each combine M/N elements into sqrt(N) partial aggregates, and the subsequent worker only needs to combine the sqrt(N) partial aggregates. Generally N (the number of workers, not the number of elements) is small enough that multiple levels are not needed. On Fri, May 26, 2023 at 1:57 PM Stephan Hoyer via dev <dev@beam.apache.org> wrote: > We have some use-cases where we are combining over very large sets (e.g., > computing the average of 1e5 to 1e6 elements, corresponding to hourly > weather observations over the past 50 years). > > "with_hot_key_fanout" seems to be rather essential for performing these > calculations, but as far as I can tell it only performs a single level of > fanout, i.e., instead of summing up 1e6 elements on a single node, you sum > 1e3 element 1e3 times, and then sum those 1e3 results together. > > My guess is that such combiners could be much more efficient if this was > performed in a hierarchical/multi-stage fashion proportional to > log(element_count), e.g., summing 100 elements with 3 stages, or maybe > summing 10 elements with 6 stages. Dask uses such a "tree reduction" > strategy as controlled by the "split_every" parameter: > https://github.com/dask/dask/blob/453bd7031828f72e4602498c1a1f776280794bea/dask/array/reductions.py#L109 > > I understand that the number of fanout stages could not be computed > automatically in the Beam data model, but it would still be nice to be able > to specify this manually. Has there been any thought to introducing this > feature? > > Thanks, > Stephan >