This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit f6fffd7d33f617acc69f405deecf050ef710d439 Author: Roman Khachatryan <[email protected]> AuthorDate: Wed Feb 12 10:07:13 2025 +0100 [hotfix][runtime] Log watermark alignment duration (and all other stages) --- .../streaming/api/operators/SourceOperator.java | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 4b3691db8a1..f153617b2c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -171,6 +171,9 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr /** A mode to control the behaviour of the {@link #emitNext(DataOutput)} method. */ private OperatingMode operatingMode; + /** The timestamp when {#operatingMode} was last changed. */ + private long operatingModeChangeTs; + private final CompletableFuture<Void> finished = new CompletableFuture<>(); private final SourceOperatorAvailabilityHelper availabilityHelper = new SourceOperatorAvailabilityHelper(); @@ -256,7 +259,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr this.configuration = checkNotNull(configuration); this.localHostname = checkNotNull(localHostname); this.emitProgressiveWatermarks = emitProgressiveWatermarks; - this.operatingMode = OperatingMode.OUTPUT_NOT_INITIALIZED; + setOperatingMode(OperatingMode.OUTPUT_NOT_INITIALIZED); this.watermarkAlignmentParams = watermarkStrategy.getAlignmentParameters(); this.allowUnalignedSourceSplits = configuration.get(ALLOW_UNALIGNED_SOURCE_SPLITS); this.canEmitBatchOfRecords = checkNotNull(canEmitBatchOfRecords); @@ -480,10 +483,10 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr case WAITING_FOR_ALIGNMENT: case OUTPUT_NOT_INITIALIZED: case READING: - this.operatingMode = + setOperatingMode( mode == StopMode.DRAIN ? OperatingMode.SOURCE_DRAINED - : OperatingMode.SOURCE_STOPPED; + : OperatingMode.SOURCE_STOPPED); availabilityHelper.forceStop(); if (this.operatingMode == OperatingMode.SOURCE_STOPPED) { stopInternalServices(); @@ -554,11 +557,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr initializeMainOutput(output); return convertToInternalStatus(sourceReader.pollNext(currentMainOutput)); case SOURCE_STOPPED: - this.operatingMode = OperatingMode.DATA_FINISHED; + setOperatingMode(OperatingMode.DATA_FINISHED); sourceMetricGroup.idlingStarted(); return DataInputStatus.STOPPED; case SOURCE_DRAINED: - this.operatingMode = OperatingMode.DATA_FINISHED; + setOperatingMode(OperatingMode.DATA_FINISHED); sourceMetricGroup.idlingStarted(); return DataInputStatus.END_OF_DATA; case DATA_FINISHED: @@ -589,7 +592,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr lastInvokedOutput = output; // Create per-split output for pending splits added before main output is initialized createOutputForSplits(splitsToInitializeOutput); - this.operatingMode = OperatingMode.READING; + setOperatingMode(OperatingMode.READING); } private void initializeLatencyMarkerEmitter(DataOutput<OUT> output) { @@ -621,7 +624,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr sourceMetricGroup.idlingStarted(); return DataInputStatus.NOTHING_AVAILABLE; case END_OF_INPUT: - this.operatingMode = OperatingMode.DATA_FINISHED; + setOperatingMode(OperatingMode.DATA_FINISHED); sourceMetricGroup.idlingStarted(); return DataInputStatus.END_OF_DATA; default: @@ -898,14 +901,14 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr if (operatingMode == OperatingMode.READING) { checkState(waitingForAlignmentFuture.isDone()); if (shouldWaitForAlignment()) { - operatingMode = OperatingMode.WAITING_FOR_ALIGNMENT; + setOperatingMode(OperatingMode.WAITING_FOR_ALIGNMENT); waitingForAlignmentFuture = new CompletableFuture<>(); mainInputActivityClock.pause(); } } else if (operatingMode == OperatingMode.WAITING_FOR_ALIGNMENT) { checkState(!waitingForAlignmentFuture.isDone()); if (!shouldWaitForAlignment()) { - operatingMode = OperatingMode.READING; + setOperatingMode(OperatingMode.READING); waitingForAlignmentFuture.complete(null); mainInputActivityClock.unPause(); } @@ -961,4 +964,18 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> extends AbstractStr forcedStopFuture.complete(null); } } + + private void setOperatingMode(OperatingMode newMode) { + final long now = System.currentTimeMillis(); + LOG.info( + "Switch mode from {} to {} after {} ms, currentMaxDesiredWatermark={}, latestWatermark={}, oldestWatermark={}", + operatingMode, + newMode, + now - operatingModeChangeTs, + currentMaxDesiredWatermark, + sampledLatestWatermark.getLatest(), + sampledLatestWatermark.getOldestSample()); + operatingMode = newMode; + operatingModeChangeTs = now; + } }
