This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch release-2.1
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.1 by this push:
     new e107aaddfae [FLINK-38327] Use vertex id instead of operator id in 
checkpoint file-merging manager (#26973)
e107aaddfae is described below

commit e107aaddfae4beef386fd34e012ffed8cfea46da
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Sep 10 14:08:47 2025 +0800

    [FLINK-38327] Use vertex id instead of operator id in checkpoint 
file-merging manager (#26973)
    
    (cherry picked from commit ee73d0f79841413f3a553de87e6ea23eef0e51ab)
---
 .../filemerging/FileMergingSnapshotManager.java    | 24 +++----
 .../SubtaskFileMergingManagerRestoreOperation.java | 10 +--
 .../operators/StreamTaskStateInitializerImpl.java  |  8 ++-
 .../FileMergingSnapshotManagerTestBase.java        | 73 ++++++++++++++--------
 4 files changed, 70 insertions(+), 45 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index 36eab3c5996..7310e5dd32c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -209,14 +209,14 @@ public interface FileMergingSnapshotManager extends 
Closeable {
             long checkpointId, SubtaskKey subtaskKey, 
Stream<SegmentFileStateHandle> stateHandles);
 
     /**
-     * A key identifies a subtask. A subtask can be identified by the operator 
id, subtask index and
+     * A key identifies a subtask. A subtask can be identified by the vertex 
id, subtask index and
      * the parallelism. Note that this key should be consistent across job 
attempts.
      */
     final class SubtaskKey {
         private static final String MANAGED_DIR_FORMAT = "job_%s_op_%s_%d_%d";
 
         final String jobIDString;
-        final String operatorIDString;
+        final String vertexIDString;
         final int subtaskIndex;
         final int parallelism;
 
@@ -226,23 +226,23 @@ public interface FileMergingSnapshotManager extends 
Closeable {
          */
         final int hashCode;
 
-        public SubtaskKey(JobID jobID, OperatorID operatorID, TaskInfo 
taskInfo) {
+        public SubtaskKey(JobID jobID, JobVertexID vertexID, TaskInfo 
taskInfo) {
             this(
                     jobID.toHexString(),
-                    operatorID.toHexString(),
+                    vertexID.toHexString(),
                     taskInfo.getIndexOfThisSubtask(),
                     taskInfo.getNumberOfParallelSubtasks());
         }
 
         @VisibleForTesting
         public SubtaskKey(
-                String jobIDString, String operatorIDString, int subtaskIndex, 
int parallelism) {
+                String jobIDString, String vertexIDString, int subtaskIndex, 
int parallelism) {
             this.jobIDString = jobIDString;
-            this.operatorIDString = operatorIDString;
+            this.vertexIDString = vertexIDString;
             this.subtaskIndex = subtaskIndex;
             this.parallelism = parallelism;
             int hash = jobIDString.hashCode();
-            hash = 31 * hash + operatorIDString.hashCode();
+            hash = 31 * hash + vertexIDString.hashCode();
             hash = 31 * hash + subtaskIndex;
             hash = 31 * hash + parallelism;
             this.hashCode = hash;
@@ -251,7 +251,7 @@ public interface FileMergingSnapshotManager extends 
Closeable {
         public static SubtaskKey of(Environment environment) {
             return new SubtaskKey(
                     environment.getJobID(),
-                    OperatorID.fromJobVertexID(environment.getJobVertexId()),
+                    environment.getJobVertexId(),
                     environment.getTaskInfo());
         }
 
@@ -269,7 +269,7 @@ public interface FileMergingSnapshotManager extends 
Closeable {
             return String.format(
                             MANAGED_DIR_FORMAT,
                             jobIDString,
-                            operatorIDString,
+                            vertexIDString,
                             subtaskIndex,
                             parallelism)
                     .replaceAll("[^a-zA-Z0-9\\-]", "_");
@@ -289,7 +289,7 @@ public interface FileMergingSnapshotManager extends 
Closeable {
             return hashCode == that.hashCode
                     && subtaskIndex == that.subtaskIndex
                     && parallelism == that.parallelism
-                    && operatorIDString.equals(that.operatorIDString)
+                    && vertexIDString.equals(that.vertexIDString)
                     && jobIDString.equals(that.jobIDString);
         }
 
@@ -301,7 +301,7 @@ public interface FileMergingSnapshotManager extends 
Closeable {
         @Override
         public String toString() {
             return String.format(
-                    "%s-%s(%d/%d)", jobIDString, operatorIDString, 
subtaskIndex, parallelism);
+                    "%s-%s(%d/%d)", jobIDString, vertexIDString, subtaskIndex, 
parallelism);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java
index ee969e94978..36c064efb84 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint.filemerging;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -51,7 +51,7 @@ public class SubtaskFileMergingManagerRestoreOperation {
     private final TaskInfo taskInfo;
 
     /** The id of the operator to which the subtask belongs. */
-    private final OperatorID operatorID;
+    private final JobVertexID vertexID;
 
     private final FileMergingSnapshotManager fileMergingSnapshotManager;
 
@@ -63,19 +63,19 @@ public class SubtaskFileMergingManagerRestoreOperation {
             FileMergingSnapshotManager fileMergingSnapshotManager,
             JobID jobID,
             TaskInfo taskInfo,
-            OperatorID operatorID,
+            JobVertexID vertexID,
             OperatorSubtaskState subtaskState) {
         this.checkpointId = checkpointId;
         this.fileMergingSnapshotManager = fileMergingSnapshotManager;
         this.jobID = jobID;
         this.taskInfo = Preconditions.checkNotNull(taskInfo);
-        this.operatorID = Preconditions.checkNotNull(operatorID);
+        this.vertexID = Preconditions.checkNotNull(vertexID);
         this.subtaskState = Preconditions.checkNotNull(subtaskState);
     }
 
     public void restore() {
         FileMergingSnapshotManager.SubtaskKey subtaskKey =
-                new FileMergingSnapshotManager.SubtaskKey(jobID, operatorID, 
taskInfo);
+                new FileMergingSnapshotManager.SubtaskKey(jobID, vertexID, 
taskInfo);
 
         Stream<? extends StateObject> keyedStateHandles =
                 Stream.concat(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 58b1a83b819..4490bad722f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import 
org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
@@ -162,7 +163,8 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
             throws Exception {
 
         TaskInfo taskInfo = environment.getTaskInfo();
-        registerRestoredStateToFileMergingManager(environment.getJobID(), 
taskInfo, operatorID);
+        registerRestoredStateToFileMergingManager(
+                environment.getJobID(), taskInfo, 
environment.getJobVertexId(), operatorID);
 
         OperatorSubtaskDescriptionText operatorSubtaskDescription =
                 new OperatorSubtaskDescriptionText(
@@ -361,7 +363,7 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
     }
 
     private void registerRestoredStateToFileMergingManager(
-            JobID jobID, TaskInfo taskInfo, OperatorID operatorID) {
+            JobID jobID, TaskInfo taskInfo, JobVertexID jobVertexID, 
OperatorID operatorID) {
         FileMergingSnapshotManager fileMergingSnapshotManager =
                 taskStateManager.getFileMergingSnapshotManager();
         Optional<Long> restoredCheckpointId = 
taskStateManager.getRestoreCheckpointId();
@@ -377,7 +379,7 @@ public class StreamTaskStateInitializerImpl implements 
StreamTaskStateInitialize
                             fileMergingSnapshotManager,
                             jobID,
                             taskInfo,
-                            operatorID,
+                            jobVertexID,
                             subtaskState.get());
             restoreOperation.restore();
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
index 582a4281b2d..e739e6d4313 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SpaceStat;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -73,7 +74,7 @@ public abstract class FileMergingSnapshotManagerTestBase {
 
     final JobID jobID = new JobID();
 
-    final OperatorID operatorID = new OperatorID(289347923L, 75893479L);
+    final JobVertexID vertexID = new JobVertexID(289347923L, 75893479L);
 
     SubtaskKey subtaskKey1;
     SubtaskKey subtaskKey2;
@@ -89,9 +90,9 @@ public abstract class FileMergingSnapshotManagerTestBase {
     @BeforeEach
     public void setup(@TempDir java.nio.file.Path tempFolder) {
         subtaskKey1 =
-                new SubtaskKey(jobID, operatorID, new 
TaskInfoImpl("TestingTask", 128, 0, 128, 3));
+                new SubtaskKey(jobID, vertexID, new 
TaskInfoImpl("TestingTask", 128, 0, 128, 3));
         subtaskKey2 =
-                new SubtaskKey(jobID, operatorID, new 
TaskInfoImpl("TestingTask", 128, 1, 128, 3));
+                new SubtaskKey(jobID, vertexID, new 
TaskInfoImpl("TestingTask", 128, 1, 128, 3));
 
         checkpointBaseDir = new Path(tempFolder.toString(), 
jobID.toHexString());
         sharedStateDir = new Path(checkpointBaseDir, 
CHECKPOINT_SHARED_STATE_DIR);
@@ -492,12 +493,21 @@ public abstract class FileMergingSnapshotManagerTestBase {
                         (FileMergingSnapshotManagerBase)
                                 
createFileMergingSnapshotManager(checkpointBaseDir);
                 CloseableRegistry closeableRegistry = new CloseableRegistry()) 
{
-
+            fmsm.registerSubtaskForSharedStates(subtaskKey1);
             fmsm.notifyCheckpointStart(subtaskKey1, checkpointId);
 
             Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID = 
new HashMap<>();
+            // Here, we simulate a task with 2 operators, each operator has 
one keyed state and one
+            // operator state. The second operator's id is the same as the 
vertexID.
+            // first operator
+            subtaskStatesByOperatorID.put(
+                    new OperatorID(777L, 75893479L),
+                    buildOperatorSubtaskState(checkpointId, fmsm, 
closeableRegistry));
+
+            // second operator
             subtaskStatesByOperatorID.put(
-                    operatorID, buildOperatorSubtaskState(checkpointId, fmsm, 
closeableRegistry));
+                    OperatorID.fromJobVertexID(vertexID),
+                    buildOperatorSubtaskState(checkpointId, fmsm, 
closeableRegistry));
             taskStateSnapshot = new 
TaskStateSnapshot(subtaskStatesByOperatorID);
             oldSpaceStat = fmsm.spaceStat;
 
@@ -510,6 +520,7 @@ public abstract class FileMergingSnapshotManagerTestBase {
         try (FileMergingSnapshotManagerBase fmsm =
                 (FileMergingSnapshotManagerBase)
                         createFileMergingSnapshotManager(checkpointBaseDir)) {
+            fmsm.registerSubtaskForSharedStates(subtaskKey1);
             TaskInfo taskInfo =
                     new TaskInfoImpl(
                             "test restore",
@@ -521,19 +532,15 @@ public abstract class FileMergingSnapshotManagerTestBase {
                     taskStateSnapshot.getSubtaskStateMappings()) {
                 SubtaskFileMergingManagerRestoreOperation restoreOperation =
                         new SubtaskFileMergingManagerRestoreOperation(
-                                checkpointId,
-                                fmsm,
-                                jobID,
-                                taskInfo,
-                                entry.getKey(),
-                                entry.getValue());
+                                checkpointId, fmsm, jobID, taskInfo, vertexID, 
entry.getValue());
                 restoreOperation.restore();
             }
             TreeMap<Long, Set<LogicalFile>> stateFiles = 
fmsm.getUploadedStates();
             assertThat(stateFiles.size()).isEqualTo(1);
             Set<LogicalFile> restoreFileSet = stateFiles.get(checkpointId);
             assertThat(restoreFileSet).isNotNull();
-            assertThat(restoreFileSet.size()).isEqualTo(4);
+            // 2 operators * (2 keyed state + 2 operator state)
+            assertThat(restoreFileSet.size()).isEqualTo(8);
             assertThat(fmsm.spaceStat).isEqualTo(oldSpaceStat);
             for (LogicalFile file : restoreFileSet) {
                 
assertThat(fmsm.getLogicalFile(file.getFileId())).isEqualTo(file);
@@ -662,7 +669,10 @@ public abstract class FileMergingSnapshotManagerTestBase {
                         Collections.singletonList(
                                 
IncrementalKeyedStateHandle.HandleAndLocalPath.of(
                                         buildOneSegmentFileHandle(
-                                                checkpointId, fmsm, 
closeableRegistry),
+                                                checkpointId,
+                                                fmsm,
+                                                CheckpointedStateScope.SHARED,
+                                                closeableRegistry),
                                         "localPath")),
                         Collections.emptyList(),
                         null);
@@ -670,21 +680,33 @@ public abstract class FileMergingSnapshotManagerTestBase {
         KeyGroupsStateHandle keyedStateHandle2 =
                 new KeyGroupsStateHandle(
                         new KeyGroupRangeOffsets(0, 8),
-                        buildOneSegmentFileHandle(checkpointId, fmsm, 
closeableRegistry));
+                        buildOneSegmentFileHandle(
+                                checkpointId,
+                                fmsm,
+                                CheckpointedStateScope.EXCLUSIVE,
+                                closeableRegistry));
 
         OperatorStateHandle operatorStateHandle1 =
                 new FileMergingOperatorStreamStateHandle(
                         null,
                         null,
                         Collections.emptyMap(),
-                        buildOneSegmentFileHandle(checkpointId, fmsm, 
closeableRegistry));
+                        buildOneSegmentFileHandle(
+                                checkpointId,
+                                fmsm,
+                                CheckpointedStateScope.EXCLUSIVE,
+                                closeableRegistry));
 
         OperatorStateHandle operatorStateHandle2 =
                 new FileMergingOperatorStreamStateHandle(
                         null,
                         null,
                         Collections.emptyMap(),
-                        buildOneSegmentFileHandle(checkpointId, fmsm, 
closeableRegistry));
+                        buildOneSegmentFileHandle(
+                                checkpointId,
+                                fmsm,
+                                CheckpointedStateScope.EXCLUSIVE,
+                                closeableRegistry));
 
         return OperatorSubtaskState.builder()
                 .setManagedKeyedState(keyedStateHandle1)
@@ -695,10 +717,13 @@ public abstract class FileMergingSnapshotManagerTestBase {
     }
 
     private SegmentFileStateHandle buildOneSegmentFileHandle(
-            long checkpointId, FileMergingSnapshotManager fmsm, 
CloseableRegistry closeableRegistry)
+            long checkpointId,
+            FileMergingSnapshotManager fmsm,
+            CheckpointedStateScope scope,
+            CloseableRegistry closeableRegistry)
             throws Exception {
         FileMergingCheckpointStateOutputStream outputStream =
-                writeCheckpointAndGetStream(checkpointId, fmsm, 
closeableRegistry);
+                writeCheckpointAndGetStream(checkpointId, fmsm, scope, 
closeableRegistry);
         return outputStream.closeAndGetHandle();
     }
 
@@ -741,15 +766,13 @@ public abstract class FileMergingSnapshotManagerTestBase {
     }
 
     FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
-            long checkpointId, FileMergingSnapshotManager fmsm, 
CloseableRegistry closeableRegistry)
+            long checkpointId,
+            FileMergingSnapshotManager fmsm,
+            CheckpointedStateScope scope,
+            CloseableRegistry closeableRegistry)
             throws IOException {
         return writeCheckpointAndGetStream(
-                subtaskKey1,
-                checkpointId,
-                CheckpointedStateScope.EXCLUSIVE,
-                fmsm,
-                closeableRegistry,
-                32);
+                subtaskKey1, checkpointId, scope, fmsm, closeableRegistry, 32);
     }
 
     FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(

Reply via email to