This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 48d15cf45fc7d77da468114ba61e34ca3087c70d Author: Efrat Levitan <[email protected]> AuthorDate: Thu Feb 19 16:09:40 2026 +0200 [FLINK-39073][runtime] Improve logging of invalid split transitions Split transitions directly from paused to idle and from idle to pause don't make any sense, improving the logs to allow further analysis. It is a convention for flink metrics system to not fail the job though. --- .../runtime/metrics/groups/InternalSourceSplitMetricGroup.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java index 706606a1598..f7efb1b8772 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java @@ -48,6 +48,7 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup private static final long SPLIT_NOT_STARTED = -1L; private long splitStartTime = SPLIT_NOT_STARTED; private final MetricGroup splitWatermarkMetricGroup; + private final String splitId; private InternalSourceSplitMetricGroup( MetricGroup parentMetricGroup, @@ -56,6 +57,7 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup Gauge<Long> currentWatermark) { super(parentMetricGroup); this.clock = clock; + this.splitId = splitId; splitWatermarkMetricGroup = parentMetricGroup.addGroup(SPLIT, splitId).addGroup(WATERMARK); pausedTimePerSecond = splitWatermarkMetricGroup.gauge( @@ -118,7 +120,7 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup // If a split got paused it means it emitted records, // hence it shouldn't be considered idle anymore markNotIdle(); - LOG.warn("Split marked paused while still idle"); + LOG.warn("[{}] Split marked paused while still idle", splitId); } this.pausedTimePerSecond.markStart(); } @@ -129,7 +131,7 @@ public class InternalSourceSplitMetricGroup extends ProxyMetricGroup<MetricGroup // If a split is marked idle, it has no records to emit. // hence it shouldn't be considered paused anymore markNotPaused(); - LOG.warn("Split marked idle while still paused"); + LOG.warn("[{}] Split marked idle while still paused", splitId); } this.idleTimePerSecond.markStart(); }
