For it to be considered a combiner, the function needs to be associative and commutative.
The issue is that from an API perspective it would be easy to have a Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many people in the data processing world expect that this parallelization/optimization is performed and thus exposing such a method would be dangerous as it would be breaking users expectations so from the design perspective it is a hard requirement. If PCollections ever become ordered or gain other properties, these requirements may loosen but it seems unlikely in the short term. At this point, I think your looking for a MapElements which you pass in a SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>. Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>, OutputT> should be trivial. On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <[email protected]> wrote: > Thanks for the thorough explanation. I see the benefits for such a > function. > My follow-up question is whether this is a hard requirement. > There are computations that don't satisfy this (I think it's monoid rule) > but possible and easier to write with Combine.perKey( > SerializableFunction<Iterable<InputT>, OutputT>). It's not difficult to > provide an underlying CombineFn. > > > On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <[email protected]> > wrote: > >> Combine.perKey takes a single SerializableFunction which knows how to >> convert from Iterable<V> to V. >> >> It turns out that many runners implement optimizations which allow them to >> run the combine operation across several machines to parallelize the work >> and potentially reduce the amount of data they store during a GBK. >> To be able to do such an optimization, it requires you to actually have >> three functions: >> InputT -> AccumulatorT : Creates the intermediate representation which >> allows for associative combining >> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining >> AccumT -> OutputT: Extracts the output >> >> In the case of Combine.perKey with a SerializableFunction, your providing >> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the >> identity functions. >> >> To be able to support a Combine.perKey which can go from Iterable<InputT> >> -> OutputT would require that this occurred within a single machine >> removing the parallelization benefits that runners provide and for almost >> all cases is not a good idea. >> >> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <[email protected]> >> wrote: >> >> > Hi all, >> > >> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input >> > and >> > output to be of the same type while `Combine.PerKey` doesn't have this >> > restriction. >> > >> > Thanks, >> > Manu >> > >> >
