davidradl commented on code in PR #27083:
URL: https://github.com/apache/flink/pull/27083#discussion_r2414349835


##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/FinishedOnRestoreInput.java:
##########
@@ -58,7 +59,22 @@ public void processWatermark(Watermark watermark) {
 
     @Override
     public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws 
Exception {
-        throw new IllegalStateException();
+        // Tasks that finished on restore may receive FINISHED watermark 
status from upstream.
+        // Aggregate FINISHED status from all inputs and emit once all are 
received, similar to
+        // how MAX_WATERMARK is handled. This ensures proper watermark status 
propagation.
+        if (watermarkStatus.isFinished()) {
+            if (++finishedStatusSeen == inputCount) {
+                for (RecordWriterOutput<?> streamOutput : streamOutputs) {
+                    streamOutput.emitWatermarkStatus(watermarkStatus);
+                }
+            }
+            return;
+        }
+        // Other watermark statuses (ACTIVE, IDLE) should not occur for 
finished tasks

Review Comment:
   nits:
   - you could could fail fast an issue the Exception  if 
(!watermarkStatus.isFinished()) {.throw  }
   then you would not need the `return`.
   - can we put out an identifier of the task in the Exception to help with 
diagnostics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to