[ 
https://issues.apache.org/jira/browse/BEAM-3863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pawel Bartoszek updated BEAM-3863:
----------------------------------
    Description: 
*Issue*

Beam AfterProcessingTime trigger doesn't fire always reliably after a 
configured delay.

The following job triggers should fire after watermark passes the end of the 
window and then every 5 seconds for late data and the finally at the end of 
allowed lateness.

*Expected behaviour*

Late firing after processing time trigger should fire after 5 seconds since 
first late records arrive in the pane.

*Actual behaviour*

>From my testings late triggers works for some keys but not for the other - 
>it's pretty random which keys are affected. The DummySource generates 15 
>distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one 
>late record. In case late trigger firing is missed it won't fire until the 
>allowed lateness period. 

*Job code*
{code:java}
String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};

FlinkPipelineOptions options = 
PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
        
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withLateFirings(
                                AfterProcessingTime
                                        
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardMinutes(2), 
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
        );
apply.apply(Count.perElement())
        .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
            @ProcessElement
            public void process(ProcessContext context, BoundedWindow window) {
                LOG.info("Count: {}. For window {}, Pane {}", 
context.element(), window, context.pane());
            }
        }));

pipeline.run().waitUntilFinish();{code}
 

*How can you replicate the issue?*

 I've created a github repo 
[https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown 
above. Please check out the README file for details how to replicate the issue.

*What's is causing the issue?*

I explained the cause in PR.

 

  was:
*Issue*

Beam AfterProcessingTime trigger doesn't fire always reliably after a 
configured delay.

 

The following job triggers should fire after watermark passes the end of the 
window and then every 5 seconds for late data and the finally at the end of 
allowed lateness.

 

*Expected behaviour*

Late firing after processing time trigger should fire after 5 seconds since 
first late records arrive in the pane.

 

*Actual behaviour*

>From my testings late triggers works for some keys but not for the other - 
>it's pretty random which keys are affected. The DummySource generates 15 
>distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one 
>late record. In case late trigger firing is missed it won't fire until the 
>allowed lateness period. 

*Job code*
{code:java}
String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};

FlinkPipelineOptions options = 
PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
        
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
                .triggering(AfterWatermark.pastEndOfWindow()
                        .withLateFirings(
                                AfterProcessingTime
                                        
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.standardMinutes(2), 
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
        );
apply.apply(Count.perElement())
        .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
            @ProcessElement
            public void process(ProcessContext context, BoundedWindow window) {
                LOG.info("Count: {}. For window {}, Pane {}", 
context.element(), window, context.pane());
            }
        }));

pipeline.run().waitUntilFinish();{code}
 

*How can you replicate the issue?*

 I've created a github repo 
[https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown 
above. Please check out the README file for details how to replicate the issue.

*What's is causing the issue?*

I explained the cause in PR 

 

 

 

 


> AfterProcessingTime trigger doesn't fire reliably
> -------------------------------------------------
>
>                 Key: BEAM-3863
>                 URL: https://issues.apache.org/jira/browse/BEAM-3863
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.1.0, 2.2.0, 2.3.0
>            Reporter: Pawel Bartoszek
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> *Issue*
> Beam AfterProcessingTime trigger doesn't fire always reliably after a 
> configured delay.
> The following job triggers should fire after watermark passes the end of the 
> window and then every 5 seconds for late data and the finally at the end of 
> allowed lateness.
> *Expected behaviour*
> Late firing after processing time trigger should fire after 5 seconds since 
> first late records arrive in the pane.
> *Actual behaviour*
> From my testings late triggers works for some keys but not for the other - 
> it's pretty random which keys are affected. The DummySource generates 15 
> distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one 
> late record. In case late trigger firing is missed it won't fire until the 
> allowed lateness period. 
> *Job code*
> {code:java}
> String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
> Pipeline pipeline = Pipeline.create(options);
> PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
>         
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>                 .triggering(AfterWatermark.pastEndOfWindow()
>                         .withLateFirings(
>                                 AfterProcessingTime
>                                         
> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
>                 .accumulatingFiredPanes()
>                 .withAllowedLateness(Duration.standardMinutes(2), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>         );
> apply.apply(Count.perElement())
>         .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
>             @ProcessElement
>             public void process(ProcessContext context, BoundedWindow window) 
> {
>                 LOG.info("Count: {}. For window {}, Pane {}", 
> context.element(), window, context.pane());
>             }
>         }));
> pipeline.run().waitUntilFinish();{code}
>  
> *How can you replicate the issue?*
>  I've created a github repo 
> [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown 
> above. Please check out the README file for details how to replicate the 
> issue.
> *What's is causing the issue?*
> I explained the cause in PR.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to