Maybe, the issue is that pushing the combine function upstream impacts the
windowing and triggering behavior of the GBK. I don't believe its as simple
as always being able to push the combiner upstream and it depends on how a
runner has decided to implement GBK.


On Tue, Jun 21, 2016 at 9:58 AM, Thomas Weise <[email protected]>
wrote:

> Hi Thomas,
>
> Thanks for the info. When the pipeline contains:
>
> .apply(Count.<String>perElement())
>
> The translation looks as follows:
>
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> entering composite transform Count.PerElement
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting transform Init [AnonymousParDo]
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting value Count.PerElement/Init.out [PCollection]
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> entering composite transform Count.PerKey [Combine.PerKey]
> 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting transform GroupByKey
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting value Count.PerElement/Count.PerKey/GroupByKey.out [PCollection]
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> entering composite transform Combine.GroupedValues
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting transform AnonymousParDo
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> visiting value
> Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo.out
> [PCollection]
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> leaving composite transform Combine.GroupedValues
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> leaving composite transform Count.PerKey [Combine.PerKey]
> 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> leaving composite transform Count.PerElement
>
> So the runner's translator needs to take care of pushing the combine
> function upstream, when it is possible. I was wondering whether this is
> something that could be handled in a runner independent way?
>
> Thanks,
> Thomas
>
>
>
>
> On Fri, Jun 17, 2016 at 10:19 AM, Thomas Groh <[email protected]>
> wrote:
>
> > Generally, the above code snippet will work, producing (after trigger
> > firing) an output Iterable<V> containing all of the input elements. It
> may
> > be notable that timers (and TimerInternals) are also per-key, so that
> > interface must also be updated per element.
> >
> > By specifying the ReduceFn of the ReduceFnRunner, you can change how the
> > ReduceFnRunner adds and merges state. The combining ReduceFn is suitable
> > for use with upstream CombineFns, while buffering is suitable for general
> > use.
> >
> > On Fri, Jun 17, 2016 at 9:52 AM, Thomas Weise <[email protected]>
> > wrote:
> >
> > > The source for my windowed groupByKey experiment is here:
> > >
> > >
> > >
> >
> https://github.com/tweise/incubator-beam/blob/master/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
> > >
> > > The result is Iterable<V>. In cases such as counting, what is the
> > > recommended way to perform the incremental aggregation, without
> building
> > an
> > > intermediate collection?
> > >
> > > Thomas
> > >
> > > On Fri, Jun 17, 2016 at 8:27 AM, Thomas Weise <[email protected]>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I'm looking into using the GroupAlsoByWindowViaWindowSetDoFn to
> > > accumulate
> > > > the windowed state with the elements arriving one by one (stream).
> > > >
> > > > Once the window is complete, I would like to emit an Iterable<V> or
> > > > another form of aggregation of the elements. Is the following
> supposed
> > to
> > > > lead to merging of current element with previously received elements
> > for
> > > > the same window:
> > > >
> > > >     KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
> > > >             kv.getKey(),
> > > >             Collections.singletonList(updatedWindowedValue));
> > > >
> > > >     context.setElement(kwi, getStateInternalsForKey(kwi.key()));
> > > >     fn.processElement(context);
> > > >
> > > > The input here are always single elements.
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > >
> > >
> >
>

Reply via email to