This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c84f42c1a7e [FLINK-34344] Pass JobID to CheckpointStatsTracker
c84f42c1a7e is described below
commit c84f42c1a7e752eaf8b9c3beb23fb9b01d39443d
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Feb 2 16:02:14 2024 +0100
[FLINK-34344] Pass JobID to CheckpointStatsTracker
---
.../runtime/checkpoint/CheckpointStatsTracker.java | 6 ++++--
.../scheduler/DefaultExecutionGraphFactory.java | 3 ++-
.../checkpoint/CheckpointCoordinatorFailureTest.java | 4 +++-
.../CheckpointCoordinatorMasterHooksTest.java | 2 +-
.../checkpoint/CheckpointCoordinatorTest.java | 20 +++++++++++++-------
.../CheckpointCoordinatorTestingUtils.java | 2 +-
.../checkpoint/CheckpointStatsTrackerTest.java | 12 ++++++------
.../flink/runtime/dispatcher/DispatcherTest.java | 4 +++-
.../TestingDefaultExecutionGraphBuilder.java | 3 ++-
.../AbstractCheckpointStatsHandlerTest.java | 4 +++-
10 files changed, 38 insertions(+), 22 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index cf66341fc06..ea04211d6f0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -111,9 +111,11 @@ public class CheckpointStatsTracker {
* @param numRememberedCheckpoints Maximum number of checkpoints to
remember, including in
* progress ones.
* @param metricGroup Metric group for exposed metrics
+ * @param jobID ID of the job being checkpointed
*/
- public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup
metricGroup) {
- this(numRememberedCheckpoints, metricGroup, new JobID(),
Integer.MAX_VALUE);
+ public CheckpointStatsTracker(
+ int numRememberedCheckpoints, MetricGroup metricGroup, JobID
jobID) {
+ this(numRememberedCheckpoints, metricGroup, jobID, Integer.MAX_VALUE);
}
CheckpointStatsTracker(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index aaeb8b6d4c7..67e91a887a0 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -129,7 +129,8 @@ public class DefaultExecutionGraphFactory implements
ExecutionGraphFactory {
() ->
new CheckpointStatsTracker(
configuration.get(WebOptions.CHECKPOINTS_HISTORY_SIZE),
- jobManagerJobMetricGroup));
+ jobManagerJobMetricGroup,
+ jobManagerJobMetricGroup.jobId()));
this.isDynamicGraph = isDynamicGraph;
this.executionJobVertexFactory =
checkNotNull(executionJobVertexFactory);
this.nonFinishedHybridPartitionShouldBeUnknown =
nonFinishedHybridPartitionShouldBeUnknown;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 8873b938f1a..6e6bcffe762 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
@@ -212,7 +213,8 @@ class CheckpointCoordinatorFailureTest {
new FailingCompletedCheckpointStore(failure);
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
final AtomicInteger cleanupCallCount = new AtomicInteger(0);
final CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e31d7811561..75e637dd078 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -500,7 +500,7 @@ class CheckpointCoordinatorMasterHooksTest {
new
ExecutionGraphCheckpointPlanCalculatorContext(graph),
graph.getVerticesTopologically(),
false),
- new CheckpointStatsTracker(1, new DummyMetricGroup()));
+ new CheckpointStatsTracker(1, new DummyMetricGroup(), new
JobID()));
}
private static <T> T mockGeneric(Class<?> clazz) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 5c63552be7d..222b5dd1d8d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -251,7 +251,8 @@ class CheckpointCoordinatorTest {
ExecutionVertex lateReportVertex =
executionGraph.getJobVertex(lateReportVertexID).getTaskVertices()[0];
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
CheckpointCoordinator coordinator =
new CheckpointCoordinatorBuilder()
.setTimer(manuallyTriggeredScheduledExecutor)
@@ -501,7 +502,8 @@ class CheckpointCoordinatorTest {
jobVertex2.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setTimer(manuallyTriggeredScheduledExecutor)
@@ -743,7 +745,8 @@ class CheckpointCoordinatorTest {
// given: Checkpoint coordinator which fails on
initializeCheckpointLocation.
TestFailJobCallback failureCallback = new TestFailJobCallback();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setCheckpointStatsTracker(statsTracker)
@@ -2017,7 +2020,8 @@ class CheckpointCoordinatorTest {
ExecutionAttemptID attemptID1 =
vertex1.getCurrentExecutionAttempt().getAttemptId();
ExecutionAttemptID attemptID2 =
vertex2.getCurrentExecutionAttempt().getAttemptId();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
// set up the coordinator and validate the initial state
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
@@ -2868,7 +2872,7 @@ class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(),
new JobID());
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setCompletedCheckpointStore(store)
@@ -3186,7 +3190,8 @@ class CheckpointCoordinatorTest {
TestFailJobCallback failureCallback = new TestFailJobCallback();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
@@ -3236,7 +3241,8 @@ class CheckpointCoordinatorTest {
TestFailJobCallback failureCallback = new TestFailJobCallback();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
final String exceptionMsg = "Test store exception.";
try (SharedStateRegistry sharedStateRegistry = new
SharedStateRegistryImpl()) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 03d8d5ced59..aac4ab51b36 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -781,7 +781,7 @@ public class CheckpointCoordinatorTestingUtils {
private boolean allowCheckpointsAfterTasksFinished;
private CheckpointStatsTracker checkpointStatsTracker =
- new CheckpointStatsTracker(1, new DummyMetricGroup());
+ new CheckpointStatsTracker(1, new DummyMetricGroup(), new
JobID());
private BiFunction<
Set<ExecutionJobVertex>,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index a4dc88d5197..83ce318baea 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -66,7 +66,7 @@ class CheckpointStatsTrackerTest {
ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(0, new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(0, new UnregisteredMetricsGroup(),
new JobID());
PendingCheckpointStats pending =
tracker.reportPendingCheckpoint(
@@ -114,7 +114,7 @@ class CheckpointStatsTrackerTest {
singletonMap(jobVertexID, jobVertex.getParallelism());
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(),
new JobID());
// Completed checkpoint
PendingCheckpointStats completed1 =
@@ -240,7 +240,7 @@ class CheckpointStatsTrackerTest {
void testCreateSnapshot() {
JobVertexID jobVertexID = new JobVertexID();
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(),
new JobID());
CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
@@ -295,7 +295,7 @@ class CheckpointStatsTrackerTest {
}
};
- CheckpointStatsTracker tracker = new CheckpointStatsTracker(10,
metricGroup);
+ CheckpointStatsTracker tracker = new CheckpointStatsTracker(10,
metricGroup, new JobID());
PendingCheckpointStats pending =
tracker.reportPendingCheckpoint(
@@ -422,7 +422,7 @@ class CheckpointStatsTrackerTest {
}
};
- new CheckpointStatsTracker(0, metricGroup);
+ new CheckpointStatsTracker(0, metricGroup, new JobID());
// Make sure this test is adjusted when further metrics are added
assertThat(registeredGaugeNames)
@@ -471,7 +471,7 @@ class CheckpointStatsTrackerTest {
.build(EXECUTOR_RESOURCE.getExecutor());
ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
- CheckpointStatsTracker stats = new CheckpointStatsTracker(0,
metricGroup);
+ CheckpointStatsTracker stats = new CheckpointStatsTracker(0,
metricGroup, new JobID());
// Make sure to adjust this test if metrics are added/removed
assertThat(registeredGauges).hasSize(12);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index 463a737ac3e..e7bfc30ab17 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -737,7 +737,9 @@ public class DispatcherTest extends AbstractDispatcherTest {
private CheckpointStatsSnapshot
getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints() {
CheckpointStatsTracker checkpointStatsTracker =
new CheckpointStatsTracker(
- 10,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+ 10,
+
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ new JobID());
checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
return checkpointStatsTracker.createSnapshot();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 58e38d62d45..1bbcbe43388 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
@@ -193,7 +194,7 @@ public class TestingDefaultExecutionGraphBuilder {
new DefaultVertexAttemptNumberStore(),
Optional.ofNullable(vertexParallelismStore)
.orElseGet(() ->
SchedulerBase.computeVertexParallelismStore(jobGraph)),
- () -> new CheckpointStatsTracker(0, new
UnregisteredMetricsGroup()),
+ () -> new CheckpointStatsTracker(0, new
UnregisteredMetricsGroup(), new JobID()),
isDynamicGraph,
executionJobVertexFactory,
markPartitionFinishedStrategy,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
index 9f629a84916..c1ad110491c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
@@ -63,7 +63,9 @@ class AbstractCheckpointStatsHandlerTest {
private static final CheckpointStatsTracker checkpointStatsTracker =
new CheckpointStatsTracker(
- 10,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+ 10,
+
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ new JobID());
@Test
void testRetrieveSnapshotFromCache() throws Exception {