[ 
https://issues.apache.org/jira/browse/BEAM-3863?focusedWorklogId=115174&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-115174
 ]

ASF GitHub Bot logged work on BEAM-3863:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Jun/18 12:33
            Start Date: 24/Jun/18 12:33
    Worklog Time Spent: 10m 
      Work Description: stale[bot] closed pull request #4875: BEAM-3863: 
AfterProcessingTime trigger firing at delayedUntil time
URL: https://github.com/apache/beam/pull/4875
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 45be8b691cd..88bcb7250f2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -230,9 +230,10 @@ public void clear(TriggerContext c) throws Exception {
   @Override
   public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws 
Exception {
     Instant delayedUntil = context.state().access(DELAYED_UNTIL_TAG).read();
+    Instant currentTime = getCurrentTime(context);
     return delayedUntil != null
-        && getCurrentTime(context) != null
-        && getCurrentTime(context).isAfter(delayedUntil);
+            && currentTime != null
+            && (currentTime.isEqual(delayedUntil) || 
currentTime.isAfter(delayedUntil));
   }
 
   @Override
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
index 9fbf801693b..a83d2ab996f 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterProcessingTimeStateMachineTest.java
@@ -72,7 +72,7 @@ public void testAfterProcessingTimeFixedWindows() throws 
Exception {
     tester.injectElements(2, 3);
 
     // Advance past the first timer and fire, finishing the first window
-    tester.advanceProcessingTime(new Instant(16));
+    tester.advanceProcessingTime(new Instant(15));
     assertTrue(tester.shouldFire(firstWindow));
     assertFalse(tester.shouldFire(secondWindow));
     tester.fireIfShouldFire(firstWindow);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 115174)
    Time Spent: 1h 50m  (was: 1h 40m)

> AfterProcessingTime trigger doesn't fire reliably
> -------------------------------------------------
>
>                 Key: BEAM-3863
>                 URL: https://issues.apache.org/jira/browse/BEAM-3863
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.1.0, 2.2.0, 2.3.0
>            Reporter: Pawel Bartoszek
>            Assignee: Aljoscha Krettek
>            Priority: Major
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> *Issue*
> Beam AfterProcessingTime trigger doesn't fire always reliably after a 
> configured delay.
> The following job triggers should fire after watermark passes the end of the 
> window and then every 5 seconds for late data and the finally at the end of 
> allowed lateness.
> *Expected behaviour*
> Late firing after processing time trigger should fire after 5 seconds since 
> first late records arrive in the pane.
> *Actual behaviour*
> From my testings late triggers works for some keys but not for the other - 
> it's pretty random which keys are affected. The DummySource generates 15 
> distinct keys AA,BB,..., PP. For each key it sends 5 on time records and one 
> late record. In case late trigger firing is missed it won't fire until the 
> allowed lateness period. 
> *Job code*
> {code:java}
> String[] runnerArgs = {"--runner=FlinkRunner", "--parallelism=8"};
> FlinkPipelineOptions options = 
> PipelineOptionsFactory.fromArgs(runnerArgs).as(FlinkPipelineOptions.class);
> Pipeline pipeline = Pipeline.create(options);
> PCollection<String> apply = pipeline.apply(Read.from(new DummySource()))
>         
> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>                 .triggering(AfterWatermark.pastEndOfWindow()
>                         .withLateFirings(
>                                 AfterProcessingTime
>                                         
> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(5))))
>                 .accumulatingFiredPanes()
>                 .withAllowedLateness(Duration.standardMinutes(2), 
> Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
>         );
> apply.apply(Count.perElement())
>         .apply(ParDo.of(new DoFn<KV<String, Long>, Long>() {
>             @ProcessElement
>             public void process(ProcessContext context, BoundedWindow window) 
> {
>                 LOG.info("Count: {}. For window {}, Pane {}", 
> context.element(), window, context.pane());
>             }
>         }));
> pipeline.run().waitUntilFinish();{code}
>  
> *How can you replicate the issue?*
>  I've created a github repo 
> [https://github.com/pbartoszek/BEAM-3863_late_trigger] with the code shown 
> above. Please check out the README file for details how to replicate the 
> issue.
> *What's is causing the issue?*
> I explained the cause in PR.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to