curcur commented on code in PR #22590:
URL: https://github.com/apache/flink/pull/22590#discussion_r1208271576


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Manages meta information in TM side when taking file-merging checkpoints. 
It provides interfaces

Review Comment:
   => `FileMergingSnapshotManager` provides an interface to manage files and 
meta information for checkpoint files with merging checkpoint files enabled. 
`FileMergingSnapshotManager` resides on the TM side.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Manages meta information in TM side when taking file-merging checkpoints. 
It provides interfaces
+ * to manipulate files.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logic 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Init the file system if needed. This includes creating the managed 
directory for file-merging

Review Comment:
   Init the file system if needed -> Initialize the file system, including 
creating the managed directory for merged files.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingCheckpointUtils.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/** Utilities for file-merging checkpoints. */
+public class FileMergingCheckpointUtils {
+
+    // ------------------------------------------------------------------------
+    //  FileMergingSnapshotManager initialization related
+    // ------------------------------------------------------------------------
+
+    /** A class that packs the file system info for snapshot. */
+    public static class SnapshotFileSystemInfo {
+        FileSystem fs;
+        Path checkpointBaseDirectory;
+        Path sharedStateDirectory;
+        Path taskOwnedStateDirectory;
+
+        public SnapshotFileSystemInfo(
+                FileSystem fs,
+                Path checkpointBaseDirectory,
+                @Nullable Path sharedStateDirectory,
+                Path taskOwnedStateDirectory) {
+            this.fs = fs;
+            this.checkpointBaseDirectory = checkpointBaseDirectory;
+            this.sharedStateDirectory = sharedStateDirectory;
+            this.taskOwnedStateDirectory = taskOwnedStateDirectory;
+        }
+    }
+
+    public static SnapshotFileSystemInfo packFileSystemInfo(

Review Comment:
   packFileSystemInfo => createSnapshotFileSystem



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Manages meta information in TM side when taking file-merging checkpoints. 
It provides interfaces
+ * to manipulate files.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logic 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Init the file system if needed. This includes creating the managed 
directory for file-merging
+     *
+     * @param fileSystemInfo the packed file system information.
+     * @throws IOException thrown if anything goes wrong with file system.
+     */
+    void initFileSystem(FileMergingCheckpointUtils.SnapshotFileSystemInfo 
fileSystemInfo)
+            throws IOException;
+
+    /**
+     * Register a subtask and create the managed directory for shared states.
+     *
+     * @param subtaskKey the subtask key identifying a subtask.
+     */
+    void addSubtask(SubtaskKey subtaskKey);
+
+    /** Return the working directory of the segment snapshot manager. */

Review Comment:
   What does segment mean here? Does it have any specific meaning other than 
the current snapshot manager?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Manages meta information in TM side when taking file-merging checkpoints. 
It provides interfaces
+ * to manipulate files.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logic 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Init the file system if needed. This includes creating the managed 
directory for file-merging
+     *
+     * @param fileSystemInfo the packed file system information.
+     * @throws IOException thrown if anything goes wrong with file system.
+     */
+    void initFileSystem(FileMergingCheckpointUtils.SnapshotFileSystemInfo 
fileSystemInfo)
+            throws IOException;
+
+    /**
+     * Register a subtask and create the managed directory for shared states.
+     *
+     * @param subtaskKey the subtask key identifying a subtask.
+     */
+    void addSubtask(SubtaskKey subtaskKey);

Review Comment:
   addSubtask -> registerSubtaskforSharedStates



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingCheckpointUtils.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/** Utilities for file-merging checkpoints. */
+public class FileMergingCheckpointUtils {
+
+    // ------------------------------------------------------------------------
+    //  FileMergingSnapshotManager initialization related
+    // ------------------------------------------------------------------------
+
+    /** A class that packs the file system info for snapshot. */
+    public static class SnapshotFileSystemInfo {

Review Comment:
   Why is this not a separate class, but resides in a util class as a static 
class?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Manages meta information in TM side when taking file-merging checkpoints. 
It provides interfaces
+ * to manipulate files.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logic 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Init the file system if needed. This includes creating the managed 
directory for file-merging
+     *
+     * @param fileSystemInfo the packed file system information.
+     * @throws IOException thrown if anything goes wrong with file system.
+     */
+    void initFileSystem(FileMergingCheckpointUtils.SnapshotFileSystemInfo 
fileSystemInfo)

Review Comment:
   nit: import `FileMergingCheckpointUtils`, and remove 
`FileMergingCheckpointUtils`  here



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Manages meta information in TM side when taking file-merging checkpoints. 
It provides interfaces
+ * to manipulate files.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logic 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Init the file system if needed. This includes creating the managed 
directory for file-merging
+     *
+     * @param fileSystemInfo the packed file system information.

Review Comment:
   what does "packed" mean here? I think you mean the file system information 
for file merging snapshot?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Manages meta information in TM side when taking file-merging checkpoints. 
It provides interfaces
+ * to manipulate files.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logic 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Init the file system if needed. This includes creating the managed 
directory for file-merging
+     *
+     * @param fileSystemInfo the packed file system information.
+     * @throws IOException thrown if anything goes wrong with file system.
+     */
+    void initFileSystem(FileMergingCheckpointUtils.SnapshotFileSystemInfo 
fileSystemInfo)
+            throws IOException;
+
+    /**
+     * Register a subtask and create the managed directory for shared states.
+     *
+     * @param subtaskKey the subtask key identifying a subtask.
+     */
+    void addSubtask(SubtaskKey subtaskKey);
+
+    /** Return the working directory of the segment snapshot manager. */
+    Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
+
+    /** A key identifies a subtask. */
+    final class SubtaskKey {
+        final String taskName;
+        final int subtaskIndex;
+        final int parallelism;
+
+        final int hashCode;
+
+        SubtaskKey(TaskInfo taskInfo) {
+            this.taskName = taskInfo.getTaskName();
+            this.subtaskIndex = taskInfo.getIndexOfThisSubtask();
+            this.parallelism = taskInfo.getNumberOfParallelSubtasks();
+            int hash = taskName.hashCode();
+            hash = 31 * hash + subtaskIndex;
+            hash = 31 * hash + parallelism;

Review Comment:
   ```
   hash = 31 * hash + subtaskIndex;
   hash = 31 * hash + parallelism;
   ```
   
   Is it possible to overflow here? Theoretically, `taskName.hashCode()` can be 
any integer



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Manages meta information in TM side when taking file-merging checkpoints. 
It provides interfaces
+ * to manipulate files.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logic 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Init the file system if needed. This includes creating the managed 
directory for file-merging
+     *
+     * @param fileSystemInfo the packed file system information.
+     * @throws IOException thrown if anything goes wrong with file system.
+     */
+    void initFileSystem(FileMergingCheckpointUtils.SnapshotFileSystemInfo 
fileSystemInfo)
+            throws IOException;
+
+    /**
+     * Register a subtask and create the managed directory for shared states.
+     *
+     * @param subtaskKey the subtask key identifying a subtask.
+     */
+    void addSubtask(SubtaskKey subtaskKey);
+
+    /** Return the working directory of the segment snapshot manager. */
+    Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
+
+    /** A key identifies a subtask. */
+    final class SubtaskKey {
+        final String taskName;
+        final int subtaskIndex;
+        final int parallelism;
+
+        final int hashCode;
+
+        SubtaskKey(TaskInfo taskInfo) {
+            this.taskName = taskInfo.getTaskName();
+            this.subtaskIndex = taskInfo.getIndexOfThisSubtask();
+            this.parallelism = taskInfo.getNumberOfParallelSubtasks();
+            int hash = taskName.hashCode();
+            hash = 31 * hash + subtaskIndex;
+            hash = 31 * hash + parallelism;
+            this.hashCode = hash;
+        }
+
+        public static SubtaskKey of(TaskInfo taskInfo) {
+            return new SubtaskKey(taskInfo);
+        }

Review Comment:
   Move the static function to the bottom, and keep different constructors 
together.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingCheckpointUtils.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/** Utilities for file-merging checkpoints. */
+public class FileMergingCheckpointUtils {
+
+    // ------------------------------------------------------------------------
+    //  FileMergingSnapshotManager initialization related
+    // ------------------------------------------------------------------------
+
+    /** A class that packs the file system info for snapshot. */

Review Comment:
   => File system information for a checkpoint when merging files enabled



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** An abstraction of physical files in file-merging checkpoints. */
+public class PhysicalFile {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhysicalFile.class);
+
+    /** Functional interface to delete the physical file. */
+    @FunctionalInterface
+    public interface PhysicalFileDeleter {
+        /** Close the outputStream if presented, and delete the file. */
+        void perform(@Nullable FSDataOutputStream outputStream, Path filePath);
+    }
+
+    /** Output stream to the file. It can be null if the file is closed. */
+    @Nullable private FSDataOutputStream outputStream;
+
+    private final AtomicInteger logicalFileRefCount;
+
+    private final AtomicLong size;
+
+    @Nullable private final PhysicalFileDeleter deleter;
+
+    private final Path filePath;
+
+    private final CheckpointedStateScope scope;
+
+    /**
+     * If a physical file is closed, it means no more file segments will be 
written to the physical
+     * file, and it can be deleted once its logicalFileRefCount decreases to 0.
+     */
+    private boolean closed;
+
+    /**
+     * A file can be deleted if: 1. It is closed, and 2. No more {@link 
LogicalFile}s have reference
+     * on it.
+     */
+    private boolean deleted = false;
+
+    public PhysicalFile(
+            @Nullable FSDataOutputStream outputStream,
+            Path filePath,
+            PhysicalFileDeleter deleter,
+            CheckpointedStateScope scope) {
+        this.filePath = filePath;
+        this.outputStream = outputStream;
+        this.closed = outputStream == null;
+        this.deleter = deleter;
+        this.scope = scope;
+        this.size = new AtomicLong(0);
+        this.logicalFileRefCount = new AtomicInteger(0);
+    }
+
+    @Nullable
+    public FSDataOutputStream getOutputStream() {
+        return outputStream;
+    }
+
+    void incRefCount() {
+        int newValue = this.logicalFileRefCount.incrementAndGet();
+        LOG.trace(
+                "Increase the reference count of physical file: {} by 1. New 
value is: {}.",
+                this.filePath,
+                newValue);
+    }
+
+    void decRefCount() throws IOException {
+        Preconditions.checkArgument(logicalFileRefCount.get() > 0);
+        int newValue = this.logicalFileRefCount.decrementAndGet();
+        LOG.trace(
+                "Decrease the reference count of physical file: {} by 1. New 
value is: {}. ",
+                this.filePath,
+                newValue);
+        deleteIfNecessary(false);
+    }
+
+    public void deleteIfNecessary(boolean forceClose) throws IOException {
+        if (forceClose) {
+            this.close();
+        }
+
+        synchronized (this) {
+            if (!isOpen() && !deleted && this.logicalFileRefCount.get() <= 0) {
+                if (deleter != null) {
+                    deleter.perform(outputStream, filePath);
+                }
+                this.deleted = true;
+            }
+        }

Review Comment:
   when `forceClose == true`
   
   This part of the code went through twice? Is that intended?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingCheckpointUtils.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+
+/** Utilities for file-merging checkpoints. */
+public class FileMergingCheckpointUtils {
+
+    // ------------------------------------------------------------------------
+    //  FileMergingSnapshotManager initialization related
+    // ------------------------------------------------------------------------
+
+    /** A class that packs the file system info for snapshot. */
+    public static class SnapshotFileSystemInfo {
+        FileSystem fs;
+        Path checkpointBaseDirectory;
+        Path sharedStateDirectory;
+        Path taskOwnedStateDirectory;
+
+        public SnapshotFileSystemInfo(
+                FileSystem fs,
+                Path checkpointBaseDirectory,
+                @Nullable Path sharedStateDirectory,
+                Path taskOwnedStateDirectory) {
+            this.fs = fs;
+            this.checkpointBaseDirectory = checkpointBaseDirectory;
+            this.sharedStateDirectory = sharedStateDirectory;
+            this.taskOwnedStateDirectory = taskOwnedStateDirectory;
+        }
+    }
+
+    public static SnapshotFileSystemInfo packFileSystemInfo(
+            FileSystem fs,
+            Path checkpointBaseDirectory,
+            Path sharedStateDirectory,
+            Path taskOwnedStateDirectory) {
+        return new SnapshotFileSystemInfo(
+                fs, checkpointBaseDirectory, sharedStateDirectory, 
taskOwnedStateDirectory);
+    }
+
+    /**
+     * Create file-merging snapshot manager.
+     *
+     * <p>TODO (FLINK-32072): Create manager during the initialization of task 
manager services.
+     *
+     * @param id The id of this manager. Should be unique within job and be 
consistent after
+     *     failover.
+     * @param asyncIO use async io to perform file system operation
+     * @return the created manager.
+     */
+    public static FileMergingSnapshotManager createFileMergingSnapshotManager(
+            String id, boolean asyncIO) {
+        Executor ioExecutor = asyncIO ? Executors.newCachedThreadPool() : 
Runnable::run;

Review Comment:
   Who is responsible to manage this thread pool? (shut down e.t.c)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to