pnowojski commented on a change in pull request #17102:
URL: https://github.com/apache/flink/pull/17102#discussion_r701062301
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceImpl.java
##########
@@ -61,7 +61,7 @@ public long getCurrentProcessingTime() {
public ScheduledFuture<?> registerTimer(long timestamp,
ProcessingTimeCallback target) {
if (isQuiesced()) {
return new NeverCompleteFuture(
- ProcessingTimeServiceUtil.getProcessingTimeDelay(
+ ProcessingTimeServiceUtil.getRecordProcessingTimeDelay(
Review comment:
I don't think this rename is a good idea. As I mentioned in the JIRA
ticket, this doesn't have to be about processing records, but about any
abstract events that are happening in the system using processing time.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceUtil.java
##########
@@ -26,21 +26,22 @@
/**
* Returns the remaining delay of the processing time specified by {@code
processingTimestamp}.
+ * This delay should guarantee that the timer will be fired after all
record with timestamp <=
+ * {@code processingTimestamp} have been already processed.
*
* @param processingTimestamp the processing time in milliseconds
* @param currentTimestamp the current processing timestamp; it usually
uses {@link
* ProcessingTimeService#getCurrentProcessingTime()} to get
* @return the remaining delay of the processing time
*/
- public static long getProcessingTimeDelay(long processingTimestamp, long
currentTimestamp) {
+ public static long getRecordProcessingTimeDelay(
+ long processingTimestamp, long currentTimestamp) {
- // Two cases of timers here:
- // (1) future/now timers(processingTimestamp >= currentTimestamp):
delay the firing of the
- // timer by 1 ms to align the semantics with watermark. A watermark
T says we won't see
- // elements in the future with a timestamp smaller or equal to T.
With processing time, we
- // therefore need to delay firing the timer by one ms.
- // (2) past timers(processingTimestamp < currentTimestamp): do not
need to delay the firing
- // because currentTimestamp is larger than processingTimestamp
pluses the 1ms offset.
+ // future/now timers(processingTimestamp >= currentTimestamp): delay
the firing of the
+ // timer by 1 ms to align the semantics with watermark. A watermark T
says we won't see
+ // elements in the future with a timestamp smaller or equal to T.
Without this 1ms delay,
+ // if we had fired the timer for T at the timestamp T, it would be
possible that we would
+ // process another record for timestamp == T in the same millisecond.
Review comment:
> it would be possible that we would process another record for
timestamp == T in the same millisecond.
->
> it would be possible that we would process another record for timestamp ==
T in the same millisecond, but after the timer for the timsetamp T has already
been fired.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
##########
@@ -107,43 +109,42 @@ public void testErrorReporting() throws Exception {
@Test
public void checkScheduledTimestamps() throws Exception {
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- final long t1 = System.currentTimeMillis();
- final long t2 = System.currentTimeMillis() - 200;
- final long t3 = System.currentTimeMillis() + 100;
- final long t4 = System.currentTimeMillis() + 200;
-
- timeService.registerTimer(t1, new
ValidatingProcessingTimeCallback(errorRef, t1, 0));
- timeService.registerTimer(t2, new
ValidatingProcessingTimeCallback(errorRef, t2, 1));
- timeService.registerTimer(t3, new
ValidatingProcessingTimeCallback(errorRef, t3, 2));
- timeService.registerTimer(t4, new
ValidatingProcessingTimeCallback(errorRef, t4, 3));
-
- long deadline = System.currentTimeMillis() + 20000;
- while (errorRef.get() == null
- && ValidatingProcessingTimeCallback.numInSequence < 4
- && System.currentTimeMillis() < deadline) {
- Thread.sleep(100);
+ ValidatingProcessingTimeCallback.numInSequence = 0;
+ long currentTimeMillis = System.currentTimeMillis();
+ ArrayList<ValidatingProcessingTimeCallback> timeCallbacks = new
ArrayList<>();
+
+ /*
+ It is not possible to register timer for currentTimeMillis or value
slightly greater than
+ currentTimeMillis because if the during registerTimer the internal
currentTime is equal
+ to this value then according to current logic the time will be
increased for 1ms while
+ `currentTimeMillis - 200` is always transform to 0, so it can lead to
reordering. See
+ comment in {@link
ProcessingTimeServiceUtil#getRecordProcessingTimeDelay(long, long)}.
+ */
+ timeCallbacks.add(new
ValidatingProcessingTimeCallback(currentTimeMillis - 1, 0));
+ timeCallbacks.add(new
ValidatingProcessingTimeCallback(currentTimeMillis - 200, 1));
+ timeCallbacks.add(new
ValidatingProcessingTimeCallback(currentTimeMillis + 100, 2));
+ timeCallbacks.add(new
ValidatingProcessingTimeCallback(currentTimeMillis + 200, 3));
+
+ for (ValidatingProcessingTimeCallback timeCallback : timeCallbacks) {
+ timeService.registerTimer(timeCallback.expectedTimestamp,
timeCallback);
}
- verifyNoException(errorRef.get());
+ for (ValidatingProcessingTimeCallback timeCallback : timeCallbacks) {
+ timeCallback.assertExpectedValues();
+ }
assertEquals(4, ValidatingProcessingTimeCallback.numInSequence);
}
private static class ValidatingProcessingTimeCallback implements
ProcessingTimeCallback {
static int numInSequence;
- private final AtomicReference<Throwable> errorRef;
+ private final CompletableFuture<Void> finished = new
CompletableFuture<>();
Review comment:
The bug fix in the test and the refactoring of this test I think are
independent and should be in separate commits?
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
##########
@@ -107,43 +109,42 @@ public void testErrorReporting() throws Exception {
@Test
public void checkScheduledTimestamps() throws Exception {
- final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-
- final long t1 = System.currentTimeMillis();
- final long t2 = System.currentTimeMillis() - 200;
- final long t3 = System.currentTimeMillis() + 100;
- final long t4 = System.currentTimeMillis() + 200;
-
- timeService.registerTimer(t1, new
ValidatingProcessingTimeCallback(errorRef, t1, 0));
- timeService.registerTimer(t2, new
ValidatingProcessingTimeCallback(errorRef, t2, 1));
- timeService.registerTimer(t3, new
ValidatingProcessingTimeCallback(errorRef, t3, 2));
- timeService.registerTimer(t4, new
ValidatingProcessingTimeCallback(errorRef, t4, 3));
-
- long deadline = System.currentTimeMillis() + 20000;
- while (errorRef.get() == null
- && ValidatingProcessingTimeCallback.numInSequence < 4
- && System.currentTimeMillis() < deadline) {
- Thread.sleep(100);
+ ValidatingProcessingTimeCallback.numInSequence = 0;
+ long currentTimeMillis = System.currentTimeMillis();
+ ArrayList<ValidatingProcessingTimeCallback> timeCallbacks = new
ArrayList<>();
+
+ /*
+ It is not possible to register timer for currentTimeMillis or value
slightly greater than
Review comment:
I believe that instead of:
> It is not possible to register timer for currentTimeMillis
You had this in mind?
> It is not possible to test registering timer for currentTimeMillis
?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceUtil.java
##########
@@ -26,21 +26,22 @@
/**
* Returns the remaining delay of the processing time specified by {@code
processingTimestamp}.
+ * This delay should guarantee that the timer will be fired after all
record with timestamp <=
+ * {@code processingTimestamp} have been already processed.
*
* @param processingTimestamp the processing time in milliseconds
* @param currentTimestamp the current processing timestamp; it usually
uses {@link
* ProcessingTimeService#getCurrentProcessingTime()} to get
* @return the remaining delay of the processing time
*/
- public static long getProcessingTimeDelay(long processingTimestamp, long
currentTimestamp) {
+ public static long getRecordProcessingTimeDelay(
+ long processingTimestamp, long currentTimestamp) {
- // Two cases of timers here:
- // (1) future/now timers(processingTimestamp >= currentTimestamp):
delay the firing of the
- // timer by 1 ms to align the semantics with watermark. A watermark
T says we won't see
- // elements in the future with a timestamp smaller or equal to T.
With processing time, we
- // therefore need to delay firing the timer by one ms.
- // (2) past timers(processingTimestamp < currentTimestamp): do not
need to delay the firing
- // because currentTimestamp is larger than processingTimestamp
pluses the 1ms offset.
+ // future/now timers(processingTimestamp >= currentTimestamp): delay
the firing of the
+ // timer by 1 ms to align the semantics with watermark. A watermark T
says we won't see
+ // elements in the future with a timestamp smaller or equal to T.
Without this 1ms delay,
+ // if we had fired the timer for T at the timestamp T, it would be
possible that we would
+ // process another record for timestamp == T in the same millisecond.
Review comment:
Plus I would keep the distinction between those two cases (the previous
(1) and (2)).
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceUtil.java
##########
@@ -26,21 +26,22 @@
/**
* Returns the remaining delay of the processing time specified by {@code
processingTimestamp}.
+ * This delay should guarantee that the timer will be fired after all
record with timestamp <=
+ * {@code processingTimestamp} have been already processed.
Review comment:
```
This delay guarantees that the timer will be fired at least 1ms after the
time it's registered for.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]