davidradl commented on code in PR #27083:
URL: https://github.com/apache/flink/pull/27083#discussion_r2414361434
##########
flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -746,6 +748,41 @@ private boolean isCurrentSyncSavepoint(long checkpointId) {
*/
protected void advanceToEndOfEventTime() throws Exception {}
+ /**
+ * Emits FINISHED watermark status to indicate this task has permanently
completed and should be
+ * excluded from watermark aggregation in downstream operators.
+ *
+ * <p>This method is overridden by source tasks to emit FINISHED status
directly to downstream
+ * via network outputs. Processing tasks (e.g., OneInputStreamTask,
TwoInputStreamTask) do not
+ * override this method because they rely on StatusWatermarkValve to
aggregate FINISHED status
+ * from upstream inputs and propagate it downstream automatically.
+ */
+ protected void emitFinishedStatus() {
+ // Empty by default - only source tasks override this method.
+ }
+
+ /**
+ * Helper method to emit FINISHED watermark status to all stream outputs.
Subclasses can call
+ * this from their emitFinishedStatus() override if needed.
+ */
+ protected void emitFinishedStatusToOutputs() {
+ try {
+ if (operatorChain != null) {
+ RecordWriterOutput<?>[] streamOutputs =
operatorChain.getStreamOutputs();
Review Comment:
Can we have no stream outputs? if so we could issue another
LOG.warn("Cannot emit FINISHED watermark status: operator chain has no stream
outputs");)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]