Maybe, the issue is that pushing the combine function upstream impacts the windowing and triggering behavior of the GBK. I don't believe its as simple as always being able to push the combiner upstream and it depends on how a runner has decided to implement GBK.
On Tue, Jun 21, 2016 at 9:58 AM, Thomas Weise <[email protected]> wrote: > Hi Thomas, > > Thanks for the info. When the pipeline contains: > > .apply(Count.<String>perElement()) > > The translation looks as follows: > > 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > entering composite transform Count.PerElement > 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > visiting transform Init [AnonymousParDo] > 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > visiting value Count.PerElement/Init.out [PCollection] > 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > entering composite transform Count.PerKey [Combine.PerKey] > 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > visiting transform GroupByKey > 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > visiting value Count.PerElement/Count.PerKey/GroupByKey.out [PCollection] > 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > entering composite transform Combine.GroupedValues > 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > visiting transform AnonymousParDo > 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > visiting value > Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo.out > [PCollection] > 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > leaving composite transform Combine.GroupedValues > 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > leaving composite transform Count.PerKey [Combine.PerKey] > 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - > leaving composite transform Count.PerElement > > So the runner's translator needs to take care of pushing the combine > function upstream, when it is possible. I was wondering whether this is > something that could be handled in a runner independent way? > > Thanks, > Thomas > > > > > On Fri, Jun 17, 2016 at 10:19 AM, Thomas Groh <[email protected]> > wrote: > > > Generally, the above code snippet will work, producing (after trigger > > firing) an output Iterable<V> containing all of the input elements. It > may > > be notable that timers (and TimerInternals) are also per-key, so that > > interface must also be updated per element. > > > > By specifying the ReduceFn of the ReduceFnRunner, you can change how the > > ReduceFnRunner adds and merges state. The combining ReduceFn is suitable > > for use with upstream CombineFns, while buffering is suitable for general > > use. > > > > On Fri, Jun 17, 2016 at 9:52 AM, Thomas Weise <[email protected]> > > wrote: > > > > > The source for my windowed groupByKey experiment is here: > > > > > > > > > > > > https://github.com/tweise/incubator-beam/blob/master/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java > > > > > > The result is Iterable<V>. In cases such as counting, what is the > > > recommended way to perform the incremental aggregation, without > building > > an > > > intermediate collection? > > > > > > Thomas > > > > > > On Fri, Jun 17, 2016 at 8:27 AM, Thomas Weise <[email protected]> > > > wrote: > > > > > > > Hi, > > > > > > > > I'm looking into using the GroupAlsoByWindowViaWindowSetDoFn to > > > accumulate > > > > the windowed state with the elements arriving one by one (stream). > > > > > > > > Once the window is complete, I would like to emit an Iterable<V> or > > > > another form of aggregation of the elements. Is the following > supposed > > to > > > > lead to merging of current element with previously received elements > > for > > > > the same window: > > > > > > > > KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem( > > > > kv.getKey(), > > > > Collections.singletonList(updatedWindowedValue)); > > > > > > > > context.setElement(kwi, getStateInternalsForKey(kwi.key())); > > > > fn.processElement(context); > > > > > > > > The input here are always single elements. > > > > > > > > Thanks, > > > > Thomas > > > > > > > > > > > > > > > > > >
