Zakelly commented on code in PR #24933:
URL: https://github.com/apache/flink/pull/24933#discussion_r1639554722
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -153,6 +152,22 @@ public abstract class FileMergingSnapshotManagerBase
implements FileMergingSnaps
/** The current space statistic, updated on file creation/deletion. */
protected SpaceStat spaceStat;
+ /**
+ * This map records shared state dirs which need be clean up when the
FileMergingSnapshotManager
+ * close. The key is SubtaskKey the shared state dir belong to, and the
value is the count of
+ * the ongoing checkpoint which reference the dir. If a checkpoint which
reference the shared
+ * dir complete, the corresponding shared dir will be removed from this
map, because the
+ * ownership is transferred to JobManager.
+ */
+ private final Map<SubtaskKey, Long> sharedDirToCleanRef = new
ConcurrentHashMap<>();
Review Comment:
I'd suggest a new class that bundle `DirectoryStreamStateHandle` and its
reference count.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java:
##########
@@ -19,23 +19,31 @@
package org.apache.flink.runtime.state.filemerging;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StreamStateHandle;
import javax.annotation.Nonnull;
-import java.nio.file.Path;
import java.util.Optional;
/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */
-public class DirectoryStreamStateHandle extends DirectoryStateHandle
implements StreamStateHandle {
+public class DirectoryStreamStateHandle implements StreamStateHandle {
private static final long serialVersionUID = 1L;
- public DirectoryStreamStateHandle(@Nonnull Path directory, long
directorySize) {
- super(directory, directorySize);
+ /** The path that describes the directory, as a string, to be
serializable. */
+ private final String directoryString;
+
+ /** Transient path cache, to avoid re-parsing the string. */
+ private transient Path directory;
Review Comment:
Seems unnecessary? I mean we could just hold the `Path` with `final`
keyword, and no need for the `String directoryString`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java:
##########
@@ -80,6 +81,12 @@ public FsMergingCheckpointStorageLocation(
reference,
fileStateSizeThreshold,
writeBufferSize);
+
+ // Record file-merging managed dir reference when
FsMergingCheckpointStorageLocation create.
+ if (fileMergingSnapshotManager instanceof
FileMergingSnapshotManagerBase) {
+ ((FileMergingSnapshotManagerBase) fileMergingSnapshotManager)
+ .recordManagedDirReference(subtaskKey, checkpointId);
Review Comment:
How about making this as an interface of `FileMergingSnapshotManager`? And
how about a new name `retainManagedDirectory`
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -496,14 +497,34 @@ protected void discardCheckpoint(long checkpointId)
throws IOException {
// Checkpoint Listener
// ------------------------------------------------------------------------
+ /**
+ * {@link FsMergingCheckpointStorageLocation} use this method let the file
merging manager know
+ * an ongoing checkpoint may reference the managed dirs.
+ */
+ public void notifyCheckpointStart(SubtaskKey subtaskKey, long
checkpointId) {
+ managedSharedStateDirHandles
+ .getOrDefault(subtaskKey, NON_HANDLE_INSTANCE)
+ .increaseRefCountWhenCheckpointStart(checkpointId);
Review Comment:
use `computeIfPresent` ?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java:
##########
@@ -19,23 +19,26 @@
package org.apache.flink.runtime.state.filemerging;
import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.DirectoryStateHandle;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.SharedStateRegistryKey;
import org.apache.flink.runtime.state.StreamStateHandle;
import javax.annotation.Nonnull;
-import java.nio.file.Path;
import java.util.Optional;
/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */
Review Comment:
Maybe you should change this javadoc
##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java:
##########
@@ -85,6 +92,18 @@ public String toString() {
* @return DirectoryStreamStateHandle with zero size.
*/
public static DirectoryStreamStateHandle forPathWithZeroSize(@Nonnull Path
directory) {
Review Comment:
How about naming it `of`?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -496,14 +497,34 @@ protected void discardCheckpoint(long checkpointId)
throws IOException {
// Checkpoint Listener
// ------------------------------------------------------------------------
+ /**
+ * {@link FsMergingCheckpointStorageLocation} use this method let the file
merging manager know
+ * an ongoing checkpoint may reference the managed dirs.
+ */
+ public void notifyCheckpointStart(SubtaskKey subtaskKey, long
checkpointId) {
Review Comment:
I'd suggest this to be an interface of `FileMergingSnapshotManager`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]