This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.18 by this push:
     new 33fb37aac5f [FLINK-34344] Pass JobID to CheckpointStatsTracker
33fb37aac5f is described below

commit 33fb37aac5fbc709a62d35445879c75a6ba48086
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Feb 2 16:02:14 2024 +0100

    [FLINK-34344] Pass JobID to CheckpointStatsTracker
---
 .../runtime/checkpoint/CheckpointStatsTracker.java     | 11 +----------
 .../scheduler/DefaultExecutionGraphFactory.java        |  3 ++-
 .../checkpoint/CheckpointCoordinatorFailureTest.java   |  4 +++-
 .../CheckpointCoordinatorMasterHooksTest.java          |  2 +-
 .../runtime/checkpoint/CheckpointCoordinatorTest.java  | 18 ++++++++++++------
 .../checkpoint/CheckpointCoordinatorTestingUtils.java  |  2 +-
 .../runtime/checkpoint/CheckpointStatsTrackerTest.java | 11 ++++++-----
 .../flink/runtime/dispatcher/DispatcherTest.java       |  4 +++-
 .../TestingDefaultExecutionGraphBuilder.java           |  3 ++-
 .../AbstractCheckpointStatsHandlerTest.java            |  4 +++-
 10 files changed, 34 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index f868f3fb4ba..5a8f72f0805 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -25,7 +25,6 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 
@@ -106,17 +105,9 @@ public class CheckpointStatsTracker {
      * @param numRememberedCheckpoints Maximum number of checkpoints to 
remember, including in
      *     progress ones.
      * @param metricGroup Metric group for exposed metrics
+     * @param jobID ID of the job being checkpointed
      */
     public CheckpointStatsTracker(
-            int numRememberedCheckpoints, JobManagerJobMetricGroup 
metricGroup) {
-        this(numRememberedCheckpoints, metricGroup, metricGroup.jobId());
-    }
-
-    public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup 
metricGroup) {
-        this(numRememberedCheckpoints, metricGroup, new JobID());
-    }
-
-    private CheckpointStatsTracker(
             int numRememberedCheckpoints, MetricGroup metricGroup, JobID 
jobID) {
         checkArgument(numRememberedCheckpoints >= 0, "Negative number of 
remembered checkpoints");
         this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index 29eb7222d95..0c5279f11fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -129,7 +129,8 @@ public class DefaultExecutionGraphFactory implements 
ExecutionGraphFactory {
                                 new CheckpointStatsTracker(
                                         configuration.getInteger(
                                                 
WebOptions.CHECKPOINTS_HISTORY_SIZE),
-                                        jobManagerJobMetricGroup));
+                                        jobManagerJobMetricGroup,
+                                        jobManagerJobMetricGroup.jobId()));
         this.isDynamicGraph = isDynamicGraph;
         this.executionJobVertexFactory = 
checkNotNull(executionJobVertexFactory);
         this.nonFinishedHybridPartitionShouldBeUnknown = 
nonFinishedHybridPartitionShouldBeUnknown;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 2538072c516..54e06728fe8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
@@ -214,7 +215,8 @@ class CheckpointCoordinatorFailureTest extends TestLogger {
                 new FailingCompletedCheckpointStore(failure);
 
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         final AtomicInteger cleanupCallCount = new AtomicInteger(0);
         final CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e31d7811561..75e637dd078 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -500,7 +500,7 @@ class CheckpointCoordinatorMasterHooksTest {
                         new 
ExecutionGraphCheckpointPlanCalculatorContext(graph),
                         graph.getVerticesTopologically(),
                         false),
-                new CheckpointStatsTracker(1, new DummyMetricGroup()));
+                new CheckpointStatsTracker(1, new DummyMetricGroup(), new 
JobID()));
     }
 
     private static <T> T mockGeneric(Class<?> clazz) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 79752468045..8e207ab80d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -247,7 +247,8 @@ class CheckpointCoordinatorTest extends TestLogger {
         ExecutionVertex lateReportVertex =
                 
executionGraph.getJobVertex(lateReportVertexID).getTaskVertices()[0];
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         CheckpointCoordinator coordinator =
                 new CheckpointCoordinatorBuilder()
                         .setTimer(manuallyTriggeredScheduledExecutor)
@@ -497,7 +498,8 @@ class CheckpointCoordinatorTest extends TestLogger {
         
jobVertex2.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
 
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
                         .setTimer(manuallyTriggeredScheduledExecutor)
@@ -739,7 +741,8 @@ class CheckpointCoordinatorTest extends TestLogger {
         // given: Checkpoint coordinator which fails on 
initializeCheckpointLocation.
         TestFailJobCallback failureCallback = new TestFailJobCallback();
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
                         .setCheckpointStatsTracker(statsTracker)
@@ -2007,7 +2010,8 @@ class CheckpointCoordinatorTest extends TestLogger {
         ExecutionAttemptID attemptID1 = 
vertex1.getCurrentExecutionAttempt().getAttemptId();
         ExecutionAttemptID attemptID2 = 
vertex2.getCurrentExecutionAttempt().getAttemptId();
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
         // set up the coordinator and validate the initial state
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -3151,7 +3155,8 @@ class CheckpointCoordinatorTest extends TestLogger {
         TestFailJobCallback failureCallback = new TestFailJobCallback();
 
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
 
         CheckpointCoordinator checkpointCoordinator =
                 new CheckpointCoordinatorBuilder()
@@ -3201,7 +3206,8 @@ class CheckpointCoordinatorTest extends TestLogger {
         TestFailJobCallback failureCallback = new TestFailJobCallback();
 
         CheckpointStatsTracker statsTracker =
-                new CheckpointStatsTracker(Integer.MAX_VALUE, new 
UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(
+                        Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new 
JobID());
 
         final String exceptionMsg = "Test store exception.";
         try (SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl()) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index ad07d342117..1a094667334 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -726,7 +726,7 @@ public class CheckpointCoordinatorTestingUtils {
         private boolean allowCheckpointsAfterTasksFinished;
 
         private CheckpointStatsTracker checkpointStatsTracker =
-                new CheckpointStatsTracker(1, new DummyMetricGroup());
+                new CheckpointStatsTracker(1, new DummyMetricGroup(), new 
JobID());
 
         private BiFunction<
                         Set<ExecutionJobVertex>,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index d3ec7e3e248..5e95e1fcbbc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
@@ -63,7 +64,7 @@ public class CheckpointStatsTrackerTest {
         ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
 
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(0, new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(0, new UnregisteredMetricsGroup(), 
new JobID());
 
         PendingCheckpointStats pending =
                 tracker.reportPendingCheckpoint(
@@ -111,7 +112,7 @@ public class CheckpointStatsTrackerTest {
                 singletonMap(jobVertexID, jobVertex.getParallelism());
 
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(), 
new JobID());
 
         // Completed checkpoint
         PendingCheckpointStats completed1 =
@@ -238,7 +239,7 @@ public class CheckpointStatsTrackerTest {
     public void testCreateSnapshot() throws Exception {
         JobVertexID jobVertexID = new JobVertexID();
         CheckpointStatsTracker tracker =
-                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+                new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(), 
new JobID());
 
         CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
 
@@ -294,7 +295,7 @@ public class CheckpointStatsTrackerTest {
                     }
                 };
 
-        new CheckpointStatsTracker(0, metricGroup);
+        new CheckpointStatsTracker(0, metricGroup, new JobID());
 
         // Make sure this test is adjusted when further metrics are added
         assertTrue(
@@ -343,7 +344,7 @@ public class CheckpointStatsTrackerTest {
                         .build(EXECUTOR_RESOURCE.getExecutor());
         ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
 
-        CheckpointStatsTracker stats = new CheckpointStatsTracker(0, 
metricGroup);
+        CheckpointStatsTracker stats = new CheckpointStatsTracker(0, 
metricGroup, new JobID());
 
         // Make sure to adjust this test if metrics are added/removed
         assertEquals(12, registeredGauges.size());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index f705ec082d5..dbe33d79d4b 100755
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -737,7 +737,9 @@ public class DispatcherTest extends AbstractDispatcherTest {
     private CheckpointStatsSnapshot 
getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints() {
         CheckpointStatsTracker checkpointStatsTracker =
                 new CheckpointStatsTracker(
-                        10, 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+                        10,
+                        
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                        new JobID());
         checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
         checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
         return checkpointStatsTracker.createSnapshot();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 4d2156701d6..49b95b8657b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
@@ -193,7 +194,7 @@ public class TestingDefaultExecutionGraphBuilder {
                 new DefaultVertexAttemptNumberStore(),
                 Optional.ofNullable(vertexParallelismStore)
                         .orElseGet(() -> 
SchedulerBase.computeVertexParallelismStore(jobGraph)),
-                () -> new CheckpointStatsTracker(0, new 
UnregisteredMetricsGroup()),
+                () -> new CheckpointStatsTracker(0, new 
UnregisteredMetricsGroup(), new JobID()),
                 isDynamicGraph,
                 executionJobVertexFactory,
                 markPartitionFinishedStrategy,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
index f18cf985c5c..4bf60af56b0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
@@ -64,7 +64,9 @@ public class AbstractCheckpointStatsHandlerTest extends 
TestLogger {
 
     private static final CheckpointStatsTracker checkpointStatsTracker =
             new CheckpointStatsTracker(
-                    10, 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+                    10,
+                    
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+                    new JobID());
 
     @Test
     public void testRetrieveSnapshotFromCache() throws Exception {

Reply via email to