pnowojski commented on code in PR #24941:
URL: https://github.com/apache/flink/pull/24941#discussion_r1642915046
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/wmassigners/WatermarkAssignerOperator.java:
##########
@@ -136,19 +154,28 @@ private void advanceWatermark() {
@Override
public void onProcessingTime(long timestamp) throws Exception {
- advanceWatermark();
+ // timestamp and now can be off in case TM is heavily overloaded.
+ long now = getProcessingTimeService().getCurrentProcessingTime();
- if (idleTimeout > 0 && currentStatus.equals(WatermarkStatus.ACTIVE)) {
- final long currentTime =
getProcessingTimeService().getCurrentProcessingTime();
- if (currentTime - lastRecordTime > idleTimeout) {
- // mark the channel as idle to ignore watermarks from this
channel
- emitWatermarkStatus(WatermarkStatus.IDLE);
- }
+ if (watermarkInterval > 0
+ && lastWatermarkPeriodicEmitTime + watermarkInterval <=
timestamp) {
Review Comment:
Goog point I think. I will try to switch to `now` and let's see if some
tests will complain or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]