This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b2f4c0e [FLINK-26464][metrics] Make the meaning of lastCheckpointSize
stay as before
b2f4c0e is described below
commit b2f4c0ea259027ed3f6a04595202cf2589e2495a
Author: Yun Tang <[email protected]>
AuthorDate: Thu Mar 3 19:30:03 2022 +0800
[FLINK-26464][metrics] Make the meaning of lastCheckpointSize stay as before
---
docs/content.zh/docs/ops/metrics.md | 7 ++++++-
docs/content/docs/ops/metrics.md | 7 ++++++-
.../runtime/checkpoint/CheckpointStatsTracker.java | 18 ++++++++++++++++++
.../checkpoint/CheckpointStatsTrackerTest.java | 21 +++++++++++++++------
4 files changed, 45 insertions(+), 8 deletions(-)
diff --git a/docs/content.zh/docs/ops/metrics.md
b/docs/content.zh/docs/ops/metrics.md
index 848f4a6..05b706e 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1207,7 +1207,12 @@ Note that for failed checkpoints, metrics are updated on
a best efforts basis an
</tr>
<tr>
<td>lastCheckpointSize</td>
- <td>The total size of the last checkpoint (in bytes).</td>
+ <td>The checkpointed size of the last checkpoint (in bytes), this metric
could be different from lastCheckpointFullSize if incremental checkpoint or
changelog is enabled.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>lastCheckpointFullSize</td>
+ <td>The full size of the last checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 84c616d..4371020 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1200,7 +1200,12 @@ Note that for failed checkpoints, metrics are updated on
a best efforts basis an
</tr>
<tr>
<td>lastCheckpointSize</td>
- <td>The total size of the last checkpoint (in bytes).</td>
+ <td>The checkpointed size of the last checkpoint (in bytes), this metric
could be different from lastCheckpointFullSize if incremental checkpoint or
changelog is enabled.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>lastCheckpointFullSize</td>
+ <td>The full size of the last checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 7971493..99cac10 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -310,6 +310,9 @@ public class CheckpointStatsTracker {
static final String LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC =
"lastCheckpointSize";
@VisibleForTesting
+ static final String LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC =
"lastCheckpointFullSize";
+
+ @VisibleForTesting
static final String LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC =
"lastCheckpointDuration";
@VisibleForTesting
@@ -342,6 +345,9 @@ public class CheckpointStatsTracker {
metricGroup.gauge(
LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC, new
LatestCompletedCheckpointSizeGauge());
metricGroup.gauge(
+ LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC,
+ new LatestCompletedCheckpointFullSizeGauge());
+ metricGroup.gauge(
LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC,
new LatestCompletedCheckpointDurationGauge());
metricGroup.gauge(
@@ -400,6 +406,18 @@ public class CheckpointStatsTracker {
public Long getValue() {
CompletedCheckpointStats completed = latestCompletedCheckpoint;
if (completed != null) {
+ return completed.getCheckpointedSize();
+ } else {
+ return -1L;
+ }
+ }
+ }
+
+ private class LatestCompletedCheckpointFullSizeGauge implements
Gauge<Long> {
+ @Override
+ public Long getValue() {
+ CompletedCheckpointStats completed = latestCompletedCheckpoint;
+ if (completed != null) {
return completed.getStateSize();
} else {
return -1L;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index 14b3a94..84ebe3a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -298,6 +298,7 @@ public class CheckpointStatsTrackerTest {
CheckpointStatsTracker.NUMBER_OF_FAILED_CHECKPOINTS_METRIC,
CheckpointStatsTracker.LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC,
CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC,
+
CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC,
CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC,
CheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_PROCESSED_DATA_METRIC,
@@ -305,7 +306,7 @@ public class CheckpointStatsTrackerTest {
.LATEST_COMPLETED_CHECKPOINT_PERSISTED_DATA_METRIC,
CheckpointStatsTracker
.LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC)));
- assertEquals(10, registeredGaugeNames.size());
+ assertEquals(11, registeredGaugeNames.size());
}
/**
@@ -336,7 +337,7 @@ public class CheckpointStatsTrackerTest {
CheckpointStatsTracker stats = new CheckpointStatsTracker(0,
metricGroup);
// Make sure to adjust this test if metrics are added/removed
- assertEquals(10, registeredGauges.size());
+ assertEquals(11, registeredGauges.size());
// Check initial values
Gauge<Long> numCheckpoints =
@@ -362,6 +363,11 @@ public class CheckpointStatsTrackerTest {
(Gauge<Long>)
registeredGauges.get(
CheckpointStatsTracker.LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC);
+ Gauge<Long> latestCompletedFullSize =
+ (Gauge<Long>)
+ registeredGauges.get(
+ CheckpointStatsTracker
+
.LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC);
Gauge<Long> latestCompletedDuration =
(Gauge<Long>)
registeredGauges.get(
@@ -388,6 +394,7 @@ public class CheckpointStatsTrackerTest {
assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue());
assertEquals(Long.valueOf(-1), latestCompletedSize.getValue());
+ assertEquals(Long.valueOf(-1), latestCompletedFullSize.getValue());
assertEquals(Long.valueOf(-1), latestCompletedDuration.getValue());
assertEquals(Long.valueOf(-1), latestProcessedData.getValue());
assertEquals(Long.valueOf(-1), latestPersistedData.getValue());
@@ -408,7 +415,8 @@ public class CheckpointStatsTrackerTest {
assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
long ackTimestamp = 11231230L;
- long stateSize = 12381238L;
+ long checkpointedSize = 123L;
+ long fullCheckpointSize = 12381238L;
long processedData = 4242L;
long persistedData = 4444L;
long ignored = 0;
@@ -418,8 +426,8 @@ public class CheckpointStatsTrackerTest {
new SubtaskStateStats(
0,
ackTimestamp,
- stateSize,
- stateSize,
+ checkpointedSize,
+ fullCheckpointSize,
ignored,
ignored,
processedData,
@@ -439,7 +447,8 @@ public class CheckpointStatsTrackerTest {
assertEquals(Long.valueOf(1), numCompletedCheckpoints.getValue());
assertEquals(Long.valueOf(0), numFailedCheckpoints.getValue());
assertEquals(Long.valueOf(-1), latestRestoreTimestamp.getValue());
- assertEquals(Long.valueOf(stateSize), latestCompletedSize.getValue());
+ assertEquals(Long.valueOf(checkpointedSize),
latestCompletedSize.getValue());
+ assertEquals(Long.valueOf(fullCheckpointSize),
latestCompletedFullSize.getValue());
assertEquals(Long.valueOf(processedData),
latestProcessedData.getValue());
assertEquals(Long.valueOf(persistedData),
latestPersistedData.getValue());
assertEquals(Long.valueOf(ackTimestamp),
latestCompletedDuration.getValue());