Hi Thomas, Thanks for the info. When the pipeline contains:
.apply(Count.<String>perElement()) The translation looks as follows: 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - entering composite transform Count.PerElement 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - visiting transform Init [AnonymousParDo] 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - visiting value Count.PerElement/Init.out [PCollection] 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - entering composite transform Count.PerKey [Combine.PerKey] 58 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - visiting transform GroupByKey 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - visiting value Count.PerElement/Count.PerKey/GroupByKey.out [PCollection] 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - entering composite transform Combine.GroupedValues 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - visiting transform AnonymousParDo 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - visiting value Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo.out [PCollection] 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - leaving composite transform Combine.GroupedValues 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - leaving composite transform Count.PerKey [Combine.PerKey] 93 [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator - leaving composite transform Count.PerElement So the runner's translator needs to take care of pushing the combine function upstream, when it is possible. I was wondering whether this is something that could be handled in a runner independent way? Thanks, Thomas On Fri, Jun 17, 2016 at 10:19 AM, Thomas Groh <[email protected]> wrote: > Generally, the above code snippet will work, producing (after trigger > firing) an output Iterable<V> containing all of the input elements. It may > be notable that timers (and TimerInternals) are also per-key, so that > interface must also be updated per element. > > By specifying the ReduceFn of the ReduceFnRunner, you can change how the > ReduceFnRunner adds and merges state. The combining ReduceFn is suitable > for use with upstream CombineFns, while buffering is suitable for general > use. > > On Fri, Jun 17, 2016 at 9:52 AM, Thomas Weise <[email protected]> > wrote: > > > The source for my windowed groupByKey experiment is here: > > > > > > > https://github.com/tweise/incubator-beam/blob/master/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java > > > > The result is Iterable<V>. In cases such as counting, what is the > > recommended way to perform the incremental aggregation, without building > an > > intermediate collection? > > > > Thomas > > > > On Fri, Jun 17, 2016 at 8:27 AM, Thomas Weise <[email protected]> > > wrote: > > > > > Hi, > > > > > > I'm looking into using the GroupAlsoByWindowViaWindowSetDoFn to > > accumulate > > > the windowed state with the elements arriving one by one (stream). > > > > > > Once the window is complete, I would like to emit an Iterable<V> or > > > another form of aggregation of the elements. Is the following supposed > to > > > lead to merging of current element with previously received elements > for > > > the same window: > > > > > > KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem( > > > kv.getKey(), > > > Collections.singletonList(updatedWindowedValue)); > > > > > > context.setElement(kwi, getStateInternalsForKey(kwi.key())); > > > fn.processElement(context); > > > > > > The input here are always single elements. > > > > > > Thanks, > > > Thomas > > > > > > > > > > > >
