curcur commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1217425463


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
+
+import java.io.Closeable;
+
+/**
+ * FileMergingSnapshotManager provides an interface to manage files and meta 
information for
+ * checkpoint files with merging checkpoint files enabled. 
FileMergingSnapshotManager resides on the
+ * TM side.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logical 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Initialize the file system, recording the checkpoint path the manager 
should work with.
+     *
+     * <pre>
+     * The layout of checkpoint directory:
+     * /user-defined-checkpoint-dir
+     *     /{job-id} (checkpointBaseDir)
+     *         |
+     *         + --shared/
+     *             |
+     *             + --subtask-1/
+     *                 + -- merged shared state files
+     *             + --subtask-2/
+     *                 + -- merged shared state files
+     *         + --taskowned/
+     *             + -- merged private state files
+     *         + --chk-1/
+     *         + --chk-2/
+     *         + --chk-3/
+     * </pre>
+     *
+     * <p>The reason why initializing directories in this method instead of 
the constructor is that
+     * the FileMergingSnapshotManager itself belongs to the {@link 
TaskStateManager}, which is
+     * initialized when receiving a task, while the base directories for 
checkpoint are created by
+     * {@link FsCheckpointStorageAccess} when the state backend initializing. 
After the checkpoint
+     * directories are initialized, the managed subdirectories are initialized 
here.
+     *
+     * <p>Note: This method may be called several times, the implementation 
should ensure
+     * idempotency, and throw {@link IllegalArgumentException} when any of the 
path in params change
+     * across function calls.
+     *
+     * @param fileSystem The filesystem to write to.
+     * @param checkpointBaseDir The base directory for checkpoints.
+     * @param sharedStateDir The directory for shared checkpoint data.
+     * @param taskOwnedStateDir The name of the directory for state not 
owned/released by the
+     *     master, but by the TaskManagers.
+     * @throws IllegalArgumentException thrown if these three paths are not 
deterministic across
+     *     calls.
+     */
+    void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir);

Review Comment:
   Missing `throws IllegalArgumentException`? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+    private final String id;
+
+    protected final Executor ioExecutor;
+
+    // file system and directories
+    protected FileSystem fs;
+    protected Path checkpointDir;
+    protected Path sharedStateDir;
+    protected Path taskOwnedStateDir;
+
+    protected int writeBufferSize;
+    private boolean fileSystemInitiated = false;
+
+    protected boolean syncAfterClosingLogicalFile;

Review Comment:
   1. `syncAfterClosingLogicalFile` Is this File System dependent?
   
   Is it true some file systems like S3 do not/are not able to support 
syncAfterClosingLogicalFile? And that's why this value is introduced?
   
   What is used to decide this value? If this is decided by FS, I think this 
should be part of the interface.
   
   `ableToSyncAfterClosingLogicalFile` be part of the interface.
   
   2. Also, I think especially for Base Class, it needs some comments for such 
flags.
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+    private final String id;
+
+    protected final Executor ioExecutor;
+
+    // file system and directories
+    protected FileSystem fs;
+    protected Path checkpointDir;
+    protected Path sharedStateDir;
+    protected Path taskOwnedStateDir;
+
+    protected int writeBufferSize;
+    private boolean fileSystemInitiated = false;
+
+    protected boolean syncAfterClosingLogicalFile;
+
+    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+    private final Map<SubtaskKey, Path> managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+    protected Path managedExclusiveStateDir;
+
+    public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+        this.id = id;
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir) {
+        if (fileSystemInitiated) {
+            Preconditions.checkArgument(
+                    checkpointBaseDir.equals(this.checkpointDir),
+                    "The checkpoint base dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    sharedStateDir.equals(this.sharedStateDir),
+                    "The shared checkpoint dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    taskOwnedStateDir.equals(this.taskOwnedStateDir),
+                    "The task-owned checkpoint dir is not deterministic across 
subtasks.");
+            return;
+        }
+        this.fs = fileSystem;
+        this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir);
+        this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir);
+        this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir);
+        this.fileSystemInitiated = true;
+        this.syncAfterClosingLogicalFile = 
shouldSyncAfterClosingLogicalFile(checkpointBaseDir);
+        // Initialize the managed exclusive path using id as the child path 
name.
+        // Currently, we use the task-owned directory to place the merged 
private state. According
+        // to the FLIP-306, we later consider move these files to the new 
introduced
+        // task-manager-owned directory.
+        Path managedExclusivePath = new Path(taskOwnedStateDir, id);
+        createManagedDirectory(managedExclusivePath);
+        this.managedExclusiveStateDir = managedExclusivePath;
+    }
+
+    @Override
+    public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
+        String managedDirName = subtaskKey.getManagedDirName();
+        Path managedPath = new Path(sharedStateDir, managedDirName);
+        if (!managedSharedStateDir.containsKey(subtaskKey)) {
+            createManagedDirectory(managedPath);
+            managedSharedStateDir.put(subtaskKey, managedPath);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  logical & physical file
+    // ------------------------------------------------------------------------
+
+    protected LogicalFile createLogicalFile(
+            @Nonnull PhysicalFile physicalFile, @Nonnull SubtaskKey 
subtaskKey) {
+        LogicalFileId fileID = LogicalFileId.generateRandomId();
+        return new LogicalFile(fileID, physicalFile, subtaskKey);
+    }
+
+    @Nonnull
+    protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, 
CheckpointedStateScope scope)
+            throws IOException {
+        PhysicalFile result;
+        Exception latestException = null;
+
+        Path dirPath = getManagedDir(subtaskKey, scope);
+
+        if (dirPath == null) {
+            throw new IOException(
+                    "Could not get "
+                            + scope
+                            + " path for subtask "
+                            + subtaskKey
+                            + ", the directory may have not been created.");
+        }
+
+        for (int attempt = 0; attempt < 10; attempt++) {
+            try {
+                OutputStreamAndPath streamAndPath =
+                        EntropyInjector.createEntropyAware(
+                                fs,
+                                createPhysicalFilePath(dirPath),
+                                FileSystem.WriteMode.NO_OVERWRITE);
+                FSDataOutputStream outputStream = streamAndPath.stream();
+                Path filePath = streamAndPath.path();
+                result = new PhysicalFile(outputStream, filePath, 
this.physicalFileDeleter, scope);
+                updateFileCreationMetrics(filePath);
+                return result;
+            } catch (Exception e) {
+                latestException = e;
+            }
+        }
+
+        throw new IOException(
+                "Could not open output stream for state file merging.", 
latestException);
+    }
+
+    private void updateFileCreationMetrics(Path path) {
+        // TODO: FLINK- add io metrics
+        LOG.debug("Create a new physical file {} for checkpoint file 
merging.", path);
+    }
+
+    protected Path createPhysicalFilePath(Path dirPath) {
+        // this must be called after initFileSystem() is called
+        // so the checkpoint directories must be not null if we reach here
+        final String fileName = UUID.randomUUID().toString();
+        return new Path(dirPath, fileName);
+    }
+
+    protected final void deletePhysicalFile(FSDataOutputStream outputStream, 
Path filePath) {
+
+        if (outputStream != null) {
+            try {
+                outputStream.close();
+            } catch (IOException e) {
+                LOG.warn("Fail to close output stream when deleting file: {}", 
filePath);
+            }
+        }
+        ioExecutor.execute(
+                () -> {
+                    try {
+                        fs.delete(filePath, false);
+                        LOG.debug("Physical file deleted: {}.", filePath);
+                    } catch (IOException e) {
+                        LOG.warn("Fail to delete file: {}", filePath);
+                    }
+                });
+    }
+
+    // ------------------------------------------------------------------------
+    //  abstract methods
+    // ------------------------------------------------------------------------
+
+    /**
+     * Get a reused physical file or create one. This will be called in stream 
creation logic.

Review Comment:
   what does stream mean here?
   
   Do you mean OuputStream for XXX?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+    private final String id;
+
+    protected final Executor ioExecutor;
+
+    // file system and directories
+    protected FileSystem fs;
+    protected Path checkpointDir;
+    protected Path sharedStateDir;
+    protected Path taskOwnedStateDir;
+
+    protected int writeBufferSize;
+    private boolean fileSystemInitiated = false;
+
+    protected boolean syncAfterClosingLogicalFile;
+
+    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+    private final Map<SubtaskKey, Path> managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+    protected Path managedExclusiveStateDir;
+
+    public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+        this.id = id;
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir) {
+        if (fileSystemInitiated) {
+            Preconditions.checkArgument(
+                    checkpointBaseDir.equals(this.checkpointDir),
+                    "The checkpoint base dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    sharedStateDir.equals(this.sharedStateDir),
+                    "The shared checkpoint dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    taskOwnedStateDir.equals(this.taskOwnedStateDir),
+                    "The task-owned checkpoint dir is not deterministic across 
subtasks.");
+            return;
+        }
+        this.fs = fileSystem;
+        this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir);
+        this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir);
+        this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir);
+        this.fileSystemInitiated = true;
+        this.syncAfterClosingLogicalFile = 
shouldSyncAfterClosingLogicalFile(checkpointBaseDir);
+        // Initialize the managed exclusive path using id as the child path 
name.
+        // Currently, we use the task-owned directory to place the merged 
private state. According
+        // to the FLIP-306, we later consider move these files to the new 
introduced
+        // task-manager-owned directory.
+        Path managedExclusivePath = new Path(taskOwnedStateDir, id);
+        createManagedDirectory(managedExclusivePath);
+        this.managedExclusiveStateDir = managedExclusivePath;
+    }
+
+    @Override
+    public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
+        String managedDirName = subtaskKey.getManagedDirName();
+        Path managedPath = new Path(sharedStateDir, managedDirName);
+        if (!managedSharedStateDir.containsKey(subtaskKey)) {
+            createManagedDirectory(managedPath);
+            managedSharedStateDir.put(subtaskKey, managedPath);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  logical & physical file
+    // ------------------------------------------------------------------------
+
+    protected LogicalFile createLogicalFile(
+            @Nonnull PhysicalFile physicalFile, @Nonnull SubtaskKey 
subtaskKey) {
+        LogicalFileId fileID = LogicalFileId.generateRandomId();
+        return new LogicalFile(fileID, physicalFile, subtaskKey);
+    }
+
+    @Nonnull
+    protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, 
CheckpointedStateScope scope)
+            throws IOException {
+        PhysicalFile result;
+        Exception latestException = null;
+
+        Path dirPath = getManagedDir(subtaskKey, scope);
+
+        if (dirPath == null) {
+            throw new IOException(
+                    "Could not get "
+                            + scope
+                            + " path for subtask "
+                            + subtaskKey
+                            + ", the directory may have not been created.");
+        }
+
+        for (int attempt = 0; attempt < 10; attempt++) {
+            try {
+                OutputStreamAndPath streamAndPath =
+                        EntropyInjector.createEntropyAware(
+                                fs,
+                                createPhysicalFilePath(dirPath),
+                                FileSystem.WriteMode.NO_OVERWRITE);
+                FSDataOutputStream outputStream = streamAndPath.stream();
+                Path filePath = streamAndPath.path();
+                result = new PhysicalFile(outputStream, filePath, 
this.physicalFileDeleter, scope);
+                updateFileCreationMetrics(filePath);
+                return result;
+            } catch (Exception e) {
+                latestException = e;
+            }
+        }
+
+        throw new IOException(
+                "Could not open output stream for state file merging.", 
latestException);
+    }
+
+    private void updateFileCreationMetrics(Path path) {
+        // TODO: FLINK- add io metrics
+        LOG.debug("Create a new physical file {} for checkpoint file 
merging.", path);
+    }
+
+    protected Path createPhysicalFilePath(Path dirPath) {
+        // this must be called after initFileSystem() is called
+        // so the checkpoint directories must be not null if we reach here
+        final String fileName = UUID.randomUUID().toString();
+        return new Path(dirPath, fileName);
+    }
+
+    protected final void deletePhysicalFile(FSDataOutputStream outputStream, 
Path filePath) {
+
+        if (outputStream != null) {
+            try {
+                outputStream.close();
+            } catch (IOException e) {
+                LOG.warn("Fail to close output stream when deleting file: {}", 
filePath);
+            }
+        }
+        ioExecutor.execute(
+                () -> {
+                    try {
+                        fs.delete(filePath, false);
+                        LOG.debug("Physical file deleted: {}.", filePath);
+                    } catch (IOException e) {
+                        LOG.warn("Fail to delete file: {}", filePath);
+                    }
+                });
+    }
+
+    // ------------------------------------------------------------------------
+    //  abstract methods
+    // ------------------------------------------------------------------------
+
+    /**
+     * Get a reused physical file or create one. This will be called in stream 
creation logic.
+     *
+     * <p>Basic logic of file reusing: whenever a physical is needed, this 
method is called with
+     * necessary information provided for acquiring a file. The file will not 
be reused until it is
+     * written and returned to the reused pool by calling {@link 
#returnPhysicalFileForNextReuse}.
+     *
+     * @param subtaskKey the subtask key for the caller
+     * @param checkpointId the checkpoint id
+     * @param scope checkpoint scope
+     * @return the requested physical file.
+     * @throws IOException thrown if anything goes wrong with file system.
+     */
+    @Nonnull
+    protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+            SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope 
scope)
+            throws IOException;
+
+    /**
+     * Try to return an existing physical file to the manager for next reuse. 
Delete if needed.
+     *
+     * <p>Basic logic of file reusing, see {@link 
#getOrCreatePhysicalFileForCheckpoint}.
+     *
+     * @param subtaskKey the subtask key for the caller
+     * @param checkpointId in which checkpoint this physical is requested.

Review Comment:
   this physical => this physical file



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringBasedID;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+
+/** An abstraction of logical files in file-merging checkpoints. */
+public class LogicalFile {

Review Comment:
   Does a Logical file stand for a segment?
   
   Does it need to record the offset of the physical file?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+    private final String id;
+
+    protected final Executor ioExecutor;
+
+    // file system and directories
+    protected FileSystem fs;
+    protected Path checkpointDir;
+    protected Path sharedStateDir;
+    protected Path taskOwnedStateDir;
+
+    protected int writeBufferSize;
+    private boolean fileSystemInitiated = false;
+
+    protected boolean syncAfterClosingLogicalFile;
+
+    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+    private final Map<SubtaskKey, Path> managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+    protected Path managedExclusiveStateDir;
+
+    public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+        this.id = id;
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir) {
+        if (fileSystemInitiated) {
+            Preconditions.checkArgument(
+                    checkpointBaseDir.equals(this.checkpointDir),
+                    "The checkpoint base dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    sharedStateDir.equals(this.sharedStateDir),
+                    "The shared checkpoint dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    taskOwnedStateDir.equals(this.taskOwnedStateDir),
+                    "The task-owned checkpoint dir is not deterministic across 
subtasks.");
+            return;
+        }
+        this.fs = fileSystem;
+        this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir);
+        this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir);
+        this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir);
+        this.fileSystemInitiated = true;
+        this.syncAfterClosingLogicalFile = 
shouldSyncAfterClosingLogicalFile(checkpointBaseDir);
+        // Initialize the managed exclusive path using id as the child path 
name.
+        // Currently, we use the task-owned directory to place the merged 
private state. According
+        // to the FLIP-306, we later consider move these files to the new 
introduced
+        // task-manager-owned directory.
+        Path managedExclusivePath = new Path(taskOwnedStateDir, id);
+        createManagedDirectory(managedExclusivePath);
+        this.managedExclusiveStateDir = managedExclusivePath;
+    }
+
+    @Override
+    public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
+        String managedDirName = subtaskKey.getManagedDirName();
+        Path managedPath = new Path(sharedStateDir, managedDirName);
+        if (!managedSharedStateDir.containsKey(subtaskKey)) {
+            createManagedDirectory(managedPath);
+            managedSharedStateDir.put(subtaskKey, managedPath);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  logical & physical file
+    // ------------------------------------------------------------------------
+
+    protected LogicalFile createLogicalFile(
+            @Nonnull PhysicalFile physicalFile, @Nonnull SubtaskKey 
subtaskKey) {
+        LogicalFileId fileID = LogicalFileId.generateRandomId();
+        return new LogicalFile(fileID, physicalFile, subtaskKey);
+    }
+
+    @Nonnull
+    protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, 
CheckpointedStateScope scope)
+            throws IOException {
+        PhysicalFile result;
+        Exception latestException = null;
+
+        Path dirPath = getManagedDir(subtaskKey, scope);
+
+        if (dirPath == null) {
+            throw new IOException(
+                    "Could not get "
+                            + scope
+                            + " path for subtask "
+                            + subtaskKey
+                            + ", the directory may have not been created.");
+        }
+
+        for (int attempt = 0; attempt < 10; attempt++) {
+            try {
+                OutputStreamAndPath streamAndPath =
+                        EntropyInjector.createEntropyAware(
+                                fs,
+                                createPhysicalFilePath(dirPath),
+                                FileSystem.WriteMode.NO_OVERWRITE);
+                FSDataOutputStream outputStream = streamAndPath.stream();
+                Path filePath = streamAndPath.path();
+                result = new PhysicalFile(outputStream, filePath, 
this.physicalFileDeleter, scope);
+                updateFileCreationMetrics(filePath);
+                return result;
+            } catch (Exception e) {
+                latestException = e;
+            }
+        }
+
+        throw new IOException(
+                "Could not open output stream for state file merging.", 
latestException);
+    }
+
+    private void updateFileCreationMetrics(Path path) {
+        // TODO: FLINK- add io metrics
+        LOG.debug("Create a new physical file {} for checkpoint file 
merging.", path);
+    }
+
+    protected Path createPhysicalFilePath(Path dirPath) {
+        // this must be called after initFileSystem() is called
+        // so the checkpoint directories must be not null if we reach here
+        final String fileName = UUID.randomUUID().toString();
+        return new Path(dirPath, fileName);
+    }
+
+    protected final void deletePhysicalFile(FSDataOutputStream outputStream, 
Path filePath) {
+
+        if (outputStream != null) {
+            try {
+                outputStream.close();
+            } catch (IOException e) {
+                LOG.warn("Fail to close output stream when deleting file: {}", 
filePath);
+            }
+        }
+        ioExecutor.execute(
+                () -> {
+                    try {
+                        fs.delete(filePath, false);
+                        LOG.debug("Physical file deleted: {}.", filePath);
+                    } catch (IOException e) {
+                        LOG.warn("Fail to delete file: {}", filePath);
+                    }
+                });
+    }
+
+    // ------------------------------------------------------------------------
+    //  abstract methods
+    // ------------------------------------------------------------------------
+
+    /**
+     * Get a reused physical file or create one. This will be called in stream 
creation logic.
+     *
+     * <p>Basic logic of file reusing: whenever a physical is needed, this 
method is called with
+     * necessary information provided for acquiring a file. The file will not 
be reused until it is
+     * written and returned to the reused pool by calling {@link 
#returnPhysicalFileForNextReuse}.
+     *
+     * @param subtaskKey the subtask key for the caller
+     * @param checkpointId the checkpoint id
+     * @param scope checkpoint scope
+     * @return the requested physical file.
+     * @throws IOException thrown if anything goes wrong with file system.
+     */
+    @Nonnull
+    protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+            SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope 
scope)
+            throws IOException;
+
+    /**
+     * Try to return an existing physical file to the manager for next reuse. 
Delete if needed.

Review Comment:
   "Delete if needed" What does it mean? Delete what, the physical file? Or the 
ref in the re-use pool?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** A {@link FileMergingSnapshotManager} that merging files within a 
checkpoint. */
+public class WithinCheckpointFileMergingSnapshotManager extends 
FileMergingSnapshotManagerBase {
+
+    /** A dummy subtask key to reuse files among subtasks for private states. 
*/
+    private static final SubtaskKey dummySubtaskKey = new SubtaskKey("dummy", 
-1, -1);
+
+    /**
+     * OutputStreams to be reused when writing checkpoint files. For 
WITHIN_BOUNDARY mode, physical
+     * files are NOT shared among multiple checkpoints. This map of file 
contains all files that are
+     * not being used and ready to be used.
+     */
+    private final Map<Tuple3<Long, SubtaskKey, CheckpointedStateScope>, 
PhysicalFile>
+            availableFiles;
+
+    public WithinCheckpointFileMergingSnapshotManager(String id, Executor 
ioExecutor) {
+        // currently there is no file size limit For WITHIN_BOUNDARY mode
+        super(id, ioExecutor);
+        availableFiles = new HashMap<>();
+    }
+
+    @Override
+    @Nonnull
+    protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+            SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope 
scope)
+            throws IOException {
+        // TODO: FLINK-32076 will add a file pool for each subtask key.
+        Tuple3<Long, SubtaskKey, CheckpointedStateScope> fileKey =
+                Tuple3.of(
+                        checkpointId,
+                        scope == CheckpointedStateScope.SHARED ? subtaskKey : 
dummySubtaskKey,
+                        scope);
+        PhysicalFile file;
+        synchronized (availableFiles) {
+            file = availableFiles.remove(fileKey);

Review Comment:
   Is it possible that
   
   the same key can map to different physical files (it might not be possible 
at the task level).
   
   Let's discuss this offline.
   
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** A {@link FileMergingSnapshotManager} that merging files within a 
checkpoint. */
+public class WithinCheckpointFileMergingSnapshotManager extends 
FileMergingSnapshotManagerBase {
+
+    /** A dummy subtask key to reuse files among subtasks for private states. 
*/
+    private static final SubtaskKey dummySubtaskKey = new SubtaskKey("dummy", 
-1, -1);
+
+    /**
+     * OutputStreams to be reused when writing checkpoint files. For 
WITHIN_BOUNDARY mode, physical
+     * files are NOT shared among multiple checkpoints. This map of file 
contains all files that are
+     * not being used and ready to be used.
+     */
+    private final Map<Tuple3<Long, SubtaskKey, CheckpointedStateScope>, 
PhysicalFile>
+            availableFiles;
+
+    public WithinCheckpointFileMergingSnapshotManager(String id, Executor 
ioExecutor) {
+        // currently there is no file size limit For WITHIN_BOUNDARY mode
+        super(id, ioExecutor);
+        availableFiles = new HashMap<>();
+    }
+
+    @Override
+    @Nonnull
+    protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+            SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope 
scope)
+            throws IOException {
+        // TODO: FLINK-32076 will add a file pool for each subtask key.
+        Tuple3<Long, SubtaskKey, CheckpointedStateScope> fileKey =
+                Tuple3.of(
+                        checkpointId,
+                        scope == CheckpointedStateScope.SHARED ? subtaskKey : 
dummySubtaskKey,
+                        scope);
+        PhysicalFile file;
+        synchronized (availableFiles) {
+            file = availableFiles.remove(fileKey);

Review Comment:
   I think 
   `if (current != physicalFile) {
               physicalFile.close();
           }`
   
   handles this problem, just double confirm.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+    private final String id;
+
+    protected final Executor ioExecutor;
+
+    // file system and directories
+    protected FileSystem fs;
+    protected Path checkpointDir;
+    protected Path sharedStateDir;
+    protected Path taskOwnedStateDir;
+
+    protected int writeBufferSize;
+    private boolean fileSystemInitiated = false;
+
+    protected boolean syncAfterClosingLogicalFile;
+
+    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+    private final Map<SubtaskKey, Path> managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+    protected Path managedExclusiveStateDir;
+
+    public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+        this.id = id;
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir) {
+        if (fileSystemInitiated) {
+            Preconditions.checkArgument(
+                    checkpointBaseDir.equals(this.checkpointDir),
+                    "The checkpoint base dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    sharedStateDir.equals(this.sharedStateDir),
+                    "The shared checkpoint dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    taskOwnedStateDir.equals(this.taskOwnedStateDir),
+                    "The task-owned checkpoint dir is not deterministic across 
subtasks.");
+            return;
+        }
+        this.fs = fileSystem;
+        this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir);
+        this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir);
+        this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir);
+        this.fileSystemInitiated = true;
+        this.syncAfterClosingLogicalFile = 
shouldSyncAfterClosingLogicalFile(checkpointBaseDir);
+        // Initialize the managed exclusive path using id as the child path 
name.
+        // Currently, we use the task-owned directory to place the merged 
private state. According
+        // to the FLIP-306, we later consider move these files to the new 
introduced
+        // task-manager-owned directory.
+        Path managedExclusivePath = new Path(taskOwnedStateDir, id);
+        createManagedDirectory(managedExclusivePath);
+        this.managedExclusiveStateDir = managedExclusivePath;
+    }
+
+    @Override
+    public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
+        String managedDirName = subtaskKey.getManagedDirName();
+        Path managedPath = new Path(sharedStateDir, managedDirName);
+        if (!managedSharedStateDir.containsKey(subtaskKey)) {
+            createManagedDirectory(managedPath);
+            managedSharedStateDir.put(subtaskKey, managedPath);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  logical & physical file
+    // ------------------------------------------------------------------------
+
+    protected LogicalFile createLogicalFile(
+            @Nonnull PhysicalFile physicalFile, @Nonnull SubtaskKey 
subtaskKey) {
+        LogicalFileId fileID = LogicalFileId.generateRandomId();
+        return new LogicalFile(fileID, physicalFile, subtaskKey);
+    }
+
+    @Nonnull
+    protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, 
CheckpointedStateScope scope)
+            throws IOException {
+        PhysicalFile result;
+        Exception latestException = null;
+
+        Path dirPath = getManagedDir(subtaskKey, scope);
+
+        if (dirPath == null) {
+            throw new IOException(
+                    "Could not get "
+                            + scope
+                            + " path for subtask "
+                            + subtaskKey
+                            + ", the directory may have not been created.");
+        }
+
+        for (int attempt = 0; attempt < 10; attempt++) {
+            try {
+                OutputStreamAndPath streamAndPath =
+                        EntropyInjector.createEntropyAware(
+                                fs,
+                                createPhysicalFilePath(dirPath),
+                                FileSystem.WriteMode.NO_OVERWRITE);
+                FSDataOutputStream outputStream = streamAndPath.stream();
+                Path filePath = streamAndPath.path();
+                result = new PhysicalFile(outputStream, filePath, 
this.physicalFileDeleter, scope);
+                updateFileCreationMetrics(filePath);
+                return result;
+            } catch (Exception e) {
+                latestException = e;
+            }
+        }
+
+        throw new IOException(
+                "Could not open output stream for state file merging.", 
latestException);
+    }
+
+    private void updateFileCreationMetrics(Path path) {
+        // TODO: FLINK- add io metrics
+        LOG.debug("Create a new physical file {} for checkpoint file 
merging.", path);
+    }
+
+    protected Path createPhysicalFilePath(Path dirPath) {
+        // this must be called after initFileSystem() is called
+        // so the checkpoint directories must be not null if we reach here
+        final String fileName = UUID.randomUUID().toString();
+        return new Path(dirPath, fileName);
+    }
+
+    protected final void deletePhysicalFile(FSDataOutputStream outputStream, 
Path filePath) {
+
+        if (outputStream != null) {
+            try {
+                outputStream.close();
+            } catch (IOException e) {
+                LOG.warn("Fail to close output stream when deleting file: {}", 
filePath);
+            }
+        }
+        ioExecutor.execute(
+                () -> {
+                    try {
+                        fs.delete(filePath, false);
+                        LOG.debug("Physical file deleted: {}.", filePath);
+                    } catch (IOException e) {
+                        LOG.warn("Fail to delete file: {}", filePath);
+                    }
+                });
+    }
+
+    // ------------------------------------------------------------------------
+    //  abstract methods
+    // ------------------------------------------------------------------------
+
+    /**
+     * Get a reused physical file or create one. This will be called in stream 
creation logic.
+     *
+     * <p>Basic logic of file reusing: whenever a physical is needed, this 
method is called with

Review Comment:
   a physical => a physical file



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+/** A builder that builds the {@link FileMergingSnapshotManager}. */
+public class FileMergingSnapshotManagerBuilder {
+
+    private final String id;
+
+    @Nullable private Executor ioExecutor = null;
+
+    /**
+     * Initialize the builder.
+     *
+     * @param id the id of the manager.
+     */
+    public FileMergingSnapshotManagerBuilder(String id) {
+        this.id = id;
+    }
+
+    /**
+     * Set the executor for io operation in manager. If null(default), all io 
operation will be
+     * executed synchronously.
+     */
+    public FileMergingSnapshotManagerBuilder setIoExecutor(@Nullable Executor 
ioExecutor) {

Review Comment:
   => setIOExecutor



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+    private final String id;
+
+    protected final Executor ioExecutor;
+
+    // file system and directories
+    protected FileSystem fs;
+    protected Path checkpointDir;
+    protected Path sharedStateDir;
+    protected Path taskOwnedStateDir;
+
+    protected int writeBufferSize;
+    private boolean fileSystemInitiated = false;
+
+    protected boolean syncAfterClosingLogicalFile;
+
+    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+    private final Map<SubtaskKey, Path> managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+    protected Path managedExclusiveStateDir;
+
+    public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+        this.id = id;
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir) {
+        if (fileSystemInitiated) {
+            Preconditions.checkArgument(
+                    checkpointBaseDir.equals(this.checkpointDir),
+                    "The checkpoint base dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    sharedStateDir.equals(this.sharedStateDir),
+                    "The shared checkpoint dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    taskOwnedStateDir.equals(this.taskOwnedStateDir),
+                    "The task-owned checkpoint dir is not deterministic across 
subtasks.");
+            return;
+        }

Review Comment:
   `FileMergingManager` is created for each TM, so `initFileSystem()` is only 
called once right?
   
   So, 
   1. in what case initFileSystem will be called several times?
   2. what if a TM contains different jobs (Base CP directory maybe different)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
+
+import java.io.Closeable;
+
+/**
+ * FileMergingSnapshotManager provides an interface to manage files and meta 
information for
+ * checkpoint files with merging checkpoint files enabled. 
FileMergingSnapshotManager resides on the
+ * TM side.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logical 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Initialize the file system, recording the checkpoint path the manager 
should work with.
+     *
+     * <pre>
+     * The layout of checkpoint directory:
+     * /user-defined-checkpoint-dir
+     *     /{job-id} (checkpointBaseDir)
+     *         |
+     *         + --shared/
+     *             |
+     *             + --subtask-1/
+     *                 + -- merged shared state files
+     *             + --subtask-2/
+     *                 + -- merged shared state files
+     *         + --taskowned/
+     *             + -- merged private state files
+     *         + --chk-1/
+     *         + --chk-2/
+     *         + --chk-3/
+     * </pre>
+     *
+     * <p>The reason why initializing directories in this method instead of 
the constructor is that
+     * the FileMergingSnapshotManager itself belongs to the {@link 
TaskStateManager}, which is
+     * initialized when receiving a task, while the base directories for 
checkpoint are created by
+     * {@link FsCheckpointStorageAccess} when the state backend initializing. 
After the checkpoint
+     * directories are initialized, the managed subdirectories are initialized 
here.
+     *
+     * <p>Note: This method may be called several times, the implementation 
should ensure
+     * idempotency, and throw {@link IllegalArgumentException} when any of the 
path in params change
+     * across function calls.
+     *
+     * @param fileSystem The filesystem to write to.
+     * @param checkpointBaseDir The base directory for checkpoints.
+     * @param sharedStateDir The directory for shared checkpoint data.
+     * @param taskOwnedStateDir The name of the directory for state not 
owned/released by the
+     *     master, but by the TaskManagers.
+     * @throws IllegalArgumentException thrown if these three paths are not 
deterministic across
+     *     calls.
+     */
+    void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir);
+
+    /**
+     * Register a subtask and create the managed directory for shared states.
+     *
+     * @param subtaskKey the subtask key identifying a subtask.
+     * @see #initFileSystem for layout information.
+     */
+    void registerSubtaskForSharedStates(SubtaskKey subtaskKey);
+
+    /**
+     * Get the managed directory of the file-merging snapshot manager, created 
in {@link
+     * #initFileSystem} or {@link #registerSubtaskForSharedStates}.
+     *
+     * @param subtaskKey the subtask key identifying the subtask.
+     * @param scope the checkpoint scope.
+     * @return the managed directory for one subtask in specified checkpoint 
scope.
+     */
+    Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
+
+    /** A key identifies a subtask. */
+    final class SubtaskKey {
+        final String taskName;
+        final int subtaskIndex;
+        final int parallelism;
+
+        final int hashCode;
+
+        SubtaskKey(TaskInfo taskInfo) {
+            this.taskName = taskInfo.getTaskName();
+            this.subtaskIndex = taskInfo.getIndexOfThisSubtask();
+            this.parallelism = taskInfo.getNumberOfParallelSubtasks();
+            int hash = taskName.hashCode();
+            hash = 31 * hash + subtaskIndex;
+            hash = 31 * hash + parallelism;
+            this.hashCode = hash;
+        }
+
+        SubtaskKey(String taskName, int subtaskIndex, int parallelism) {
+            this.taskName = taskName;
+            this.subtaskIndex = subtaskIndex;
+            this.parallelism = parallelism;
+            int hash = taskName.hashCode();
+            hash = 31 * hash + subtaskIndex;
+            hash = 31 * hash + parallelism;
+            this.hashCode = hash;
+        }

Review Comment:
   1. This method does not seem to be used
   2. This method contains a lot of duplicated code compared to the above one.
   3. If only `of(TaskInfo taskInfo)` is public, keeping one of the two 
constructors is enough (to avoid unnecessary function calls)
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+    private final String id;
+
+    protected final Executor ioExecutor;
+
+    // file system and directories
+    protected FileSystem fs;
+    protected Path checkpointDir;
+    protected Path sharedStateDir;
+    protected Path taskOwnedStateDir;
+
+    protected int writeBufferSize;
+    private boolean fileSystemInitiated = false;
+
+    protected boolean syncAfterClosingLogicalFile;
+
+    protected PhysicalFile.PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+    private final Map<SubtaskKey, Path> managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+    protected Path managedExclusiveStateDir;
+
+    public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+        this.id = id;
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir) {
+        if (fileSystemInitiated) {
+            Preconditions.checkArgument(
+                    checkpointBaseDir.equals(this.checkpointDir),
+                    "The checkpoint base dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    sharedStateDir.equals(this.sharedStateDir),
+                    "The shared checkpoint dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    taskOwnedStateDir.equals(this.taskOwnedStateDir),
+                    "The task-owned checkpoint dir is not deterministic across 
subtasks.");
+            return;
+        }
+        this.fs = fileSystem;
+        this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir);
+        this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir);
+        this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir);
+        this.fileSystemInitiated = true;
+        this.syncAfterClosingLogicalFile = 
shouldSyncAfterClosingLogicalFile(checkpointBaseDir);
+        // Initialize the managed exclusive path using id as the child path 
name.
+        // Currently, we use the task-owned directory to place the merged 
private state. According
+        // to the FLIP-306, we later consider move these files to the new 
introduced
+        // task-manager-owned directory.
+        Path managedExclusivePath = new Path(taskOwnedStateDir, id);
+        createManagedDirectory(managedExclusivePath);
+        this.managedExclusiveStateDir = managedExclusivePath;
+    }
+
+    @Override
+    public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
+        String managedDirName = subtaskKey.getManagedDirName();
+        Path managedPath = new Path(sharedStateDir, managedDirName);
+        if (!managedSharedStateDir.containsKey(subtaskKey)) {
+            createManagedDirectory(managedPath);
+            managedSharedStateDir.put(subtaskKey, managedPath);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  logical & physical file
+    // ------------------------------------------------------------------------
+
+    protected LogicalFile createLogicalFile(
+            @Nonnull PhysicalFile physicalFile, @Nonnull SubtaskKey 
subtaskKey) {
+        LogicalFileId fileID = LogicalFileId.generateRandomId();
+        return new LogicalFile(fileID, physicalFile, subtaskKey);
+    }
+
+    @Nonnull
+    protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, 
CheckpointedStateScope scope)
+            throws IOException {
+        PhysicalFile result;
+        Exception latestException = null;
+
+        Path dirPath = getManagedDir(subtaskKey, scope);
+
+        if (dirPath == null) {
+            throw new IOException(
+                    "Could not get "
+                            + scope
+                            + " path for subtask "
+                            + subtaskKey
+                            + ", the directory may have not been created.");
+        }
+
+        for (int attempt = 0; attempt < 10; attempt++) {
+            try {
+                OutputStreamAndPath streamAndPath =
+                        EntropyInjector.createEntropyAware(
+                                fs,
+                                createPhysicalFilePath(dirPath),
+                                FileSystem.WriteMode.NO_OVERWRITE);
+                FSDataOutputStream outputStream = streamAndPath.stream();
+                Path filePath = streamAndPath.path();
+                result = new PhysicalFile(outputStream, filePath, 
this.physicalFileDeleter, scope);
+                updateFileCreationMetrics(filePath);
+                return result;
+            } catch (Exception e) {
+                latestException = e;
+            }
+        }
+
+        throw new IOException(
+                "Could not open output stream for state file merging.", 
latestException);
+    }
+
+    private void updateFileCreationMetrics(Path path) {
+        // TODO: FLINK- add io metrics
+        LOG.debug("Create a new physical file {} for checkpoint file 
merging.", path);
+    }
+
+    protected Path createPhysicalFilePath(Path dirPath) {
+        // this must be called after initFileSystem() is called
+        // so the checkpoint directories must be not null if we reach here
+        final String fileName = UUID.randomUUID().toString();
+        return new Path(dirPath, fileName);
+    }
+
+    protected final void deletePhysicalFile(FSDataOutputStream outputStream, 
Path filePath) {
+
+        if (outputStream != null) {
+            try {
+                outputStream.close();
+            } catch (IOException e) {
+                LOG.warn("Fail to close output stream when deleting file: {}", 
filePath);
+            }
+        }
+        ioExecutor.execute(
+                () -> {
+                    try {
+                        fs.delete(filePath, false);
+                        LOG.debug("Physical file deleted: {}.", filePath);
+                    } catch (IOException e) {
+                        LOG.warn("Fail to delete file: {}", filePath);
+                    }
+                });
+    }
+
+    // ------------------------------------------------------------------------
+    //  abstract methods
+    // ------------------------------------------------------------------------
+
+    /**
+     * Get a reused physical file or create one. This will be called in stream 
creation logic.
+     *
+     * <p>Basic logic of file reusing: whenever a physical is needed, this 
method is called with
+     * necessary information provided for acquiring a file. The file will not 
be reused until it is
+     * written and returned to the reused pool by calling {@link 
#returnPhysicalFileForNextReuse}.
+     *
+     * @param subtaskKey the subtask key for the caller
+     * @param checkpointId the checkpoint id
+     * @param scope checkpoint scope
+     * @return the requested physical file.
+     * @throws IOException thrown if anything goes wrong with file system.
+     */
+    @Nonnull
+    protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+            SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope 
scope)
+            throws IOException;
+
+    /**
+     * Try to return an existing physical file to the manager for next reuse. 
Delete if needed.
+     *
+     * <p>Basic logic of file reusing, see {@link 
#getOrCreatePhysicalFileForCheckpoint}.
+     *
+     * @param subtaskKey the subtask key for the caller
+     * @param checkpointId in which checkpoint this physical is requested.
+     * @param physicalFile the returning checkpoint
+     * @throws IOException thrown if anything goes wrong with file system.
+     * @see #getOrCreatePhysicalFileForCheckpoint(SubtaskKey, long, 
CheckpointedStateScope)
+     */
+    protected abstract void returnPhysicalFileForNextReuse(

Review Comment:
   this method is not used



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.Executor;
+
+/** A builder that builds the {@link FileMergingSnapshotManager}. */
+public class FileMergingSnapshotManagerBuilder {
+
+    private final String id;
+
+    @Nullable private Executor ioExecutor = null;
+
+    /**
+     * Initialize the builder.
+     *
+     * @param id the id of the manager.
+     */
+    public FileMergingSnapshotManagerBuilder(String id) {
+        this.id = id;
+    }
+
+    /**
+     * Set the executor for io operation in manager. If null(default), all io 
operation will be
+     * executed synchronously.
+     */
+    public FileMergingSnapshotManagerBuilder setIoExecutor(@Nullable Executor 
ioExecutor) {
+        this.ioExecutor = ioExecutor;
+        return this;
+    }
+
+    /**
+     * Create file-merging snapshot manager based on configuration.
+     *
+     * <p>TODO (FLINK-32072): Create manager during the initialization of task 
manager services.
+     *
+     * <p>TODO (FLINK-32074): Support another type of 
FileMergingSnapshotManager that merges files
+     * across differnet checkpoints.
+     *
+     * @return the created manager.
+     */
+    public FileMergingSnapshotManager build() throws IOException {
+        return new WithinCheckpointFileMergingSnapshotManager(

Review Comment:
   This is actually a `WithinCheckpointFileMergingSnapshotManagerBuilder` right?
   
   Usually, it might be better to bond with 
WithinCheckpointFileMergingSnapshotManager.
   
   What if you have other types of FileMergingSnapshotManager?
   
   Also, does it have to be a builder? Since it also seems quite simple if 
remove this builder class.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** A {@link FileMergingSnapshotManager} that merging files within a 
checkpoint. */
+public class WithinCheckpointFileMergingSnapshotManager extends 
FileMergingSnapshotManagerBase {
+
+    /** A dummy subtask key to reuse files among subtasks for private states. 
*/
+    private static final SubtaskKey dummySubtaskKey = new SubtaskKey("dummy", 
-1, -1);
+
+    /**
+     * OutputStreams to be reused when writing checkpoint files. For 
WITHIN_BOUNDARY mode, physical
+     * files are NOT shared among multiple checkpoints. This map of file 
contains all files that are
+     * not being used and ready to be used.
+     */
+    private final Map<Tuple3<Long, SubtaskKey, CheckpointedStateScope>, 
PhysicalFile>
+            availableFiles;

Review Comment:
   `writablePhysicalFilePool`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** A {@link FileMergingSnapshotManager} that merging files within a 
checkpoint. */
+public class WithinCheckpointFileMergingSnapshotManager extends 
FileMergingSnapshotManagerBase {
+
+    /** A dummy subtask key to reuse files among subtasks for private states. 
*/
+    private static final SubtaskKey dummySubtaskKey = new SubtaskKey("dummy", 
-1, -1);

Review Comment:
   static final is usually CAPTILIZED
   => DUMMY_SUBTASK_KEY



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringBasedID;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+
+/** An abstraction of logical files in file-merging checkpoints. */
+public class LogicalFile {
+
+    /** ID for {@link LogicalFile}. It should be unique for each file. */
+    public static class LogicalFileId extends StringBasedID {
+
+        public LogicalFileId(String keyString) {
+            super(keyString);
+        }
+
+        public Path getFilePath() {
+            return new Path(getKeyString());
+        }
+
+        public static LogicalFileId generateRandomId() {
+            return new LogicalFileId(UUID.randomUUID().toString());
+        }
+    }
+
+    /** ID for this file. */
+    LogicalFileId fileId;
+
+    private long lastCheckpointId = -1L;
+
+    boolean removed = false;
+
+    /** The physical file where this logical file is stored. This should be 
null. */
+    @Nonnull private final PhysicalFile physicalFile;
+
+    @Nonnull private final SubtaskKey subtaskKey;
+
+    public LogicalFile(
+            LogicalFileId fileId,
+            @Nonnull PhysicalFile physicalFile,
+            @Nonnull SubtaskKey subtaskKey) {
+        this.fileId = fileId;
+        this.physicalFile = physicalFile;
+        this.subtaskKey = subtaskKey;
+        physicalFile.incRefCount();
+    }
+
+    public LogicalFileId getFileId() {
+        return fileId;
+    }
+
+    public void discardWithCheckpointId(long checkpointId) throws IOException {
+        if (!removed && checkpointId >= lastCheckpointId) {
+            physicalFile.decRefCount();
+            removed = true;
+        }
+    }
+
+    public void advanceLastCheckpointId(long checkpointId) {

Review Comment:
   Does a logic file map to a single CP?
   When this method is needed?
   `advanceLastCheckpointId`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** A {@link FileMergingSnapshotManager} that merging files within a 
checkpoint. */
+public class WithinCheckpointFileMergingSnapshotManager extends 
FileMergingSnapshotManagerBase {
+
+    /** A dummy subtask key to reuse files among subtasks for private states. 
*/
+    private static final SubtaskKey dummySubtaskKey = new SubtaskKey("dummy", 
-1, -1);
+
+    /**
+     * OutputStreams to be reused when writing checkpoint files. For 
WITHIN_BOUNDARY mode, physical
+     * files are NOT shared among multiple checkpoints. This map of file 
contains all files that are
+     * not being used and ready to be used.

Review Comment:
   For WITHIN_BOUNDARY mode, physical files are NOT shared among multiple 
checkpoints.
   That means XXX files can be shared. This map contains all physical files 
that are still writable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to