[ https://issues.apache.org/jira/browse/BEAM-3863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pawel Bartoszek updated BEAM-3863: ---------------------------------- Description: *Issue* Beam AfterProcessingTime trigger doesn't fire always reliably after a configured delay. The following job triggers should fire after watermark passes the end of the window and then every 5 seconds for late data and the finally at the end of allowed lateness. *Expected behaviour* Late firing after processing time trigger should fire after 5 seconds since first late records arrive in the pane. *Actual behaviour* >From my testings late triggers works for some keys but not for the other - >it's pretty random which keys are affected. The DummySource generates 15 >distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one >late record. In case late trigger firing is missed it won't fire until the >allowed lateness period. *Job code* {code:java} String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); Pipeline pipeline = Pipeline.create(options); PCollection<String> apply = pipeline.apply(Read.from(new DummySource())) .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings( AfterProcessingTime .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))) .accumulatingFiredPanes() .withAllowedLateness(Duration.standardMinutes(2), Window.ClosingBehavior.FIRE_IF_NON_EMPTY) ); apply.apply(Count.perElement()) .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() { @ProcessElement public void process(ProcessContext context, BoundedWindow window) { LOG.info("Count: {}. For window {}, Pane {}", context.element(), window, context.pane()); } })); pipeline.run().waitUntilFinish();{code} *How can you replicate the issue?* I've created a github repo [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown above. Please check out the README file for details how to replicate the issue. *What's is causing the issue?* I explained the cause in PR. was: *Issue* Beam AfterProcessingTime trigger doesn't fire always reliably after a configured delay. The following job triggers should fire after watermark passes the end of the window and then every 5 seconds for late data and the finally at the end of allowed lateness. *Expected behaviour* Late firing after processing time trigger should fire after 5 seconds since first late records arrive in the pane. *Actual behaviour* >From my testings late triggers works for some keys but not for the other - >it's pretty random which keys are affected. The DummySource generates 15 >distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one >late record. In case late trigger firing is missed it won't fire until the >allowed lateness period. *Job code* {code:java} String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); Pipeline pipeline = Pipeline.create(options); PCollection<String> apply = pipeline.apply(Read.from(new DummySource())) .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings( AfterProcessingTime .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))) .accumulatingFiredPanes() .withAllowedLateness(Duration.standardMinutes(2), Window.ClosingBehavior.FIRE_IF_NON_EMPTY) ); apply.apply(Count.perElement()) .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() { @ProcessElement public void process(ProcessContext context, BoundedWindow window) { LOG.info("Count: {}. For window {}, Pane {}", context.element(), window, context.pane()); } })); pipeline.run().waitUntilFinish();{code} *How can you replicate the issue?* I've created a github repo [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown above. Please check out the README file for details how to replicate the issue. *What's is causing the issue?* I explained the cause in PR > AfterProcessingTime trigger doesn't fire reliably > ------------------------------------------------- > > Key: BEAM-3863 > URL: https://issues.apache.org/jira/browse/BEAM-3863 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.1.0, 2.2.0, 2.3.0 > Reporter: Pawel Bartoszek > Assignee: Kenneth Knowles > Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > *Issue* > Beam AfterProcessingTime trigger doesn't fire always reliably after a > configured delay. > The following job triggers should fire after watermark passes the end of the > window and then every 5 seconds for late data and the finally at the end of > allowed lateness. > *Expected behaviour* > Late firing after processing time trigger should fire after 5 seconds since > first late records arrive in the pane. > *Actual behaviour* > From my testings late triggers works for some keys but not for the other - > it's pretty random which keys are affected. The DummySource generates 15 > distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one > late record. In case late trigger firing is missed it won't fire until the > allowed lateness period. > *Job code* > {code:java} > String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"}; > FlinkPipelineOptions options = > PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class); > Pipeline pipeline = Pipeline.create(options); > PCollection<String> apply = pipeline.apply(Read.from(new DummySource())) > > .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10))) > .triggering(AfterWatermark.pastEndOfWindow() > .withLateFirings( > AfterProcessingTime > > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5)))) > .accumulatingFiredPanes() > .withAllowedLateness(Duration.standardMinutes(2), > Window.ClosingBehavior.FIRE_IF_NON_EMPTY) > ); > apply.apply(Count.perElement()) > .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() { > @ProcessElement > public void process(ProcessContext context, BoundedWindow window) > { > LOG.info("Count: {}. For window {}, Pane {}", > context.element(), window, context.pane()); > } > })); > pipeline.run().waitUntilFinish();{code} > > *How can you replicate the issue?* > I've created a github repo > [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown > above. Please check out the README file for details how to replicate the > issue. > *What's is causing the issue?* > I explained the cause in PR. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)