This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 6e13c60e2ac [FLINK-33071][metrics,checkpointing] Log a json dump of
checkpoint statistics when checkpoint completes or fails
6e13c60e2ac is described below
commit 6e13c60e2ac5ea737e8dfd47899415291b393813
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Sep 11 18:30:10 2023 +0200
[FLINK-33071][metrics,checkpointing] Log a json dump of checkpoint
statistics when checkpoint completes or fails
---
.../runtime/checkpoint/CheckpointStatsTracker.java | 46 ++++++++++++++++++++++
1 file changed, 46 insertions(+)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 0542ae4e01b..f868f3fb4ba 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -19,14 +19,24 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
+import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.StringWriter;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
@@ -51,6 +61,9 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
*/
public class CheckpointStatsTracker {
+ private static final Logger LOG =
LoggerFactory.getLogger(CheckpointStatsTracker.class);
+ private static final ObjectMapper MAPPER =
RestMapperUtils.getStrictObjectMapper();
+
/**
* Lock used to update stats and creating snapshots. Updates always happen
from a single Thread
* at a time and there can be multiple concurrent read accesses to the
latest stats snapshot.
@@ -70,6 +83,8 @@ public class CheckpointStatsTracker {
/** History of checkpoints. */
private final CheckpointStatsHistory history;
+ private final JobID jobID;
+
/** The latest restored checkpoint. */
@Nullable private RestoredCheckpointStats latestRestoredCheckpoint;
@@ -92,10 +107,20 @@ public class CheckpointStatsTracker {
* progress ones.
* @param metricGroup Metric group for exposed metrics
*/
+ public CheckpointStatsTracker(
+ int numRememberedCheckpoints, JobManagerJobMetricGroup
metricGroup) {
+ this(numRememberedCheckpoints, metricGroup, metricGroup.jobId());
+ }
+
public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup
metricGroup) {
+ this(numRememberedCheckpoints, metricGroup, new JobID());
+ }
+ private CheckpointStatsTracker(
+ int numRememberedCheckpoints, MetricGroup metricGroup, JobID
jobID) {
checkArgument(numRememberedCheckpoints >= 0, "Negative number of
remembered checkpoints");
this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
+ this.jobID = jobID;
// Latest snapshot is empty
latestSnapshot =
@@ -210,6 +235,7 @@ public class CheckpointStatsTracker {
summary.updateSummary(completed);
dirty = true;
+ logCheckpointStatistics(completed);
} finally {
statsReadWriteLock.unlock();
}
@@ -227,11 +253,31 @@ public class CheckpointStatsTracker {
history.replacePendingCheckpointById(failed);
dirty = true;
+ logCheckpointStatistics(failed);
} finally {
statsReadWriteLock.unlock();
}
}
+ private void logCheckpointStatistics(AbstractCheckpointStats
checkpointStats) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ StringWriter sw = new StringWriter();
+ MAPPER.writeValue(
+ sw,
+
CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true));
+ String jsonDump = sw.toString();
+ LOG.debug(
+ "CheckpointStatistics (for jobID={}, checkpointId={})
dump = {} ",
+ jobID,
+ checkpointStats.checkpointId,
+ jsonDump);
+ }
+ } catch (Exception ex) {
+ LOG.warn("Fail to log CheckpointStatistics", ex);
+ }
+ }
+
/**
* Callback when a checkpoint failure without in progress checkpoint. For
example, it should be
* callback when triggering checkpoint failure before creating
PendingCheckpoint.