This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6fffd7d33f617acc69f405deecf050ef710d439
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Feb 12 10:07:13 2025 +0100

    [hotfix][runtime] Log watermark alignment duration (and all other stages)
---
 .../streaming/api/operators/SourceOperator.java    | 35 ++++++++++++++++------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 4b3691db8a1..f153617b2c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -171,6 +171,9 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
     /** A mode to control the behaviour of the {@link #emitNext(DataOutput)} 
method. */
     private OperatingMode operatingMode;
 
+    /** The timestamp when {#operatingMode} was last changed. */
+    private long operatingModeChangeTs;
+
     private final CompletableFuture<Void> finished = new CompletableFuture<>();
     private final SourceOperatorAvailabilityHelper availabilityHelper =
             new SourceOperatorAvailabilityHelper();
@@ -256,7 +259,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         this.configuration = checkNotNull(configuration);
         this.localHostname = checkNotNull(localHostname);
         this.emitProgressiveWatermarks = emitProgressiveWatermarks;
-        this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED;
+        setOperatingMode(OperatingMode.OUTPUT_NOT_INITIALIZED);
         this.watermarkAlignmentParams = 
watermarkStrategy.getAlignmentParameters();
         this.allowUnalignedSourceSplits = 
configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS);
         this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords);
@@ -480,10 +483,10 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             case WAITING_FOR_ALIGNMENT:
             case OUTPUT_NOT_INITIALIZED:
             case READING:
-                this.operatingMode =
+                setOperatingMode(
                         mode == StopMode.DRAIN
                                 ? OperatingMode.SOURCE_DRAINED
-                                : OperatingMode.SOURCE_STOPPED;
+                                : OperatingMode.SOURCE_STOPPED);
                 availabilityHelper.forceStop();
                 if (this.operatingMode == OperatingMode.SOURCE_STOPPED) {
                     stopInternalServices();
@@ -554,11 +557,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                 initializeMainOutput(output);
                 return 
convertToInternalStatus(sourceReader.pollNext(currentMainOutput));
             case SOURCE_STOPPED:
-                this.operatingMode = OperatingMode.DATA_FINISHED;
+                setOperatingMode(OperatingMode.DATA_FINISHED);
                 sourceMetricGroup.idlingStarted();
                 return DataInputStatus.STOPPED;
             case SOURCE_DRAINED:
-                this.operatingMode = OperatingMode.DATA_FINISHED;
+                setOperatingMode(OperatingMode.DATA_FINISHED);
                 sourceMetricGroup.idlingStarted();
                 return DataInputStatus.END_OF_DATA;
             case DATA_FINISHED:
@@ -589,7 +592,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         lastInvokedOutput = output;
         // Create per-split output for pending splits added before main output 
is initialized
         createOutputForSplits(splitsToInitializeOutput);
-        this.operatingMode = OperatingMode.READING;
+        setOperatingMode(OperatingMode.READING);
     }
 
     private void initializeLatencyMarkerEmitter(DataOutput<OUT> output) {
@@ -621,7 +624,7 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
                 sourceMetricGroup.idlingStarted();
                 return DataInputStatus.NOTHING_AVAILABLE;
             case END_OF_INPUT:
-                this.operatingMode = OperatingMode.DATA_FINISHED;
+                setOperatingMode(OperatingMode.DATA_FINISHED);
                 sourceMetricGroup.idlingStarted();
                 return DataInputStatus.END_OF_DATA;
             default:
@@ -898,14 +901,14 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
         if (operatingMode == OperatingMode.READING) {
             checkState(waitingForAlignmentFuture.isDone());
             if (shouldWaitForAlignment()) {
-                operatingMode = OperatingMode.WAITING_FOR_ALIGNMENT;
+                setOperatingMode(OperatingMode.WAITING_FOR_ALIGNMENT);
                 waitingForAlignmentFuture = new CompletableFuture<>();
                 mainInputActivityClock.pause();
             }
         } else if (operatingMode == OperatingMode.WAITING_FOR_ALIGNMENT) {
             checkState(!waitingForAlignmentFuture.isDone());
             if (!shouldWaitForAlignment()) {
-                operatingMode = OperatingMode.READING;
+                setOperatingMode(OperatingMode.READING);
                 waitingForAlignmentFuture.complete(null);
                 mainInputActivityClock.unPause();
             }
@@ -961,4 +964,18 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit> extends AbstractStr
             forcedStopFuture.complete(null);
         }
     }
+
+    private void setOperatingMode(OperatingMode newMode) {
+        final long now = System.currentTimeMillis();
+        LOG.info(
+                "Switch mode from {} to {} after {} ms, 
currentMaxDesiredWatermark={}, latestWatermark={}, oldestWatermark={}",
+                operatingMode,
+                newMode,
+                now - operatingModeChangeTs,
+                currentMaxDesiredWatermark,
+                sampledLatestWatermark.getLatest(),
+                sampledLatestWatermark.getOldestSample());
+        operatingMode = newMode;
+        operatingModeChangeTs = now;
+    }
 }

Reply via email to