This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4702dbb  [BEAM-3863] Ensure correct firing of processing time timers
     new fa2369c  Merge pull request #8366:  [BEAM-3863] Ensure correct firing 
of processing time timers
4702dbb is described below

commit 4702dbb3c210f266da722df1dad1ac45f936a29d
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Fri Apr 19 22:32:50 2019 +0200

    [BEAM-3863] Ensure correct firing of processing time timers
    
    In Beam, a timer with timestamp `T` is only illegible for firing when the 
time
    has moved past this time stamp, i.e. `T < current_time`. In the case of 
event
    time, current_time is the watermark, in the case of processing time it is 
the
    system time.
    
    Flink's TimerService has different semantics because it only ensures `T <=
    current_time`. To make up for this, we previously subtracted one from the
    Watermark. However, this does not fix the problem for processing time. We 
can't
    modify processing time easily like we can for event time via the watermark.
    
    To fix processing timers, we change Flink's internal timer timestamp. We 
add one
    millisecond to the timestamp to ensure that the time has moved past the 
original
    timer timestamp in both event and processing time.
    
    Note that we do not modify Beam's timestamp and we do not expose Flink's
    timestamp. For consistency this approach has also been applied to event time
    timers.
---
 .../wrappers/streaming/DoFnOperator.java           | 43 +++++++++++--------
 .../wrappers/streaming/DoFnOperatorTest.java       | 49 ++++++++++++++++++----
 2 files changed, 66 insertions(+), 26 deletions(-)

diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index dfed839..adc9c59 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -616,8 +616,7 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
       // hold back by the pushed back values waiting for side inputs
       long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
 
-      timeServiceManager.advanceWatermark(
-          new Watermark(toFlinkRuntimeWatermark(pushedBackInputWatermark)));
+      timeServiceManager.advanceWatermark(new 
Watermark(pushedBackInputWatermark));
 
       Instant watermarkHold = keyedStateInternals.watermarkHold();
 
@@ -655,17 +654,6 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
   }
 
   /**
-   * Converts a Beam watermark to a Flink watermark. This is only relevant 
when considering what
-   * event-time timers to fire: in Beam, a watermark {@code T} says there will 
not be any elements
-   * with a timestamp {@code < T} in the future. A Flink watermark {@code T} 
says there will not be
-   * any elements with a timestamp {@code <= T} in the future. We correct this 
by subtracting {@code
-   * 1} from a Beam watermark before passing to any relevant Flink runtime 
components.
-   */
-  private static long toFlinkRuntimeWatermark(long beamWatermark) {
-    return beamWatermark - 1;
-  }
-
-  /**
    * Emits all pushed-back data. This should be used once we know that there 
will not be any future
    * side input, i.e. that there is no point in waiting.
    */
@@ -1030,11 +1018,11 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
       long time = timer.getTimestamp().getMillis();
       switch (timer.getDomain()) {
         case EVENT_TIME:
-          timerService.registerEventTimeTimer(timer, time);
+          timerService.registerEventTimeTimer(timer, 
adjustTimestampForFlink(time));
           break;
         case PROCESSING_TIME:
         case SYNCHRONIZED_PROCESSING_TIME:
-          timerService.registerProcessingTimeTimer(timer, time);
+          timerService.registerProcessingTimeTimer(timer, 
adjustTimestampForFlink(time));
           break;
         default:
           throw new UnsupportedOperationException("Unsupported time domain: " 
+ timer.getDomain());
@@ -1082,11 +1070,11 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
       long time = timerKey.getTimestamp().getMillis();
       switch (timerKey.getDomain()) {
         case EVENT_TIME:
-          timerService.deleteEventTimeTimer(timerKey, time);
+          timerService.deleteEventTimeTimer(timerKey, 
adjustTimestampForFlink(time));
           break;
         case PROCESSING_TIME:
         case SYNCHRONIZED_PROCESSING_TIME:
-          timerService.deleteProcessingTimeTimer(timerKey, time);
+          timerService.deleteProcessingTimeTimer(timerKey, 
adjustTimestampForFlink(time));
           break;
         default:
           throw new UnsupportedOperationException(
@@ -1115,5 +1103,26 @@ public class DoFnOperator<InputT, OutputT> extends 
AbstractStreamOperator<Window
     public Instant currentOutputWatermarkTime() {
       return new Instant(currentOutputWatermark);
     }
+
+    /**
+     * In Beam, a timer with timestamp {@code T} is only illegible for firing 
when the time has
+     * moved past this time stamp, i.e. {@code T < current_time}. In the case 
of event time,
+     * current_time is the watermark, in the case of processing time it is the 
system time.
+     *
+     * <p>Flink's TimerService has different semantics because it only ensures 
{@code T <=
+     * current_time}.
+     *
+     * <p>To make up for this, we need to add one millisecond to Flink's 
internal timer timestamp.
+     * Note that we do not modify Beam's timestamp and we are not exposing 
Flink's timestamp.
+     *
+     * <p>See also https://jira.apache.org/jira/browse/BEAM-3863
+     */
+    private long adjustTimestampForFlink(long beamTimerTimestamp) {
+      if (beamTimerTimestamp == Long.MAX_VALUE) {
+        // We would overflow, do not adjust timestamp
+        return Long.MAX_VALUE;
+      }
+      return beamTimerTimestamp + 1;
+    }
   }
 }
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
index 652ff1f..41c0c41 100644
--- 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperatorTest.java
@@ -243,7 +243,8 @@ public class DoFnOperatorTest {
   public void testWatermarkContract() throws Exception {
 
     final Instant timerTimestamp = new Instant(1000);
-    final String outputMessage = "Timer fired";
+    final String eventTimeMessage = "Event timer fired";
+    final String processingTimeMessage = "Processing timer fired";
 
     WindowingStrategy<Object, IntervalWindow> windowingStrategy =
         WindowingStrategy.of(FixedWindows.of(new Duration(10_000)));
@@ -251,20 +252,39 @@ public class DoFnOperatorTest {
     DoFn<Integer, String> fn =
         new DoFn<Integer, String>() {
           private static final String EVENT_TIMER_ID = "eventTimer";
+          private static final String PROCESSING_TIMER_ID = "processingTimer";
 
           @TimerId(EVENT_TIMER_ID)
           private final TimerSpec eventTimer = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
 
+          @TimerId(PROCESSING_TIMER_ID)
+          private final TimerSpec processingTimer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
           @ProcessElement
-          public void processElement(ProcessContext context, 
@TimerId(EVENT_TIMER_ID) Timer timer) {
-            timer.set(timerTimestamp);
+          public void processElement(
+              ProcessContext context,
+              @TimerId(EVENT_TIMER_ID) Timer eventTimer,
+              @TimerId(PROCESSING_TIMER_ID) Timer processingTimer) {
+            eventTimer.set(timerTimestamp);
+            
processingTimer.offset(Duration.millis(timerTimestamp.getMillis())).setRelative();
           }
 
           @OnTimer(EVENT_TIMER_ID)
           public void onEventTime(OnTimerContext context) {
             assertEquals(
                 "Timer timestamp must match set timestamp.", timerTimestamp, 
context.timestamp());
-            context.outputWithTimestamp(outputMessage, context.timestamp());
+            context.outputWithTimestamp(eventTimeMessage, context.timestamp());
+          }
+
+          @OnTimer(PROCESSING_TIMER_ID)
+          public void onProcessingTime(OnTimerContext context) {
+            assertEquals(
+                // Timestamps in processing timer context are defined to be 
the input watermark
+                // See SimpleDoFnRunner#onTimer
+                "Timer timestamp must match current input watermark",
+                timerTimestamp.plus(1),
+                context.timestamp());
+            context.outputWithTimestamp(processingTimeMessage, 
context.timestamp());
           }
         };
 
@@ -304,30 +324,41 @@ public class DoFnOperatorTest {
     testHarness.open();
 
     testHarness.processWatermark(0);
+    testHarness.setProcessingTime(0);
 
     IntervalWindow window1 = new IntervalWindow(new Instant(0), 
Duration.millis(10_000));
 
-    // this should register a timer
+    // this should register the two timers above
     testHarness.processElement(
         new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, 
PaneInfo.NO_FIRING)));
 
     assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()), 
emptyIterable());
 
-    // this does not yet fire the timer (in vanilla Flink it would)
+    // this does not yet fire the timers (in vanilla Flink it would)
     testHarness.processWatermark(timerTimestamp.getMillis());
+    testHarness.setProcessingTime(timerTimestamp.getMillis());
 
     assertThat(stripStreamRecordFromWindowedValue(testHarness.getOutput()), 
emptyIterable());
 
+    // this must fire the event timer
+    testHarness.processWatermark(timerTimestamp.getMillis() + 1);
+
+    assertThat(
+        stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(WindowedValue.of(eventTimeMessage, timerTimestamp, window1, 
PaneInfo.NO_FIRING)));
+
     testHarness.getOutput().clear();
 
-    // this must fire the timer
-    testHarness.processWatermark(timerTimestamp.getMillis() + 1);
+    // this must fire the processing timer
+    testHarness.setProcessingTime(timerTimestamp.getMillis() + 1);
 
     assertThat(
         stripStreamRecordFromWindowedValue(testHarness.getOutput()),
         contains(
             WindowedValue.of(
-                outputMessage, new Instant(timerTimestamp), window1, 
PaneInfo.NO_FIRING)));
+                // Timestamps in processing timer context are defined to be 
the input watermark
+                // See SimpleDoFnRunner#onTimer
+                processingTimeMessage, timerTimestamp.plus(1), window1, 
PaneInfo.NO_FIRING)));
 
     testHarness.close();
   }

Reply via email to