This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 33fb37aac5f [FLINK-34344] Pass JobID to CheckpointStatsTracker
33fb37aac5f is described below
commit 33fb37aac5fbc709a62d35445879c75a6ba48086
Author: Roman Khachatryan <[email protected]>
AuthorDate: Fri Feb 2 16:02:14 2024 +0100
[FLINK-34344] Pass JobID to CheckpointStatsTracker
---
.../runtime/checkpoint/CheckpointStatsTracker.java | 11 +----------
.../scheduler/DefaultExecutionGraphFactory.java | 3 ++-
.../checkpoint/CheckpointCoordinatorFailureTest.java | 4 +++-
.../CheckpointCoordinatorMasterHooksTest.java | 2 +-
.../runtime/checkpoint/CheckpointCoordinatorTest.java | 18 ++++++++++++------
.../checkpoint/CheckpointCoordinatorTestingUtils.java | 2 +-
.../runtime/checkpoint/CheckpointStatsTrackerTest.java | 11 ++++++-----
.../flink/runtime/dispatcher/DispatcherTest.java | 4 +++-
.../TestingDefaultExecutionGraphBuilder.java | 3 ++-
.../AbstractCheckpointStatsHandlerTest.java | 4 +++-
10 files changed, 34 insertions(+), 28 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index f868f3fb4ba..5a8f72f0805 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -25,7 +25,6 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
@@ -106,17 +105,9 @@ public class CheckpointStatsTracker {
* @param numRememberedCheckpoints Maximum number of checkpoints to
remember, including in
* progress ones.
* @param metricGroup Metric group for exposed metrics
+ * @param jobID ID of the job being checkpointed
*/
public CheckpointStatsTracker(
- int numRememberedCheckpoints, JobManagerJobMetricGroup
metricGroup) {
- this(numRememberedCheckpoints, metricGroup, metricGroup.jobId());
- }
-
- public CheckpointStatsTracker(int numRememberedCheckpoints, MetricGroup
metricGroup) {
- this(numRememberedCheckpoints, metricGroup, new JobID());
- }
-
- private CheckpointStatsTracker(
int numRememberedCheckpoints, MetricGroup metricGroup, JobID
jobID) {
checkArgument(numRememberedCheckpoints >= 0, "Negative number of
remembered checkpoints");
this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index 29eb7222d95..0c5279f11fd 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -129,7 +129,8 @@ public class DefaultExecutionGraphFactory implements
ExecutionGraphFactory {
new CheckpointStatsTracker(
configuration.getInteger(
WebOptions.CHECKPOINTS_HISTORY_SIZE),
- jobManagerJobMetricGroup));
+ jobManagerJobMetricGroup,
+ jobManagerJobMetricGroup.jobId()));
this.isDynamicGraph = isDynamicGraph;
this.executionJobVertexFactory =
checkNotNull(executionJobVertexFactory);
this.nonFinishedHybridPartitionShouldBeUnknown =
nonFinishedHybridPartitionShouldBeUnknown;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 2538072c516..54e06728fe8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
@@ -214,7 +215,8 @@ class CheckpointCoordinatorFailureTest extends TestLogger {
new FailingCompletedCheckpointStore(failure);
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
final AtomicInteger cleanupCallCount = new AtomicInteger(0);
final CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e31d7811561..75e637dd078 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -500,7 +500,7 @@ class CheckpointCoordinatorMasterHooksTest {
new
ExecutionGraphCheckpointPlanCalculatorContext(graph),
graph.getVerticesTopologically(),
false),
- new CheckpointStatsTracker(1, new DummyMetricGroup()));
+ new CheckpointStatsTracker(1, new DummyMetricGroup(), new
JobID()));
}
private static <T> T mockGeneric(Class<?> clazz) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 79752468045..8e207ab80d6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -247,7 +247,8 @@ class CheckpointCoordinatorTest extends TestLogger {
ExecutionVertex lateReportVertex =
executionGraph.getJobVertex(lateReportVertexID).getTaskVertices()[0];
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
CheckpointCoordinator coordinator =
new CheckpointCoordinatorBuilder()
.setTimer(manuallyTriggeredScheduledExecutor)
@@ -497,7 +498,8 @@ class CheckpointCoordinatorTest extends TestLogger {
jobVertex2.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setTimer(manuallyTriggeredScheduledExecutor)
@@ -739,7 +741,8 @@ class CheckpointCoordinatorTest extends TestLogger {
// given: Checkpoint coordinator which fails on
initializeCheckpointLocation.
TestFailJobCallback failureCallback = new TestFailJobCallback();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setCheckpointStatsTracker(statsTracker)
@@ -2007,7 +2010,8 @@ class CheckpointCoordinatorTest extends TestLogger {
ExecutionAttemptID attemptID1 =
vertex1.getCurrentExecutionAttempt().getAttemptId();
ExecutionAttemptID attemptID2 =
vertex2.getCurrentExecutionAttempt().getAttemptId();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
// set up the coordinator and validate the initial state
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
@@ -3151,7 +3155,8 @@ class CheckpointCoordinatorTest extends TestLogger {
TestFailJobCallback failureCallback = new TestFailJobCallback();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
@@ -3201,7 +3206,8 @@ class CheckpointCoordinatorTest extends TestLogger {
TestFailJobCallback failureCallback = new TestFailJobCallback();
CheckpointStatsTracker statsTracker =
- new CheckpointStatsTracker(Integer.MAX_VALUE, new
UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(
+ Integer.MAX_VALUE, new UnregisteredMetricsGroup(), new
JobID());
final String exceptionMsg = "Test store exception.";
try (SharedStateRegistry sharedStateRegistry = new
SharedStateRegistryImpl()) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index ad07d342117..1a094667334 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -726,7 +726,7 @@ public class CheckpointCoordinatorTestingUtils {
private boolean allowCheckpointsAfterTasksFinished;
private CheckpointStatsTracker checkpointStatsTracker =
- new CheckpointStatsTracker(1, new DummyMetricGroup());
+ new CheckpointStatsTracker(1, new DummyMetricGroup(), new
JobID());
private BiFunction<
Set<ExecutionJobVertex>,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index d3ec7e3e248..5e95e1fcbbc 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
@@ -63,7 +64,7 @@ public class CheckpointStatsTrackerTest {
ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(0, new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(0, new UnregisteredMetricsGroup(),
new JobID());
PendingCheckpointStats pending =
tracker.reportPendingCheckpoint(
@@ -111,7 +112,7 @@ public class CheckpointStatsTrackerTest {
singletonMap(jobVertexID, jobVertex.getParallelism());
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(),
new JobID());
// Completed checkpoint
PendingCheckpointStats completed1 =
@@ -238,7 +239,7 @@ public class CheckpointStatsTrackerTest {
public void testCreateSnapshot() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
CheckpointStatsTracker tracker =
- new CheckpointStatsTracker(10, new UnregisteredMetricsGroup());
+ new CheckpointStatsTracker(10, new UnregisteredMetricsGroup(),
new JobID());
CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
@@ -294,7 +295,7 @@ public class CheckpointStatsTrackerTest {
}
};
- new CheckpointStatsTracker(0, metricGroup);
+ new CheckpointStatsTracker(0, metricGroup, new JobID());
// Make sure this test is adjusted when further metrics are added
assertTrue(
@@ -343,7 +344,7 @@ public class CheckpointStatsTrackerTest {
.build(EXECUTOR_RESOURCE.getExecutor());
ExecutionJobVertex jobVertex = graph.getJobVertex(jobVertexID);
- CheckpointStatsTracker stats = new CheckpointStatsTracker(0,
metricGroup);
+ CheckpointStatsTracker stats = new CheckpointStatsTracker(0,
metricGroup, new JobID());
// Make sure to adjust this test if metrics are added/removed
assertEquals(12, registeredGauges.size());
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index f705ec082d5..dbe33d79d4b 100755
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -737,7 +737,9 @@ public class DispatcherTest extends AbstractDispatcherTest {
private CheckpointStatsSnapshot
getTestCheckpointStatsSnapshotWithTwoFailedCheckpoints() {
CheckpointStatsTracker checkpointStatsTracker =
new CheckpointStatsTracker(
- 10,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+ 10,
+
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ new JobID());
checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
checkpointStatsTracker.reportFailedCheckpointsWithoutInProgress();
return checkpointStatsTracker.createSnapshot();
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
index 4d2156701d6..49b95b8657b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
@@ -193,7 +194,7 @@ public class TestingDefaultExecutionGraphBuilder {
new DefaultVertexAttemptNumberStore(),
Optional.ofNullable(vertexParallelismStore)
.orElseGet(() ->
SchedulerBase.computeVertexParallelismStore(jobGraph)),
- () -> new CheckpointStatsTracker(0, new
UnregisteredMetricsGroup()),
+ () -> new CheckpointStatsTracker(0, new
UnregisteredMetricsGroup(), new JobID()),
isDynamicGraph,
executionJobVertexFactory,
markPartitionFinishedStrategy,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
index f18cf985c5c..4bf60af56b0 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointStatsHandlerTest.java
@@ -64,7 +64,9 @@ public class AbstractCheckpointStatsHandlerTest extends
TestLogger {
private static final CheckpointStatsTracker checkpointStatsTracker =
new CheckpointStatsTracker(
- 10,
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup());
+ 10,
+
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+ new JobID());
@Test
public void testRetrieveSnapshotFromCache() throws Exception {