masteryhx commented on code in PR #22590: URL: https://github.com/apache/flink/pull/22590#discussion_r1226146210
########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; + +import java.io.Closeable; + +/** + * FileMergingSnapshotManager provides an interface to manage files and meta information for + * checkpoint files with merging checkpoint files enabled. It manages the files for ONE single task + * in TM, including all subtasks of this single task that running in this TM. There is one + * FileMergingSnapshotManager for each task per task manager. + * + * <p>TODO (FLINK-32073): create output stream. + * + * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logical files. + */ +public interface FileMergingSnapshotManager extends Closeable { + + /** + * Initialize the file system, recording the checkpoint path the manager should work with. + * + * <pre> + * The layout of checkpoint directory: + * /user-defined-checkpoint-dir + * /{job-id} (checkpointBaseDir) + * | + * + --shared/ + * | + * + --subtask-1/ + * + -- merged shared state files + * + --subtask-2/ + * + -- merged shared state files + * + --taskowned/ + * + -- merged private state files + * + --chk-1/ + * + --chk-2/ + * + --chk-3/ + * </pre> + * + * <p>The reason why initializing directories in this method instead of the constructor is that + * the FileMergingSnapshotManager itself belongs to the {@link TaskStateManager}, which is + * initialized when receiving a task, while the base directories for checkpoint are created by + * {@link FsCheckpointStorageAccess} when the state backend initializing per subtask. After the + * checkpoint directories are initialized, the managed subdirectories are initialized here. + * + * <p>Note: This method may be called several times, the implementation should ensure + * idempotency, and throw {@link IllegalArgumentException} when any of the path in params change + * across function calls. + * + * @param fileSystem The filesystem to write to. + * @param checkpointBaseDir The base directory for checkpoints. + * @param sharedStateDir The directory for shared checkpoint data. + * @param taskOwnedStateDir The name of the directory for state not owned/released by the + * master, but by the TaskManagers. + * @throws IllegalArgumentException thrown if these three paths are not deterministic across + * calls. + */ + void initFileSystem( + FileSystem fileSystem, Review Comment: Do we really need filesystem as a parameter ? We could also get this by `checkpointBaseDir.getFileSystem` which is a common usage in all cases who wants to get filesystem. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.core.fs.EntropyInjector; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.OutputStreamAndPath; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +import static org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile.PhysicalFileDeleter; + +/** Base implementation of {@link FileMergingSnapshotManager}. */ +public abstract class FileMergingSnapshotManagerBase implements FileMergingSnapshotManager { + + private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class); + + /** The identifier of this manager. */ + private final String id; + + /** The executor for I/O operations in this manager. */ + protected final Executor ioExecutor; + + /** The {@link FileSystem} that this manager works on. */ + protected FileSystem fs; + + // checkpoint directories + protected Path checkpointDir; + protected Path sharedStateDir; + protected Path taskOwnedStateDir; + + /** + * The file system should only be initialized once. + * + * @see FileMergingSnapshotManager#initFileSystem for the reason why a throttle is needed. + */ + private boolean fileSystemInitiated = false; + + /** + * File-system dependent value. Mark whether the file system this manager running on need sync + * for visibility. If true, DO a file sync after writing each segment . + */ + protected boolean shouldSyncAfterClosingLogicalFile; + + protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile; + + /** + * Currently the shared state files are merged within each subtask, files are split by different + * directories. + */ + private final Map<SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap<>(); + + /** + * The private state files are merged across subtasks, there is only one directory for + * merged-files within one TM per job. + */ + protected Path managedExclusiveStateDir; + + public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) { + this.id = id; + this.ioExecutor = ioExecutor; + } + + @Override + public void initFileSystem( + FileSystem fileSystem, + Path checkpointBaseDir, + Path sharedStateDir, + Path taskOwnedStateDir) + throws IllegalArgumentException { + if (fileSystemInitiated) { + Preconditions.checkArgument( + checkpointBaseDir.equals(this.checkpointDir), + "The checkpoint base dir is not deterministic across subtasks."); + Preconditions.checkArgument( + sharedStateDir.equals(this.sharedStateDir), + "The shared checkpoint dir is not deterministic across subtasks."); + Preconditions.checkArgument( + taskOwnedStateDir.equals(this.taskOwnedStateDir), + "The task-owned checkpoint dir is not deterministic across subtasks."); + return; + } + this.fs = fileSystem; + this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir); + this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir); + this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir); + this.fileSystemInitiated = true; + this.shouldSyncAfterClosingLogicalFile = shouldSyncAfterClosingLogicalFile(fileSystem); + // Initialize the managed exclusive path using id as the child path name. + // Currently, we use the task-owned directory to place the merged private state. According + // to the FLIP-306, we later consider move these files to the new introduced + // task-manager-owned directory. + Path managedExclusivePath = new Path(taskOwnedStateDir, id); + createManagedDirectory(managedExclusivePath); + this.managedExclusiveStateDir = managedExclusivePath; + } + + @Override + public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) { + String managedDirName = subtaskKey.getManagedDirName(); + Path managedPath = new Path(sharedStateDir, managedDirName); + if (!managedSharedStateDir.containsKey(subtaskKey)) { + createManagedDirectory(managedPath); + managedSharedStateDir.put(subtaskKey, managedPath); + } + } + + // ------------------------------------------------------------------------ + // logical & physical file + // ------------------------------------------------------------------------ + + /** + * Create a logical file on a physical file. + * + * @param physicalFile the underlying physical file. + * @param startOffset the offset of the physical file that the logical file start from. + * @param length the length of the logical file. + * @param subtaskKey the id of the subtask that the logical file belongs to. + * @return the created logical file. + */ + protected LogicalFile createLogicalFile( + @Nonnull PhysicalFile physicalFile, + int startOffset, + int length, + @Nonnull SubtaskKey subtaskKey) { + LogicalFileId fileID = LogicalFileId.generateRandomId(); + return new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey); + } + + /** + * Create a physical file in right location (managed directory), which is specified by scope of + * this checkpoint and current subtask. + * + * @param subtaskKey the {@link SubtaskKey} of current subtask. + * @param scope the scope of the checkpoint. + * @return the created physical file. + * @throws IOException if anything goes wrong with file system. + */ + @Nonnull + protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, CheckpointedStateScope scope) + throws IOException { + PhysicalFile result; + Exception latestException = null; + + Path dirPath = getManagedDir(subtaskKey, scope); + + if (dirPath == null) { + throw new IOException( + "Could not get " + + scope + + " path for subtask " + + subtaskKey + + ", the directory may have not been created."); + } + + for (int attempt = 0; attempt < 10; attempt++) { + try { + OutputStreamAndPath streamAndPath = + EntropyInjector.createEntropyAware( + fs, + generatePhysicalFilePath(dirPath), + FileSystem.WriteMode.NO_OVERWRITE); + FSDataOutputStream outputStream = streamAndPath.stream(); + Path filePath = streamAndPath.path(); + result = new PhysicalFile(outputStream, filePath, this.physicalFileDeleter, scope); + updateFileCreationMetrics(filePath); + return result; + } catch (Exception e) { + latestException = e; + } + } + + throw new IOException( + "Could not open output stream for state file merging.", latestException); + } + + private void updateFileCreationMetrics(Path path) { + // TODO: FLINK-32091 add io metrics + LOG.debug("Create a new physical file {} for checkpoint file merging.", path); + } + + /** + * Generate a file path for a physical file. + * + * @param dirPath the parent directory path for the physical file. + * @return the generated file path for a physical file. + */ + protected Path generatePhysicalFilePath(Path dirPath) { + // this must be called after initFileSystem() is called + // so the checkpoint directories must be not null if we reach here + final String fileName = UUID.randomUUID().toString(); + return new Path(dirPath, fileName); + } + + /** + * Delete a physical file by given file path. Use the io executor to do the deletion. + * + * @param filePath the given file path to delete. + */ + protected final void deletePhysicalFile(Path filePath) { + ioExecutor.execute( + () -> { + try { + fs.delete(filePath, false); + LOG.debug("Physical file deleted: {}.", filePath); + } catch (IOException e) { + LOG.warn("Fail to delete file: {}", filePath); + } + }); + } + + // ------------------------------------------------------------------------ + // abstract methods + // ------------------------------------------------------------------------ + + /** + * Get a reused physical file or create one. This will be called in checkpoint output stream + * creation logic. + * + * <p>TODO (FLINK-32073): Implement a CheckpointStreamFactory for file-merging that uses this + * method to create or reuse physical files. + * + * <p>Basic logic of file reusing: whenever a physical file is needed, this method is called + * with necessary information provided for acquiring a file. The file will not be reused until + * it is written and returned to the reused pool by calling {@link + * #returnPhysicalFileForNextReuse}. + * + * @param subtaskKey the subtask key for the caller + * @param checkpointId the checkpoint id + * @param scope checkpoint scope + * @return the requested physical file. + * @throws IOException thrown if anything goes wrong with file system. + */ + @Nonnull + protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint( + SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope) + throws IOException; + + /** + * Try to return an existing physical file to the manager for next reuse. If this physical file + * is no longer needed (for reusing), it will be closed. + * + * <p>Basic logic of file reusing, see {@link #getOrCreatePhysicalFileForCheckpoint}. + * + * @param subtaskKey the subtask key for the caller + * @param checkpointId in which checkpoint this physical file is requested. + * @param physicalFile the returning checkpoint + * @throws IOException thrown if anything goes wrong with file system. + * @see #getOrCreatePhysicalFileForCheckpoint(SubtaskKey, long, CheckpointedStateScope) + */ + protected abstract void returnPhysicalFileForNextReuse( + SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) throws IOException; + + // ------------------------------------------------------------------------ + // file system + // ------------------------------------------------------------------------ + + @Override + public Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope) { + if (scope.equals(CheckpointedStateScope.SHARED)) { + return managedSharedStateDir.get(subtaskKey); + } else { + return managedExclusiveStateDir; + } + } + + static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) { + // Currently, we do file sync regardless of the file system. + // TODO: Determine whether do file sync more wisely. Add an interface to FileSystem if + // needed. + return true; + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private void createManagedDirectory(Path managedPath) { + try { + FileStatus fileStatus = null; Review Comment: If filestatus is just used to check the existence of file, does using `mkdirs` directly could implement this ? It contains the logic of creating and reusing, also exceptions. ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java: ########## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** An abstraction of physical files in file-merging checkpoints. */ +public class PhysicalFile { + + private static final Logger LOG = LoggerFactory.getLogger(PhysicalFile.class); + + /** Functional interface to delete the physical file. */ + @FunctionalInterface + public interface PhysicalFileDeleter { + /** Delete the file. */ + void perform(Path filePath) throws IOException; + } + + /** + * Output stream to the file, which keeps open for writing. It can be null if the file is + * closed. + */ + @Nullable private FSDataOutputStream outputStream; + + /** Reference count from the logical files. */ + private final AtomicInteger logicalFileRefCount; + + /** The size of this physical file. */ + private final AtomicLong size; + + /** + * Deleter that will be called when delete this physical file. If null, do not delete this + * physical file. + */ + @Nullable private final PhysicalFileDeleter deleter; + + private final Path filePath; + + private final CheckpointedStateScope scope; + + /** + * If a physical file is closed, it means no more file segments will be written to the physical + * file, and it can be deleted once its logicalFileRefCount decreases to 0. + */ + private boolean closed; + + /** + * A file can be deleted if: 1. It is closed, and 2. No more {@link LogicalFile}s have reference + * on it. + */ + private boolean deleted = false; + + public PhysicalFile( + @Nullable FSDataOutputStream outputStream, + Path filePath, + PhysicalFileDeleter deleter, Review Comment: nit: @Nullable ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java: ########## @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.core.fs.EntropyInjector; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.OutputStreamAndPath; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +import static org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile.PhysicalFileDeleter; + +/** Base implementation of {@link FileMergingSnapshotManager}. */ +public abstract class FileMergingSnapshotManagerBase implements FileMergingSnapshotManager { + + private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class); + + /** The identifier of this manager. */ + private final String id; + + /** The executor for I/O operations in this manager. */ + protected final Executor ioExecutor; + + /** The {@link FileSystem} that this manager works on. */ + protected FileSystem fs; + + // checkpoint directories + protected Path checkpointDir; + protected Path sharedStateDir; + protected Path taskOwnedStateDir; + + /** + * The file system should only be initialized once. + * + * @see FileMergingSnapshotManager#initFileSystem for the reason why a throttle is needed. + */ + private boolean fileSystemInitiated = false; + + /** + * File-system dependent value. Mark whether the file system this manager running on need sync + * for visibility. If true, DO a file sync after writing each segment . + */ + protected boolean shouldSyncAfterClosingLogicalFile; + + protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile; + + /** + * Currently the shared state files are merged within each subtask, files are split by different + * directories. + */ + private final Map<SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap<>(); + + /** + * The private state files are merged across subtasks, there is only one directory for + * merged-files within one TM per job. + */ + protected Path managedExclusiveStateDir; + + public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) { + this.id = id; + this.ioExecutor = ioExecutor; + } + + @Override + public void initFileSystem( + FileSystem fileSystem, + Path checkpointBaseDir, + Path sharedStateDir, + Path taskOwnedStateDir) + throws IllegalArgumentException { + if (fileSystemInitiated) { + Preconditions.checkArgument( + checkpointBaseDir.equals(this.checkpointDir), + "The checkpoint base dir is not deterministic across subtasks."); + Preconditions.checkArgument( + sharedStateDir.equals(this.sharedStateDir), + "The shared checkpoint dir is not deterministic across subtasks."); + Preconditions.checkArgument( + taskOwnedStateDir.equals(this.taskOwnedStateDir), + "The task-owned checkpoint dir is not deterministic across subtasks."); + return; + } + this.fs = fileSystem; + this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir); + this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir); + this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir); + this.fileSystemInitiated = true; + this.shouldSyncAfterClosingLogicalFile = shouldSyncAfterClosingLogicalFile(fileSystem); + // Initialize the managed exclusive path using id as the child path name. + // Currently, we use the task-owned directory to place the merged private state. According + // to the FLIP-306, we later consider move these files to the new introduced + // task-manager-owned directory. + Path managedExclusivePath = new Path(taskOwnedStateDir, id); + createManagedDirectory(managedExclusivePath); + this.managedExclusiveStateDir = managedExclusivePath; + } + + @Override + public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) { + String managedDirName = subtaskKey.getManagedDirName(); + Path managedPath = new Path(sharedStateDir, managedDirName); + if (!managedSharedStateDir.containsKey(subtaskKey)) { + createManagedDirectory(managedPath); + managedSharedStateDir.put(subtaskKey, managedPath); + } + } + + // ------------------------------------------------------------------------ + // logical & physical file + // ------------------------------------------------------------------------ + + /** + * Create a logical file on a physical file. + * + * @param physicalFile the underlying physical file. + * @param startOffset the offset of the physical file that the logical file start from. + * @param length the length of the logical file. + * @param subtaskKey the id of the subtask that the logical file belongs to. + * @return the created logical file. + */ + protected LogicalFile createLogicalFile( + @Nonnull PhysicalFile physicalFile, + int startOffset, + int length, + @Nonnull SubtaskKey subtaskKey) { + LogicalFileId fileID = LogicalFileId.generateRandomId(); + return new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey); + } + + /** + * Create a physical file in right location (managed directory), which is specified by scope of + * this checkpoint and current subtask. + * + * @param subtaskKey the {@link SubtaskKey} of current subtask. + * @param scope the scope of the checkpoint. + * @return the created physical file. + * @throws IOException if anything goes wrong with file system. + */ + @Nonnull + protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, CheckpointedStateScope scope) + throws IOException { + PhysicalFile result; + Exception latestException = null; + + Path dirPath = getManagedDir(subtaskKey, scope); + + if (dirPath == null) { + throw new IOException( + "Could not get " + + scope + + " path for subtask " + + subtaskKey + + ", the directory may have not been created."); + } + + for (int attempt = 0; attempt < 10; attempt++) { + try { + OutputStreamAndPath streamAndPath = + EntropyInjector.createEntropyAware( + fs, + generatePhysicalFilePath(dirPath), + FileSystem.WriteMode.NO_OVERWRITE); + FSDataOutputStream outputStream = streamAndPath.stream(); + Path filePath = streamAndPath.path(); + result = new PhysicalFile(outputStream, filePath, this.physicalFileDeleter, scope); + updateFileCreationMetrics(filePath); + return result; + } catch (Exception e) { + latestException = e; + } + } + + throw new IOException( + "Could not open output stream for state file merging.", latestException); + } + + private void updateFileCreationMetrics(Path path) { + // TODO: FLINK-32091 add io metrics + LOG.debug("Create a new physical file {} for checkpoint file merging.", path); + } + + /** + * Generate a file path for a physical file. + * + * @param dirPath the parent directory path for the physical file. + * @return the generated file path for a physical file. + */ + protected Path generatePhysicalFilePath(Path dirPath) { + // this must be called after initFileSystem() is called + // so the checkpoint directories must be not null if we reach here + final String fileName = UUID.randomUUID().toString(); + return new Path(dirPath, fileName); + } + + /** + * Delete a physical file by given file path. Use the io executor to do the deletion. + * + * @param filePath the given file path to delete. + */ + protected final void deletePhysicalFile(Path filePath) { + ioExecutor.execute( + () -> { + try { + fs.delete(filePath, false); + LOG.debug("Physical file deleted: {}.", filePath); + } catch (IOException e) { + LOG.warn("Fail to delete file: {}", filePath); + } + }); + } + + // ------------------------------------------------------------------------ + // abstract methods + // ------------------------------------------------------------------------ + + /** + * Get a reused physical file or create one. This will be called in checkpoint output stream + * creation logic. + * + * <p>TODO (FLINK-32073): Implement a CheckpointStreamFactory for file-merging that uses this + * method to create or reuse physical files. + * + * <p>Basic logic of file reusing: whenever a physical file is needed, this method is called + * with necessary information provided for acquiring a file. The file will not be reused until + * it is written and returned to the reused pool by calling {@link + * #returnPhysicalFileForNextReuse}. + * + * @param subtaskKey the subtask key for the caller + * @param checkpointId the checkpoint id + * @param scope checkpoint scope + * @return the requested physical file. + * @throws IOException thrown if anything goes wrong with file system. + */ + @Nonnull + protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint( + SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope) + throws IOException; + + /** + * Try to return an existing physical file to the manager for next reuse. If this physical file + * is no longer needed (for reusing), it will be closed. + * + * <p>Basic logic of file reusing, see {@link #getOrCreatePhysicalFileForCheckpoint}. + * + * @param subtaskKey the subtask key for the caller + * @param checkpointId in which checkpoint this physical file is requested. + * @param physicalFile the returning checkpoint + * @throws IOException thrown if anything goes wrong with file system. + * @see #getOrCreatePhysicalFileForCheckpoint(SubtaskKey, long, CheckpointedStateScope) + */ + protected abstract void returnPhysicalFileForNextReuse( Review Comment: IIUC, returnPhysicalFileForNextReuse must be called after using a physical file ? otherwise caller has to maintain the close logic of the physical file, right ? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; + +import java.io.Closeable; + +/** + * FileMergingSnapshotManager provides an interface to manage files and meta information for + * checkpoint files with merging checkpoint files enabled. It manages the files for ONE single task + * in TM, including all subtasks of this single task that running in this TM. There is one + * FileMergingSnapshotManager for each task per task manager. + * + * <p>TODO (FLINK-32073): create output stream. + * + * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logical files. + */ +public interface FileMergingSnapshotManager extends Closeable { Review Comment: IIUC, it will be used in different threads, right ? How about clarifying the thread-safety guarantee ? ########## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java: ########## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.concurrent.Executor; + +/** A builder that builds the {@link FileMergingSnapshotManager}. */ +public class FileMergingSnapshotManagerBuilder { + + /** The id for identify a {@link FileMergingSnapshotManager}. */ + private final String id; + + @Nullable private Executor ioExecutor = null; + + /** + * Initialize the builder. + * + * @param id the id of the manager. + */ + public FileMergingSnapshotManagerBuilder(String id) { + this.id = id; + } + + /** + * Set the executor for io operation in manager. If null(default), all io operation will be + * executed synchronously. + */ + public FileMergingSnapshotManagerBuilder setIOExecutor(@Nullable Executor ioExecutor) { + this.ioExecutor = ioExecutor; + return this; + } + + /** + * Create file-merging snapshot manager based on configuration. + * + * <p>TODO (FLINK-32072): Create manager during the initialization of task manager services. + * + * <p>TODO (FLINK-32074): Support another type of FileMergingSnapshotManager that merges files + * across different checkpoints. + * + * @return the created manager. + */ + public FileMergingSnapshotManager build() throws IOException { Review Comment: Why the IOException is needed ? IIUC, All IO operations are in initFileSystem, constructor should be simple currently. ########## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java: ########## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FileMergingSnapshotManager}. */ +public class FileMergingSnapshotManagerTest { + + private final String tmId = "Testing"; + + private final OperatorID operatorID = new OperatorID(289347923L, 75893479L); + + private SubtaskKey subtaskKey1; + private SubtaskKey subtaskKey2; + + private Path checkpointBaseDir; + + @BeforeEach + public void setup(@TempDir java.nio.file.Path tempFolder) { + // use simplified job ids for the tests + long jobId = 1; + subtaskKey1 = new SubtaskKey(operatorID, new TaskInfo("TestingTask", 128, 0, 128, 3)); + subtaskKey2 = new SubtaskKey(operatorID, new TaskInfo("TestingTask", 128, 1, 128, 3)); + checkpointBaseDir = new Path(tempFolder.toString(), String.valueOf(jobId)); + } + + @Test + public void testCreateFileMergingSnapshotManager() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + assertThat(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)) + .isEqualTo( + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess + .CHECKPOINT_TASK_OWNED_STATE_DIR + + "/" + + tmId)); + assertThat(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)) + .isEqualTo( + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR + + "/" + + subtaskKey1.getManagedDirName())); + } + } + + @Test + public void testCreateAndReuseFiles() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + // firstly, we try shared state. + PhysicalFile file1 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file1.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + // allocate another + PhysicalFile file2 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file2.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file2).isNotEqualTo(file1); + + // return for reuse + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1); + + // allocate for another subtask + PhysicalFile file3 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey2, 0, CheckpointedStateScope.SHARED); + assertThat(file3.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED)); + assertThat(file3).isNotEqualTo(file1); + + // allocate for another checkpoint + PhysicalFile file4 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.SHARED); + assertThat(file4.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file4).isNotEqualTo(file1); + + // allocate for this checkpoint + PhysicalFile file5 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file5.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file5).isEqualTo(file1); + + // Secondly, we try private state + PhysicalFile file6 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file6.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + + // allocate another + PhysicalFile file7 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file7.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file7).isNotEqualTo(file6); + + // return for reuse + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file6); + + // allocate for another checkpoint + PhysicalFile file8 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE); + assertThat(file8.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file8).isNotEqualTo(file6); + + // allocate for this checkpoint but another subtask + PhysicalFile file9 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file9.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file9).isEqualTo(file6); + + assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + } + } + + @Test + public void testRefCountBetweenLogicalAndPhysicalFiles() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + + PhysicalFile physicalFile1 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(physicalFile1.isOpen()).isTrue(); + + LogicalFile logicalFile1 = fmsm.createLogicalFile(physicalFile1, 0, 10, subtaskKey1); + assertThat(logicalFile1.getSubtaskKey()).isEqualTo(subtaskKey1); + assertThat(logicalFile1.getPhysicalFile()).isEqualTo(physicalFile1); + assertThat(logicalFile1.getStartOffset()).isEqualTo(0); + assertThat(logicalFile1.getLength()).isEqualTo(10); + assertThat(physicalFile1.getRefCount()).isEqualTo(1); + + assertThat(logicalFile1.isDiscarded()).isFalse(); + logicalFile1.advanceLastCheckpointId(2); + assertThat(logicalFile1.getLastUsedCheckpointID()).isEqualTo(2); + logicalFile1.advanceLastCheckpointId(1); + assertThat(logicalFile1.getLastUsedCheckpointID()).isEqualTo(2); + logicalFile1.discardWithCheckpointId(1); + assertThat(logicalFile1.isDiscarded()).isFalse(); + logicalFile1.discardWithCheckpointId(2); + assertThat(logicalFile1.isDiscarded()).isTrue(); + + // the stream is still open for reuse + assertThat(physicalFile1.isOpen()).isTrue(); + assertThat(physicalFile1.isDeleted()).isFalse(); + assertThat(physicalFile1.getRefCount()).isEqualTo(0); + + physicalFile1.close(); + assertThat(physicalFile1.isOpen()).isFalse(); + assertThat(physicalFile1.isDeleted()).isTrue(); + + // try close physical file but not deleted + PhysicalFile physicalFile2 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + LogicalFile logicalFile2 = fmsm.createLogicalFile(physicalFile2, 0, 10, subtaskKey1); + assertThat(logicalFile2.getPhysicalFile()).isEqualTo(physicalFile2); + assertThat(logicalFile2.getStartOffset()).isEqualTo(0); + assertThat(logicalFile2.getLength()).isEqualTo(10); + assertThat(physicalFile2.getRefCount()).isEqualTo(1); + logicalFile2.advanceLastCheckpointId(2); + + assertThat(physicalFile2.isOpen()).isTrue(); + assertThat(physicalFile2.isDeleted()).isFalse(); + physicalFile2.close(); + assertThat(physicalFile2.isOpen()).isFalse(); + assertThat(physicalFile2.isDeleted()).isFalse(); + assertThat(physicalFile2.getRefCount()).isEqualTo(1); + + logicalFile2.discardWithCheckpointId(2); + assertThat(logicalFile2.isDiscarded()).isTrue(); + assertThat(physicalFile2.isDeleted()).isTrue(); + assertThat(physicalFile2.getRefCount()).isEqualTo(0); + } + } + + @Test + public void testSizeStatsInPhysicalFile() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + PhysicalFile physicalFile = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + + assertThat(physicalFile.getSize()).isEqualTo(0); + physicalFile.incSize(123); + assertThat(physicalFile.getSize()).isEqualTo(123); + physicalFile.incSize(456); + assertThat(physicalFile.getSize()).isEqualTo(123 + 456); + } + } + + private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) + throws IOException { + FileSystem fs = LocalFileSystem.getSharedInstance(); + Path sharedStateDir = new Path(checkpointBaseDir, "shared"); Review Comment: AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR & AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
