Broadly, yes: This and other semantics-preserving transformations are (by
definition) runner-independent and we have a home for them in mind. The
place we would put them is in the nascent runners-core module, which is
generally the place for utilities for helping to implement runners.

An optimization library has not been a priority since our existing runners
already have optimized execution strategies, and many optimizations are a
better fit for a runner-specific lower level "intermediate language" anyhow.

But if you implemented the transformation and contributed it, I think it
would be welcome.

On Tue, Jun 21, 2016 at 10:08 AM, Lukasz Cwik <[email protected]>
wrote:

> Maybe, the issue is that pushing the combine function upstream impacts the
> windowing and triggering behavior of the GBK. I don't believe its as simple
> as always being able to push the combiner upstream and it depends on how a
> runner has decided to implement GBK.
>
>
> On Tue, Jun 21, 2016 at 9:58 AM, Thomas Weise <[email protected]>
> wrote:
>
> > Hi Thomas,
> >
> > Thanks for the info. When the pipeline contains:
> >
> > .apply(Count.<String>perElement())
> >
> > The translation looks as follows:
> >
> > 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > entering composite transform Count.PerElement
> > 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > visiting transform Init [AnonymousParDo]
> > 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > visiting value Count.PerElement/Init.out [PCollection]
> > 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > entering composite transform Count.PerKey [Combine.PerKey]
> > 58   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > visiting transform GroupByKey
> > 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > visiting value Count.PerElement/Count.PerKey/GroupByKey.out [PCollection]
> > 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > entering composite transform Combine.GroupedValues
> > 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > visiting transform AnonymousParDo
> > 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > visiting value
> > Count.PerElement/Count.PerKey/Combine.GroupedValues/AnonymousParDo.out
> > [PCollection]
> > 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > leaving composite transform Combine.GroupedValues
> > 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > leaving composite transform Count.PerKey [Combine.PerKey]
> > 93   [main] DEBUG org.apache.beam.runners.apex.ApexPipelineTranslator  -
> > leaving composite transform Count.PerElement
> >
> > So the runner's translator needs to take care of pushing the combine
> > function upstream, when it is possible. I was wondering whether this is
> > something that could be handled in a runner independent way?
> >
> > Thanks,
> > Thomas
> >
> >
> >
> >
> > On Fri, Jun 17, 2016 at 10:19 AM, Thomas Groh <[email protected]>
> > wrote:
> >
> > > Generally, the above code snippet will work, producing (after trigger
> > > firing) an output Iterable<V> containing all of the input elements. It
> > may
> > > be notable that timers (and TimerInternals) are also per-key, so that
> > > interface must also be updated per element.
> > >
> > > By specifying the ReduceFn of the ReduceFnRunner, you can change how
> the
> > > ReduceFnRunner adds and merges state. The combining ReduceFn is
> suitable
> > > for use with upstream CombineFns, while buffering is suitable for
> general
> > > use.
> > >
> > > On Fri, Jun 17, 2016 at 9:52 AM, Thomas Weise <[email protected]>
> > > wrote:
> > >
> > > > The source for my windowed groupByKey experiment is here:
> > > >
> > > >
> > > >
> > >
> >
> https://github.com/tweise/incubator-beam/blob/master/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
> > > >
> > > > The result is Iterable<V>. In cases such as counting, what is the
> > > > recommended way to perform the incremental aggregation, without
> > building
> > > an
> > > > intermediate collection?
> > > >
> > > > Thomas
> > > >
> > > > On Fri, Jun 17, 2016 at 8:27 AM, Thomas Weise <
> [email protected]>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I'm looking into using the GroupAlsoByWindowViaWindowSetDoFn to
> > > > accumulate
> > > > > the windowed state with the elements arriving one by one (stream).
> > > > >
> > > > > Once the window is complete, I would like to emit an Iterable<V> or
> > > > > another form of aggregation of the elements. Is the following
> > supposed
> > > to
> > > > > lead to merging of current element with previously received
> elements
> > > for
> > > > > the same window:
> > > > >
> > > > >     KeyedWorkItem<K, V> kwi = KeyedWorkItems.elementsWorkItem(
> > > > >             kv.getKey(),
> > > > >             Collections.singletonList(updatedWindowedValue));
> > > > >
> > > > >     context.setElement(kwi, getStateInternalsForKey(kwi.key()));
> > > > >     fn.processElement(context);
> > > > >
> > > > > The input here are always single elements.
> > > > >
> > > > > Thanks,
> > > > > Thomas
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to