This is an automated email from the ASF dual-hosted git repository.

tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b2f4c0e  [FLINK-26464][metrics] Make the meaning of lastCheckpointSize 
stay as before
b2f4c0e is described below

commit b2f4c0ea259027ed3f6a04595202cf2589e2495a
Author: Yun Tang <[email protected]>
AuthorDate: Thu Mar 3 19:30:03 2022 +0800

    [FLINK-26464][metrics] Make the meaning of lastCheckpointSize stay as before
---
 docs/content.zh/docs/ops/metrics.md                 |  7 ++++++-
 docs/content/docs/ops/metrics.md                    |  7 ++++++-
 .../runtime/checkpoint/CheckpointStatsTracker.java  | 18 ++++++++++++++++++
 .../checkpoint/CheckpointStatsTrackerTest.java      | 21 +++++++++++++++------
 4 files changed, 45 insertions(+), 8 deletions(-)

diff --git a/docs/content.zh/docs/ops/metrics.md 
b/docs/content.zh/docs/ops/metrics.md
index 848f4a6..05b706e 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1207,7 +1207,12 @@ Note that for failed checkpoints, metrics are updated on 
a best efforts basis an
     </tr>
     <tr>
       <td>lastCheckpointSize</td>
-      <td>The total size of the last checkpoint (in bytes).</td>
+      <td>The checkpointed size of the last checkpoint (in bytes), this metric 
could be different from lastCheckpointFullSize if incremental checkpoint or 
changelog is enabled.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>lastCheckpointFullSize</td>
+      <td>The full size of the last checkpoint (in bytes).</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 84c616d..4371020 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1200,7 +1200,12 @@ Note that for failed checkpoints, metrics are updated on 
a best efforts basis an
     </tr>
     <tr>
       <td>lastCheckpointSize</td>
-      <td>The total size of the last checkpoint (in bytes).</td>
+      <td>The checkpointed size of the last checkpoint (in bytes), this metric 
could be different from lastCheckpointFullSize if incremental checkpoint or 
changelog is enabled.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>lastCheckpointFullSize</td>
+      <td>The full size of the last checkpoint (in bytes).</td>
       <td>Gauge</td>
     </tr>
     <tr>
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 7971493..99cac10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -310,6 +310,9 @@ public class CheckpointStatsTracker {
     static final String LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC = 
"lastCheckpointSize";
 
     @VisibleForTesting
+    static final String LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC = 
"lastCheckpointFullSize";
+
+    @VisibleForTesting
     static final String LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC = 
"lastCheckpointDuration";
 
     @VisibleForTesting
@@ -342,6 +345,9 @@ public class CheckpointStatsTracker {
         metricGroup.gauge(
                 LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC, new 
LatestCompletedCheckpointSizeGauge());
         metricGroup.gauge(
+                LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC,
+                new LatestCompletedCheckpointFullSizeGauge());
+        metricGroup.gauge(
                 LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC,
                 new LatestCompletedCheckpointDurationGauge());
         metricGroup.gauge(
@@ -400,6 +406,18 @@ public class CheckpointStatsTracker {
         public Long getValue() {
             CompletedCheckpointStats completed = latestCompletedCheckpoint;
             if (completed != null) {
+                return completed.getCheckpointedSize();
+            } else {
+                return -1L;
+            }
+        }
+    }
+
+    private class LatestCompletedCheckpointFullSizeGauge implements 
Gauge<Long> {
+        @Override
+        public Long getValue() {
+            CompletedCheckpointStats completed = latestCompletedCheckpoint;
+            if (completed != null) {
                 return completed.getStateSize();
             } else {
                 return -1L;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 14b3a94..84ebe3a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -298,6 +298,7 @@ public class CheckpointStatsTrackerTest {
                                 
CheckpointStatsTracker.NUMBER_OF_FAILED_CHECKPOINTS_METRIC,
                                 
CheckpointStatsTracker.LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC,
                                 
CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC,
+                                
CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC,
                                 
CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC,
                                 CheckpointStatsTracker
                                         
.LATEST_COMPLETED_CHECKPOINT_PROCESSED_DATA_METRIC,
@@ -305,7 +306,7 @@ public class CheckpointStatsTrackerTest {
                                         
.LATEST_COMPLETED_CHECKPOINT_PERSISTED_DATA_METRIC,
                                 CheckpointStatsTracker
                                         
.LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC)));
-        assertEquals(10, registeredGaugeNames.size());
+        assertEquals(11, registeredGaugeNames.size());
     }
 
     /**
@@ -336,7 +337,7 @@ public class CheckpointStatsTrackerTest {
         CheckpointStatsTracker stats = new CheckpointStatsTracker(0, 
metricGroup);
 
         // Make sure to adjust this test if metrics are added/removed
-        assertEquals(10, registeredGauges.size());
+        assertEquals(11, registeredGauges.size());
 
         // Check initial values
         Gauge<Long> numCheckpoints =
@@ -362,6 +363,11 @@ public class CheckpointStatsTrackerTest {
                 (Gauge<Long>)
                         registeredGauges.get(
                                 
CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC);
+        Gauge<Long> latestCompletedFullSize =
+                (Gauge<Long>)
+                        registeredGauges.get(
+                                CheckpointStatsTracker
+                                        
.LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC);
         Gauge<Long> latestCompletedDuration =
                 (Gauge<Long>)
                         registeredGauges.get(
@@ -388,6 +394,7 @@ public class CheckpointStatsTrackerTest {
         assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
         assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue());
         assertEquals(Long.valueOf(-1), latestCompletedSize.getValue());
+        assertEquals(Long.valueOf(-1), latestCompletedFullSize.getValue());
         assertEquals(Long.valueOf(-1), latestCompletedDuration.getValue());
         assertEquals(Long.valueOf(-1), latestProcessedData.getValue());
         assertEquals(Long.valueOf(-1), latestPersistedData.getValue());
@@ -408,7 +415,8 @@ public class CheckpointStatsTrackerTest {
         assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
 
         long ackTimestamp = 11231230L;
-        long stateSize = 12381238L;
+        long checkpointedSize = 123L;
+        long fullCheckpointSize = 12381238L;
         long processedData = 4242L;
         long persistedData = 4444L;
         long ignored = 0;
@@ -418,8 +426,8 @@ public class CheckpointStatsTrackerTest {
                 new SubtaskStateStats(
                         0,
                         ackTimestamp,
-                        stateSize,
-                        stateSize,
+                        checkpointedSize,
+                        fullCheckpointSize,
                         ignored,
                         ignored,
                         processedData,
@@ -439,7 +447,8 @@ public class CheckpointStatsTrackerTest {
         assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue());
         assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
         assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue());
-        assertEquals(Long.valueOf(stateSize), latestCompletedSize.getValue());
+        assertEquals(Long.valueOf(checkpointedSize), 
latestCompletedSize.getValue());
+        assertEquals(Long.valueOf(fullCheckpointSize), 
latestCompletedFullSize.getValue());
         assertEquals(Long.valueOf(processedData), 
latestProcessedData.getValue());
         assertEquals(Long.valueOf(persistedData), 
latestPersistedData.getValue());
         assertEquals(Long.valueOf(ackTimestamp), 
latestCompletedDuration.getValue());

Reply via email to