This is an automated email from the ASF dual-hosted git repository.
leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1112582fd13 [FLINK-32072][checkpoint] Create and wire
FileMergingSnapshotManager with TaskManagerServices (#22890)
1112582fd13 is described below
commit 1112582fd136df47c6d356d6f6ad3946ad1e56d5
Author: Yanfei Lei <[email protected]>
AuthorDate: Tue Sep 19 17:31:04 2023 +0800
[FLINK-32072][checkpoint] Create and wire FileMergingSnapshotManager with
TaskManagerServices (#22890)
---
.../api/runtime/SavepointTaskStateManager.java | 7 +
.../filemerging/FileMergingSnapshotManager.java | 6 +-
.../FileMergingSnapshotManagerBuilder.java | 2 -
.../runtime/state/CheckpointStorageWorkerView.java | 13 ++
.../state/TaskExecutorFileMergingManager.java | 143 +++++++++++++++++++++
.../flink/runtime/state/TaskStateManager.java | 3 +
.../flink/runtime/state/TaskStateManagerImpl.java | 14 ++
.../filesystem/FsCheckpointStorageAccess.java | 24 +++-
.../FsMergingCheckpointStorageAccess.java | 73 +++++++++++
.../flink/runtime/taskexecutor/TaskExecutor.java | 14 ++
.../runtime/taskexecutor/TaskManagerServices.java | 13 ++
.../state/TaskExecutorFileMergingManagerTest.java | 87 +++++++++++++
.../runtime/state/TaskStateManagerImplTest.java | 4 +
.../flink/runtime/state/TestTaskStateManager.java | 7 +
.../taskexecutor/TaskManagerServicesBuilder.java | 10 ++
.../runtime/util/JvmExitOnFatalErrorTest.java | 1 +
.../flink/streaming/runtime/tasks/StreamTask.java | 29 ++++-
.../StateInitializationContextImplTest.java | 1 +
.../StreamTaskStateInitializerImplTest.java | 1 +
.../runtime/tasks/LocalStateForwardingTest.java | 1 +
.../streaming/runtime/tasks/StreamTaskTest.java | 2 +
21 files changed, 446 insertions(+), 9 deletions(-)
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
index cb6193e7724..a1d649d4d8c 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java
@@ -25,6 +25,7 @@ import
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskStateManager;
@@ -114,6 +115,12 @@ final class SavepointTaskStateManager implements
TaskStateManager {
return null;
}
+ @Nullable
+ @Override
+ public FileMergingSnapshotManager getFileMergingSnapshotManager() {
+ return null;
+ }
+
@Override
public void notifyCheckpointComplete(long checkpointId) {
throw new UnsupportedOperationException(MSG);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index 4954d8548f3..968e86f91c5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.checkpoint.filemerging;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -31,7 +32,7 @@ import java.io.Closeable;
* FileMergingSnapshotManager provides an interface to manage files and meta
information for
* checkpoint files with merging checkpoint files enabled. It manages the
files for ONE single task
* in TM, including all subtasks of this single task that running in this TM.
There is one
- * FileMergingSnapshotManager for each task per task manager.
+ * FileMergingSnapshotManager for each job per task manager.
*
* <p>TODO (FLINK-32073): create output stream.
*
@@ -125,7 +126,8 @@ public interface FileMergingSnapshotManager extends
Closeable {
taskInfo.getNumberOfParallelSubtasks());
}
- SubtaskKey(String operatorIDString, int subtaskIndex, int parallelism)
{
+ @VisibleForTesting
+ public SubtaskKey(String operatorIDString, int subtaskIndex, int
parallelism) {
this.operatorIDString = operatorIDString;
this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
index f07669b094b..faa9023abca 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
@@ -50,8 +50,6 @@ public class FileMergingSnapshotManagerBuilder {
/**
* Create file-merging snapshot manager based on configuration.
*
- * <p>TODO (FLINK-32072): Create manager during the initialization of task
manager services.
- *
* <p>TODO (FLINK-32074): Support another type of
FileMergingSnapshotManager that merges files
* across different checkpoints.
*
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
index 3e7e659e0e9..8817d71fce5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageWorkerView.java
@@ -19,6 +19,8 @@
package org.apache.flink.runtime.state;
import org.apache.flink.annotation.Internal;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import org.apache.flink.runtime.execution.Environment;
import java.io.IOException;
@@ -75,4 +77,15 @@ public interface CheckpointStorageWorkerView {
* @return A toolset for additional operations for state owned by tasks.
*/
CheckpointStateToolset createTaskOwnedCheckpointStateToolset();
+
+ /**
+ * Return {@link
org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageAccess} if
+ * file merging is enabled Otherwise, return itself. File merging is
supported by subclasses of
+ * {@link
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess}.
+ */
+ default CheckpointStorageWorkerView toFileMergingStorage(
+ FileMergingSnapshotManager mergingSnapshotManager, Environment
environment)
+ throws IOException {
+ return this;
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
new file mode 100644
index 00000000000..42d8a4a5469
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManager.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
+import org.apache.flink.util.ShutdownHookUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * There is one {@link FileMergingSnapshotManager} for each job per task
manager. This class holds
+ * all {@link FileMergingSnapshotManager} objects for a task executor
(manager).
+ */
+public class TaskExecutorFileMergingManager {
+ /** Logger for this class. */
+ private static final Logger LOG =
LoggerFactory.getLogger(TaskExecutorFileMergingManager.class);
+
+ /**
+ * This map holds all FileMergingSnapshotManager for tasks running on this
task
+ * manager(executor).
+ */
+ @GuardedBy("lock")
+ private final Map<JobID, FileMergingSnapshotManager>
fileMergingSnapshotManagerByJobId;
+
+ @GuardedBy("lock")
+ private boolean closed;
+
+ private final Object lock = new Object();
+
+ /** Shutdown hook for this manager. */
+ private final Thread shutdownHook;
+
+ public TaskExecutorFileMergingManager() {
+ this.fileMergingSnapshotManagerByJobId = new HashMap<>();
+ this.closed = false;
+ this.shutdownHook =
+ ShutdownHookUtil.addShutdownHook(this::shutdown,
getClass().getSimpleName(), LOG);
+ }
+
+ /**
+ * Initialize file merging snapshot manager for each job according
configurations when {@link
+ * org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask}.
+ */
+ public @Nullable FileMergingSnapshotManager
fileMergingSnapshotManagerForJob(
+ @Nonnull JobID jobId) {
+ synchronized (lock) {
+ if (closed) {
+ throw new IllegalStateException(
+ "TaskExecutorFileMergingManager is already closed and
cannot "
+ + "register a new
FileMergingSnapshotManager.");
+ }
+ FileMergingSnapshotManager fileMergingSnapshotManager =
+ fileMergingSnapshotManagerByJobId.get(jobId);
+ if (fileMergingSnapshotManager == null) {
+ // TODO FLINK-32440: choose different
FileMergingSnapshotManager by configuration
+ fileMergingSnapshotManager =
+ new
FileMergingSnapshotManagerBuilder(jobId.toString()).build();
+ fileMergingSnapshotManagerByJobId.put(jobId,
fileMergingSnapshotManager);
+ LOG.info("Registered new file merging snapshot manager for job
{}.", jobId);
+ }
+ return fileMergingSnapshotManager;
+ }
+ }
+
+ /**
+ * Release file merging snapshot manager of one job when {@link
+ * org.apache.flink.runtime.taskexecutor.TaskExecutor#releaseJobResources}
called.
+ */
+ public void releaseMergingSnapshotManagerForJob(@Nonnull JobID jobId) {
+ LOG.debug("Releasing file merging snapshot manager under job id {}.",
jobId);
+ FileMergingSnapshotManager toRelease = null;
+ synchronized (lock) {
+ if (closed) {
+ return;
+ }
+ toRelease = fileMergingSnapshotManagerByJobId.remove(jobId);
+ }
+
+ if (toRelease != null) {
+ try {
+ toRelease.close();
+ } catch (Exception e) {
+ LOG.warn(
+ "Exception while closing
TaskExecutorFileMergingManager for job {}.",
+ jobId,
+ e);
+ }
+ }
+ }
+
+ public void shutdown() {
+ HashMap<JobID, FileMergingSnapshotManager> toRelease =
+ new HashMap<>(fileMergingSnapshotManagerByJobId);
+ synchronized (lock) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ fileMergingSnapshotManagerByJobId.clear();
+ }
+
+ LOG.info("Shutting down TaskExecutorFileMergingManager.");
+
+ ShutdownHookUtil.removeShutdownHook(shutdownHook,
getClass().getSimpleName(), LOG);
+
+ for (Map.Entry<JobID, FileMergingSnapshotManager> entry :
toRelease.entrySet()) {
+ if (entry.getValue() != null) {
+ try {
+ entry.getValue().close();
+ } catch (Exception e) {
+ LOG.warn(
+ "Exception while closing
TaskExecutorFileMergingManager for job {}.",
+ entry.getKey(),
+ e);
+ }
+ }
+ }
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index 7c979b6b0d8..44f0cdc1e5e 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -26,6 +26,7 @@ import
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -111,4 +112,6 @@ public interface TaskStateManager extends
CheckpointListener, AutoCloseable {
@Nullable
StateChangelogStorageView<?> getStateChangelogStorageView(
Configuration configuration, ChangelogStateHandle
changelogStateHandle);
+
+ FileMergingSnapshotManager getFileMergingSnapshotManager();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
index a82b55b155b..a2ce6c8f7a3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java
@@ -29,6 +29,7 @@ import
org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
@@ -80,6 +81,9 @@ public class TaskStateManagerImpl implements TaskStateManager
{
/** The changelog storage where the manager reads and writes the changelog
*/
@Nullable private final StateChangelogStorage<?> stateChangelogStorage;
+ /** The file merging snapshot */
+ @Nullable private final FileMergingSnapshotManager
fileMergingSnapshotManager;
+
private final TaskExecutorStateChangelogStoragesManager
changelogStoragesManager;
/** The checkpoint responder through which this manager can report to the
job manager. */
@@ -91,6 +95,7 @@ public class TaskStateManagerImpl implements TaskStateManager
{
@Nonnull JobID jobId,
@Nonnull ExecutionAttemptID executionAttemptID,
@Nonnull TaskLocalStateStore localStateStore,
+ @Nullable FileMergingSnapshotManager fileMergingSnapshotManager,
@Nullable StateChangelogStorage<?> stateChangelogStorage,
@Nonnull TaskExecutorStateChangelogStoragesManager
changelogStoragesManager,
@Nullable JobManagerTaskRestore jobManagerTaskRestore,
@@ -99,6 +104,7 @@ public class TaskStateManagerImpl implements
TaskStateManager {
jobId,
executionAttemptID,
localStateStore,
+ fileMergingSnapshotManager,
stateChangelogStorage,
changelogStoragesManager,
jobManagerTaskRestore,
@@ -113,6 +119,7 @@ public class TaskStateManagerImpl implements
TaskStateManager {
@Nonnull JobID jobId,
@Nonnull ExecutionAttemptID executionAttemptID,
@Nonnull TaskLocalStateStore localStateStore,
+ @Nullable FileMergingSnapshotManager fileMergingSnapshotManager,
@Nullable StateChangelogStorage<?> stateChangelogStorage,
@Nonnull TaskExecutorStateChangelogStoragesManager
changelogStoragesManager,
@Nullable JobManagerTaskRestore jobManagerTaskRestore,
@@ -120,6 +127,7 @@ public class TaskStateManagerImpl implements
TaskStateManager {
@Nonnull SequentialChannelStateReaderImpl
sequentialChannelStateReader) {
this.jobId = jobId;
this.localStateStore = localStateStore;
+ this.fileMergingSnapshotManager = fileMergingSnapshotManager;
this.stateChangelogStorage = stateChangelogStorage;
this.changelogStoragesManager = changelogStoragesManager;
this.jobManagerTaskRestore = jobManagerTaskRestore;
@@ -270,6 +278,12 @@ public class TaskStateManagerImpl implements
TaskStateManager {
return storageView;
}
+ @Nullable
+ @Override
+ public FileMergingSnapshotManager getFileMergingSnapshotManager() {
+ return fileMergingSnapshotManager;
+ }
+
/** Tracking when local state can be confirmed and disposed. */
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
index 11807c3de85..92ba08e3dc5 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.DuplicatingFileSystem;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStateToolset;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
@@ -41,13 +43,13 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
/** An implementation of durable checkpoint storage to file systems. */
public class FsCheckpointStorageAccess extends
AbstractFsCheckpointStorageAccess {
- private final FileSystem fileSystem;
+ protected final FileSystem fileSystem;
- private final Path checkpointsDirectory;
+ protected final Path checkpointsDirectory;
- private final Path sharedStateDirectory;
+ protected final Path sharedStateDirectory;
- private final Path taskOwnedStateDirectory;
+ protected final Path taskOwnedStateDirectory;
private final int fileSizeThreshold;
@@ -201,4 +203,18 @@ public class FsCheckpointStorageAccess extends
AbstractFsCheckpointStorageAccess
return new FsCheckpointStorageLocation(
fs, location, location, location, reference,
fileSizeThreshold, writeBufferSize);
}
+
+ public FsMergingCheckpointStorageAccess toFileMergingStorage(
+ FileMergingSnapshotManager mergingSnapshotManager, Environment
environment)
+ throws IOException {
+ return new FsMergingCheckpointStorageAccess(
+ fileSystem,
+ checkpointsDirectory.getParent(),
+ getDefaultSavepointDirectory(),
+ environment.getJobID(),
+ fileSizeThreshold,
+ writeBufferSize,
+ mergingSnapshotManager,
+ environment);
+ }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
new file mode 100644
index 00000000000..a471243ab7a
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** An implementation of file merging checkpoint storage to file systems. */
+public class FsMergingCheckpointStorageAccess extends
FsCheckpointStorageAccess {
+
+ /** FileMergingSnapshotManager manages files and meta information for
checkpoints. */
+ private final FileMergingSnapshotManager fileMergingSnapshotManager;
+
+ /** The identity of subtask. */
+ private final FileMergingSnapshotManager.SubtaskKey subtaskKey;
+
+ public FsMergingCheckpointStorageAccess(
+ FileSystem fs,
+ Path checkpointBaseDirectory,
+ @Nullable Path defaultSavepointDirectory,
+ JobID jobId,
+ int fileSizeThreshold,
+ int writeBufferSize,
+ FileMergingSnapshotManager fileMergingSnapshotManager,
+ Environment environment)
+ throws IOException {
+ super(
+ fs,
+ checkpointBaseDirectory,
+ defaultSavepointDirectory,
+ jobId,
+ fileSizeThreshold,
+ writeBufferSize);
+ this.fileMergingSnapshotManager = fileMergingSnapshotManager;
+ this.subtaskKey =
+ new SubtaskKey(
+
OperatorID.fromJobVertexID(environment.getJobVertexId()),
+ environment.getTaskInfo());
+ }
+
+ @Override
+ public void initializeBaseLocationsForCheckpoint() throws IOException {
+ super.initializeBaseLocationsForCheckpoint();
+ fileMergingSnapshotManager.initFileSystem(
+ fileSystem, checkpointsDirectory, sharedStateDirectory,
taskOwnedStateDirectory);
+ fileMergingSnapshotManager.registerSubtaskForSharedStates(subtaskKey);
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index fc37a7f2f26..d42a5658598 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -32,6 +32,7 @@ import
org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
@@ -98,6 +99,7 @@ import
org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import
org.apache.flink.runtime.state.TaskExecutorChannelStateExecutorFactoryManager;
+import org.apache.flink.runtime.state.TaskExecutorFileMergingManager;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
@@ -217,6 +219,12 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
/** The state manager for this task, providing state managers per slot. */
private final TaskExecutorLocalStateStoresManager localStateStoresManager;
+ /**
+ * The file merging manager for this task, providing file merging snapshot
manager per job, see
+ * {@link FileMergingSnapshotManager} for details.
+ */
+ private final TaskExecutorFileMergingManager fileMergingManager;
+
/** The changelog manager for this task, providing changelog storage per
job. */
private final TaskExecutorStateChangelogStoragesManager
changelogStoragesManager;
@@ -329,6 +337,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
this.unresolvedTaskManagerLocation =
taskExecutorServices.getUnresolvedTaskManagerLocation();
this.localStateStoresManager =
taskExecutorServices.getTaskManagerStateStore();
+ this.fileMergingManager =
taskExecutorServices.getTaskManagerFileMergingManager();
this.changelogStoragesManager =
taskExecutorServices.getTaskManagerChangelogManager();
this.channelStateExecutorFactoryManager =
taskExecutorServices.getTaskManagerChannelStateManager();
@@ -730,6 +739,9 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
taskManagerConfiguration.getConfiguration(),
jobInformation.getJobConfiguration());
+ final FileMergingSnapshotManager fileMergingSnapshotManager =
+ fileMergingManager.fileMergingSnapshotManagerForJob(jobId);
+
// TODO: Pass config value from user program and do overriding
here.
final StateChangelogStorage<?> changelogStorage;
try {
@@ -750,6 +762,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
jobId,
tdd.getExecutionAttemptId(),
localStateStore,
+ fileMergingSnapshotManager,
changelogStorage,
changelogStoragesManager,
taskRestore,
@@ -1891,6 +1904,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
currentSlotOfferPerJob.remove(jobId);
channelStateExecutorFactoryManager.releaseResourcesForJob(jobId);
shuffleDescriptorsCache.clearCacheForJob(jobId);
+ fileMergingManager.releaseMergingSnapshotManagerForJob(jobId);
}
private void scheduleResultPartitionCleanup(JobID jobId) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index ff14b12da15..da8a0636189 100755
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.shuffle.ShuffleServiceLoader;
import
org.apache.flink.runtime.state.TaskExecutorChannelStateExecutorFactoryManager;
+import org.apache.flink.runtime.state.TaskExecutorFileMergingManager;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
@@ -80,6 +81,7 @@ public class TaskManagerServices {
private final JobTable jobTable;
private final JobLeaderService jobLeaderService;
private final TaskExecutorLocalStateStoresManager taskManagerStateStore;
+ private final TaskExecutorFileMergingManager taskManagerFileMergingManager;
private final TaskExecutorStateChangelogStoragesManager
taskManagerChangelogManager;
private final TaskExecutorChannelStateExecutorFactoryManager
taskManagerChannelStateManager;
private final TaskEventDispatcher taskEventDispatcher;
@@ -100,6 +102,7 @@ public class TaskManagerServices {
JobTable jobTable,
JobLeaderService jobLeaderService,
TaskExecutorLocalStateStoresManager taskManagerStateStore,
+ TaskExecutorFileMergingManager taskManagerFileMergingManager,
TaskExecutorStateChangelogStoragesManager
taskManagerChangelogManager,
TaskExecutorChannelStateExecutorFactoryManager
taskManagerChannelStateManager,
TaskEventDispatcher taskEventDispatcher,
@@ -120,6 +123,8 @@ public class TaskManagerServices {
this.jobTable = Preconditions.checkNotNull(jobTable);
this.jobLeaderService = Preconditions.checkNotNull(jobLeaderService);
this.taskManagerStateStore =
Preconditions.checkNotNull(taskManagerStateStore);
+ this.taskManagerFileMergingManager =
+ Preconditions.checkNotNull(taskManagerFileMergingManager);
this.taskManagerChangelogManager =
Preconditions.checkNotNull(taskManagerChangelogManager);
this.taskManagerChannelStateManager = taskManagerChannelStateManager;
this.taskEventDispatcher =
Preconditions.checkNotNull(taskEventDispatcher);
@@ -174,6 +179,10 @@ public class TaskManagerServices {
return taskManagerStateStore;
}
+ public TaskExecutorFileMergingManager getTaskManagerFileMergingManager() {
+ return taskManagerFileMergingManager;
+ }
+
public TaskExecutorStateChangelogStoragesManager
getTaskManagerChangelogManager() {
return taskManagerChangelogManager;
}
@@ -363,6 +372,9 @@ public class TaskManagerServices {
final TaskExecutorChannelStateExecutorFactoryManager
channelStateExecutorFactoryManager =
new TaskExecutorChannelStateExecutorFactoryManager();
+ final TaskExecutorFileMergingManager fileMergingManager =
+ new TaskExecutorFileMergingManager();
+
final boolean failOnJvmMetaspaceOomError =
taskManagerServicesConfiguration
.getConfiguration()
@@ -407,6 +419,7 @@ public class TaskManagerServices {
jobTable,
jobLeaderService,
taskStateManager,
+ fileMergingManager,
changelogStoragesManager,
channelStateExecutorFactoryManager,
taskEventDispatcher,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java
new file mode 100644
index 00000000000..944fbe6e072
--- /dev/null
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskExecutorFileMergingManager}. */
+public class TaskExecutorFileMergingManagerTest {
+ @Test
+ public void testCheckpointScope(@TempDir java.nio.file.Path testBaseDir)
throws IOException {
+ TaskExecutorFileMergingManager taskExecutorFileMergingManager =
+ new TaskExecutorFileMergingManager();
+ JobID job1 = new JobID(1234L, 4321L);
+ JobID job2 = new JobID(1234L, 5678L);
+ SubtaskKey key1 = new SubtaskKey("test-op1", 0, 128);
+ SubtaskKey key2 = new SubtaskKey("test-op2", 1, 128);
+ Path checkpointDir1 = new Path(testBaseDir.toString(), "job1");
+ Path checkpointDir2 = new Path(testBaseDir.toString(), "job2");
+ FileMergingSnapshotManager manager1 =
+
taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(job1);
+ manager1.initFileSystem(
+ checkpointDir1.getFileSystem(),
+ checkpointDir1,
+ new Path(checkpointDir1, "shared"),
+ new Path(checkpointDir1, "taskowned"));
+ FileMergingSnapshotManager manager2 =
+
taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(job1);
+ manager2.initFileSystem(
+ checkpointDir1.getFileSystem(),
+ checkpointDir1,
+ new Path(checkpointDir1, "shared"),
+ new Path(checkpointDir1, "taskowned"));
+ FileMergingSnapshotManager manager3 =
+
taskExecutorFileMergingManager.fileMergingSnapshotManagerForJob(job2);
+ manager3.initFileSystem(
+ checkpointDir2.getFileSystem(),
+ checkpointDir2,
+ new Path(checkpointDir2, "shared"),
+ new Path(checkpointDir2, "taskowned"));
+
+ assertThat(manager1).isEqualTo(manager2);
+ assertThat(manager1).isNotEqualTo(manager3);
+
+ // tasks of same job should have same private dir.
+ assertThat(manager1.getManagedDir(key1,
CheckpointedStateScope.EXCLUSIVE))
+ .isEqualTo(manager2.getManagedDir(key2,
CheckpointedStateScope.EXCLUSIVE));
+ // tasks of different jobs should have different private dirs.
+ assertThat(manager1.getManagedDir(key1,
CheckpointedStateScope.EXCLUSIVE))
+ .isNotEqualTo(manager3.getManagedDir(key2,
CheckpointedStateScope.EXCLUSIVE));
+
+ manager1.registerSubtaskForSharedStates(key1);
+ manager1.registerSubtaskForSharedStates(key2);
+ manager3.registerSubtaskForSharedStates(key1);
+ manager3.registerSubtaskForSharedStates(key2);
+ // tasks of same job should have different shared dirs for different
subtasks.
+ assertThat(manager1.getManagedDir(key1, CheckpointedStateScope.SHARED))
+ .isNotEqualTo(manager1.getManagedDir(key2,
CheckpointedStateScope.SHARED));
+ // tasks with same SubtaskKey of different jobs should have different
shared dirs.
+ assertThat(manager1.getManagedDir(key1, CheckpointedStateScope.SHARED))
+ .isNotEqualTo(manager3.getManagedDir(key1,
CheckpointedStateScope.SHARED));
+ }
+}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index e51ac6c3824..76f78e26fda 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -271,6 +271,7 @@ class TaskStateManagerImplTest {
createExecutionAttemptId(),
new TestTaskLocalStateStore(),
null,
+ null,
new TaskExecutorStateChangelogStoragesManager(),
jobManagerTaskRestore,
new TestCheckpointResponder());
@@ -284,6 +285,7 @@ class TaskStateManagerImplTest {
createExecutionAttemptId(),
new TestTaskLocalStateStore(),
null,
+ null,
new TaskExecutorStateChangelogStoragesManager(),
null,
new TestCheckpointResponder());
@@ -295,6 +297,7 @@ class TaskStateManagerImplTest {
createExecutionAttemptId(),
new TestTaskLocalStateStore(),
null,
+ null,
new TaskExecutorStateChangelogStoragesManager(),
new JobManagerTaskRestore(2, new TaskStateSnapshot()),
new TestCheckpointResponder());
@@ -313,6 +316,7 @@ class TaskStateManagerImplTest {
jobID,
executionAttemptID,
localStateStore,
+ null,
stateChangelogStorage,
new TaskExecutorStateChangelogStoragesManager(),
jobManagerTaskRestore,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
index 6e25d4336e0..2cb492277c8 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
@@ -28,6 +28,7 @@ import
org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
@@ -244,6 +245,12 @@ public class TestTaskStateManager implements
TaskStateManager {
return storageView;
}
+ @Nullable
+ @Override
+ public FileMergingSnapshotManager getFileMergingSnapshotManager() {
+ return null;
+ }
+
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
this.notifiedCompletedCheckpointId = checkpointId;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 7e2a53fd63f..2bd18f543a1 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import
org.apache.flink.runtime.state.TaskExecutorChannelStateExecutorFactoryManager;
+import org.apache.flink.runtime.state.TaskExecutorFileMergingManager;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import
org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import
org.apache.flink.runtime.taskexecutor.slot.NoOpSlotAllocationSnapshotPersistenceService;
@@ -58,6 +59,7 @@ public class TaskManagerServicesBuilder {
private JobTable jobTable;
private JobLeaderService jobLeaderService;
private TaskExecutorLocalStateStoresManager taskStateManager;
+ private TaskExecutorFileMergingManager taskFileMergingManager;
private TaskExecutorStateChangelogStoragesManager
taskChangelogStoragesManager;
private TaskExecutorChannelStateExecutorFactoryManager
taskChannelStateExecutorFactoryManager;
private TaskEventDispatcher taskEventDispatcher;
@@ -83,6 +85,7 @@ public class TaskManagerServicesBuilder {
unresolvedTaskManagerLocation,
RetryingRegistrationConfiguration.defaultConfiguration());
taskStateManager = mock(TaskExecutorLocalStateStoresManager.class);
+ taskFileMergingManager = new TaskExecutorFileMergingManager();
taskChangelogStoragesManager =
mock(TaskExecutorStateChangelogStoragesManager.class);
taskChannelStateExecutorFactoryManager =
new TaskExecutorChannelStateExecutorFactoryManager();
@@ -142,6 +145,12 @@ public class TaskManagerServicesBuilder {
return this;
}
+ public TaskManagerServicesBuilder setTaskFileMergingManager(
+ TaskExecutorFileMergingManager taskFileMergingManager) {
+ this.taskFileMergingManager = taskFileMergingManager;
+ return this;
+ }
+
public TaskManagerServicesBuilder setTaskChangelogStoragesManager(
TaskExecutorStateChangelogStoragesManager
taskChangelogStoragesManager) {
this.taskChangelogStoragesManager = taskChangelogStoragesManager;
@@ -183,6 +192,7 @@ public class TaskManagerServicesBuilder {
jobTable,
jobLeaderService,
taskStateManager,
+ taskFileMergingManager,
taskChangelogStoragesManager,
taskChannelStateExecutorFactoryManager,
taskEventDispatcher,
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index b5966bed100..7f0dd21d6b5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -218,6 +218,7 @@ class JvmExitOnFatalErrorTest {
jid,
executionAttemptID,
localStateStore,
+ null,
changelogStorage,
new
TaskExecutorStateChangelogStoragesManager(),
null,
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index cff785b28e3..39b460337dd 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
+import
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.AvailabilityProvider;
@@ -456,7 +457,10 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
CheckpointStorageAccess checkpointStorageAccess =
checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
-
+ checkpointStorageAccess =
+ applyFileMergingCheckpoint(
+ checkpointStorageAccess,
+
environment.getTaskStateManager().getFileMergingSnapshotManager());
environment.setCheckpointStorageAccess(checkpointStorageAccess);
// if the clock is not already set, then assign a default
TimeServiceProvider
@@ -510,6 +514,29 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
}
}
+ private CheckpointStorageAccess applyFileMergingCheckpoint(
+ CheckpointStorageAccess checkpointStorageAccess,
+ FileMergingSnapshotManager fileMergingSnapshotManager) {
+ if (fileMergingSnapshotManager == null) {
+ return checkpointStorageAccess;
+ }
+
+ try {
+ CheckpointStorageWorkerView mergingCheckpointStorageAccess =
+ checkpointStorageAccess.toFileMergingStorage(
+ fileMergingSnapshotManager, environment);
+ return (CheckpointStorageAccess) mergingCheckpointStorageAccess;
+ } catch (IOException e) {
+ LOG.warn(
+ "Initiating FsMergingCheckpointStorageAccess failed"
+ + "with exception: {}, falling back to original
checkpoint storage access {}.",
+ e.getMessage(),
+ checkpointStorageAccess.getClass(),
+ e);
+ return checkpointStorageAccess;
+ }
+ }
+
private TimerService createTimerService(String timerThreadName) {
ThreadFactory timerThreadFactory =
new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
timerThreadName);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
index f8e0b0ccfc2..0df5253d753 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java
@@ -170,6 +170,7 @@ public class StateInitializationContextImplTest {
new JobID(),
createExecutionAttemptId(),
new TestTaskLocalStateStore(),
+ null,
new InMemoryStateChangelogStorage(),
new TaskExecutorStateChangelogStoragesManager(),
jobManagerTaskRestore,
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index a3f155b61d5..bb75f6f5dda 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -294,6 +294,7 @@ public class StreamTaskStateInitializerImplTest {
jobID,
executionAttemptID,
taskLocalStateStore,
+ null,
changelogStorage,
new TaskExecutorStateChangelogStoragesManager(),
jobManagerTaskRestore,
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index c7f13aad326..929cc3d2109 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -251,6 +251,7 @@ public class LocalStateForwardingTest extends TestLogger {
jobID,
executionAttemptID,
taskLocalStateStore,
+ null,
stateChangelogStorage,
new TaskExecutorStateChangelogStoragesManager(),
null,
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index bf847b0c6b7..379afccc458 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -821,6 +821,7 @@ public class StreamTaskTest {
new JobID(1L, 2L),
createExecutionAttemptId(),
mock(TaskLocalStateStoreImpl.class),
+ null,
new InMemoryStateChangelogStorage(),
new TaskExecutorStateChangelogStoragesManager(),
null,
@@ -1019,6 +1020,7 @@ public class StreamTaskTest {
new JobID(1L, 2L),
createExecutionAttemptId(),
mock(TaskLocalStateStoreImpl.class),
+ null,
new InMemoryStateChangelogStorage(),
new TaskExecutorStateChangelogStoragesManager(),
null,