This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cd9a9f76cbc [FLINK-32075][FLIP-306][Checkpoint] Delete merged files on 
checkpoint abort or subsumption (#24181)
cd9a9f76cbc is described below

commit cd9a9f76cbc3a288abd49ba12c0edba3ce208ec7
Author: Zakelly <[email protected]>
AuthorDate: Thu Feb 29 18:06:54 2024 +0800

    [FLINK-32075][FLIP-306][Checkpoint] Delete merged files on checkpoint abort 
or subsumption (#24181)
---
 .../filemerging/FileMergingSnapshotManager.java    | 37 ++++++++-
 .../FileMergingSnapshotManagerBase.java            | 92 ++++++++++++++++++++++
 ...WithinCheckpointFileMergingSnapshotManager.java | 46 +++++++++++
 .../flink/runtime/state/TaskStateManager.java      |  1 +
 .../FsMergingCheckpointStorageAccess.java          |  6 +-
 .../FileMergingSnapshotManagerTest.java            | 70 ++++++++++++++++
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 60 +++++++++++++-
 7 files changed, 304 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index ff252f05e78..cc54854a7a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -34,8 +35,6 @@ import java.io.Closeable;
  * checkpoint files with merging checkpoint files enabled. It manages the 
files for ONE single task
  * in TM, including all subtasks of this single task that is running in this 
TM. There is one
  * FileMergingSnapshotManager for each job per task manager.
- *
- * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logical 
files.
  */
 public interface FileMergingSnapshotManager extends Closeable {
 
@@ -118,6 +117,34 @@ public interface FileMergingSnapshotManager extends 
Closeable {
      */
     Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
 
+    /**
+     * Notifies the manager that the checkpoint with the given {@code 
checkpointId} completed and
+     * was committed.
+     *
+     * @param subtaskKey the subtask key identifying the subtask.
+     * @param checkpointId The ID of the checkpoint that has been completed.
+     * @throws Exception thrown if anything goes wrong with the listener.
+     */
+    void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId) 
throws Exception;
+
+    /**
+     * This method is called as a notification once a distributed checkpoint 
has been aborted.
+     *
+     * @param subtaskKey the subtask key identifying the subtask.
+     * @param checkpointId The ID of the checkpoint that has been completed.
+     * @throws Exception thrown if anything goes wrong with the listener.
+     */
+    void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) 
throws Exception;
+
+    /**
+     * This method is called as a notification once a distributed checkpoint 
has been subsumed.
+     *
+     * @param subtaskKey the subtask key identifying the subtask.
+     * @param checkpointId The ID of the checkpoint that has been completed.
+     * @throws Exception thrown if anything goes wrong with the listener.
+     */
+    void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) 
throws Exception;
+
     /**
      * A key identifies a subtask. A subtask can be identified by the operator 
id, subtask index and
      * the parallelism. Note that this key should be consistent across job 
attempts.
@@ -151,6 +178,12 @@ public interface FileMergingSnapshotManager extends 
Closeable {
             this.hashCode = hash;
         }
 
+        public static SubtaskKey of(Environment environment) {
+            return new SubtaskKey(
+                    OperatorID.fromJobVertexID(environment.getJobVertexId()),
+                    environment.getTaskInfo());
+        }
+
         /**
          * Generate an unique managed directory name for one subtask.
          *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
index 19414292aec..9ad4ff4d664 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint.filemerging;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.EntropyInjector;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -34,10 +35,15 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
@@ -55,6 +61,12 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
     /** The executor for I/O operations in this manager. */
     protected final Executor ioExecutor;
 
+    /** Guard for uploadedStates. */
+    protected final Object lock = new Object();
+
+    @GuardedBy("lock")
+    protected TreeMap<Long, Set<LogicalFile>> uploadedStates = new TreeMap<>();
+
     /** The {@link FileSystem} that this manager works on. */
     protected FileSystem fs;
 
@@ -245,6 +257,13 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
                                             physicalFile, startPos, stateSize, 
subtaskKey);
                             logicalFile.advanceLastCheckpointId(checkpointId);
 
+                            // track the logical file
+                            synchronized (lock) {
+                                uploadedStates
+                                        .computeIfAbsent(checkpointId, key -> 
new HashSet<>())
+                                        .add(logicalFile);
+                            }
+
                             // deal with physicalFile file
                             physicalFile.incSize(stateSize);
                             returnPhysicalFileForNextReuse(subtaskKey, 
checkpointId, physicalFile);
@@ -289,6 +308,13 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
         return new Path(dirPath, fileName);
     }
 
+    @VisibleForTesting
+    boolean isResponsibleForFile(Path filePath) {
+        Path parent = filePath.getParent();
+        return parent.equals(managedExclusiveStateDir)
+                || managedSharedStateDir.containsValue(parent);
+    }
+
     /**
      * Delete a physical file by given file path. Use the io executor to do 
the deletion.
      *
@@ -345,6 +371,72 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
     protected abstract void returnPhysicalFileForNextReuse(
             SubtaskKey subtaskKey, long checkpointId, PhysicalFile 
physicalFile) throws IOException;
 
+    /**
+     * The callback which will be triggered when all subtasks discarded 
(aborted or subsumed).
+     *
+     * @param checkpointId the discarded checkpoint id.
+     * @throws IOException if anything goes wrong with file system.
+     */
+    protected abstract void discardCheckpoint(long checkpointId) throws 
IOException;
+
+    // ------------------------------------------------------------------------
+    //  Checkpoint Listener
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void notifyCheckpointComplete(SubtaskKey subtaskKey, long 
checkpointId)
+            throws Exception {
+        // does nothing
+    }
+
+    @Override
+    public void notifyCheckpointAborted(SubtaskKey subtaskKey, long 
checkpointId) throws Exception {
+        synchronized (lock) {
+            Set<LogicalFile> logicalFilesForCurrentCp = 
uploadedStates.get(checkpointId);
+            if (logicalFilesForCurrentCp == null) {
+                return;
+            }
+            if (discardLogicalFiles(subtaskKey, checkpointId, 
logicalFilesForCurrentCp)) {
+                uploadedStates.remove(checkpointId);
+            }
+        }
+    }
+
+    @Override
+    public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long 
checkpointId)
+            throws Exception {
+        synchronized (lock) {
+            Iterator<Map.Entry<Long, Set<LogicalFile>>> uploadedStatesIterator 
=
+                    uploadedStates.headMap(checkpointId, 
true).entrySet().iterator();
+            while (uploadedStatesIterator.hasNext()) {
+                Map.Entry<Long, Set<LogicalFile>> entry = 
uploadedStatesIterator.next();
+                if (discardLogicalFiles(subtaskKey, entry.getKey(), 
entry.getValue())) {
+                    uploadedStatesIterator.remove();
+                }
+            }
+        }
+    }
+
+    private boolean discardLogicalFiles(
+            SubtaskKey subtaskKey, long checkpointId, Set<LogicalFile> 
logicalFiles)
+            throws Exception {
+        Iterator<LogicalFile> logicalFileIterator = logicalFiles.iterator();
+        while (logicalFileIterator.hasNext()) {
+            LogicalFile logicalFile = logicalFileIterator.next();
+            if (logicalFile.getSubtaskKey().equals(subtaskKey)
+                    && logicalFile.getLastUsedCheckpointID() <= checkpointId) {
+                logicalFile.discardWithCheckpointId(checkpointId);
+                logicalFileIterator.remove();
+            }
+        }
+
+        if (logicalFiles.isEmpty()) {
+            discardCheckpoint(checkpointId);
+            return true;
+        }
+        return false;
+    }
+
     // ------------------------------------------------------------------------
     //  file system
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
index 3a02427547f..c8b3573a13f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
@@ -50,6 +50,47 @@ public class WithinCheckpointFileMergingSnapshotManager 
extends FileMergingSnaps
         writablePhysicalFilePool = new HashMap<>();
     }
 
+    // ------------------------------------------------------------------------
+    //  CheckpointListener
+    // ------------------------------------------------------------------------
+
+    @Override
+    public void notifyCheckpointComplete(SubtaskKey subtaskKey, long 
checkpointId)
+            throws Exception {
+        super.notifyCheckpointComplete(subtaskKey, checkpointId);
+        removeAndCloseFiles(subtaskKey, checkpointId);
+    }
+
+    @Override
+    public void notifyCheckpointAborted(SubtaskKey subtaskKey, long 
checkpointId) throws Exception {
+        super.notifyCheckpointAborted(subtaskKey, checkpointId);
+        removeAndCloseFiles(subtaskKey, checkpointId);
+    }
+
+    /**
+     * Remove files that belongs to specific subtask and checkpoint from the 
reuse pool. And close
+     * these files. TODO: Refactor this in FLINK-32076.
+     */
+    private void removeAndCloseFiles(SubtaskKey subtaskKey, long checkpointId) 
throws Exception {
+        Tuple3<Long, SubtaskKey, CheckpointedStateScope> fileKey =
+                Tuple3.of(checkpointId, subtaskKey, 
CheckpointedStateScope.SHARED);
+        PhysicalFile file;
+        synchronized (writablePhysicalFilePool) {
+            file = writablePhysicalFilePool.remove(fileKey);
+        }
+        if (file != null) {
+            file.close();
+        }
+
+        fileKey = Tuple3.of(checkpointId, DUMMY_SUBTASK_KEY, 
CheckpointedStateScope.EXCLUSIVE);
+        synchronized (writablePhysicalFilePool) {
+            file = writablePhysicalFilePool.remove(fileKey);
+        }
+        if (file != null) {
+            file.close();
+        }
+    }
+
     @Override
     @Nonnull
     protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
@@ -98,4 +139,9 @@ public class WithinCheckpointFileMergingSnapshotManager 
extends FileMergingSnaps
             physicalFile.close();
         }
     }
+
+    @Override
+    protected void discardCheckpoint(long checkpointId) throws IOException {
+        // TODO: Discard the whole file pool for checkpoint id (When there is 
one after FLINK-32076)
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index b3c91791228..3fdb05cebde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -116,5 +116,6 @@ public interface TaskStateManager extends 
CheckpointListener, AutoCloseable {
     StateChangelogStorageView<?> getStateChangelogStorageView(
             Configuration configuration, ChangelogStateHandle 
changelogStateHandle);
 
+    @Nullable
     FileMergingSnapshotManager getFileMergingSnapshotManager();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
index ccfe5b3bbd3..a2f7560d653 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 
@@ -60,10 +59,7 @@ public class FsMergingCheckpointStorageAccess extends 
FsCheckpointStorageAccess
                 fileSizeThreshold,
                 writeBufferSize);
         this.fileMergingSnapshotManager = fileMergingSnapshotManager;
-        this.subtaskKey =
-                new SubtaskKey(
-                        
OperatorID.fromJobVertexID(environment.getJobVertexId()),
-                        environment.getTaskInfo());
+        this.subtaskKey = SubtaskKey.of(environment);
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
index a6e1fbbbc87..d947a884dbe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
@@ -358,6 +358,38 @@ public class FileMergingSnapshotManagerTest {
         }
     }
 
+    @Test
+    public void testCheckpointNotification() throws Exception {
+        try (FileMergingSnapshotManager fmsm = 
createFileMergingSnapshotManager(checkpointBaseDir);
+                CloseableRegistry closeableRegistry = new CloseableRegistry()) 
{
+            FileMergingCheckpointStateOutputStream cp1Stream =
+                    writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
+            SegmentFileStateHandle cp1StateHandle = 
cp1Stream.closeAndGetHandle();
+            fmsm.notifyCheckpointComplete(subtaskKey1, 1);
+            assertFileInManagedDir(fmsm, cp1StateHandle);
+
+            // complete checkpoint-2
+            FileMergingCheckpointStateOutputStream cp2Stream =
+                    writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
+            SegmentFileStateHandle cp2StateHandle = 
cp2Stream.closeAndGetHandle();
+            fmsm.notifyCheckpointComplete(subtaskKey1, 2);
+            assertFileInManagedDir(fmsm, cp2StateHandle);
+
+            // subsume checkpoint-1
+            assertThat(fileExists(cp1StateHandle)).isTrue();
+            fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
+            assertThat(fileExists(cp1StateHandle)).isFalse();
+
+            // abort checkpoint-3
+            FileMergingCheckpointStateOutputStream cp3Stream =
+                    writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
+            SegmentFileStateHandle cp3StateHandle = 
cp3Stream.closeAndGetHandle();
+            assertFileInManagedDir(fmsm, cp3StateHandle);
+            fmsm.notifyCheckpointAborted(subtaskKey1, 3);
+            assertThat(fileExists(cp3StateHandle)).isFalse();
+        }
+    }
+
     private FileMergingSnapshotManager createFileMergingSnapshotManager(Path 
checkpointBaseDir)
             throws IOException {
         FileSystem fs = LocalFileSystem.getSharedInstance();
@@ -384,4 +416,42 @@ public class FileMergingSnapshotManagerTest {
         assertThat(fmsm).isNotNull();
         return fmsm;
     }
+
+    private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
+            long checkpointId, FileMergingSnapshotManager fmsm, 
CloseableRegistry closeableRegistry)
+            throws IOException {
+        return writeCheckpointAndGetStream(checkpointId, fmsm, 
closeableRegistry, 32);
+    }
+
+    private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
+            long checkpointId,
+            FileMergingSnapshotManager fmsm,
+            CloseableRegistry closeableRegistry,
+            int numBytes)
+            throws IOException {
+        FileMergingCheckpointStateOutputStream stream =
+                fmsm.createCheckpointStateOutputStream(
+                        subtaskKey1, checkpointId, 
CheckpointedStateScope.EXCLUSIVE);
+        closeableRegistry.registerCloseable(stream);
+        for (int i = 0; i < numBytes; i++) {
+            stream.write(i);
+        }
+        return stream;
+    }
+
+    private void assertFileInManagedDir(
+            FileMergingSnapshotManager fmsm, SegmentFileStateHandle 
stateHandle) {
+        assertThat(fmsm instanceof FileMergingSnapshotManagerBase).isTrue();
+        assertThat(stateHandle).isNotNull();
+        Path filePath = stateHandle.getFilePath();
+        assertThat(filePath).isNotNull();
+        assertThat(((FileMergingSnapshotManagerBase) 
fmsm).isResponsibleForFile(filePath)).isTrue();
+    }
+
+    private boolean fileExists(SegmentFileStateHandle stateHandle) throws 
IOException {
+        assertThat(stateHandle).isNotNull();
+        Path filePath = stateHandle.getFilePath();
+        assertThat(filePath).isNotNull();
+        return filePath.getFileSystem().exists(filePath);
+    }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 81616343085..d9603c8993d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -53,6 +54,7 @@ import org.apache.flink.util.function.BiFunctionWithException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
 import java.io.IOException;
@@ -127,6 +129,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
      */
     private long alignmentCheckpointId;
 
+    @Nullable private final FileMergingSnapshotManager 
fileMergingSnapshotManager;
+
     SubtaskCheckpointCoordinatorImpl(
             CheckpointStorage checkpointStorage,
             CheckpointStorageWorkerView checkpointStorageView,
@@ -157,7 +161,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                                 taskName, checkpointStorage, env, 
maxSubtasksPerChannelStateFile)
                         : ChannelStateWriter.NO_OP,
                 enableCheckpointAfterTasksFinished,
-                registerTimer);
+                registerTimer,
+                env.getTaskStateManager().getFileMergingSnapshotManager());
     }
 
     @VisibleForTesting
@@ -175,6 +180,36 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
             ChannelStateWriter channelStateWriter,
             boolean enableCheckpointAfterTasksFinished,
             DelayableTimer registerTimer) {
+        this(
+                checkpointStorage,
+                taskName,
+                actionExecutor,
+                asyncOperationsThreadPool,
+                env,
+                asyncExceptionHandler,
+                prepareInputSnapshot,
+                maxRecordAbortedCheckpoints,
+                channelStateWriter,
+                enableCheckpointAfterTasksFinished,
+                registerTimer,
+                null);
+    }
+
+    SubtaskCheckpointCoordinatorImpl(
+            CheckpointStorageWorkerView checkpointStorage,
+            String taskName,
+            StreamTaskActionExecutor actionExecutor,
+            ExecutorService asyncOperationsThreadPool,
+            Environment env,
+            AsyncExceptionHandler asyncExceptionHandler,
+            BiFunctionWithException<
+                            ChannelStateWriter, Long, CompletableFuture<Void>, 
CheckpointException>
+                    prepareInputSnapshot,
+            int maxRecordAbortedCheckpoints,
+            ChannelStateWriter channelStateWriter,
+            boolean enableCheckpointAfterTasksFinished,
+            DelayableTimer registerTimer,
+            FileMergingSnapshotManager fileMergingSnapshotManager) {
         this.checkpointStorage =
                 new 
CachingCheckpointStorageWorkerView(checkNotNull(checkpointStorage));
         this.taskName = checkNotNull(taskName);
@@ -194,6 +229,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         this.enableCheckpointAfterTasksFinished = 
enableCheckpointAfterTasksFinished;
         this.registerTimer = registerTimer;
         this.clock = SystemClock.getInstance();
+        this.fileMergingSnapshotManager = fileMergingSnapshotManager;
     }
 
     private static ChannelStateWriter openChannelStateWriter(
@@ -484,6 +520,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                     case COMPLETE:
                         
env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
                 }
+                notifyFileMergingSnapshotManagerCheckpoint(checkpointId, 
notifyCheckpointOperation);
             } catch (Exception e) {
                 previousException = ExceptionUtils.firstOrSuppressed(e, 
previousException);
             }
@@ -492,6 +529,27 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
         ExceptionUtils.tryRethrowException(previousException);
     }
 
+    private void notifyFileMergingSnapshotManagerCheckpoint(
+            long checkpointId, Task.NotifyCheckpointOperation 
notifyCheckpointOperation)
+            throws Exception {
+        if (fileMergingSnapshotManager != null) {
+            switch (notifyCheckpointOperation) {
+                case ABORT:
+                    fileMergingSnapshotManager.notifyCheckpointAborted(
+                            FileMergingSnapshotManager.SubtaskKey.of(env), 
checkpointId);
+                    break;
+                case COMPLETE:
+                    fileMergingSnapshotManager.notifyCheckpointComplete(
+                            FileMergingSnapshotManager.SubtaskKey.of(env), 
checkpointId);
+                    break;
+                case SUBSUME:
+                    fileMergingSnapshotManager.notifyCheckpointSubsumed(
+                            FileMergingSnapshotManager.SubtaskKey.of(env), 
checkpointId);
+                    break;
+            }
+        }
+    }
+
     @Override
     public void initInputsCheckpoint(long id, CheckpointOptions 
checkpointOptions)
             throws CheckpointException {

Reply via email to