[ 
https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=81216&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-81216
 ]

ASF GitHub Bot logged work on BEAM-3863:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Mar/18 16:34
            Start Date: 16/Mar/18 16:34
    Worklog Time Spent: 10m 
      Work Description: pbartoszek opened a new pull request #4875: BEAM-3863: 
AfterProcessingTime trigger firing at delayedUntil time
URL: https://github.com/apache/beam/pull/4875
 
 
   The issue with the current logic is that if `delayedUntil` timestamp is 
equal to the current time when `shouldFire` method is called than `shouldFire` 
method returns false and trigger doesn't fire and  never fires again unless 
there is some new element for the the same <key, window> or allowed lateness is 
hit. In reality when I tested with Flink runner on a cluster it's very likely 
on the powerful enough machines that it's actually the same millisecond timer 
fires at `delayedUntil` timestamp and  calls a method `public void 
onTimers(Iterable<TimerData> timers) throws Exception` from `ReduceFnRunner` 
and the time current time is probed via `getCurrentTime(context)` in 
`AfterDelayFromFirstElementStateMachine.shouldFire()` method. The suggested fix 
is to check if current timestamp is equal or greater than `delayedUntil` 
timestamp.
   
   I created a test job that replicates the issue: 
https://github.com/pbartoszek/BEAM-3863_late_trigger 
   
   Follow this checklist to help us incorporate your contribution quickly and 
easily:
   
    - [ ] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
    - [ ] Write a pull request description that is detailed enough to 
understand:
      - [ ] What the pull request does
      - [ ] Why it does it
      - [ ] How it does it
      - [ ] Why this approach
    - [ ] Each commit in the pull request should have a meaningful subject line 
and body.
    - [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

            Worklog Id:     (was: 81216)
            Time Spent: 10m
    Remaining Estimate: 0h

> AfterProcessingTime trigger doesn't fire reliably
> -------------------------------------------------
>
>                 Key: BEAM-3863
>                 URL: https://issues.apache.org/jira/browse/BEAM-3863
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.1.0, 2.2.0, 2.3.0
>            Reporter: Pawel Bartoszek
>            Assignee: Kenneth Knowles
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> *Issue*
> Beam AfterProcessingTime trigger doesn't fire always reliably after a 
> configured delay.
>  
> The following job triggers should fire after watermark passes the end of the 
> window and then every 5 seconds for late data and the finally at the end of 
> allowed lateness.
>  
> *Expected behaviour*
> Late firing after processing time trigger should fire after 5 seconds since 
> first late records arrive in the pane.
>  
> *Actual behaviour*
> From my testings late triggers works for some keys but not for the other - 
> it's pretty random which keys are affected. The DummySource generates 15 
> distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one 
> late record. In case late trigger firing is missed it won't fire until the 
> allowed lateness period. 
> *Job code*
> {code:java}
> String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
> Pipeline pipeline = Pipeline.create(options);
> PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
>         
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>                 .triggering(AfterWatermark.pastEndOfWindow()
>                         .withLateFirings(
>                                 AfterProcessingTime
>                                         
> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
>                 .accumulatingFiredPanes()
>                 .withAllowedLateness(Duration.standardMinutes(2), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>         );
> apply.apply(Count.perElement())
>         .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
>             @ProcessElement
>             public void process(ProcessContext context, BoundedWindow window) 
> {
>                 LOG.info("Count: {}. For window {}, Pane {}", 
> context.element(), window, context.pane());
>             }
>         }));
> pipeline.run().waitUntilFinish();{code}
>  
> *How can you replicate the issue?*
>  I've created a github repo 
> [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown 
> above. Please check out the README file for details how to replicate the 
> issue.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to