Then what about the other interfaces, like Combine.perKey(GlobalCombineFn)
and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the
requirements ?

On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote:

> For it to be considered a combiner, the function needs to be associative
> and commutative.
>
> The issue is that from an API perspective it would be easy to have a
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many
> people in the data processing world expect that this
> parallelization/optimization is performed and thus exposing such a method
> would be dangerous as it would be breaking users expectations so from the
> design perspective it is a hard requirement. If PCollections ever become
> ordered or gain other properties, these requirements may loosen but it
> seems unlikely in the short term.
>
> At this point, I think your looking for a MapElements which you pass in a
> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>.
> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K,
> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>,
> OutputT> should be trivial.
>
>
> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> Thanks for the thorough explanation. I see the benefits for such a
> function.
> My follow-up question is whether this is a hard requirement.
> There are computations that don't satisfy this (I think it's monoid rule)
> but possible and easier to write with
> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not
> difficult to provide an underlying CombineFn.
>
>
> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid>
> wrote:
>
> Combine.perKey takes a single SerializableFunction which knows how to
> convert from Iterable<V> to V.
>
> It turns out that many runners implement optimizations which allow them to
> run the combine operation across several machines to parallelize the work
> and potentially reduce the amount of data they store during a GBK.
> To be able to do such an optimization, it requires you to actually have
> three functions:
> InputT -> AccumulatorT : Creates the intermediate representation which
> allows for associative combining
> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining
> AccumT -> OutputT: Extracts the output
>
> In the case of Combine.perKey with a SerializableFunction, your providing
> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the
> identity functions.
>
> To be able to support a Combine.perKey which can go from Iterable<InputT>
> -> OutputT would require that this occurred within a single machine
> removing the parallelization benefits that runners provide and for almost
> all cases is not a good idea.
>
> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <owenzhang1...@gmail.com>
> wrote:
>
> > Hi all,
> >
> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input
> > and
> > output to be of the same type while `Combine.PerKey` doesn't have this
> > restriction.
> >
> > Thanks,
> > Manu
> >
>
>
>

Reply via email to