This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ece5fad353a [FLINK-33071][metrics,checkpointing] Log a json dump of 
checkpoint statistics when checkpoint completes or fails
ece5fad353a is described below

commit ece5fad353a44cb666da05342fe92e5306a934a4
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Sep 11 18:30:10 2023 +0200

    [FLINK-33071][metrics,checkpointing] Log a json dump of checkpoint 
statistics when checkpoint completes or fails
---
 .../runtime/checkpoint/CheckpointStatsTracker.java | 46 ++++++++++++++++++++++
 1 file changed, 46 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 0542ae4e01b..f868f3fb4ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -19,14 +19,24 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -51,6 +61,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CheckpointStatsTracker {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointStatsTracker.class);
+    private static final ObjectMapper MAPPER = 
RestMapperUtils.getStrictObjectMapper();
+
     /**
      * Lock used to update stats and creating snapshots. Updates always happen 
from a single Thread
      * at a time and there can be multiple concurrent read accesses to the 
latest stats snapshot.
@@ -70,6 +83,8 @@ public class CheckpointStatsTracker {
     /** History of checkpoints. */
     private final CheckpointStatsHistory history;
 
+    private final JobID jobID;
+
     /** The latest restored checkpoint. */
     @Nullable private RestoredCheckpointStats latestRestoredCheckpoint;
 
@@ -92,10 +107,20 @@ public class CheckpointStatsTracker {
      *     progress ones.
      * @param metricGroup Metric group for exposed metrics
      */
+    public CheckpointStatsTracker(
+            int numRememberedCheckpoints, JobManagerJobMetricGroup 
metricGroup) {
+        this(numRememberedCheckpoints, metricGroup, metricGroup.jobId());
+    }
+
     public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup 
metricGroup) {
+        this(numRememberedCheckpoints, metricGroup, new JobID());
+    }
 
+    private CheckpointStatsTracker(
+            int numRememberedCheckpoints, MetricGroup metricGroup, JobID 
jobID) {
         checkArgument(numRememberedCheckpoints >= 0, "Negative number of 
remembered checkpoints");
         this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
+        this.jobID = jobID;
 
         // Latest snapshot is empty
         latestSnapshot =
@@ -210,6 +235,7 @@ public class CheckpointStatsTracker {
             summary.updateSummary(completed);
 
             dirty = true;
+            logCheckpointStatistics(completed);
         } finally {
             statsReadWriteLock.unlock();
         }
@@ -227,11 +253,31 @@ public class CheckpointStatsTracker {
             history.replacePendingCheckpointById(failed);
 
             dirty = true;
+            logCheckpointStatistics(failed);
         } finally {
             statsReadWriteLock.unlock();
         }
     }
 
+    private void logCheckpointStatistics(AbstractCheckpointStats 
checkpointStats) {
+        try {
+            if (LOG.isDebugEnabled()) {
+                StringWriter sw = new StringWriter();
+                MAPPER.writeValue(
+                        sw,
+                        
CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true));
+                String jsonDump = sw.toString();
+                LOG.debug(
+                        "CheckpointStatistics (for jobID={}, checkpointId={}) 
dump = {} ",
+                        jobID,
+                        checkpointStats.checkpointId,
+                        jsonDump);
+            }
+        } catch (Exception ex) {
+            LOG.warn("Fail to log CheckpointStatistics", ex);
+        }
+    }
+
     /**
      * Callback when a checkpoint failure without in progress checkpoint. For 
example, it should be
      * callback when triggering checkpoint failure before creating 
PendingCheckpoint.

Reply via email to