This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1112582fd13 [FLINK-32072][checkpoint] Create and wire 
FileMergingSnapshotManager with TaskManagerServices (#22890)
1112582fd13 is described below

commit 1112582fd136df47c6d356d6f6ad3946ad1e56d5
Author: Yanfei Lei <[email protected]>
AuthorDate: Tue Sep 19 17:31:04 2023 +0800

    [FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with 
TaskManagerServices (#22890)
---
 .../api/runtime/SavepointTaskStateManager.java     |   7 +
 .../filemerging/FileMergingSnapshotManager.java    |   6 +-
 .../FileMergingSnapshotManagerBuilder.java         |   2 -
 .../runtime/state/CheckpointStorageWorkerView.java |  13 ++
 .../state/TaskExecutorFileMergingManager.java      | 143 +++++++++++++++++++++
 .../flink/runtime/state/TaskStateManager.java      |   3 +
 .../flink/runtime/state/TaskStateManagerImpl.java  |  14 ++
 .../filesystem/FsCheckpointStorageAccess.java      |  24 +++-
 .../FsMergingCheckpointStorageAccess.java          |  73 +++++++++++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  14 ++
 .../runtime/taskexecutor/TaskManagerServices.java  |  13 ++
 .../state/TaskExecutorFileMergingManagerTest.java  |  87 +++++++++++++
 .../runtime/state/TaskStateManagerImplTest.java    |   4 +
 .../flink/runtime/state/TestTaskStateManager.java  |   7 +
 .../taskexecutor/TaskManagerServicesBuilder.java   |  10 ++
 .../runtime/util/JvmExitOnFatalErrorTest.java      |   1 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  29 ++++-
 .../StateInitializationContextImplTest.java        |   1 +
 .../StreamTaskStateInitializerImplTest.java        |   1 +
 .../runtime/tasks/LocalStateForwardingTest.java    |   1 +
 .../streaming/runtime/tasks/StreamTaskTest.java    |   2 +
 21 files changed, 446 insertions(+), 9 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
index cb6193e7724..a1d649d4d8c 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
 import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -114,6 +115,12 @@ final class SavepointTaskStateManager implements 
TaskStateManager {
         return null;
     }
 
+    @Nullable
+    @Override
+    public FileMergingSnapshotManager getFileMergingSnapshotManager() {
+        return null;
+    }
+
     @Override
     public void notifyCheckpointComplete(long checkpointId) {
         throw new UnsupportedOperationException(MSG);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index 4954d8548f3..968e86f91c5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint.filemerging;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -31,7 +32,7 @@ import java.io.Closeable;
  * FileMergingSnapshotManager provides an interface to manage files and meta 
information for
  * checkpoint files with merging checkpoint files enabled. It manages the 
files for ONE single task
  * in TM, including all subtasks of this single task that running in this TM. 
There is one
- * FileMergingSnapshotManager for each task per task manager.
+ * FileMergingSnapshotManager for each job per task manager.
  *
  * <p>TODO (FLINK-32073): create output stream.
  *
@@ -125,7 +126,8 @@ public interface FileMergingSnapshotManager extends 
Closeable {
                     taskInfo.getNumberOfParallelSubtasks());
         }
 
-        SubtaskKey(String operatorIDString, int subtaskIndex, int parallelism) 
{
+        @VisibleForTesting
+        public SubtaskKey(String operatorIDString, int subtaskIndex, int 
parallelism) {
             this.operatorIDString = operatorIDString;
             this.subtaskIndex = subtaskIndex;
             this.parallelism = parallelism;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
index f07669b094b..faa9023abca 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
@@ -50,8 +50,6 @@ public class FileMergingSnapshotManagerBuilder {
     /**
      * Create file-merging snapshot manager based on configuration.
      *
-     * <p>TODO (FLINK-32072): Create manager during the initialization of task 
manager services.
-     *
      * <p>TODO (FLINK-32074): Support another type of 
FileMergingSnapshotManager that merges files
      * across different checkpoints.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
index 3e7e659e0e9..8817d71fce5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import org.apache.flink.runtime.execution.Environment;
 
 import java.io.IOException;
 
@@ -75,4 +77,15 @@ public interface CheckpointStorageWorkerView {
      * @return A toolset for additional operations for state owned by tasks.
      */
     CheckpointStateToolset createTaskOwnedCheckpointStateToolset();
+
+    /**
+     * Return {@link 
org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageAccess} if
+     * file merging is enabled Otherwise, return itself. File merging is 
supported by subclasses of
+     * {@link 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess}.
+     */
+    default CheckpointStorageWorkerView toFileMergingStorage(
+            FileMergingSnapshotManager mergingSnapshotManager, Environment 
environment)
+            throws IOException {
+        return this;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
new file mode 100644
index 00000000000..42d8a4a5469
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
+import org.apache.flink.util.ShutdownHookUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * There is one {@link FileMergingSnapshotManager} for each job per task 
manager. This class holds
+ * all {@link FileMergingSnapshotManager} objects for a task executor 
(manager).
+ */
+public class TaskExecutorFileMergingManager {
+    /** Logger for this class. */
+    private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorFileMergingManager.class);
+
+    /**
+     * This map holds all FileMergingSnapshotManager for tasks running on this 
task
+     * manager(executor).
+     */
+    @GuardedBy("lock")
+    private final Map<JobID, FileMergingSnapshotManager> 
fileMergingSnapshotManagerByJobId;
+
+    @GuardedBy("lock")
+    private boolean closed;
+
+    private final Object lock = new Object();
+
+    /** Shutdown hook for this manager. */
+    private final Thread shutdownHook;
+
+    public TaskExecutorFileMergingManager() {
+        this.fileMergingSnapshotManagerByJobId = new HashMap<>();
+        this.closed = false;
+        this.shutdownHook =
+                ShutdownHookUtil.addShutdownHook(this::shutdown, 
getClass().getSimpleName(), LOG);
+    }
+
+    /**
+     * Initialize file merging snapshot manager for each job according 
configurations when {@link
+     * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
+     */
+    public @Nullable FileMergingSnapshotManager 
fileMergingSnapshotManagerForJob(
+            @Nonnull JobID jobId) {
+        synchronized (lock) {
+            if (closed) {
+                throw new IllegalStateException(
+                        "TaskExecutorFileMergingManager is already closed and 
cannot "
+                                + "register a new 
FileMergingSnapshotManager.");
+            }
+            FileMergingSnapshotManager fileMergingSnapshotManager =
+                    fileMergingSnapshotManagerByJobId.get(jobId);
+            if (fileMergingSnapshotManager == null) {
+                // TODO FLINK-32440: choose different 
FileMergingSnapshotManager by configuration
+                fileMergingSnapshotManager =
+                        new 
FileMergingSnapshotManagerBuilder(jobId.toString()).build();
+                fileMergingSnapshotManagerByJobId.put(jobId, 
fileMergingSnapshotManager);
+                LOG.info("Registered new file merging snapshot manager for job 
{}.", jobId);
+            }
+            return fileMergingSnapshotManager;
+        }
+    }
+
+    /**
+     * Release file merging snapshot manager of one job when {@link
+     * org.apache.flink.runtime.taskexecutor.TaskExecutor#releaseJobResources} 
called.
+     */
+    public void releaseMergingSnapshotManagerForJob(@Nonnull JobID jobId) {
+        LOG.debug("Releasing file merging snapshot manager under job id {}.", 
jobId);
+        FileMergingSnapshotManager toRelease = null;
+        synchronized (lock) {
+            if (closed) {
+                return;
+            }
+            toRelease = fileMergingSnapshotManagerByJobId.remove(jobId);
+        }
+
+        if (toRelease != null) {
+            try {
+                toRelease.close();
+            } catch (Exception e) {
+                LOG.warn(
+                        "Exception while closing 
TaskExecutorFileMergingManager for job {}.",
+                        jobId,
+                        e);
+            }
+        }
+    }
+
+    public void shutdown() {
+        HashMap<JobID, FileMergingSnapshotManager> toRelease =
+                new HashMap<>(fileMergingSnapshotManagerByJobId);
+        synchronized (lock) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+            fileMergingSnapshotManagerByJobId.clear();
+        }
+
+        LOG.info("Shutting down TaskExecutorFileMergingManager.");
+
+        ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
+
+        for (Map.Entry<JobID, FileMergingSnapshotManager> entry : 
toRelease.entrySet()) {
+            if (entry.getValue() != null) {
+                try {
+                    entry.getValue().close();
+                } catch (Exception e) {
+                    LOG.warn(
+                            "Exception while closing 
TaskExecutorFileMergingManager for job {}.",
+                            entry.getKey(),
+                            e);
+                }
+            }
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index 7c979b6b0d8..44f0cdc1e5e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
 import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -111,4 +112,6 @@ public interface TaskStateManager extends 
CheckpointListener, AutoCloseable {
     @Nullable
     StateChangelogStorageView<?> getStateChangelogStorageView(
             Configuration configuration, ChangelogStateHandle 
changelogStateHandle);
+
+    FileMergingSnapshotManager getFileMergingSnapshotManager();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index a82b55b155b..a2ce6c8f7a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
@@ -80,6 +81,9 @@ public class TaskStateManagerImpl implements TaskStateManager 
{
     /** The changelog storage where the manager reads and writes the changelog 
*/
     @Nullable private final StateChangelogStorage<?> stateChangelogStorage;
 
+    /** The file merging snapshot */
+    @Nullable private final FileMergingSnapshotManager 
fileMergingSnapshotManager;
+
     private final TaskExecutorStateChangelogStoragesManager 
changelogStoragesManager;
 
     /** The checkpoint responder through which this manager can report to the 
job manager. */
@@ -91,6 +95,7 @@ public class TaskStateManagerImpl implements TaskStateManager 
{
             @Nonnull JobID jobId,
             @Nonnull ExecutionAttemptID executionAttemptID,
             @Nonnull TaskLocalStateStore localStateStore,
+            @Nullable FileMergingSnapshotManager fileMergingSnapshotManager,
             @Nullable StateChangelogStorage<?> stateChangelogStorage,
             @Nonnull TaskExecutorStateChangelogStoragesManager 
changelogStoragesManager,
             @Nullable JobManagerTaskRestore jobManagerTaskRestore,
@@ -99,6 +104,7 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
                 jobId,
                 executionAttemptID,
                 localStateStore,
+                fileMergingSnapshotManager,
                 stateChangelogStorage,
                 changelogStoragesManager,
                 jobManagerTaskRestore,
@@ -113,6 +119,7 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
             @Nonnull JobID jobId,
             @Nonnull ExecutionAttemptID executionAttemptID,
             @Nonnull TaskLocalStateStore localStateStore,
+            @Nullable FileMergingSnapshotManager fileMergingSnapshotManager,
             @Nullable StateChangelogStorage<?> stateChangelogStorage,
             @Nonnull TaskExecutorStateChangelogStoragesManager 
changelogStoragesManager,
             @Nullable JobManagerTaskRestore jobManagerTaskRestore,
@@ -120,6 +127,7 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
             @Nonnull SequentialChannelStateReaderImpl 
sequentialChannelStateReader) {
         this.jobId = jobId;
         this.localStateStore = localStateStore;
+        this.fileMergingSnapshotManager = fileMergingSnapshotManager;
         this.stateChangelogStorage = stateChangelogStorage;
         this.changelogStoragesManager = changelogStoragesManager;
         this.jobManagerTaskRestore = jobManagerTaskRestore;
@@ -270,6 +278,12 @@ public class TaskStateManagerImpl implements 
TaskStateManager {
         return storageView;
     }
 
+    @Nullable
+    @Override
+    public FileMergingSnapshotManager getFileMergingSnapshotManager() {
+        return fileMergingSnapshotManager;
+    }
+
     /** Tracking when local state can be confirmed and disposed. */
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index 11807c3de85..92ba08e3dc5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.DuplicatingFileSystem;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStateToolset;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -41,13 +43,13 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /** An implementation of durable checkpoint storage to file systems. */
 public class FsCheckpointStorageAccess extends 
AbstractFsCheckpointStorageAccess {
 
-    private final FileSystem fileSystem;
+    protected final FileSystem fileSystem;
 
-    private final Path checkpointsDirectory;
+    protected final Path checkpointsDirectory;
 
-    private final Path sharedStateDirectory;
+    protected final Path sharedStateDirectory;
 
-    private final Path taskOwnedStateDirectory;
+    protected final Path taskOwnedStateDirectory;
 
     private final int fileSizeThreshold;
 
@@ -201,4 +203,18 @@ public class FsCheckpointStorageAccess extends 
AbstractFsCheckpointStorageAccess
         return new FsCheckpointStorageLocation(
                 fs, location, location, location, reference, 
fileSizeThreshold, writeBufferSize);
     }
+
+    public FsMergingCheckpointStorageAccess toFileMergingStorage(
+            FileMergingSnapshotManager mergingSnapshotManager, Environment 
environment)
+            throws IOException {
+        return new FsMergingCheckpointStorageAccess(
+                fileSystem,
+                checkpointsDirectory.getParent(),
+                getDefaultSavepointDirectory(),
+                environment.getJobID(),
+                fileSizeThreshold,
+                writeBufferSize,
+                mergingSnapshotManager,
+                environment);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
new file mode 100644
index 00000000000..a471243ab7a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** An implementation of file merging checkpoint storage to file systems. */
+public class FsMergingCheckpointStorageAccess extends 
FsCheckpointStorageAccess {
+
+    /** FileMergingSnapshotManager manages files and meta information for 
checkpoints. */
+    private final FileMergingSnapshotManager fileMergingSnapshotManager;
+
+    /** The identity of subtask. */
+    private final FileMergingSnapshotManager.SubtaskKey subtaskKey;
+
+    public FsMergingCheckpointStorageAccess(
+            FileSystem fs,
+            Path checkpointBaseDirectory,
+            @Nullable Path defaultSavepointDirectory,
+            JobID jobId,
+            int fileSizeThreshold,
+            int writeBufferSize,
+            FileMergingSnapshotManager fileMergingSnapshotManager,
+            Environment environment)
+            throws IOException {
+        super(
+                fs,
+                checkpointBaseDirectory,
+                defaultSavepointDirectory,
+                jobId,
+                fileSizeThreshold,
+                writeBufferSize);
+        this.fileMergingSnapshotManager = fileMergingSnapshotManager;
+        this.subtaskKey =
+                new SubtaskKey(
+                        
OperatorID.fromJobVertexID(environment.getJobVertexId()),
+                        environment.getTaskInfo());
+    }
+
+    @Override
+    public void initializeBaseLocationsForCheckpoint() throws IOException {
+        super.initializeBaseLocationsForCheckpoint();
+        fileMergingSnapshotManager.initFileSystem(
+                fileSystem, checkpointsDirectory, sharedStateDirectory, 
taskOwnedStateDirectory);
+        fileMergingSnapshotManager.registerSubtaskForSharedStates(subtaskKey);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fc37a7f2f26..d42a5658598 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -98,6 +99,7 @@ import 
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import 
org.apache.flink.runtime.state.TaskExecutorChannelStateExecutorFactoryManager;
+import org.apache.flink.runtime.state.TaskExecutorFileMergingManager;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import 
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import org.apache.flink.runtime.state.TaskLocalStateStore;
@@ -217,6 +219,12 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
     /** The state manager for this task, providing state managers per slot. */
     private final TaskExecutorLocalStateStoresManager localStateStoresManager;
 
+    /**
+     * The file merging manager for this task, providing file merging snapshot 
manager per job, see
+     * {@link FileMergingSnapshotManager} for details.
+     */
+    private final TaskExecutorFileMergingManager fileMergingManager;
+
     /** The changelog manager for this task, providing changelog storage per 
job. */
     private final TaskExecutorStateChangelogStoragesManager 
changelogStoragesManager;
 
@@ -329,6 +337,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         this.unresolvedTaskManagerLocation =
                 taskExecutorServices.getUnresolvedTaskManagerLocation();
         this.localStateStoresManager = 
taskExecutorServices.getTaskManagerStateStore();
+        this.fileMergingManager = 
taskExecutorServices.getTaskManagerFileMergingManager();
         this.changelogStoragesManager = 
taskExecutorServices.getTaskManagerChangelogManager();
         this.channelStateExecutorFactoryManager =
                 taskExecutorServices.getTaskManagerChannelStateManager();
@@ -730,6 +739,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                             taskManagerConfiguration.getConfiguration(),
                             jobInformation.getJobConfiguration());
 
+            final FileMergingSnapshotManager fileMergingSnapshotManager =
+                    fileMergingManager.fileMergingSnapshotManagerForJob(jobId);
+
             // TODO: Pass config value from user program and do overriding 
here.
             final StateChangelogStorage<?> changelogStorage;
             try {
@@ -750,6 +762,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                             jobId,
                             tdd.getExecutionAttemptId(),
                             localStateStore,
+                            fileMergingSnapshotManager,
                             changelogStorage,
                             changelogStoragesManager,
                             taskRestore,
@@ -1891,6 +1904,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
         currentSlotOfferPerJob.remove(jobId);
         channelStateExecutorFactoryManager.releaseResourcesForJob(jobId);
         shuffleDescriptorsCache.clearCacheForJob(jobId);
+        fileMergingManager.releaseMergingSnapshotManagerForJob(jobId);
     }
 
     private void scheduleResultPartitionCleanup(JobID jobId) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ff14b12da15..da8a0636189 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
 import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
 import 
org.apache.flink.runtime.state.TaskExecutorChannelStateExecutorFactoryManager;
+import org.apache.flink.runtime.state.TaskExecutorFileMergingManager;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import 
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
@@ -80,6 +81,7 @@ public class TaskManagerServices {
     private final JobTable jobTable;
     private final JobLeaderService jobLeaderService;
     private final TaskExecutorLocalStateStoresManager taskManagerStateStore;
+    private final TaskExecutorFileMergingManager taskManagerFileMergingManager;
     private final TaskExecutorStateChangelogStoragesManager 
taskManagerChangelogManager;
     private final TaskExecutorChannelStateExecutorFactoryManager 
taskManagerChannelStateManager;
     private final TaskEventDispatcher taskEventDispatcher;
@@ -100,6 +102,7 @@ public class TaskManagerServices {
             JobTable jobTable,
             JobLeaderService jobLeaderService,
             TaskExecutorLocalStateStoresManager taskManagerStateStore,
+            TaskExecutorFileMergingManager taskManagerFileMergingManager,
             TaskExecutorStateChangelogStoragesManager 
taskManagerChangelogManager,
             TaskExecutorChannelStateExecutorFactoryManager 
taskManagerChannelStateManager,
             TaskEventDispatcher taskEventDispatcher,
@@ -120,6 +123,8 @@ public class TaskManagerServices {
         this.jobTable = Preconditions.checkNotNull(jobTable);
         this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
         this.taskManagerStateStore = 
Preconditions.checkNotNull(taskManagerStateStore);
+        this.taskManagerFileMergingManager =
+                Preconditions.checkNotNull(taskManagerFileMergingManager);
         this.taskManagerChangelogManager = 
Preconditions.checkNotNull(taskManagerChangelogManager);
         this.taskManagerChannelStateManager = taskManagerChannelStateManager;
         this.taskEventDispatcher = 
Preconditions.checkNotNull(taskEventDispatcher);
@@ -174,6 +179,10 @@ public class TaskManagerServices {
         return taskManagerStateStore;
     }
 
+    public TaskExecutorFileMergingManager getTaskManagerFileMergingManager() {
+        return taskManagerFileMergingManager;
+    }
+
     public TaskExecutorStateChangelogStoragesManager 
getTaskManagerChangelogManager() {
         return taskManagerChangelogManager;
     }
@@ -363,6 +372,9 @@ public class TaskManagerServices {
         final TaskExecutorChannelStateExecutorFactoryManager 
channelStateExecutorFactoryManager =
                 new TaskExecutorChannelStateExecutorFactoryManager();
 
+        final TaskExecutorFileMergingManager fileMergingManager =
+                new TaskExecutorFileMergingManager();
+
         final boolean failOnJvmMetaspaceOomError =
                 taskManagerServicesConfiguration
                         .getConfiguration()
@@ -407,6 +419,7 @@ public class TaskManagerServices {
                 jobTable,
                 jobLeaderService,
                 taskStateManager,
+                fileMergingManager,
                 changelogStoragesManager,
                 channelStateExecutorFactoryManager,
                 taskEventDispatcher,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java
new file mode 100644
index 00000000000..944fbe6e072
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskExecutorFileMergingManager}. */
+public class TaskExecutorFileMergingManagerTest {
+    @Test
+    public void testCheckpointScope(@TempDir java.nio.file.Path testBaseDir) 
throws IOException {
+        TaskExecutorFileMergingManager taskExecutorFileMergingManager =
+                new TaskExecutorFileMergingManager();
+        JobID job1 = new JobID(1234L, 4321L);
+        JobID job2 = new JobID(1234L, 5678L);
+        SubtaskKey key1 = new SubtaskKey("test-op1", 0, 128);
+        SubtaskKey key2 = new SubtaskKey("test-op2", 1, 128);
+        Path checkpointDir1 = new Path(testBaseDir.toString(), "job1");
+        Path checkpointDir2 = new Path(testBaseDir.toString(), "job2");
+        FileMergingSnapshotManager manager1 =
+                
taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(job1);
+        manager1.initFileSystem(
+                checkpointDir1.getFileSystem(),
+                checkpointDir1,
+                new Path(checkpointDir1, "shared"),
+                new Path(checkpointDir1, "taskowned"));
+        FileMergingSnapshotManager manager2 =
+                
taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(job1);
+        manager2.initFileSystem(
+                checkpointDir1.getFileSystem(),
+                checkpointDir1,
+                new Path(checkpointDir1, "shared"),
+                new Path(checkpointDir1, "taskowned"));
+        FileMergingSnapshotManager manager3 =
+                
taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(job2);
+        manager3.initFileSystem(
+                checkpointDir2.getFileSystem(),
+                checkpointDir2,
+                new Path(checkpointDir2, "shared"),
+                new Path(checkpointDir2, "taskowned"));
+
+        assertThat(manager1).isEqualTo(manager2);
+        assertThat(manager1).isNotEqualTo(manager3);
+
+        // tasks of same job should have same private dir.
+        assertThat(manager1.getManagedDir(key1, 
CheckpointedStateScope.EXCLUSIVE))
+                .isEqualTo(manager2.getManagedDir(key2, 
CheckpointedStateScope.EXCLUSIVE));
+        // tasks of different jobs should have different private dirs.
+        assertThat(manager1.getManagedDir(key1, 
CheckpointedStateScope.EXCLUSIVE))
+                .isNotEqualTo(manager3.getManagedDir(key2, 
CheckpointedStateScope.EXCLUSIVE));
+
+        manager1.registerSubtaskForSharedStates(key1);
+        manager1.registerSubtaskForSharedStates(key2);
+        manager3.registerSubtaskForSharedStates(key1);
+        manager3.registerSubtaskForSharedStates(key2);
+        // tasks of same job should have different shared dirs for different 
subtasks.
+        assertThat(manager1.getManagedDir(key1, CheckpointedStateScope.SHARED))
+                .isNotEqualTo(manager1.getManagedDir(key2, 
CheckpointedStateScope.SHARED));
+        // tasks with same SubtaskKey of different jobs should have different 
shared dirs.
+        assertThat(manager1.getManagedDir(key1, CheckpointedStateScope.SHARED))
+                .isNotEqualTo(manager3.getManagedDir(key1, 
CheckpointedStateScope.SHARED));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index e51ac6c3824..76f78e26fda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -271,6 +271,7 @@ class TaskStateManagerImplTest {
                         createExecutionAttemptId(),
                         new TestTaskLocalStateStore(),
                         null,
+                        null,
                         new TaskExecutorStateChangelogStoragesManager(),
                         jobManagerTaskRestore,
                         new TestCheckpointResponder());
@@ -284,6 +285,7 @@ class TaskStateManagerImplTest {
                         createExecutionAttemptId(),
                         new TestTaskLocalStateStore(),
                         null,
+                        null,
                         new TaskExecutorStateChangelogStoragesManager(),
                         null,
                         new TestCheckpointResponder());
@@ -295,6 +297,7 @@ class TaskStateManagerImplTest {
                         createExecutionAttemptId(),
                         new TestTaskLocalStateStore(),
                         null,
+                        null,
                         new TaskExecutorStateChangelogStoragesManager(),
                         new JobManagerTaskRestore(2, new TaskStateSnapshot()),
                         new TestCheckpointResponder());
@@ -313,6 +316,7 @@ class TaskStateManagerImplTest {
                 jobID,
                 executionAttemptID,
                 localStateStore,
+                null,
                 stateChangelogStorage,
                 new TaskExecutorStateChangelogStoragesManager(),
                 jobManagerTaskRestore,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index 6e25d4336e0..2cb492277c8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
@@ -244,6 +245,12 @@ public class TestTaskStateManager implements 
TaskStateManager {
         return storageView;
     }
 
+    @Nullable
+    @Override
+    public FileMergingSnapshotManager getFileMergingSnapshotManager() {
+        return null;
+    }
+
     @Override
     public void notifyCheckpointComplete(long checkpointId) throws Exception {
         this.notifiedCompletedCheckpointId = checkpointId;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 7e2a53fd63f..2bd18f543a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
 import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import 
org.apache.flink.runtime.state.TaskExecutorChannelStateExecutorFactoryManager;
+import org.apache.flink.runtime.state.TaskExecutorFileMergingManager;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import 
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
 import 
org.apache.flink.runtime.taskexecutor.slot.NoOpSlotAllocationSnapshotPersistenceService;
@@ -58,6 +59,7 @@ public class TaskManagerServicesBuilder {
     private JobTable jobTable;
     private JobLeaderService jobLeaderService;
     private TaskExecutorLocalStateStoresManager taskStateManager;
+    private TaskExecutorFileMergingManager taskFileMergingManager;
     private TaskExecutorStateChangelogStoragesManager 
taskChangelogStoragesManager;
     private TaskExecutorChannelStateExecutorFactoryManager 
taskChannelStateExecutorFactoryManager;
     private TaskEventDispatcher taskEventDispatcher;
@@ -83,6 +85,7 @@ public class TaskManagerServicesBuilder {
                         unresolvedTaskManagerLocation,
                         
RetryingRegistrationConfiguration.defaultConfiguration());
         taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
+        taskFileMergingManager = new TaskExecutorFileMergingManager();
         taskChangelogStoragesManager = 
mock(TaskExecutorStateChangelogStoragesManager.class);
         taskChannelStateExecutorFactoryManager =
                 new TaskExecutorChannelStateExecutorFactoryManager();
@@ -142,6 +145,12 @@ public class TaskManagerServicesBuilder {
         return this;
     }
 
+    public TaskManagerServicesBuilder setTaskFileMergingManager(
+            TaskExecutorFileMergingManager taskFileMergingManager) {
+        this.taskFileMergingManager = taskFileMergingManager;
+        return this;
+    }
+
     public TaskManagerServicesBuilder setTaskChangelogStoragesManager(
             TaskExecutorStateChangelogStoragesManager 
taskChangelogStoragesManager) {
         this.taskChangelogStoragesManager = taskChangelogStoragesManager;
@@ -183,6 +192,7 @@ public class TaskManagerServicesBuilder {
                 jobTable,
                 jobLeaderService,
                 taskStateManager,
+                taskFileMergingManager,
                 taskChangelogStoragesManager,
                 taskChannelStateExecutorFactoryManager,
                 taskEventDispatcher,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index b5966bed100..7f0dd21d6b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -218,6 +218,7 @@ class JvmExitOnFatalErrorTest {
                                 jid,
                                 executionAttemptID,
                                 localStateStore,
+                                null,
                                 changelogStorage,
                                 new 
TaskExecutorStateChangelogStoragesManager(),
                                 null,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index cff785b28e3..39b460337dd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.checkpoint.SnapshotType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.AvailabilityProvider;
@@ -456,7 +457,10 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
 
             CheckpointStorageAccess checkpointStorageAccess =
                     
checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
-
+            checkpointStorageAccess =
+                    applyFileMergingCheckpoint(
+                            checkpointStorageAccess,
+                            
environment.getTaskStateManager().getFileMergingSnapshotManager());
             environment.setCheckpointStorageAccess(checkpointStorageAccess);
 
             // if the clock is not already set, then assign a default 
TimeServiceProvider
@@ -510,6 +514,29 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         }
     }
 
+    private CheckpointStorageAccess applyFileMergingCheckpoint(
+            CheckpointStorageAccess checkpointStorageAccess,
+            FileMergingSnapshotManager fileMergingSnapshotManager) {
+        if (fileMergingSnapshotManager == null) {
+            return checkpointStorageAccess;
+        }
+
+        try {
+            CheckpointStorageWorkerView mergingCheckpointStorageAccess =
+                    checkpointStorageAccess.toFileMergingStorage(
+                            fileMergingSnapshotManager, environment);
+            return (CheckpointStorageAccess) mergingCheckpointStorageAccess;
+        } catch (IOException e) {
+            LOG.warn(
+                    "Initiating FsMergingCheckpointStorageAccess failed"
+                            + "with exception: {}, falling back to original 
checkpoint storage access {}.",
+                    e.getMessage(),
+                    checkpointStorageAccess.getClass(),
+                    e);
+            return checkpointStorageAccess;
+        }
+    }
+
     private TimerService createTimerService(String timerThreadName) {
         ThreadFactory timerThreadFactory =
                 new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, 
timerThreadName);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index f8e0b0ccfc2..0df5253d753 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -170,6 +170,7 @@ public class StateInitializationContextImplTest {
                         new JobID(),
                         createExecutionAttemptId(),
                         new TestTaskLocalStateStore(),
+                        null,
                         new InMemoryStateChangelogStorage(),
                         new TaskExecutorStateChangelogStoragesManager(),
                         jobManagerTaskRestore,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index a3f155b61d5..bb75f6f5dda 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -294,6 +294,7 @@ public class StreamTaskStateInitializerImplTest {
                         jobID,
                         executionAttemptID,
                         taskLocalStateStore,
+                        null,
                         changelogStorage,
                         new TaskExecutorStateChangelogStoragesManager(),
                         jobManagerTaskRestore,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index c7f13aad326..929cc3d2109 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -251,6 +251,7 @@ public class LocalStateForwardingTest extends TestLogger {
                         jobID,
                         executionAttemptID,
                         taskLocalStateStore,
+                        null,
                         stateChangelogStorage,
                         new TaskExecutorStateChangelogStoragesManager(),
                         null,
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index bf847b0c6b7..379afccc458 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -821,6 +821,7 @@ public class StreamTaskTest {
                         new JobID(1L, 2L),
                         createExecutionAttemptId(),
                         mock(TaskLocalStateStoreImpl.class),
+                        null,
                         new InMemoryStateChangelogStorage(),
                         new TaskExecutorStateChangelogStoragesManager(),
                         null,
@@ -1019,6 +1020,7 @@ public class StreamTaskTest {
                         new JobID(1L, 2L),
                         createExecutionAttemptId(),
                         mock(TaskLocalStateStoreImpl.class),
+                        null,
                         new InMemoryStateChangelogStorage(),
                         new TaskExecutorStateChangelogStoragesManager(),
                         null,

Reply via email to