Hi Thomas,

Thanks for the info. When the pipeline contains:

.apply(Count.<String>perElement())

The translation looks as follows:

58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
entering composite transform Count.PerElement
58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
visiting transform Init [AnonymousParDo]
58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
visiting value Count.PerElement/Init.out [PCollection]
58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
entering composite transform Count.PerKey [Combine.PerKey]
58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
visiting transform GroupByKey
93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
visiting value Count.PerElement/Count.PerKey/GroupByKey.out [PCollection]
93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
entering composite transform Combine.GroupedValues
93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
visiting transform AnonymousParDo
93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
visiting value
Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo.out
[PCollection]
93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
leaving composite transform Combine.GroupedValues
93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
leaving composite transform Count.PerKey [Combine.PerKey]
93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
leaving composite transform Count.PerElement

So the runner's translator needs to take care of pushing the combine
function upstream, when it is possible. I was wondering whether this is
something that could be handled in a runner independent way?

Thanks,
Thomas




On Fri, Jun 17, 2016 at 10:19 AM, Thomas Groh <[email protected]>
wrote:

> Generally, the above code snippet will work, producing (after trigger
> firing) an output Iterable<V> containing all of the input elements. It may
> be notable that timers (and TimerInternals) are also per-key, so that
> interface must also be updated per element.
>
> By specifying the ReduceFn of the ReduceFnRunner, you can change how the
> ReduceFnRunner adds and merges state. The combining ReduceFn is suitable
> for use with upstream CombineFns, while buffering is suitable for general
> use.
>
> On Fri, Jun 17, 2016 at 9:52 AM, Thomas Weise <[email protected]>
> wrote:
>
> > The source for my windowed groupByKey experiment is here:
> >
> >
> >
> https://github.com/tweise/incubator-beam/blob/master/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
> >
> > The result is Iterable<V>. In cases such as counting, what is the
> > recommended way to perform the incremental aggregation, without building
> an
> > intermediate collection?
> >
> > Thomas
> >
> > On Fri, Jun 17, 2016 at 8:27 AM, Thomas Weise <[email protected]>
> > wrote:
> >
> > > Hi,
> > >
> > > I'm looking into using the GroupAlsoByWindowViaWindowSetDoFn to
> > accumulate
> > > the windowed state with the elements arriving one by one (stream).
> > >
> > > Once the window is complete, I would like to emit an Iterable<V> or
> > > another form of aggregation of the elements. Is the following supposed
> to
> > > lead to merging of current element with previously received elements
> for
> > > the same window:
> > >
> > >     KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
> > >             kv.getKey(),
> > >             Collections.singletonList(updatedWindowedValue));
> > >
> > >     context.setElement(kwi, getStateInternalsForKey(kwi.key()));
> > >     fn.processElement(context);
> > >
> > > The input here are always single elements.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > >
> >
>

Reply via email to