[ 
https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=230283&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-230283
 ]

ASF GitHub Bot logged work on BEAM-3863:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Apr/19 22:20
            Start Date: 19/Apr/19 22:20
    Worklog Time Spent: 10m 
      Work Description: mxm commented on pull request #8366:  [BEAM-3863] 
Ensure correct firing of processing time timers
URL: https://github.com/apache/beam/pull/8366
 
 
   In Beam, a timer with timestamp `T` is only illegible for firing when the 
time
   has moved past this time stamp, i.e. `T < current_time`. In the case of event
   time, current_time is the watermark, in the case of processing time it is the
   system time.
   
   Flink's TimerService has different semantics because it only ensures 
   `T <= current_time`. To make up for this, we previously subtracted one from 
the
   Watermark. However, this does not fix the problem for processing time. We 
can't
   modify processing time easily like we can for event time via the watermark.
   
   To fix processing timers, we change Flink's internal timer timestamp. We add 
one
   millisecond to the timestamp to ensure that the time has moved past the 
original
   timer timestamp in both event and processing time.
   
   Note that we do not modify Beam's timestamp and we do not expose Flink's
   timestamp. For consistency this approach has also been applied to event time
   timers.
   
   CC @aljoscha @kennknowles @pbartoszek
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | --- | --- | --- | --- | ---
   Java | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)
   Python | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)<br>[![Build
 
Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/)
 | --- | [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)
 <br> [![Build 
Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)
 | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
 
   Portable | --- | [![Build 
Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
 | --- | ---
   
   See 
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
 for trigger phrase, status and link of all Jenkins jobs.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 230283)
    Time Spent: 2h 10m  (was: 2h)

> AfterProcessingTime trigger doesn't fire reliably
> -------------------------------------------------
>
>                 Key: BEAM-3863
>                 URL: https://issues.apache.org/jira/browse/BEAM-3863
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.1.0, 2.2.0, 2.3.0
>            Reporter: Pawel Bartoszek
>            Assignee: Maximilian Michels
>            Priority: Major
>              Labels: triaged
>          Time Spent: 2h 10m
>  Remaining Estimate: 0h
>
> *Issue*
> Beam AfterProcessingTime trigger doesn't fire always reliably after a 
> configured delay.
> The following job triggers should fire after watermark passes the end of the 
> window and then every 5 seconds for late data and the finally at the end of 
> allowed lateness.
> *Expected behaviour*
> Late firing after processing time trigger should fire after 5 seconds since 
> first late records arrive in the pane.
> *Actual behaviour*
> From my testings late triggers works for some keys but not for the other - 
> it's pretty random which keys are affected. The DummySource generates 15 
> distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one 
> late record. In case late trigger firing is missed it won't fire until the 
> allowed lateness period. 
> *Job code*
> {code:java}
> String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
> Pipeline pipeline = Pipeline.create(options);
> PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
>         
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>                 .triggering(AfterWatermark.pastEndOfWindow()
>                         .withLateFirings(
>                                 AfterProcessingTime
>                                         
> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
>                 .accumulatingFiredPanes()
>                 .withAllowedLateness(Duration.standardMinutes(2), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>         );
> apply.apply(Count.perElement())
>         .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
>             @ProcessElement
>             public void process(ProcessContext context, BoundedWindow window) 
> {
>                 LOG.info("Count: {}. For window {}, Pane {}", 
> context.element(), window, context.pane());
>             }
>         }));
> pipeline.run().waitUntilFinish();{code}
>  
> *How can you replicate the issue?*
>  I've created a github repo 
> [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown 
> above. Please check out the README file for details how to replicate the 
> issue.
> *What's is causing the issue?*
> I explained the cause in PR.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to