This is an automated email from the ASF dual-hosted git repository.
tangyun pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 969967c160b [FLINK-29244][state/changelog] Add metric
lastMaterializationDuration to ChangelogMaterializationMetricGroup
969967c160b is described below
commit 969967c160bf13a3948df941c4bd0a53005aac5d
Author: wangfeifan <[email protected]>
AuthorDate: Fri Sep 9 15:52:07 2022 +0800
[FLINK-29244][state/changelog] Add metric lastMaterializationDuration to
ChangelogMaterializationMetricGroup
---
docs/content.zh/docs/ops/metrics.md | 7 ++++++-
docs/content/docs/ops/metrics.md | 7 ++++++-
.../flink/state/changelog/ChangelogMetricGroupTest.java | 10 ++++++++++
.../state/common/ChangelogMaterializationMetricGroup.java | 11 ++++++++++-
.../flink/state/common/PeriodicMaterializationManager.java | 9 +++++++--
5 files changed, 39 insertions(+), 5 deletions(-)
diff --git a/docs/content.zh/docs/ops/metrics.md
b/docs/content.zh/docs/ops/metrics.md
index b0e168f97dd..010b0a2db17 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1566,7 +1566,7 @@ Note that the metrics are only available via reporters.
<td>Gauge</td>
</tr>
<tr>
- <th rowspan="7"><strong>Task/Operator</strong></th>
+ <th rowspan="8"><strong>Task/Operator</strong></th>
<td>startedMaterialization</td>
<td>The number of started materializations.</td>
<td>Counter</td>
@@ -1581,6 +1581,11 @@ Note that the metrics are only available via reporters.
<td>The number of failed materializations.</td>
<td>Counter</td>
</tr>
+ <tr>
+ <td>lastDurationOfMaterialization</td>
+ <td>The duration of the last materialization (in milliseconds).</td>
+ <td>Gauge</td>
+ </tr>
<tr>
<td>lastFullSizeOfMaterialization</td>
<td>The full size of the materialization part of the last reported
checkpoint (in bytes).</td>
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index 8a4e1927e12..088bb168c52 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1551,7 +1551,7 @@ Note that the metrics are only available via reporters.
<td>Gauge</td>
</tr>
<tr>
- <th rowspan="7"><strong>Task/Operator</strong></th>
+ <th rowspan="8"><strong>Task/Operator</strong></th>
<td>startedMaterialization</td>
<td>The number of started materializations.</td>
<td>Counter</td>
@@ -1566,6 +1566,11 @@ Note that the metrics are only available via reporters.
<td>The number of failed materializations.</td>
<td>Counter</td>
</tr>
+ <tr>
+ <td>lastDurationOfMaterialization</td>
+ <td>The duration of the last materialization (in milliseconds).</td>
+ <td>Gauge</td>
+ </tr>
<tr>
<td>lastFullSizeOfMaterialization</td>
<td>The full size of the materialization part of the last reported
checkpoint (in bytes).</td>
diff --git
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java
index f032ea862d1..6bfe146d701 100644
---
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java
+++
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogMetricGroupTest.java
@@ -56,6 +56,7 @@ import static
org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup.
import static
org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup.LATEST_INC_SIZE_OF_NON_MATERIALIZATION;
import static
org.apache.flink.state.common.ChangelogMaterializationMetricGroup.COMPLETED_MATERIALIZATION;
import static
org.apache.flink.state.common.ChangelogMaterializationMetricGroup.FAILED_MATERIALIZATION;
+import static
org.apache.flink.state.common.ChangelogMaterializationMetricGroup.LAST_DURATION_OF_MATERIALIZATION;
import static
org.apache.flink.state.common.ChangelogMaterializationMetricGroup.STARTED_MATERIALIZATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
@@ -73,6 +74,7 @@ public class ChangelogMetricGroupTest {
private Counter startedMaterializationCounter;
private Counter completedMaterializationCounter;
private Counter failedMaterializationCounter;
+ private Gauge<Long> lastDurationOfMaterializationGauge;
private Gauge<Long> lastFullSizeOfMaterializationGauge;
private Gauge<Long> lastIncSizeOfMaterializationGauge;
private Gauge<Long> lastFullSizeOfNonMaterializationGauge;
@@ -83,10 +85,12 @@ public class ChangelogMetricGroupTest {
setup(snapshotResult -> snapshotResult);
// The materialization will be skipped if no data updated.
+ assertEquals(-1L,
lastDurationOfMaterializationGauge.getValue().longValue());
periodicMaterializationManager.triggerMaterialization();
runSnapshot(1L);
assertEquals(1L, startedMaterializationCounter.getCount());
assertEquals(1L, completedMaterializationCounter.getCount());
+ assertNotEquals(-1L,
lastDurationOfMaterializationGauge.getValue().longValue());
assertEquals(0L,
lastFullSizeOfMaterializationGauge.getValue().longValue());
assertEquals(0L,
lastIncSizeOfMaterializationGauge.getValue().longValue());
assertEquals(0L,
lastFullSizeOfNonMaterializationGauge.getValue().longValue());
@@ -104,6 +108,7 @@ public class ChangelogMetricGroupTest {
Long lastIncSizeOfNonMaterialization =
lastIncSizeOfNonMaterializationGauge.getValue();
assertNotEquals(0L, lastFullSizeOfMaterialization.longValue());
assertNotEquals(0L, lastIncSizeOfMaterialization.longValue());
+ assertNotEquals(-1L,
lastDurationOfMaterializationGauge.getValue().longValue());
// The non-materialization size will be zero if no data updated
between completed
// materialization and checkpoint.
assertEquals(0L, lastFullSizeOfNonMaterialization.longValue());
@@ -128,11 +133,13 @@ public class ChangelogMetricGroupTest {
setup(snapshotResult -> ExceptionallyDoneFuture.of(new
RuntimeException()));
changelogKeyedStateBackend.setCurrentKey(1);
state.update(1);
+ assertEquals(-1L,
lastDurationOfMaterializationGauge.getValue().longValue());
periodicMaterializationManager.triggerMaterialization();
runSnapshot(1L);
assertEquals(0L, completedMaterializationCounter.getCount());
assertEquals(1L, failedMaterializationCounter.getCount());
assertEquals(1L, startedMaterializationCounter.getCount());
+ assertEquals(-1L,
lastDurationOfMaterializationGauge.getValue().longValue());
assertEquals(0L,
lastFullSizeOfMaterializationGauge.getValue().longValue());
assertEquals(0L,
lastIncSizeOfMaterializationGauge.getValue().longValue());
assertNotEquals(0L,
lastFullSizeOfNonMaterializationGauge.getValue().longValue());
@@ -175,6 +182,9 @@ public class ChangelogMetricGroupTest {
Preconditions.checkNotNull(counterMap.get(COMPLETED_MATERIALIZATION));
failedMaterializationCounter =
Preconditions.checkNotNull(counterMap.get(FAILED_MATERIALIZATION));
+ lastDurationOfMaterializationGauge =
+ Preconditions.checkNotNull(
+ (Gauge<Long>)
gaugeMap.get(LAST_DURATION_OF_MATERIALIZATION));
lastFullSizeOfMaterializationGauge =
Preconditions.checkNotNull(
(Gauge<Long>)
gaugeMap.get(LATEST_FULL_SIZE_OF_MATERIALIZATION));
diff --git
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java
index 1a2115d5c1c..d1c1b5c1d92 100644
---
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java
+++
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/ChangelogMaterializationMetricGroup.java
@@ -39,10 +39,16 @@ public class ChangelogMaterializationMetricGroup extends
ProxyMetricGroup<Metric
@VisibleForTesting
public static final String FAILED_MATERIALIZATION = PREFIX +
".failedMaterialization";
+ @VisibleForTesting
+ public static final String LAST_DURATION_OF_MATERIALIZATION =
+ PREFIX + ".lastDurationOfMaterialization";
+
private final Counter startedMaterializationCounter;
private final Counter completedMaterializationCounter;
private final Counter failedMaterializationCounter;
+ private volatile long lastDuration = -1;
+
public ChangelogMaterializationMetricGroup(MetricGroup parentMetricGroup) {
super(parentMetricGroup);
this.startedMaterializationCounter =
@@ -51,14 +57,17 @@ public class ChangelogMaterializationMetricGroup extends
ProxyMetricGroup<Metric
counter(COMPLETED_MATERIALIZATION, new
ThreadSafeSimpleCounter());
this.failedMaterializationCounter =
counter(FAILED_MATERIALIZATION, new ThreadSafeSimpleCounter());
+
+ gauge(LAST_DURATION_OF_MATERIALIZATION, () -> lastDuration);
}
void reportStartedMaterialization() {
startedMaterializationCounter.inc();
}
- void reportCompletedMaterialization() {
+ void reportCompletedMaterialization(long duration) {
completedMaterializationCounter.inc();
+ lastDuration = duration;
}
void reportFailedMaterialization() {
diff --git
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
index 96926e98841..d7af80c1752 100644
---
a/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
+++
b/flink-state-backends/flink-statebackend-common/src/main/java/org/apache/flink/state/common/PeriodicMaterializationManager.java
@@ -202,6 +202,7 @@ public class PeriodicMaterializationManager implements
Closeable {
public void triggerMaterialization() {
mailboxExecutor.execute(
() -> {
+ long triggerTime = System.currentTimeMillis();
metrics.reportStartedMaterialization();
Optional<MaterializationRunnable>
materializationRunnableOptional;
try {
@@ -216,11 +217,13 @@ public class PeriodicMaterializationManager implements
Closeable {
asyncOperationsThreadPool.execute(
() ->
asyncMaterializationPhase(
+ triggerTime,
runnable.getMaterializationRunnable(),
runnable.getMaterializationID(),
runnable.getMaterializedTo()));
} else {
- metrics.reportCompletedMaterialization();
+ metrics.reportCompletedMaterialization(
+ System.currentTimeMillis() - triggerTime);
scheduleNextMaterialization();
LOG.info(
@@ -234,6 +237,7 @@ public class PeriodicMaterializationManager implements
Closeable {
}
private void asyncMaterializationPhase(
+ long triggerTime,
RunnableFuture<SnapshotResult<KeyedStateHandle>>
materializedRunnableFuture,
long materializationID,
SequenceNumber upTo) {
@@ -250,7 +254,8 @@ public class PeriodicMaterializationManager implements
Closeable {
try {
target.handleMaterializationResult(
snapshotResult,
materializationID, upTo);
-
metrics.reportCompletedMaterialization();
+
metrics.reportCompletedMaterialization(
+
System.currentTimeMillis() - triggerTime);
} catch (Exception ex) {
metrics.reportFailedMaterialization();
}