This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 48d15cf45fc7d77da468114ba61e34ca3087c70d
Author: Efrat Levitan <[email protected]>
AuthorDate: Thu Feb 19 16:09:40 2026 +0200

    [FLINK-39073][runtime] Improve logging of invalid split transitions
    
    Split transitions directly from paused to idle and from idle to pause don't 
make any sense, improving the logs to allow further analysis. It is a 
convention for flink metrics system to not fail the job though.
---
 .../runtime/metrics/groups/InternalSourceSplitMetricGroup.java      | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
index 706606a1598..f7efb1b8772 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSourceSplitMetricGroup.java
@@ -48,6 +48,7 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
     private static final long SPLIT_NOT_STARTED = -1L;
     private long splitStartTime = SPLIT_NOT_STARTED;
     private final MetricGroup splitWatermarkMetricGroup;
+    private final String splitId;
 
     private InternalSourceSplitMetricGroup(
             MetricGroup parentMetricGroup,
@@ -56,6 +57,7 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
             Gauge<Long> currentWatermark) {
         super(parentMetricGroup);
         this.clock = clock;
+        this.splitId = splitId;
         splitWatermarkMetricGroup = parentMetricGroup.addGroup(SPLIT, 
splitId).addGroup(WATERMARK);
         pausedTimePerSecond =
                 splitWatermarkMetricGroup.gauge(
@@ -118,7 +120,7 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
             // If a split got paused it means it emitted records,
             // hence it shouldn't be considered idle anymore
             markNotIdle();
-            LOG.warn("Split marked paused while still idle");
+            LOG.warn("[{}] Split marked paused while still idle", splitId);
         }
         this.pausedTimePerSecond.markStart();
     }
@@ -129,7 +131,7 @@ public class InternalSourceSplitMetricGroup extends 
ProxyMetricGroup<MetricGroup
             // If a split is marked idle, it has no records to emit.
             // hence it shouldn't be considered paused anymore
             markNotPaused();
-            LOG.warn("Split marked idle while still paused");
+            LOG.warn("[{}] Split marked idle while still paused", splitId);
         }
         this.idleTimePerSecond.markStart();
     }

Reply via email to