Generally, the above code snippet will work, producing (after trigger
firing) an output Iterable<V> containing all of the input elements. It may
be notable that timers (and TimerInternals) are also per-key, so that
interface must also be updated per element.

By specifying the ReduceFn of the ReduceFnRunner, you can change how the
ReduceFnRunner adds and merges state. The combining ReduceFn is suitable
for use with upstream CombineFns, while buffering is suitable for general
use.

On Fri, Jun 17, 2016 at 9:52 AM, Thomas Weise <[email protected]>
wrote:

> The source for my windowed groupByKey experiment is here:
>
>
> https://github.com/tweise/incubator-beam/blob/master/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
>
> The result is Iterable<V>. In cases such as counting, what is the
> recommended way to perform the incremental aggregation, without building an
> intermediate collection?
>
> Thomas
>
> On Fri, Jun 17, 2016 at 8:27 AM, Thomas Weise <[email protected]>
> wrote:
>
> > Hi,
> >
> > I'm looking into using the GroupAlsoByWindowViaWindowSetDoFn to
> accumulate
> > the windowed state with the elements arriving one by one (stream).
> >
> > Once the window is complete, I would like to emit an Iterable<V> or
> > another form of aggregation of the elements. Is the following supposed to
> > lead to merging of current element with previously received elements for
> > the same window:
> >
> >     KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
> >             kv.getKey(),
> >             Collections.singletonList(updatedWindowedValue));
> >
> >     context.setElement(kwi, getStateInternalsForKey(kwi.key()));
> >     fn.processElement(context);
> >
> > The input here are always single elements.
> >
> > Thanks,
> > Thomas
> >
> >
> >
>

Reply via email to