This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch release-2.1 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.1 by this push: new e107aaddfae [FLINK-38327] Use vertex id instead of operator id in checkpoint file-merging manager (#26973) e107aaddfae is described below commit e107aaddfae4beef386fd34e012ffed8cfea46da Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Sep 10 14:08:47 2025 +0800 [FLINK-38327] Use vertex id instead of operator id in checkpoint file-merging manager (#26973) (cherry picked from commit ee73d0f79841413f3a553de87e6ea23eef0e51ab) --- .../filemerging/FileMergingSnapshotManager.java | 24 +++---- .../SubtaskFileMergingManagerRestoreOperation.java | 10 +-- .../operators/StreamTaskStateInitializerImpl.java | 8 ++- .../FileMergingSnapshotManagerTestBase.java | 73 ++++++++++++++-------- 4 files changed, 70 insertions(+), 45 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index 36eab3c5996..7310e5dd32c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.TaskInfo; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; @@ -209,14 +209,14 @@ public interface FileMergingSnapshotManager extends Closeable { long checkpointId, SubtaskKey subtaskKey, Stream<SegmentFileStateHandle> stateHandles); /** - * A key identifies a subtask. A subtask can be identified by the operator id, subtask index and + * A key identifies a subtask. A subtask can be identified by the vertex id, subtask index and * the parallelism. Note that this key should be consistent across job attempts. */ final class SubtaskKey { private static final String MANAGED_DIR_FORMAT = "job_%s_op_%s_%d_%d"; final String jobIDString; - final String operatorIDString; + final String vertexIDString; final int subtaskIndex; final int parallelism; @@ -226,23 +226,23 @@ public interface FileMergingSnapshotManager extends Closeable { */ final int hashCode; - public SubtaskKey(JobID jobID, OperatorID operatorID, TaskInfo taskInfo) { + public SubtaskKey(JobID jobID, JobVertexID vertexID, TaskInfo taskInfo) { this( jobID.toHexString(), - operatorID.toHexString(), + vertexID.toHexString(), taskInfo.getIndexOfThisSubtask(), taskInfo.getNumberOfParallelSubtasks()); } @VisibleForTesting public SubtaskKey( - String jobIDString, String operatorIDString, int subtaskIndex, int parallelism) { + String jobIDString, String vertexIDString, int subtaskIndex, int parallelism) { this.jobIDString = jobIDString; - this.operatorIDString = operatorIDString; + this.vertexIDString = vertexIDString; this.subtaskIndex = subtaskIndex; this.parallelism = parallelism; int hash = jobIDString.hashCode(); - hash = 31 * hash + operatorIDString.hashCode(); + hash = 31 * hash + vertexIDString.hashCode(); hash = 31 * hash + subtaskIndex; hash = 31 * hash + parallelism; this.hashCode = hash; @@ -251,7 +251,7 @@ public interface FileMergingSnapshotManager extends Closeable { public static SubtaskKey of(Environment environment) { return new SubtaskKey( environment.getJobID(), - OperatorID.fromJobVertexID(environment.getJobVertexId()), + environment.getJobVertexId(), environment.getTaskInfo()); } @@ -269,7 +269,7 @@ public interface FileMergingSnapshotManager extends Closeable { return String.format( MANAGED_DIR_FORMAT, jobIDString, - operatorIDString, + vertexIDString, subtaskIndex, parallelism) .replaceAll("[^a-zA-Z0-9\\-]", "_"); @@ -289,7 +289,7 @@ public interface FileMergingSnapshotManager extends Closeable { return hashCode == that.hashCode && subtaskIndex == that.subtaskIndex && parallelism == that.parallelism - && operatorIDString.equals(that.operatorIDString) + && vertexIDString.equals(that.vertexIDString) && jobIDString.equals(that.jobIDString); } @@ -301,7 +301,7 @@ public interface FileMergingSnapshotManager extends Closeable { @Override public String toString() { return String.format( - "%s-%s(%d/%d)", jobIDString, operatorIDString, subtaskIndex, parallelism); + "%s-%s(%d/%d)", jobIDString, vertexIDString, subtaskIndex, parallelism); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java index ee969e94978..36c064efb84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java @@ -21,7 +21,7 @@ package org.apache.flink.runtime.checkpoint.filemerging; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -51,7 +51,7 @@ public class SubtaskFileMergingManagerRestoreOperation { private final TaskInfo taskInfo; /** The id of the operator to which the subtask belongs. */ - private final OperatorID operatorID; + private final JobVertexID vertexID; private final FileMergingSnapshotManager fileMergingSnapshotManager; @@ -63,19 +63,19 @@ public class SubtaskFileMergingManagerRestoreOperation { FileMergingSnapshotManager fileMergingSnapshotManager, JobID jobID, TaskInfo taskInfo, - OperatorID operatorID, + JobVertexID vertexID, OperatorSubtaskState subtaskState) { this.checkpointId = checkpointId; this.fileMergingSnapshotManager = fileMergingSnapshotManager; this.jobID = jobID; this.taskInfo = Preconditions.checkNotNull(taskInfo); - this.operatorID = Preconditions.checkNotNull(operatorID); + this.vertexID = Preconditions.checkNotNull(vertexID); this.subtaskState = Preconditions.checkNotNull(subtaskState); } public void restore() { FileMergingSnapshotManager.SubtaskKey subtaskKey = - new FileMergingSnapshotManager.SubtaskKey(jobID, operatorID, taskInfo); + new FileMergingSnapshotManager.SubtaskKey(jobID, vertexID, taskInfo); Stream<? extends StateObject> keyedStateHandles = Stream.concat( diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 58b1a83b819..4490bad722f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; import org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.state.AsyncKeyedStateBackend; @@ -162,7 +163,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize throws Exception { TaskInfo taskInfo = environment.getTaskInfo(); - registerRestoredStateToFileMergingManager(environment.getJobID(), taskInfo, operatorID); + registerRestoredStateToFileMergingManager( + environment.getJobID(), taskInfo, environment.getJobVertexId(), operatorID); OperatorSubtaskDescriptionText operatorSubtaskDescription = new OperatorSubtaskDescriptionText( @@ -361,7 +363,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize } private void registerRestoredStateToFileMergingManager( - JobID jobID, TaskInfo taskInfo, OperatorID operatorID) { + JobID jobID, TaskInfo taskInfo, JobVertexID jobVertexID, OperatorID operatorID) { FileMergingSnapshotManager fileMergingSnapshotManager = taskStateManager.getFileMergingSnapshotManager(); Optional<Long> restoredCheckpointId = taskStateManager.getRestoreCheckpointId(); @@ -377,7 +379,7 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize fileMergingSnapshotManager, jobID, taskInfo, - operatorID, + jobVertexID, subtaskState.get()); restoreOperation.restore(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java index 582a4281b2d..e739e6d4313 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SpaceStat; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.state.CheckpointedStateScope; @@ -73,7 +74,7 @@ public abstract class FileMergingSnapshotManagerTestBase { final JobID jobID = new JobID(); - final OperatorID operatorID = new OperatorID(289347923L, 75893479L); + final JobVertexID vertexID = new JobVertexID(289347923L, 75893479L); SubtaskKey subtaskKey1; SubtaskKey subtaskKey2; @@ -89,9 +90,9 @@ public abstract class FileMergingSnapshotManagerTestBase { @BeforeEach public void setup(@TempDir java.nio.file.Path tempFolder) { subtaskKey1 = - new SubtaskKey(jobID, operatorID, new TaskInfoImpl("TestingTask", 128, 0, 128, 3)); + new SubtaskKey(jobID, vertexID, new TaskInfoImpl("TestingTask", 128, 0, 128, 3)); subtaskKey2 = - new SubtaskKey(jobID, operatorID, new TaskInfoImpl("TestingTask", 128, 1, 128, 3)); + new SubtaskKey(jobID, vertexID, new TaskInfoImpl("TestingTask", 128, 1, 128, 3)); checkpointBaseDir = new Path(tempFolder.toString(), jobID.toHexString()); sharedStateDir = new Path(checkpointBaseDir, CHECKPOINT_SHARED_STATE_DIR); @@ -492,12 +493,21 @@ public abstract class FileMergingSnapshotManagerTestBase { (FileMergingSnapshotManagerBase) createFileMergingSnapshotManager(checkpointBaseDir); CloseableRegistry closeableRegistry = new CloseableRegistry()) { - + fmsm.registerSubtaskForSharedStates(subtaskKey1); fmsm.notifyCheckpointStart(subtaskKey1, checkpointId); Map<OperatorID, OperatorSubtaskState> subtaskStatesByOperatorID = new HashMap<>(); + // Here, we simulate a task with 2 operators, each operator has one keyed state and one + // operator state. The second operator's id is the same as the vertexID. + // first operator + subtaskStatesByOperatorID.put( + new OperatorID(777L, 75893479L), + buildOperatorSubtaskState(checkpointId, fmsm, closeableRegistry)); + + // second operator subtaskStatesByOperatorID.put( - operatorID, buildOperatorSubtaskState(checkpointId, fmsm, closeableRegistry)); + OperatorID.fromJobVertexID(vertexID), + buildOperatorSubtaskState(checkpointId, fmsm, closeableRegistry)); taskStateSnapshot = new TaskStateSnapshot(subtaskStatesByOperatorID); oldSpaceStat = fmsm.spaceStat; @@ -510,6 +520,7 @@ public abstract class FileMergingSnapshotManagerTestBase { try (FileMergingSnapshotManagerBase fmsm = (FileMergingSnapshotManagerBase) createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); TaskInfo taskInfo = new TaskInfoImpl( "test restore", @@ -521,19 +532,15 @@ public abstract class FileMergingSnapshotManagerTestBase { taskStateSnapshot.getSubtaskStateMappings()) { SubtaskFileMergingManagerRestoreOperation restoreOperation = new SubtaskFileMergingManagerRestoreOperation( - checkpointId, - fmsm, - jobID, - taskInfo, - entry.getKey(), - entry.getValue()); + checkpointId, fmsm, jobID, taskInfo, vertexID, entry.getValue()); restoreOperation.restore(); } TreeMap<Long, Set<LogicalFile>> stateFiles = fmsm.getUploadedStates(); assertThat(stateFiles.size()).isEqualTo(1); Set<LogicalFile> restoreFileSet = stateFiles.get(checkpointId); assertThat(restoreFileSet).isNotNull(); - assertThat(restoreFileSet.size()).isEqualTo(4); + // 2 operators * (2 keyed state + 2 operator state) + assertThat(restoreFileSet.size()).isEqualTo(8); assertThat(fmsm.spaceStat).isEqualTo(oldSpaceStat); for (LogicalFile file : restoreFileSet) { assertThat(fmsm.getLogicalFile(file.getFileId())).isEqualTo(file); @@ -662,7 +669,10 @@ public abstract class FileMergingSnapshotManagerTestBase { Collections.singletonList( IncrementalKeyedStateHandle.HandleAndLocalPath.of( buildOneSegmentFileHandle( - checkpointId, fmsm, closeableRegistry), + checkpointId, + fmsm, + CheckpointedStateScope.SHARED, + closeableRegistry), "localPath")), Collections.emptyList(), null); @@ -670,21 +680,33 @@ public abstract class FileMergingSnapshotManagerTestBase { KeyGroupsStateHandle keyedStateHandle2 = new KeyGroupsStateHandle( new KeyGroupRangeOffsets(0, 8), - buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry)); + buildOneSegmentFileHandle( + checkpointId, + fmsm, + CheckpointedStateScope.EXCLUSIVE, + closeableRegistry)); OperatorStateHandle operatorStateHandle1 = new FileMergingOperatorStreamStateHandle( null, null, Collections.emptyMap(), - buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry)); + buildOneSegmentFileHandle( + checkpointId, + fmsm, + CheckpointedStateScope.EXCLUSIVE, + closeableRegistry)); OperatorStateHandle operatorStateHandle2 = new FileMergingOperatorStreamStateHandle( null, null, Collections.emptyMap(), - buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry)); + buildOneSegmentFileHandle( + checkpointId, + fmsm, + CheckpointedStateScope.EXCLUSIVE, + closeableRegistry)); return OperatorSubtaskState.builder() .setManagedKeyedState(keyedStateHandle1) @@ -695,10 +717,13 @@ public abstract class FileMergingSnapshotManagerTestBase { } private SegmentFileStateHandle buildOneSegmentFileHandle( - long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) + long checkpointId, + FileMergingSnapshotManager fmsm, + CheckpointedStateScope scope, + CloseableRegistry closeableRegistry) throws Exception { FileMergingCheckpointStateOutputStream outputStream = - writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry); + writeCheckpointAndGetStream(checkpointId, fmsm, scope, closeableRegistry); return outputStream.closeAndGetHandle(); } @@ -741,15 +766,13 @@ public abstract class FileMergingSnapshotManagerTestBase { } FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream( - long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) + long checkpointId, + FileMergingSnapshotManager fmsm, + CheckpointedStateScope scope, + CloseableRegistry closeableRegistry) throws IOException { return writeCheckpointAndGetStream( - subtaskKey1, - checkpointId, - CheckpointedStateScope.EXCLUSIVE, - fmsm, - closeableRegistry, - 32); + subtaskKey1, checkpointId, scope, fmsm, closeableRegistry, 32); } FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(