On Mon, Oct 31, 2016 at 8:39 PM, Kenneth Knowles <k...@google.com.invalid> wrote: > Manu, I think your critique about user interface clarity is valid. > CombineFn conflates a few operations and is not that clear about what it is > doing or why. You seem to be concerned about CombineFn versus > SerializableFunction constructors for the Combine family of transforms. I > thought I'd respond from my own perspective, in case it is helpful. It is > mostly the same things that Luke has said. Let's ignore keys. I don't think > they change things much. > > As you seem to already understand, a CombineFn is a convenient collapsed > representation of three functions: > > init : InputT -> AccumT > combiner: (AccumT, AccumT) -> AccumT > extract: AccumT -> OutputT > > And the real semantics: > > MapElements.via(init) > Combine.via(combiner) > MapElements.via(extract) > > For starters, "associative" is not even a well-typed word to use unless > input type and output type are the same. So it is `combiner` that needs to > be associative and commutative. Sometimes `combiner` also has an identity > element. I'm afraid `createAccumulator()` and `defaultValue()` confuse > things here (the latter is never meaningfully used). When we say a > CombineFn has to be "associative" and "commutative" we just mean that it > can be factored into these methods. > > So the SerializableFunction just needs to be factorable into these methods, > too, like Luke said. Pragmatically, if we only have a > SerializableFunction<Iterable<InputT>, OutputT> then we don't have a way to > do hierarchical combines (can't feed the output of one layer into the next > layer), so associativity is irrelevant and it might as well be a > MapElements. So it only makes sense to allow > SerializableFunction<Iterable<AccumT>, AccumT>. Some variant that is a > binary function would make sense for lambdas, etc. > > Here are some reasons for the particular design of CombineFn that actually > should be called out: > > - It is a major efficiency gain to mutate the accumulator. > - Usually `init` is trivial and best to inline, hence addInput(InputT, > AccumT)
I would add that often the map InputT -> AccumT is *non-trivial*, as is AccumT -> AccumT, so AccumT + Input -> AccumT is preferable (both for efficiency and code simplicity) for anything beyond trivial combiners. FlumeJava, a predecessor to Beam that we took many lessons from, had an explicit init rather than addInput and that turned out to be a drawback when implementing CombineFns. > - With `compact` we allow multiple physical representations of the same > semantic accumulator, and a hook to switch between them > - And it is hard to take the user through the journey from the real > reasons behind it and the particular Java interface > > Note also that CombineWithContext allows side inputs, which complicates the > formalities somewhat but doesn't change the intuition. > > Kenn > > On Mon, Oct 31, 2016 at 6:37 PM Manu Zhang <owenzhang1...@gmail.com> wrote: > >> I'm a bit confused here because neither of them requires same type of >> input and output. Also, the Javadoc of Globally says "It is common for {@code >> *InputT == OutputT}, but not required" *If associative and commutative is >> expected, why don't they have restrictions like >> Combine.perKey(SerializableFunction) ? >> >> I understand the motive and requirement behind Combine functions. I'm more >> asking about the user interface consistency. >> By the way, it's hard to know what Combine.Globally does from the name but >> that discussion should be put in another thread. >> >> Thanks for your patience here. >> >> Manu >> >> On Tue, Nov 1, 2016 at 12:04 AM Lukasz Cwik <lc...@google.com> wrote: >> >> GlobalCombineFn and PerKeyCombineFn still expect an associative and >> commutative function when accumulating. >> GlobalCombineFn is shorthand for assigning everything to a single key, >> doing the combine, and then discarding the key and extracting the single >> output. >> PerKeyCombineFn is shorthand for doing accumulation where the key doesn't >> modify the accumulation in anyway. >> >> On Fri, Oct 28, 2016 at 6:09 PM, Manu Zhang <owenzhang1...@gmail.com> >> wrote: >> >> Then what about the other interfaces, like Combine.perKey(GlobalCombineFn) >> and Combine.perKey(PerkeyCombineFn) ? Why not all of them have the >> requirements ? >> >> On Sat, Oct 29, 2016 at 12:06 AM Lukasz Cwik <lc...@google.com> wrote: >> >> For it to be considered a combiner, the function needs to be associative >> and commutative. >> >> The issue is that from an API perspective it would be easy to have a >> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). But many >> people in the data processing world expect that this >> parallelization/optimization is performed and thus exposing such a method >> would be dangerous as it would be breaking users expectations so from the >> design perspective it is a hard requirement. If PCollections ever become >> ordered or gain other properties, these requirements may loosen but it >> seems unlikely in the short term. >> >> At this point, I think your looking for a MapElements which you pass in a >> SerializableFunction<KV<K, Iterable<InputT>, KV<K, OutputT>>. >> Creating a wrapper SerializableFunction<KV<K, Iterable<InputT>, KV<K, >> OutputT>> which can delegate to a SerializableFunction<Iterable<InputT>, >> OutputT> should be trivial. >> >> >> On Thu, Oct 27, 2016 at 6:17 PM, Manu Zhang <owenzhang1...@gmail.com> >> wrote: >> >> Thanks for the thorough explanation. I see the benefits for such a >> function. >> My follow-up question is whether this is a hard requirement. >> There are computations that don't satisfy this (I think it's monoid rule) >> but possible and easier to write with >> Combine.perKey(SerializableFunction<Iterable<InputT>, OutputT>). It's not >> difficult to provide an underlying CombineFn. >> >> >> On Thu, Oct 27, 2016 at 9:47 AM Lukasz Cwik <lc...@google.com.invalid> >> wrote: >> >> Combine.perKey takes a single SerializableFunction which knows how to >> convert from Iterable<V> to V. >> >> It turns out that many runners implement optimizations which allow them to >> run the combine operation across several machines to parallelize the work >> and potentially reduce the amount of data they store during a GBK. >> To be able to do such an optimization, it requires you to actually have >> three functions: >> InputT -> AccumulatorT : Creates the intermediate representation which >> allows for associative combining >> Iterable<AccumulatorT> -> AccumulatorT: Performs the actual combining >> AccumT -> OutputT: Extracts the output >> >> In the case of Combine.perKey with a SerializableFunction, your providing >> Iterable<AccumulatorT> -> AccumulatorT and the other two functions are the >> identity functions. >> >> To be able to support a Combine.perKey which can go from Iterable<InputT> >> -> OutputT would require that this occurred within a single machine >> removing the parallelization benefits that runners provide and for almost >> all cases is not a good idea. >> >> On Wed, Oct 26, 2016 at 6:23 PM, Manu Zhang <owenzhang1...@gmail.com> >> wrote: >> >> > Hi all, >> > >> > I'm wondering why `Combine.perKey(SerializableFunction)` requires input >> > and >> > output to be of the same type while `Combine.PerKey` doesn't have this >> > restriction. >> > >> > Thanks, >> > Manu >> > >> >> >> >>