The framework for OnWindowExpiration was added in that PR, but as far as I can tell it hasn't yet been hooked up - nothing every calls it. Adding Kenn who might know more; is there a pending PR somewhere that finishes this work?
On Mon, Feb 25, 2019 at 3:41 PM Augustin Lafanechere < [email protected]> wrote: > Hello dear Beam community, > I would like to write to you for a question about OnWindowExpiration > annotation on DoFn. > Does anyone of you have a working snippet with this ? > > I try to write a DoFn with a Batch RPC on window closure. It is a BigQuery > call for a historical metric value updated by an external process. I want > to execute this query and sum the results with my events buffered in a > state. The OnWindowExpiration looks very practical to accomplish this. > > It looks like the function annotated with @OnWindowExpiration is never > call. My pipeline runs on Dataflow, perhaps its not a supported feature on > this runner… > > Here is a snippet of what I try to accomplish. It seems like the annotated > functions is never called, the log line is never appearing. Am I missing > something ? > I tried to replicate the logic found in this blog post > <https://beam.apache.org/blog/2017/08/28/timely-processing.html> and > pieces of information found in this PR. > <https://github.com/apache/beam/pull/4482> > > > // The window definition used in the pipeline sets in a higher transform > // Window<KV<String, Long>> w = > // Window.<Row>into(FixedWindows.of(Duration.standardMinutes(1L))) > // .withAllowedLateness(Duration.ZERO) > // .discardingFiredPanes(); > > public final class Enrich extends DoFn<KV<String, Long>, KV<String, Long>> > { > > @StateId("buffer") > private final StateSpec<BagState<KV<String, Long>>> bufferedEvents = > StateSpecs.bag(); > > @ProcessElement > public void process( > final ProcessContext context, > final @StateId("buffer") BagState<KV<String, Long>> bufferState) { > bufferState.add(context.element()); > context.output(context.element()); > } > > @OnWindowExpiration > public void onWindowExpiration( > final @StateId("buffer") BagState<KV<String, Long>> bufferState, > final OutputReceiver<KV<String, Long>> outputReceiver) { > LOG.info("The window expired"); > for (KV<String, Long> enrichedEvent : enrichWithBigQuery(bufferState.read())) > { > outputReceiver.output(enrichedEvent); > } > } > } > > > Thanks for your help, > > > Augustin > > > > Chauffeur Privé devient kapten_ Plus d'informations ici > <https://www.kapten.com/fr/manifesto.html> >
