pnowojski commented on a change in pull request #17102:
URL: https://github.com/apache/flink/pull/17102#discussion_r701062301



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
##########
@@ -61,7 +61,7 @@ public long getCurrentProcessingTime() {
     public ScheduledFuture<?> registerTimer(long timestamp, 
ProcessingTimeCallback target) {
         if (isQuiesced()) {
             return new NeverCompleteFuture(
-                    ProcessingTimeServiceUtil.getProcessingTimeDelay(
+                    ProcessingTimeServiceUtil.getRecordProcessingTimeDelay(

Review comment:
       I don't think this rename is a good idea. As I mentioned in the JIRA 
ticket, this doesn't have to be about processing records, but about any 
abstract events that are happening in the system using processing time.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceUtil.java
##########
@@ -26,21 +26,22 @@
 
     /**
      * Returns the remaining delay of the processing time specified by {@code 
processingTimestamp}.
+     * This delay should guarantee that the timer will be fired after all 
record with timestamp <=
+     * {@code processingTimestamp} have been already processed.
      *
      * @param processingTimestamp the processing time in milliseconds
      * @param currentTimestamp the current processing timestamp; it usually 
uses {@link
      *     ProcessingTimeService#getCurrentProcessingTime()} to get
      * @return the remaining delay of the processing time
      */
-    public static long getProcessingTimeDelay(long processingTimestamp, long 
currentTimestamp) {
+    public static long getRecordProcessingTimeDelay(
+            long processingTimestamp, long currentTimestamp) {
 
-        // Two cases of timers here:
-        // (1) future/now timers(processingTimestamp >= currentTimestamp): 
delay the firing of the
-        //   timer by 1 ms to align the semantics with watermark. A watermark 
T says we won't see
-        //   elements in the future with a timestamp smaller or equal to T. 
With processing time, we
-        //   therefore need to delay firing the timer by one ms.
-        // (2) past timers(processingTimestamp < currentTimestamp): do not 
need to delay the firing
-        //   because currentTimestamp is larger than processingTimestamp 
pluses the 1ms offset.
+        // future/now timers(processingTimestamp >= currentTimestamp): delay 
the firing of the
+        // timer by 1 ms to align the semantics with watermark. A watermark T 
says we won't see
+        // elements in the future with a timestamp smaller or equal to T. 
Without this 1ms delay,
+        // if we had fired the timer for T at the timestamp T, it would be 
possible that we would
+        // process another record for timestamp == T in the same millisecond.

Review comment:
       > it would be possible that we would process another record for 
timestamp == T in the same millisecond.
   
   ->
   
   > it would be possible that we would process another record for timestamp == 
T in the same millisecond, but after the timer for the timsetamp T has already 
been fired. 

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
##########
@@ -107,43 +109,42 @@ public void testErrorReporting() throws Exception {
 
     @Test
     public void checkScheduledTimestamps() throws Exception {
-        final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-        final long t1 = System.currentTimeMillis();
-        final long t2 = System.currentTimeMillis() - 200;
-        final long t3 = System.currentTimeMillis() + 100;
-        final long t4 = System.currentTimeMillis() + 200;
-
-        timeService.registerTimer(t1, new 
ValidatingProcessingTimeCallback(errorRef, t1, 0));
-        timeService.registerTimer(t2, new 
ValidatingProcessingTimeCallback(errorRef, t2, 1));
-        timeService.registerTimer(t3, new 
ValidatingProcessingTimeCallback(errorRef, t3, 2));
-        timeService.registerTimer(t4, new 
ValidatingProcessingTimeCallback(errorRef, t4, 3));
-
-        long deadline = System.currentTimeMillis() + 20000;
-        while (errorRef.get() == null
-                && ValidatingProcessingTimeCallback.numInSequence < 4
-                && System.currentTimeMillis() < deadline) {
-            Thread.sleep(100);
+        ValidatingProcessingTimeCallback.numInSequence = 0;
+        long currentTimeMillis = System.currentTimeMillis();
+        ArrayList<ValidatingProcessingTimeCallback> timeCallbacks = new 
ArrayList<>();
+
+        /*
+         It is not possible to register timer for currentTimeMillis or value 
slightly greater than
+         currentTimeMillis because if the during registerTimer the internal 
currentTime is equal
+         to this value then according to current logic the time will be 
increased for 1ms while
+         `currentTimeMillis - 200` is always transform to 0, so it can lead to 
reordering. See
+         comment in {@link 
ProcessingTimeServiceUtil#getRecordProcessingTimeDelay(long, long)}.
+        */
+        timeCallbacks.add(new 
ValidatingProcessingTimeCallback(currentTimeMillis - 1, 0));
+        timeCallbacks.add(new 
ValidatingProcessingTimeCallback(currentTimeMillis - 200, 1));
+        timeCallbacks.add(new 
ValidatingProcessingTimeCallback(currentTimeMillis + 100, 2));
+        timeCallbacks.add(new 
ValidatingProcessingTimeCallback(currentTimeMillis + 200, 3));
+
+        for (ValidatingProcessingTimeCallback timeCallback : timeCallbacks) {
+            timeService.registerTimer(timeCallback.expectedTimestamp, 
timeCallback);
         }
 
-        verifyNoException(errorRef.get());
+        for (ValidatingProcessingTimeCallback timeCallback : timeCallbacks) {
+            timeCallback.assertExpectedValues();
+        }
         assertEquals(4, ValidatingProcessingTimeCallback.numInSequence);
     }
 
     private static class ValidatingProcessingTimeCallback implements 
ProcessingTimeCallback {
 
         static int numInSequence;
 
-        private final AtomicReference<Throwable> errorRef;
+        private final CompletableFuture<Void> finished = new 
CompletableFuture<>();

Review comment:
       The bug fix in the test and the refactoring of this test I think are 
independent and should be in separate commits? 

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
##########
@@ -107,43 +109,42 @@ public void testErrorReporting() throws Exception {
 
     @Test
     public void checkScheduledTimestamps() throws Exception {
-        final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
-        final long t1 = System.currentTimeMillis();
-        final long t2 = System.currentTimeMillis() - 200;
-        final long t3 = System.currentTimeMillis() + 100;
-        final long t4 = System.currentTimeMillis() + 200;
-
-        timeService.registerTimer(t1, new 
ValidatingProcessingTimeCallback(errorRef, t1, 0));
-        timeService.registerTimer(t2, new 
ValidatingProcessingTimeCallback(errorRef, t2, 1));
-        timeService.registerTimer(t3, new 
ValidatingProcessingTimeCallback(errorRef, t3, 2));
-        timeService.registerTimer(t4, new 
ValidatingProcessingTimeCallback(errorRef, t4, 3));
-
-        long deadline = System.currentTimeMillis() + 20000;
-        while (errorRef.get() == null
-                && ValidatingProcessingTimeCallback.numInSequence < 4
-                && System.currentTimeMillis() < deadline) {
-            Thread.sleep(100);
+        ValidatingProcessingTimeCallback.numInSequence = 0;
+        long currentTimeMillis = System.currentTimeMillis();
+        ArrayList<ValidatingProcessingTimeCallback> timeCallbacks = new 
ArrayList<>();
+
+        /*
+         It is not possible to register timer for currentTimeMillis or value 
slightly greater than

Review comment:
       I believe that instead of:
   > It is not possible to register timer for currentTimeMillis
   
   You had this in mind?
   
   > It is not possible to test registering timer for currentTimeMillis
   
   ?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceUtil.java
##########
@@ -26,21 +26,22 @@
 
     /**
      * Returns the remaining delay of the processing time specified by {@code 
processingTimestamp}.
+     * This delay should guarantee that the timer will be fired after all 
record with timestamp <=
+     * {@code processingTimestamp} have been already processed.
      *
      * @param processingTimestamp the processing time in milliseconds
      * @param currentTimestamp the current processing timestamp; it usually 
uses {@link
      *     ProcessingTimeService#getCurrentProcessingTime()} to get
      * @return the remaining delay of the processing time
      */
-    public static long getProcessingTimeDelay(long processingTimestamp, long 
currentTimestamp) {
+    public static long getRecordProcessingTimeDelay(
+            long processingTimestamp, long currentTimestamp) {
 
-        // Two cases of timers here:
-        // (1) future/now timers(processingTimestamp >= currentTimestamp): 
delay the firing of the
-        //   timer by 1 ms to align the semantics with watermark. A watermark 
T says we won't see
-        //   elements in the future with a timestamp smaller or equal to T. 
With processing time, we
-        //   therefore need to delay firing the timer by one ms.
-        // (2) past timers(processingTimestamp < currentTimestamp): do not 
need to delay the firing
-        //   because currentTimestamp is larger than processingTimestamp 
pluses the 1ms offset.
+        // future/now timers(processingTimestamp >= currentTimestamp): delay 
the firing of the
+        // timer by 1 ms to align the semantics with watermark. A watermark T 
says we won't see
+        // elements in the future with a timestamp smaller or equal to T. 
Without this 1ms delay,
+        // if we had fired the timer for T at the timestamp T, it would be 
possible that we would
+        // process another record for timestamp == T in the same millisecond.

Review comment:
       Plus I would keep the distinction between those two cases (the previous 
(1) and (2)).   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceUtil.java
##########
@@ -26,21 +26,22 @@
 
     /**
      * Returns the remaining delay of the processing time specified by {@code 
processingTimestamp}.
+     * This delay should guarantee that the timer will be fired after all 
record with timestamp <=
+     * {@code processingTimestamp} have been already processed.

Review comment:
       ```
   This delay guarantees that the timer will be fired at least 1ms after the 
time it's registered for.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to