This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1f9632a07199854c0225bd7f416c038fbf59abe0
Author: fredia <[email protected]>
AuthorDate: Tue May 31 15:22:59 2022 +0800

    [FLINK-27693][changelog] Support local recovery for non-materialized part
---
 .../fs/AbstractStateChangeFsUploader.java          | 120 ++++++++++++++
 .../fs/DuplicatingOutputStreamWithPos.java         | 174 +++++++++++++++++++++
 .../fs/DuplicatingStateChangeFsUploader.java       | 111 +++++++++++++
 .../changelog/fs/FsStateChangelogStorage.java      |  50 ++++--
 .../fs/FsStateChangelogStorageFactory.java         |   8 +-
 .../flink/changelog/fs/FsStateChangelogWriter.java | 111 ++++++++++---
 .../flink/changelog/fs/OutputStreamWithPos.java    |  54 ++++++-
 .../flink/changelog/fs/StateChangeFormat.java      |  10 +-
 .../flink/changelog/fs/StateChangeFsUploader.java  | 104 ++----------
 .../changelog/fs/StateChangeUploadScheduler.java   |  34 ++--
 .../flink/changelog/fs/StateChangeUploader.java    |  33 +++-
 .../apache/flink/changelog/fs/UploadResult.java    |  41 ++++-
 .../fs/BatchingStateChangeUploadSchedulerTest.java |   6 +-
 .../changelog/fs/ChangelogStorageMetricsTest.java  |  45 ++++--
 .../changelog/fs/FsStateChangelogStorageTest.java  |   7 +-
 .../fs/FsStateChangelogWriterSqnTest.java          |   6 +-
 .../changelog/fs/FsStateChangelogWriterTest.java   |  25 ++-
 .../state/ChangelogTaskLocalStateStore.java        |  33 +++-
 .../TaskExecutorStateChangelogStoragesManager.java |   6 +-
 .../state/changelog/LocalChangelogRegistry.java    |  64 ++++++++
 .../changelog/LocalChangelogRegistryImpl.java      | 118 ++++++++++++++
 .../changelog/StateChangelogStorageFactory.java    |   6 +-
 .../changelog/StateChangelogStorageLoader.java     |   8 +-
 .../state/changelog/StateChangelogWriter.java      |  21 ++-
 .../InMemoryStateChangelogStorageFactory.java      |   6 +-
 .../inmemory/InMemoryStateChangelogWriter.java     |  15 +-
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   5 +-
 .../state/ChangelogTaskLocalStateStoreTest.java    |   2 +
 ...kExecutorStateChangelogStoragesManagerTest.java |  60 +++++--
 .../changelog/LocalChangelogRegistryTest.java      |  56 +++++++
 .../inmemory/StateChangelogStorageLoaderTest.java  |  16 +-
 .../inmemory/StateChangelogStorageTest.java        |   7 +-
 ...cutorExecutionDeploymentReconciliationTest.java |  14 +-
 .../taskexecutor/TaskExecutorSlotLifetimeTest.java |  14 ++
 .../changelog/ChangelogKeyedStateBackend.java      |  28 +++-
 .../state/changelog/ChangelogTruncateHelper.java   |   1 +
 .../changelog/ChangelogStateBackendTestUtils.java  |   4 +-
 .../state/changelog/ChangelogStateDiscardTest.java |  20 ++-
 .../state/changelog/StateChangeLoggerTestBase.java |   7 +-
 .../ChangelogLocalRecoveryITCase.java              |   6 +-
 40 files changed, 1219 insertions(+), 237 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
new file mode 100644
index 00000000000..3dd3dfcc70f
--- /dev/null
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/AbstractStateChangeFsUploader.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.clock.Clock;
+import org.apache.flink.util.clock.SystemClock;
+
+import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/** Base implementation of StateChangeUploader. */
+public abstract class AbstractStateChangeFsUploader implements 
StateChangeUploader {
+
+    private final StateChangeFormat format;
+    private final Clock clock;
+    private final TaskChangelogRegistry changelogRegistry;
+    private final BiFunction<Path, Long, StreamStateHandle> handleFactory;
+    protected final ChangelogStorageMetricGroup metrics;
+    protected final boolean compression;
+    protected final int bufferSize;
+
+    public AbstractStateChangeFsUploader(
+            boolean compression,
+            int bufferSize,
+            ChangelogStorageMetricGroup metrics,
+            TaskChangelogRegistry changelogRegistry,
+            BiFunction<Path, Long, StreamStateHandle> handleFactory) {
+        this.format = new StateChangeFormat();
+        this.compression = compression;
+        this.bufferSize = bufferSize;
+        this.metrics = metrics;
+        this.clock = SystemClock.getInstance();
+        this.changelogRegistry = changelogRegistry;
+        this.handleFactory = handleFactory;
+    }
+
+    abstract OutputStreamWithPos prepareStream() throws IOException;
+
+    @Override
+    public UploadTasksResult upload(Collection<UploadTask> tasks) throws 
IOException {
+        metrics.getUploadsCounter().inc();
+        long start = clock.relativeTimeNanos();
+        UploadTasksResult result = uploadInternal(tasks);
+        metrics.getUploadLatenciesNanos().update(clock.relativeTimeNanos() - 
start);
+        metrics.getUploadSizes().update(result.getStateSize());
+        return result;
+    }
+
+    private UploadTasksResult uploadInternal(Collection<UploadTask> tasks) 
throws IOException {
+        try (OutputStreamWithPos stream = prepareStream()) {
+            final Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> 
tasksOffsets =
+                    new HashMap<>();
+            for (UploadTask task : tasks) {
+                tasksOffsets.put(task, format.write(stream, task.changeSets));
+            }
+            StreamStateHandle handle = stream.getHandle(handleFactory);
+            changelogRegistry.startTracking(
+                    handle,
+                    tasks.stream()
+                            .flatMap(t -> t.getChangeSets().stream())
+                            .map(StateChangeSet::getLogId)
+                            .collect(Collectors.toSet()));
+            if (stream instanceof DuplicatingOutputStreamWithPos) {
+                StreamStateHandle localHandle =
+                        ((DuplicatingOutputStreamWithPos) 
stream).getSecondaryHandle(handleFactory);
+                changelogRegistry.startTracking(
+                        localHandle,
+                        tasks.stream()
+                                .flatMap(t -> t.getChangeSets().stream())
+                                .map(StateChangeSet::getLogId)
+                                .collect(Collectors.toSet()));
+                return new UploadTasksResult(tasksOffsets, handle, 
localHandle);
+            }
+            // WARN: streams have to be closed before returning the results
+            // otherwise JM may receive invalid handles
+            return new UploadTasksResult(tasksOffsets, handle);
+        } catch (IOException e) {
+            metrics.getUploadFailuresCounter().inc();
+            try (Closer closer = Closer.create()) {
+                closer.register(
+                        () -> {
+                            throw e;
+                        });
+                tasks.forEach(cs -> closer.register(() -> cs.fail(e)));
+            }
+        }
+        return null; // closer above throws an exception
+    }
+
+    protected String generateFileName() {
+        return UUID.randomUUID().toString();
+    }
+}
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java
new file mode 100644
index 00000000000..b977bd3de0d
--- /dev/null
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingOutputStreamWithPos.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.function.BiFunction;
+
+/**
+ * A DuplicatingOutputStreamWithPos is similar to {@link 
DuplicatingCheckpointOutputStream} which
+ * wraps a primary and a secondary OutputStream and duplicates all writes into 
both streams. The
+ * difference is that this stream does not delete the file when {@link 
#close()}.
+ */
+class DuplicatingOutputStreamWithPos extends OutputStreamWithPos {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DuplicatingOutputStreamWithPos.class);
+
+    private OutputStream secondaryStream;
+    private OutputStream originalSecondaryStream;
+    private final Path secondaryPath;
+
+    /**
+     * Stores a potential exception that occurred while interacting with 
{@link #secondaryStream}.
+     */
+    private Exception secondaryStreamException;
+
+    public DuplicatingOutputStreamWithPos(
+            OutputStream primaryStream,
+            Path primaryPath,
+            OutputStream secondaryStream,
+            Path secondaryPath) {
+        super(primaryStream, primaryPath);
+        this.secondaryStream = Preconditions.checkNotNull(secondaryStream);
+        this.originalSecondaryStream = 
Preconditions.checkNotNull(secondaryStream);
+        this.secondaryPath = Preconditions.checkNotNull(secondaryPath);
+    }
+
+    @Override
+    public void wrap(boolean compression, int bufferSize) throws IOException {
+        super.wrap(compression, bufferSize);
+        this.secondaryStream = wrapInternal(compression, bufferSize, 
this.originalSecondaryStream);
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        outputStream.write(b);
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.write(b);
+            } catch (Exception ex) {
+                handleSecondaryStreamOnException(ex);
+            }
+        }
+        pos++;
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        outputStream.write(b);
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.write(b);
+            } catch (Exception ex) {
+                LOG.warn("Exception encountered during write to secondary 
stream");
+                handleSecondaryStreamOnException(ex);
+            }
+        }
+        pos += b.length;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        outputStream.write(b, off, len);
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.write(b, off, len);
+            } catch (Exception ex) {
+                LOG.warn("Exception encountered during writing to secondary 
stream");
+                handleSecondaryStreamOnException(ex);
+            }
+        }
+        pos += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        outputStream.flush();
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.flush();
+            } catch (Exception ex) {
+                LOG.warn("Exception encountered during flushing secondary 
stream");
+                handleSecondaryStreamOnException(ex);
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        Exception exCollector = null;
+
+        try {
+            super.close();
+        } catch (Exception closeEx) {
+            exCollector = ExceptionUtils.firstOrSuppressed(closeEx, 
exCollector);
+        }
+
+        if (secondaryStreamException == null) {
+            try {
+                secondaryStream.close();
+                originalSecondaryStream.close();
+            } catch (Exception closeEx) {
+                getSecondaryPath().getFileSystem().delete(getSecondaryPath(), 
true);
+                handleSecondaryStreamOnException(closeEx);
+            }
+        }
+
+        if (exCollector != null) {
+            throw new IOException("Exception while closing duplicating 
stream.", exCollector);
+        }
+    }
+
+    private void handleSecondaryStreamOnException(Exception ex) {
+
+        Preconditions.checkState(
+                secondaryStreamException == null,
+                "Secondary stream already failed from previous exception!");
+        try {
+            secondaryStream.close();
+        } catch (Exception closeEx) {
+            ex = ExceptionUtils.firstOrSuppressed(closeEx, ex);
+        }
+
+        secondaryStreamException = Preconditions.checkNotNull(ex);
+    }
+
+    public Path getSecondaryPath() {
+        return secondaryPath;
+    }
+
+    public StreamStateHandle getSecondaryHandle(
+            BiFunction<Path, Long, StreamStateHandle> handleFactory) throws 
IOException {
+        if (secondaryStreamException == null) {
+            return handleFactory.apply(secondaryPath, this.pos);
+        } else {
+            throw new IOException(
+                    "Secondary stream previously failed exceptionally", 
secondaryStreamException);
+        }
+    }
+}
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
new file mode 100644
index 00000000000..cf99ed17683
--- /dev/null
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.changelog.fs;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
+import org.apache.flink.runtime.state.LocalRecoveryDirectoryProvider;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.flink.changelog.fs.StateChangeFsUploader.PATH_SUB_DIR;
+import static 
org.apache.flink.runtime.state.ChangelogTaskLocalStateStore.getLocalTaskOwnedDirectory;
+
+/**
+ * A StateChangeFsUploader implementation that writes the changes to remote 
and local.
+ * <li>Local dstl files are only managed by TM side, {@link 
LocalChangelogRegistry}, {@link
+ *     TaskChangelogRegistry} and {@link ChangelogTaskLocalStateStore} are 
responsible for managing
+ *     them.
+ * <li>Remote dstl files are managed by TM side and JM side, {@link 
TaskChangelogRegistry} is
+ *     responsible for TM side, and {@link SharedStateRegistry} is responsible 
for JM side.
+ *
+ *     <p>The total discard logic of local dstl files is:
+ *
+ *     <ol>
+ *       <li>Register files to {@link TaskChangelogRegistry#startTracking} on 
{@link #upload}.
+ *       <li>Store the meta of files into {@link ChangelogTaskLocalStateStore} 
by
+ *           AsyncCheckpointRunnable#reportCompletedSnapshotStates().
+ *       <li>Pass control of the file to {@link 
LocalChangelogRegistry#register} when
+ *           ChangelogKeyedStateBackend#notifyCheckpointComplete() , files of 
the previous
+ *           checkpoint will be deleted by {@link 
LocalChangelogRegistry#discardUpToCheckpoint} at
+ *           the same time.
+ *       <li>When ChangelogTruncateHelper#materialized() or
+ *           ChangelogTruncateHelper#checkpointSubsumed() is called, {@link
+ *           TaskChangelogRegistry#notUsed} is responsible for deleting local 
files.
+ *       <li>When one checkpoint is aborted, the dstl files of this checkpoint 
will be deleted by
+ *           {@link LocalChangelogRegistry#prune} in {@link 
FsStateChangelogWriter#reset}.
+ *     </ol>
+ */
+public class DuplicatingStateChangeFsUploader extends 
AbstractStateChangeFsUploader {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DuplicatingStateChangeFsUploader.class);
+
+    private final Path basePath;
+    private final FileSystem fileSystem;
+    private final LocalRecoveryDirectoryProvider 
localRecoveryDirectoryProvider;
+    private final JobID jobID;
+
+    public DuplicatingStateChangeFsUploader(
+            JobID jobID,
+            Path basePath,
+            FileSystem fileSystem,
+            boolean compression,
+            int bufferSize,
+            ChangelogStorageMetricGroup metrics,
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider) {
+        super(compression, bufferSize, metrics, changelogRegistry, 
FileStateHandle::new);
+        this.basePath =
+                new Path(basePath, String.format("%s/%s", jobID.toHexString(), 
PATH_SUB_DIR));
+        this.fileSystem = fileSystem;
+        this.localRecoveryDirectoryProvider = localRecoveryDirectoryProvider;
+        this.jobID = jobID;
+    }
+
+    @Override
+    public OutputStreamWithPos prepareStream() throws IOException {
+        final String fileName = generateFileName();
+        LOG.debug("upload tasks to {}", fileName);
+        Path path = new Path(basePath, fileName);
+        FSDataOutputStream primaryStream = fileSystem.create(path, 
WriteMode.NO_OVERWRITE);
+        Path localPath =
+                new Path(
+                        
getLocalTaskOwnedDirectory(localRecoveryDirectoryProvider, jobID),
+                        fileName);
+        FSDataOutputStream secondaryStream =
+                localPath.getFileSystem().create(localPath, 
WriteMode.NO_OVERWRITE);
+        DuplicatingOutputStreamWithPos outputStream =
+                new DuplicatingOutputStreamWithPos(primaryStream, path, 
secondaryStream, localPath);
+        outputStream.wrap(this.compression, this.bufferSize);
+        return outputStream;
+    }
+
+    @Override
+    public void close() throws Exception {}
+}
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
index de7718521a0..5ba18f701ea 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorage.java
@@ -26,16 +26,22 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
 import java.util.UUID;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.flink.changelog.fs.FsStateChangelogOptions.NUM_DISCARD_THREADS;
@@ -62,26 +68,42 @@ public class FsStateChangelogStorage extends 
FsStateChangelogStorageForRecovery
 
     private final TaskChangelogRegistry changelogRegistry;
 
+    @Nullable private LocalChangelogRegistry localChangelogRegistry = 
LocalChangelogRegistry.NO_OP;
+
+    /** The configuration for local recovery. */
+    @Nonnull private final LocalRecoveryConfig localRecoveryConfig;
+
     public FsStateChangelogStorage(
-            JobID jobID, Configuration config, TaskManagerJobMetricGroup 
metricGroup)
+            JobID jobID,
+            Configuration config,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
-        this(jobID, config, metricGroup, 
defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS)));
+        this(
+                jobID,
+                config,
+                metricGroup,
+                defaultChangelogRegistry(config.get(NUM_DISCARD_THREADS)),
+                localRecoveryConfig);
     }
 
     public FsStateChangelogStorage(
             JobID jobID,
             Configuration config,
             TaskManagerJobMetricGroup metricGroup,
-            TaskChangelogRegistry changelogRegistry)
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         this(
                 fromConfig(
                         jobID,
                         config,
                         new ChangelogStorageMetricGroup(metricGroup),
-                        changelogRegistry),
+                        changelogRegistry,
+                        localRecoveryConfig),
                 config.get(PREEMPTIVE_PERSIST_THRESHOLD).getBytes(),
-                changelogRegistry);
+                changelogRegistry,
+                localRecoveryConfig);
     }
 
     @VisibleForTesting
@@ -91,7 +113,8 @@ public class FsStateChangelogStorage extends 
FsStateChangelogStorageForRecovery
             boolean compression,
             int bufferSize,
             ChangelogStorageMetricGroup metricGroup,
-            TaskChangelogRegistry changelogRegistry)
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         this(
                 directScheduler(
@@ -104,18 +127,25 @@ public class FsStateChangelogStorage extends 
FsStateChangelogStorageForRecovery
                                 metricGroup,
                                 changelogRegistry)),
                 PREEMPTIVE_PERSIST_THRESHOLD.defaultValue().getBytes(),
-                changelogRegistry);
+                changelogRegistry,
+                localRecoveryConfig);
     }
 
     @VisibleForTesting
     public FsStateChangelogStorage(
             StateChangeUploadScheduler uploader,
             long preEmptivePersistThresholdInBytes,
-            TaskChangelogRegistry changelogRegistry) {
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig) {
         super(ChangelogStreamHandleReader.DIRECT_READER);
         this.preEmptivePersistThresholdInBytes = 
preEmptivePersistThresholdInBytes;
         this.changelogRegistry = changelogRegistry;
         this.uploader = uploader;
+        this.localRecoveryConfig = localRecoveryConfig;
+        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+            this.localChangelogRegistry =
+                    new 
LocalChangelogRegistryImpl(Executors.newSingleThreadExecutor());
+        }
     }
 
     @Override
@@ -129,7 +159,9 @@ public class FsStateChangelogStorage extends 
FsStateChangelogStorageForRecovery
                 uploader,
                 preEmptivePersistThresholdInBytes,
                 mailboxExecutor,
-                changelogRegistry);
+                changelogRegistry,
+                localRecoveryConfig,
+                localChangelogRegistry);
     }
 
     @Override
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
index c81931241a5..edb412e3ea4 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogStorageFactory.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
@@ -47,9 +48,12 @@ public class FsStateChangelogStorageFactory implements 
StateChangelogStorageFact
 
     @Override
     public StateChangelogStorage<?> createStorage(
-            JobID jobID, Configuration configuration, 
TaskManagerJobMetricGroup metricGroup)
+            JobID jobID,
+            Configuration configuration,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
-        return new FsStateChangelogStorage(jobID, configuration, metricGroup);
+        return new FsStateChangelogStorage(jobID, configuration, metricGroup, 
localRecoveryConfig);
     }
 
     @Override
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
index e6b5b583885..2481fdcc2e4 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogWriter.java
@@ -22,8 +22,11 @@ import org.apache.flink.api.common.operators.MailboxExecutor;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.SequenceNumberRange;
 import org.apache.flink.runtime.state.changelog.StateChange;
@@ -32,6 +35,7 @@ import 
org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.NotThreadSafe;
 
@@ -139,19 +143,28 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
 
     private final TaskChangelogRegistry changelogRegistry;
 
+    /** The configuration for local recovery. */
+    @Nonnull private final LocalRecoveryConfig localRecoveryConfig;
+
+    private final LocalChangelogRegistry localChangelogRegistry;
+
     FsStateChangelogWriter(
             UUID logId,
             KeyGroupRange keyGroupRange,
             StateChangeUploadScheduler uploader,
             long preEmptivePersistThresholdInBytes,
             MailboxExecutor mailboxExecutor,
-            TaskChangelogRegistry changelogRegistry) {
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig,
+            LocalChangelogRegistry localChangelogRegistry) {
         this.logId = logId;
         this.keyGroupRange = keyGroupRange;
         this.uploader = uploader;
         this.preEmptivePersistThresholdInBytes = 
preEmptivePersistThresholdInBytes;
         this.mailboxExecutor = mailboxExecutor;
         this.changelogRegistry = changelogRegistry;
+        this.localRecoveryConfig = localRecoveryConfig;
+        this.localChangelogRegistry = localChangelogRegistry;
     }
 
     @Override
@@ -185,8 +198,8 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
     }
 
     @Override
-    public CompletableFuture<ChangelogStateHandleStreamImpl> 
persist(SequenceNumber from)
-            throws IOException {
+    public CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
persist(
+            SequenceNumber from) throws IOException {
         LOG.debug(
                 "persist {} starting from sqn {} (incl.), active sqn: {}",
                 logId,
@@ -195,8 +208,8 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         return persistInternal(from);
     }
 
-    private CompletableFuture<ChangelogStateHandleStreamImpl> 
persistInternal(SequenceNumber from)
-            throws IOException {
+    private CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
persistInternal(
+            SequenceNumber from) throws IOException {
         ensureCanPersist(from);
         rollover();
         Map<SequenceNumber, StateChangeSet> toUpload = 
drainTailMap(notUploaded, from);
@@ -206,9 +219,11 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         SequenceNumberRange range = SequenceNumberRange.generic(from, 
activeSequenceNumber);
         if (range.size() == readyToReturn.size()) {
             checkState(toUpload.isEmpty());
-            return 
CompletableFuture.completedFuture(buildHandle(keyGroupRange, readyToReturn, 
0L));
+            return CompletableFuture.completedFuture(
+                    buildSnapshotResult(keyGroupRange, readyToReturn, 0L));
         } else {
-            CompletableFuture<ChangelogStateHandleStreamImpl> future = new 
CompletableFuture<>();
+            CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> 
future =
+                    new CompletableFuture<>();
             uploadCompletionListeners.add(
                     new UploadCompletionListener(keyGroupRange, range, 
readyToReturn, future));
             if (!toUpload.isEmpty()) {
@@ -262,6 +277,9 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
                                 // uploaded already truncated, i.e. 
materialized state changes,
                                 // or closed
                                 
changelogRegistry.notUsed(result.streamStateHandle, logId);
+                                if (result.localStreamHandle != null) {
+                                    
changelogRegistry.notUsed(result.localStreamHandle, logId);
+                                }
                             }
                         }
                     }
@@ -314,7 +332,7 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
     }
 
     @Override
-    public void confirm(SequenceNumber from, SequenceNumber to) {
+    public void confirm(SequenceNumber from, SequenceNumber to, long 
checkpointId) {
         checkState(from.compareTo(to) <= 0, "Invalid confirm range: [%s,%s)", 
from, to);
         checkState(
                 from.compareTo(activeSequenceNumber) <= 0
@@ -329,14 +347,30 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         uploaded.subMap(from, to).values().stream()
                 .map(UploadResult::getStreamStateHandle)
                 .forEach(changelogRegistry::stopTracking);
+
+        // transfer the control of localHandle to localStateRegistry.
+        uploaded.subMap(from, to).values().stream()
+                .map(UploadResult::getLocalStreamHandleStateHandle)
+                .filter(localHandle -> localHandle != null)
+                .forEach(
+                        localHandle -> {
+                            changelogRegistry.stopTracking(localHandle);
+                            localChangelogRegistry.register(localHandle, 
checkpointId);
+                        });
+        localChangelogRegistry.discardUpToCheckpoint(checkpointId);
+    }
+
+    @Override
+    public void subsume(long checkpointId) {
+        localChangelogRegistry.discardUpToCheckpoint(checkpointId);
     }
 
     @Override
-    public void reset(SequenceNumber from, SequenceNumber to) {
-        // do nothing
+    public void reset(SequenceNumber from, SequenceNumber to, long 
checkpointId) {
+        localChangelogRegistry.prune(checkpointId);
     }
 
-    private static ChangelogStateHandleStreamImpl buildHandle(
+    private SnapshotResult<ChangelogStateHandleStreamImpl> buildSnapshotResult(
             KeyGroupRange keyGroupRange,
             NavigableMap<SequenceNumber, UploadResult> results,
             long incrementalSize) {
@@ -346,12 +380,42 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
             tuples.add(Tuple2.of(uploadResult.getStreamStateHandle(), 
uploadResult.getOffset()));
             size += uploadResult.getSize();
         }
-        return new ChangelogStateHandleStreamImpl(
-                tuples,
-                keyGroupRange,
-                size,
-                incrementalSize,
-                FsStateChangelogStorageFactory.IDENTIFIER);
+        ChangelogStateHandleStreamImpl jmChangelogStateHandle =
+                new ChangelogStateHandleStreamImpl(
+                        tuples,
+                        keyGroupRange,
+                        size,
+                        incrementalSize,
+                        FsStateChangelogStorageFactory.IDENTIFIER);
+        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+            size = 0;
+            List<Tuple2<StreamStateHandle, Long>> localTuples = new 
ArrayList<>();
+            for (UploadResult uploadResult : results.values()) {
+                if (uploadResult.getLocalStreamHandleStateHandle() != null) {
+                    localTuples.add(
+                            Tuple2.of(
+                                    
uploadResult.getLocalStreamHandleStateHandle(),
+                                    uploadResult.getLocalOffset()));
+                    size += uploadResult.getSize();
+                }
+            }
+            ChangelogStateHandleStreamImpl localChangelogStateHandle = null;
+            if (localTuples.size() == tuples.size()) {
+                localChangelogStateHandle =
+                        new ChangelogStateHandleStreamImpl(
+                                localTuples,
+                                keyGroupRange,
+                                size,
+                                0L,
+                                FsStateChangelogStorageFactory.IDENTIFIER);
+                return SnapshotResult.withLocalState(
+                        jmChangelogStateHandle, localChangelogStateHandle);
+
+            } else {
+                LOG.warn("local handles are different from remote");
+            }
+        }
+        return SnapshotResult.of(jmChangelogStateHandle);
     }
 
     @VisibleForTesting
@@ -378,9 +442,10 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         }
     }
 
-    private static final class UploadCompletionListener {
+    private final class UploadCompletionListener {
         private final NavigableMap<SequenceNumber, UploadResult> uploaded;
-        private final CompletableFuture<ChangelogStateHandleStreamImpl> 
completionFuture;
+        private final 
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
+                completionFuture;
         private final KeyGroupRange keyGroupRange;
         private final SequenceNumberRange changeRange;
 
@@ -388,7 +453,8 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
                 KeyGroupRange keyGroupRange,
                 SequenceNumberRange changeRange,
                 Map<SequenceNumber, UploadResult> uploaded,
-                CompletableFuture<ChangelogStateHandleStreamImpl> 
completionFuture) {
+                
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>>
+                        completionFuture) {
             checkArgument(
                     !changeRange.isEmpty(), "Empty change range not allowed: 
%s", changeRange);
             this.uploaded = new TreeMap<>(uploaded);
@@ -405,7 +471,7 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
                     incrementalSize += uploadResult.getSize();
                     if (uploaded.size() == changeRange.size()) {
                         completionFuture.complete(
-                                buildHandle(keyGroupRange, uploaded, 
incrementalSize));
+                                buildSnapshotResult(keyGroupRange, uploaded, 
incrementalSize));
                         return true;
                     }
                 }
@@ -440,6 +506,9 @@ class FsStateChangelogWriter implements 
StateChangelogWriter<ChangelogStateHandl
         LOG.trace("Uploaded state to discard: {}", notUsedState);
         for (UploadResult result : notUsedState.values()) {
             changelogRegistry.notUsed(result.streamStateHandle, logId);
+            if (result.localStreamHandle != null) {
+                changelogRegistry.notUsed(result.localStreamHandle, logId);
+            }
         }
     }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java
index b6be3a766a3..89d71f6dde2 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/OutputStreamWithPos.java
@@ -17,15 +17,46 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
+import org.apache.flink.runtime.state.StreamCompressionDecorator;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.util.Preconditions;
+
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.function.BiFunction;
 
 class OutputStreamWithPos extends OutputStream {
-    private final OutputStream outputStream;
-    private long pos;
+    protected final Path path;
+    protected OutputStream outputStream;
+    protected long pos;
+    protected boolean compression;
+    protected final OutputStream originalStream;
+
+    public OutputStreamWithPos(OutputStream outputStream, Path path) {
+        this.outputStream = Preconditions.checkNotNull(outputStream);
+        this.originalStream = Preconditions.checkNotNull(outputStream);
+        this.path = Preconditions.checkNotNull(path);
+        this.pos = 0;
+        this.compression = false;
+    }
+
+    protected OutputStream wrapInternal(boolean compression, int bufferSize, 
OutputStream fsStream)
+            throws IOException {
+        fsStream.write(compression ? 1 : 0);
+        StreamCompressionDecorator instance =
+                compression
+                        ? SnappyStreamCompressionDecorator.INSTANCE
+                        : UncompressedStreamCompressionDecorator.INSTANCE;
+        return new 
BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize);
+    }
 
-    public OutputStreamWithPos(OutputStream outputStream) {
-        this.outputStream = outputStream;
+    public void wrap(boolean compression, int bufferSize) throws IOException {
+        this.compression = compression;
+        this.outputStream = wrapInternal(compression, bufferSize, 
this.originalStream);
     }
 
     @Override
@@ -53,10 +84,23 @@ class OutputStreamWithPos extends OutputStream {
 
     @Override
     public void close() throws IOException {
-        outputStream.close();
+        try {
+            outputStream.close();
+            originalStream.close();
+        } catch (IOException e) {
+            getPath().getFileSystem().delete(getPath(), true);
+        }
     }
 
     public long getPos() {
         return pos;
     }
+
+    public Path getPath() {
+        return path;
+    }
+
+    public StreamStateHandle getHandle(BiFunction<Path, Long, 
StreamStateHandle> handleFactory) {
+        return handleFactory.apply(path, this.pos);
+    }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
index 7a672da4ba4..58d142305cd 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFormat.java
@@ -18,6 +18,7 @@
 package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.util.CloseableIterator;
@@ -45,17 +46,18 @@ import static java.util.Comparator.comparing;
 public class StateChangeFormat {
     private static final Logger LOG = 
LoggerFactory.getLogger(StateChangeFormat.class);
 
-    Map<StateChangeSet, Long> write(OutputStreamWithPos os, 
Collection<StateChangeSet> changeSets)
-            throws IOException {
+    Map<StateChangeSet, Tuple2<Long, Long>> write(
+            OutputStreamWithPos os, Collection<StateChangeSet> changeSets) 
throws IOException {
         List<StateChangeSet> sorted = new ArrayList<>(changeSets);
         // using sorting instead of bucketing for simplicity
         sorted.sort(
                 comparing(StateChangeSet::getLogId)
                         .thenComparing(StateChangeSet::getSequenceNumber));
         DataOutputViewStreamWrapper dataOutput = new 
DataOutputViewStreamWrapper(os);
-        Map<StateChangeSet, Long> pendingResults = new HashMap<>();
+        Map<StateChangeSet, Tuple2<Long, Long>> pendingResults = new 
HashMap<>();
         for (StateChangeSet changeSet : sorted) {
-            pendingResults.put(changeSet, os.getPos());
+            long pos = os.getPos();
+            pendingResults.put(changeSet, Tuple2.of(pos, pos));
             writeChangeSet(dataOutput, changeSet.getChanges());
         }
         return pendingResults;
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
index 8a37a07f460..e9e9662c65f 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java
@@ -19,52 +19,29 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
-import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
-import org.apache.flink.util.clock.Clock;
-import org.apache.flink.util.clock.SystemClock;
-
-import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
 import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE;
 
 /**
  * A synchronous {@link StateChangeUploadScheduler} implementation that 
uploads the changes using
  * {@link FileSystem}.
  */
-public class StateChangeFsUploader implements StateChangeUploader {
+public class StateChangeFsUploader extends AbstractStateChangeFsUploader {
     private static final Logger LOG = 
LoggerFactory.getLogger(StateChangeFsUploader.class);
 
     @VisibleForTesting public static final String PATH_SUB_DIR = "dstl";
 
     private final Path basePath;
     private final FileSystem fileSystem;
-    private final StateChangeFormat format;
-    private final boolean compression;
-    private final int bufferSize;
-    private final ChangelogStorageMetricGroup metrics;
-    private final Clock clock;
-    private final TaskChangelogRegistry changelogRegistry;
-    private final BiFunction<Path, Long, StreamStateHandle> handleFactory;
 
     @VisibleForTesting
     public StateChangeFsUploader(
@@ -95,16 +72,10 @@ public class StateChangeFsUploader implements 
StateChangeUploader {
             ChangelogStorageMetricGroup metrics,
             TaskChangelogRegistry changelogRegistry,
             BiFunction<Path, Long, StreamStateHandle> handleFactory) {
+        super(compression, bufferSize, metrics, changelogRegistry, 
handleFactory);
         this.basePath =
                 new Path(basePath, String.format("%s/%s", jobID.toHexString(), 
PATH_SUB_DIR));
         this.fileSystem = fileSystem;
-        this.format = new StateChangeFormat();
-        this.compression = compression;
-        this.bufferSize = bufferSize;
-        this.metrics = metrics;
-        this.clock = SystemClock.getInstance();
-        this.changelogRegistry = changelogRegistry;
-        this.handleFactory = handleFactory;
     }
 
     @VisibleForTesting
@@ -112,70 +83,15 @@ public class StateChangeFsUploader implements 
StateChangeUploader {
         return this.basePath;
     }
 
-    public UploadTasksResult upload(Collection<UploadTask> tasks) throws 
IOException {
+    @Override
+    public OutputStreamWithPos prepareStream() throws IOException {
         final String fileName = generateFileName();
-        LOG.debug("upload {} tasks to {}", tasks.size(), fileName);
+        LOG.debug("upload tasks to {}", fileName);
         Path path = new Path(basePath, fileName);
-
-        try {
-            return uploadWithMetrics(path, tasks);
-        } catch (IOException e) {
-            metrics.getUploadFailuresCounter().inc();
-            try (Closer closer = Closer.create()) {
-                closer.register(
-                        () -> {
-                            throw e;
-                        });
-                tasks.forEach(cs -> closer.register(() -> cs.fail(e)));
-                closer.register(() -> fileSystem.delete(path, true));
-            }
-        }
-        return null; // closer above throws an exception
-    }
-
-    private UploadTasksResult uploadWithMetrics(Path path, 
Collection<UploadTask> tasks)
-            throws IOException {
-        metrics.getUploadsCounter().inc();
-        long start = clock.relativeTimeNanos();
-        UploadTasksResult result = upload(path, tasks);
-        metrics.getUploadLatenciesNanos().update(clock.relativeTimeNanos() - 
start);
-        metrics.getUploadSizes().update(result.getStateSize());
-        return result;
-    }
-
-    private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) 
throws IOException {
-        try (FSDataOutputStream fsStream = fileSystem.create(path, 
NO_OVERWRITE)) {
-            fsStream.write(compression ? 1 : 0);
-            try (OutputStreamWithPos stream = wrap(fsStream)) {
-                final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets 
= new HashMap<>();
-                for (UploadTask task : tasks) {
-                    tasksOffsets.put(task, format.write(stream, 
task.changeSets));
-                }
-                StreamStateHandle handle = handleFactory.apply(path, 
stream.getPos());
-                changelogRegistry.startTracking(
-                        handle,
-                        tasks.stream()
-                                .flatMap(t -> t.getChangeSets().stream())
-                                .map(StateChangeSet::getLogId)
-                                .collect(Collectors.toSet()));
-                // WARN: streams have to be closed before returning the results
-                // otherwise JM may receive invalid handles
-                return new UploadTasksResult(tasksOffsets, handle);
-            }
-        }
-    }
-
-    private OutputStreamWithPos wrap(FSDataOutputStream fsStream) throws 
IOException {
-        StreamCompressionDecorator instance =
-                compression
-                        ? SnappyStreamCompressionDecorator.INSTANCE
-                        : UncompressedStreamCompressionDecorator.INSTANCE;
-        return new OutputStreamWithPos(
-                new 
BufferedOutputStream(instance.decorateWithCompression(fsStream), bufferSize));
-    }
-
-    private String generateFileName() {
-        return UUID.randomUUID().toString();
+        OutputStreamWithPos outputStream =
+                new OutputStreamWithPos(fileSystem.create(path, 
WriteMode.NO_OVERWRITE), path);
+        outputStream.wrap(this.compression, this.bufferSize);
+        return outputStream;
     }
 
     @Override
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
index 6e3b31a8ab3..19f09a151e6 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.io.AvailabilityProvider;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -88,21 +89,34 @@ public interface StateChangeUploadScheduler extends 
AutoCloseable {
             JobID jobID,
             ReadableConfig config,
             ChangelogStorageMetricGroup metricGroup,
-            TaskChangelogRegistry changelogRegistry)
+            TaskChangelogRegistry changelogRegistry,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         Path basePath = new Path(config.get(BASE_PATH));
         long bytes = config.get(UPLOAD_BUFFER_SIZE).getBytes();
         checkArgument(bytes <= Integer.MAX_VALUE);
         int bufferSize = (int) bytes;
-        StateChangeFsUploader store =
-                new StateChangeFsUploader(
-                        jobID,
-                        basePath,
-                        basePath.getFileSystem(),
-                        config.get(COMPRESSION_ENABLED),
-                        bufferSize,
-                        metricGroup,
-                        changelogRegistry);
+        StateChangeUploader store =
+                localRecoveryConfig.isLocalRecoveryEnabled()
+                        ? new DuplicatingStateChangeFsUploader(
+                                jobID,
+                                basePath,
+                                basePath.getFileSystem(),
+                                config.get(COMPRESSION_ENABLED),
+                                bufferSize,
+                                metricGroup,
+                                changelogRegistry,
+                                localRecoveryConfig
+                                        .getLocalStateDirectoryProvider()
+                                        
.orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled()))
+                        : new StateChangeFsUploader(
+                                jobID,
+                                basePath,
+                                basePath.getFileSystem(),
+                                config.get(COMPRESSION_ENABLED),
+                                bufferSize,
+                                metricGroup,
+                                changelogRegistry);
         BatchingStateChangeUploadScheduler batchingStore =
                 new BatchingStateChangeUploadScheduler(
                         config.get(PERSIST_DELAY).toMillis(),
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
index 1f18fe25d8b..537aa7947e5 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
@@ -19,10 +19,13 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -46,27 +49,45 @@ public interface StateChangeUploader extends AutoCloseable {
 
     /** Result of executing one or more {@link UploadTask upload tasks}. */
     final class UploadTasksResult {
-        private final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets;
+        private final Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> 
tasksOffsets;
         private final StreamStateHandle handle;
+        private final StreamStateHandle localHandle;
+
+        public UploadTasksResult(
+                Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> 
tasksOffsets,
+                StreamStateHandle handle) {
+            this(tasksOffsets, handle, null);
+        }
 
         public UploadTasksResult(
-                Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets, 
StreamStateHandle handle) {
+                Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> 
tasksOffsets,
+                StreamStateHandle handle,
+                @Nullable StreamStateHandle localHandle) {
             this.tasksOffsets = unmodifiableMap(tasksOffsets);
             this.handle = Preconditions.checkNotNull(handle);
+            this.localHandle = localHandle;
         }
 
         public void complete() {
-            for (Map.Entry<UploadTask, Map<StateChangeSet, Long>> entry : 
tasksOffsets.entrySet()) {
+            for (Map.Entry<UploadTask, Map<StateChangeSet, Tuple2<Long, 
Long>>> entry :
+                    tasksOffsets.entrySet()) {
                 UploadTask task = entry.getKey();
-                Map<StateChangeSet, Long> offsets = entry.getValue();
+                Map<StateChangeSet, Tuple2<Long, Long>> offsets = 
entry.getValue();
                 task.complete(buildResults(handle, offsets));
             }
         }
 
         private List<UploadResult> buildResults(
-                StreamStateHandle handle, Map<StateChangeSet, Long> offsets) {
+                StreamStateHandle handle, Map<StateChangeSet, Tuple2<Long, 
Long>> offsets) {
             return offsets.entrySet().stream()
-                    .map(e -> UploadResult.of(handle, e.getKey(), 
e.getValue()))
+                    .map(
+                            e ->
+                                    UploadResult.of(
+                                            handle,
+                                            localHandle,
+                                            e.getKey(),
+                                            e.getValue().f0,
+                                            e.getValue().f1))
                     .collect(toList());
         }
 
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java
index b2caa5c3bd9..385c502b076 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/UploadResult.java
@@ -21,13 +21,17 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 
+import javax.annotation.Nullable;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Result of uploading state changes. */
 @Internal
 public final class UploadResult {
     public final StreamStateHandle streamStateHandle;
+    @Nullable public final StreamStateHandle localStreamHandle;
     public final long offset;
+    public final long localOffset;
     public final SequenceNumber sequenceNumber;
     public final long size;
 
@@ -36,24 +40,55 @@ public final class UploadResult {
             long offset,
             SequenceNumber sequenceNumber,
             long size) {
+        this(streamStateHandle, null, offset, offset, sequenceNumber, size);
+    }
+
+    public UploadResult(
+            StreamStateHandle streamStateHandle,
+            @Nullable StreamStateHandle localStreamHandle,
+            long offset,
+            long localOffset,
+            SequenceNumber sequenceNumber,
+            long size) {
         this.streamStateHandle = checkNotNull(streamStateHandle);
+        this.localStreamHandle = localStreamHandle;
         this.offset = offset;
+        this.localOffset = localOffset;
         this.sequenceNumber = checkNotNull(sequenceNumber);
         this.size = size;
     }
 
-    public static UploadResult of(StreamStateHandle handle, StateChangeSet 
changeSet, long offset) {
-        return new UploadResult(handle, offset, changeSet.getSequenceNumber(), 
changeSet.getSize());
+    public static UploadResult of(
+            StreamStateHandle handle,
+            StreamStateHandle localHandle,
+            StateChangeSet changeSet,
+            long offset,
+            long localOffset) {
+        return new UploadResult(
+                handle,
+                localHandle,
+                offset,
+                localOffset,
+                changeSet.getSequenceNumber(),
+                changeSet.getSize());
     }
 
     public StreamStateHandle getStreamStateHandle() {
         return streamStateHandle;
     }
 
+    public StreamStateHandle getLocalStreamHandleStateHandle() {
+        return localStreamHandle;
+    }
+
     public long getOffset() {
         return offset;
     }
 
+    public long getLocalOffset() {
+        return localOffset;
+    }
+
     public SequenceNumber getSequenceNumber() {
         return sequenceNumber;
     }
@@ -66,6 +101,8 @@ public final class UploadResult {
     public String toString() {
         return "streamStateHandle="
                 + streamStateHandle
+                + "localStreamHandle"
+                + localStreamHandle
                 + ", size="
                 + size
                 + ", offset="
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index 21e6e932f8c..d70c3f63082 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -462,13 +462,13 @@ class BatchingStateChangeUploadSchedulerTest {
             return uploadsCounter.get();
         }
 
-        private Map<UploadTask, Map<StateChangeSet, Long>> withOffsets(
+        private Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> 
withOffsets(
                 Collection<UploadTask> tasks) {
             return tasks.stream().collect(toMap(identity(), 
this::withOffsets));
         }
 
-        private Map<StateChangeSet, Long> withOffsets(UploadTask task) {
-            return task.changeSets.stream().collect(toMap(identity(), ign -> 
0L));
+        private Map<StateChangeSet, Tuple2<Long, Long>> withOffsets(UploadTask 
task) {
+            return task.changeSets.stream().collect(toMap(identity(), ign -> 
Tuple2.of(0L, 0L)));
         }
     }
 
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
index 25acff4f606..c86276d0ad7 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java
@@ -18,6 +18,7 @@ package org.apache.flink.changelog.fs;
  */
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
@@ -26,6 +27,7 @@ import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
 
@@ -69,7 +71,8 @@ public class ChangelogStorageMetricsTest {
                         false,
                         100,
                         metrics,
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled())) {
             FsStateChangelogWriter writer = createWriter(storage);
             int numUploads = 5;
             for (int i = 0; i < numUploads; i++) {
@@ -94,7 +97,8 @@ public class ChangelogStorageMetricsTest {
                         false,
                         100,
                         metrics,
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled())) {
             FsStateChangelogWriter writer = createWriter(storage);
 
             // upload single byte to infer header size
@@ -127,7 +131,8 @@ public class ChangelogStorageMetricsTest {
                         false,
                         100,
                         metrics,
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled())) {
             FsStateChangelogWriter writer = createWriter(storage);
 
             int numUploads = 5;
@@ -178,7 +183,10 @@ public class ChangelogStorageMetricsTest {
 
         FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
-                        batcher, Integer.MAX_VALUE, 
TaskChangelogRegistry.NO_OP);
+                        batcher,
+                        Integer.MAX_VALUE,
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled());
         FsStateChangelogWriter[] writers = new 
FsStateChangelogWriter[numWriters];
         for (int i = 0; i < numWriters; i++) {
             writers[i] =
@@ -230,7 +238,10 @@ public class ChangelogStorageMetricsTest {
 
         FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
-                        batcher, Integer.MAX_VALUE, 
TaskChangelogRegistry.NO_OP);
+                        batcher,
+                        Integer.MAX_VALUE,
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled());
         FsStateChangelogWriter writer = createWriter(storage);
 
         try {
@@ -272,7 +283,10 @@ public class ChangelogStorageMetricsTest {
 
         FsStateChangelogStorage storage =
                 new FsStateChangelogStorage(
-                        batcher, Integer.MAX_VALUE, 
TaskChangelogRegistry.NO_OP);
+                        batcher,
+                        Integer.MAX_VALUE,
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled());
         FsStateChangelogWriter writer = createWriter(storage);
 
         try {
@@ -335,7 +349,11 @@ public class ChangelogStorageMetricsTest {
                                 metrics.getTotalAttemptsPerUpload()),
                         metrics);
         try (FsStateChangelogStorage storage =
-                new FsStateChangelogStorage(batcher, Long.MAX_VALUE, 
TaskChangelogRegistry.NO_OP)) {
+                new FsStateChangelogStorage(
+                        batcher,
+                        Long.MAX_VALUE,
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled())) {
             FsStateChangelogWriter writer = createWriter(storage);
             int numUploads = 11;
             for (int i = 0; i < numUploads; i++) {
@@ -360,7 +378,7 @@ public class ChangelogStorageMetricsTest {
 
         @Override
         public UploadTasksResult upload(Collection<UploadTask> tasks) throws 
IOException {
-            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+            Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> map = new 
HashMap<>();
             for (UploadTask uploadTask : tasks) {
                 int currentAttempt = 1 + 
attemptsPerTask.getOrDefault(uploadTask, 0);
                 if (currentAttempt == maxAttempts) {
@@ -368,7 +386,10 @@ public class ChangelogStorageMetricsTest {
                     map.put(
                             uploadTask,
                             uploadTask.changeSets.stream()
-                                    
.collect(Collectors.toMap(Function.identity(), ign -> 0L)));
+                                    .collect(
+                                            Collectors.toMap(
+                                                    Function.identity(),
+                                                    ign -> Tuple2.of(0L, 
0L))));
                 } else {
                     attemptsPerTask.put(uploadTask, currentAttempt);
                     throw new IOException();
@@ -420,12 +441,14 @@ public class ChangelogStorageMetricsTest {
                 }
             }
 
-            Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>();
+            Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> map = new 
HashMap<>();
             for (UploadTask uploadTask : tasks) {
                 map.put(
                         uploadTask,
                         uploadTask.changeSets.stream()
-                                .collect(Collectors.toMap(Function.identity(), 
ign -> 0L)));
+                                .collect(
+                                        Collectors.toMap(
+                                                Function.identity(), ign -> 
Tuple2.of(0L, 0L))));
             }
             return new UploadTasksResult(map, new EmptyStreamStateHandle());
         }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
index 01b45762f5c..03fc8a9b3e4 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogStorageTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.changelog.fs.BatchingStateChangeUploadSchedulerTest.Bloc
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -57,7 +58,8 @@ public class FsStateChangelogStorageTest
                 compression,
                 1024 * 1024 * 10,
                 createUnregisteredChangelogStorageMetricGroup(),
-                TaskChangelogRegistry.NO_OP);
+                TaskChangelogRegistry.NO_OP,
+                TestLocalRecoveryConfig.disabled());
     }
 
     /**
@@ -103,7 +105,8 @@ public class FsStateChangelogStorageTest
                         new FsStateChangelogStorage(
                                         scheduler,
                                         0,
-                                        TaskChangelogRegistry.NO_OP /* persist 
immediately */)
+                                        TaskChangelogRegistry.NO_OP, /* 
persist immediately */
+                                        TestLocalRecoveryConfig.disabled())
                                 .createWriter(
                                         new OperatorID().toString(),
                                         KeyGroupRange.of(0, 0),
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
index 926f188bb5c..4ea8f665002 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterSqnTest.java
@@ -19,6 +19,8 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
 import org.apache.flink.util.function.ThrowingConsumer;
@@ -80,7 +82,9 @@ public class FsStateChangelogWriterSqnTest {
                                 new TestingStateChangeUploader()),
                         Long.MAX_VALUE,
                         new SyncMailboxExecutor(),
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled(),
+                        LocalChangelogRegistry.NO_OP)) {
             if (writerSqnTestSettings.withAppend) {
                 append(writer);
             }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
index b9e6227cf1d..d2a3aa61f79 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/FsStateChangelogWriterTest.java
@@ -19,7 +19,10 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
+import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.util.function.BiConsumerWithException;
 
@@ -72,12 +75,15 @@ class FsStateChangelogWriterTest {
         withWriter(
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
-                    CompletableFuture<ChangelogStateHandleStreamImpl> future =
+                    
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
                             writer.persist(append(writer, bytes));
                     assertSubmittedOnly(uploader, bytes);
                     uploader.completeUpload();
                     assertThat(
-                                    
getOnlyElement(future.get().getHandlesAndOffsets())
+                                    getOnlyElement(
+                                                    future.get()
+                                                            
.getJobManagerOwnedSnapshot()
+                                                            
.getHandlesAndOffsets())
                                             .f0
                                             .asBytesIfInMemory()
                                             .get())
@@ -94,7 +100,7 @@ class FsStateChangelogWriterTest {
                     writer.persist(sqn);
                     uploader.completeUpload();
                     uploader.reset();
-                    writer.confirm(sqn, writer.nextSequenceNumber());
+                    writer.confirm(sqn, writer.nextSequenceNumber(), 1L);
                     writer.persist(sqn);
                     assertNoUpload(uploader, "confirmed changes shouldn't be 
re-uploaded");
                 });
@@ -134,7 +140,7 @@ class FsStateChangelogWriterTest {
                 (writer, uploader) -> {
                     byte[] bytes = getBytes();
                     SequenceNumber sqn = append(writer, bytes);
-                    writer.reset(sqn, SequenceNumber.of(Long.MAX_VALUE));
+                    writer.reset(sqn, SequenceNumber.of(Long.MAX_VALUE), 
Long.MAX_VALUE);
                     uploader.reset();
                     writer.persist(sqn);
                     assertSubmittedOnly(uploader, bytes);
@@ -149,7 +155,9 @@ class FsStateChangelogWriterTest {
                                         (writer, uploader) -> {
                                             byte[] bytes = getBytes();
                                             SequenceNumber sqn = 
append(writer, bytes);
-                                            
CompletableFuture<ChangelogStateHandleStreamImpl>
+                                            CompletableFuture<
+                                                            SnapshotResult<
+                                                                    
ChangelogStateHandleStreamImpl>>
                                                     future = 
writer.persist(sqn);
                                             uploader.failUpload(new 
RuntimeException("test"));
                                             try {
@@ -186,7 +194,8 @@ class FsStateChangelogWriterTest {
                     uploader.failUpload(new RuntimeException("test"));
                     uploader.reset();
                     SequenceNumber sqn2 = append(writer, bytes);
-                    CompletableFuture<ChangelogStateHandleStreamImpl> future = 
writer.persist(sqn2);
+                    
CompletableFuture<SnapshotResult<ChangelogStateHandleStreamImpl>> future =
+                            writer.persist(sqn2);
                     uploader.completeUpload();
                     future.get();
                 });
@@ -225,7 +234,9 @@ class FsStateChangelogWriterTest {
                         StateChangeUploadScheduler.directScheduler(uploader),
                         appendPersistThreshold,
                         new SyncMailboxExecutor(),
-                        TaskChangelogRegistry.NO_OP)) {
+                        TaskChangelogRegistry.NO_OP,
+                        TestLocalRecoveryConfig.disabled(),
+                        LocalChangelogRegistry.NO_OP)) {
             test.accept(writer, uploader);
         }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
index a814589d45f..23b2b6fd42a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -36,6 +37,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -44,6 +46,7 @@ import java.util.concurrent.Executor;
 import java.util.function.LongPredicate;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** Changelog's implementation of a {@link TaskLocalStateStore}. */
@@ -97,6 +100,21 @@ public class ChangelogTaskLocalStateStore extends 
TaskLocalStateStoreImpl {
         }
     }
 
+    public static Path getLocalTaskOwnedDirectory(
+            LocalRecoveryDirectoryProvider provider, JobID jobID) {
+        File outDir =
+                provider.selectAllocationBaseDirectory(
+                        (jobID.hashCode() & Integer.MAX_VALUE)
+                                % provider.allocationBaseDirsCount());
+        if (!outDir.exists() && !outDir.mkdirs()) {
+            LOG.error(
+                    "Local state base directory does not exist and could not 
be created: "
+                            + outDir);
+        }
+        return new Path(
+                String.format("%s/jid_%s", outDir.toURI(), jobID), 
CHECKPOINT_TASK_OWNED_STATE_DIR);
+    }
+
     @Override
     public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot 
localState) {
         if (checkpointId < lastCheckpointId) {
@@ -137,13 +155,13 @@ public class ChangelogTaskLocalStateStore extends 
TaskLocalStateStoreImpl {
 
         discardExecutor.execute(
                 () ->
-                        syncDiscardDirectoryForCollection(
+                        syncDiscardFileForCollection(
                                 materializationToRemove.stream()
                                         .map(super::getCheckpointDirectory)
                                         .collect(Collectors.toList())));
     }
 
-    private void syncDiscardDirectoryForCollection(Collection<File> toDiscard) 
{
+    private void syncDiscardFileForCollection(Collection<File> toDiscard) {
         for (File directory : toDiscard) {
             if (directory.exists()) {
                 try {
@@ -181,6 +199,17 @@ public class ChangelogTaskLocalStateStore extends 
TaskLocalStateStoreImpl {
     @Override
     public CompletableFuture<Void> dispose() {
         deleteMaterialization(id -> true);
+        // delete all ChangelogStateHandle in taskowned directory.
+        discardExecutor.execute(
+                () ->
+                        syncDiscardFileForCollection(
+                                Collections.singleton(
+                                        new File(
+                                                getLocalTaskOwnedDirectory(
+                                                                
getLocalRecoveryDirectoryProvider(),
+                                                                jobID)
+                                                        .toUri()))));
+
         synchronized (lock) {
             mapToMaterializationId.clear();
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
index 59c3c090bf3..b0d928f7751 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManager.java
@@ -89,7 +89,8 @@ public class TaskExecutorStateChangelogStoragesManager {
     public StateChangelogStorage<?> stateChangelogStorageForJob(
             @Nonnull JobID jobId,
             Configuration configuration,
-            TaskManagerJobMetricGroup metricGroup)
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         synchronized (lock) {
             if (closed) {
@@ -103,7 +104,8 @@ public class TaskExecutorStateChangelogStoragesManager {
 
             if (stateChangelogStorage == null) {
                 StateChangelogStorage<?> loaded =
-                        StateChangelogStorageLoader.load(jobId, configuration, 
metricGroup);
+                        StateChangelogStorageLoader.load(
+                                jobId, configuration, metricGroup, 
localRecoveryConfig);
                 stateChangelogStorage = Optional.ofNullable(loaded);
                 changelogStoragesByJobId.put(jobId, stateChangelogStorage);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
new file mode 100644
index 00000000000..a2ffb66b84d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistry.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+/** This registry is responsible for deleting changlog's local handles which 
are not in use. */
+@Internal
+public interface LocalChangelogRegistry {
+    LocalChangelogRegistry NO_OP =
+            new LocalChangelogRegistry() {
+                @Override
+                public void register(StreamStateHandle handle, long 
checkpointID) {}
+
+                @Override
+                public void discardUpToCheckpoint(long upTo) {}
+
+                @Override
+                public void prune(long checkpointID) {}
+            };
+
+    /**
+     * Called upon ChangelogKeyedStateBackend#notifyCheckpointComplete.
+     *
+     * @param handle handle to register.
+     * @param checkpointID latest used checkpointID.
+     */
+    void register(StreamStateHandle handle, long checkpointID);
+
+    /**
+     * Called upon ChangelogKeyedStateBackend#notifyCheckpointComplete and
+     * ChangelogKeyedStateBackend#notifyCheckpointSubsumed. Remote dstl 
handles are unregistered
+     * when {@link CompletedCheckpointStore#addCheckpointAndSubsumeOldestOne}, 
local dtsl handles
+     * are unregistered when the checkpoint completes, because only one 
checkpoint is kept for local
+     * recovery.
+     *
+     * @param upTo lowest CheckpointID which is still valid.
+     */
+    void discardUpToCheckpoint(long upTo);
+
+    /**
+     * Called upon ChangelogKeyedStateBackend#notifyCheckpointAborted.
+     *
+     * @param checkpointID to abort
+     */
+    void prune(long checkpointID);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
new file mode 100644
index 00000000000..acb8ae318ca
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryImpl.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.stream.Collectors;
+
+@Internal
+public class LocalChangelogRegistryImpl implements LocalChangelogRegistry {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LocalChangelogRegistry.class);
+    /**
+     * All registered handles. (PhysicalStateHandleID, (handle, checkpointID)) 
represents a handle
+     * and the latest checkpoint that refer to this handle.
+     */
+    private final Map<PhysicalStateHandleID, Tuple2<StreamStateHandle, Long>>
+            handleToLastUsedCheckpointID = new ConcurrentHashMap<>();
+
+    /** Executor for async state deletion. */
+    private final Executor asyncDisposalExecutor;
+
+    public LocalChangelogRegistryImpl(Executor ioExecutor) {
+        this.asyncDisposalExecutor = ioExecutor;
+    }
+
+    public void register(StreamStateHandle handle, long checkpointID) {
+        handleToLastUsedCheckpointID.compute(
+                handle.getStreamStateHandleID(),
+                (k, v) -> {
+                    if (v == null) {
+                        return Tuple2.of(handle, checkpointID);
+                    } else {
+                        Preconditions.checkState(handle.equals(v.f0));
+                        return Tuple2.of(handle, Math.max(v.f1, checkpointID));
+                    }
+                });
+    }
+
+    public void discardUpToCheckpoint(long upTo) {
+        List<StreamStateHandle> handles = new ArrayList<>();
+        synchronized (handleToLastUsedCheckpointID) {
+            Iterator<Tuple2<StreamStateHandle, Long>> iterator =
+                    handleToLastUsedCheckpointID.values().iterator();
+            while (iterator.hasNext()) {
+                Tuple2<StreamStateHandle, Long> entry = iterator.next();
+                if (entry.f1 < upTo) {
+                    handles.add(entry.f0);
+                    iterator.remove();
+                }
+            }
+        }
+        for (StreamStateHandle handle : handles) {
+            scheduleAsyncDelete(handle);
+        }
+    }
+
+    public void prune(long checkpointID) {
+        Set<StreamStateHandle> handles =
+                handleToLastUsedCheckpointID.values().stream()
+                        .filter(tuple -> tuple.f1 == checkpointID)
+                        .map(tuple -> tuple.f0)
+                        .collect(Collectors.toSet());
+        for (StreamStateHandle handle : handles) {
+            scheduleAsyncDelete(handle);
+        }
+    }
+
+    private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
+        if (streamStateHandle != null) {
+            LOG.trace("Scheduled delete of state handle {}.", 
streamStateHandle);
+            Runnable discardRunner =
+                    () -> {
+                        try {
+                            streamStateHandle.discardState();
+                        } catch (Exception exception) {
+                            LOG.warn(
+                                    "A problem occurred during asynchronous 
disposal of a stream handle {}.",
+                                    streamStateHandle);
+                        }
+                    };
+            try {
+                asyncDisposalExecutor.execute(discardRunner);
+            } catch (RejectedExecutionException ex) {
+                discardRunner.run();
+            }
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
index be86a416e6c..37d9091ea30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 
 import java.io.IOException;
 
@@ -36,7 +37,10 @@ public interface StateChangelogStorageFactory {
 
     /** Create the storage based on a configuration. */
     StateChangelogStorage<?> createStorage(
-            JobID jobID, Configuration configuration, 
TaskManagerJobMetricGroup metricGroup)
+            JobID jobID,
+            Configuration configuration,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException;
 
     /** Create the storage for recovery. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
index 622c09f9956..ee7d39f8a34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogStorageLoader.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
@@ -87,7 +88,10 @@ public class StateChangelogStorageLoader {
 
     @Nullable
     public static StateChangelogStorage<?> load(
-            JobID jobID, Configuration configuration, 
TaskManagerJobMetricGroup metricGroup)
+            JobID jobID,
+            Configuration configuration,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig)
             throws IOException {
         final String identifier =
                 configuration
@@ -100,7 +104,7 @@ public class StateChangelogStorageLoader {
             return null;
         } else {
             LOG.info("Creating a changelog storage with name '{}'.", 
identifier);
-            return factory.createStorage(jobID, configuration, metricGroup);
+            return factory.createStorage(jobID, configuration, metricGroup, 
localRecoveryConfig);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
index 0ee4f615a31..756528139d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/StateChangelogWriter.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.state.changelog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.SnapshotResult;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
@@ -41,13 +42,13 @@ public interface StateChangelogWriter<Handle extends 
ChangelogStateHandle> exten
     /**
      * Durably persist previously {@link #append(int, byte[]) appended} data 
starting from the
      * provided {@link SequenceNumber} and up to the latest change added. 
After this call, one of
-     * {@link #confirm(SequenceNumber, SequenceNumber) confirm}, {@link 
#reset(SequenceNumber,
-     * SequenceNumber) reset}, or {@link #truncate(SequenceNumber) truncate} 
eventually must be
-     * called for the corresponding change set. with reset/truncate/confirm 
methods?
+     * {@link #confirm(SequenceNumber, SequenceNumber, long) confirm}, {@link 
#reset(SequenceNumber,
+     * SequenceNumber, long) reset}, or {@link #truncate(SequenceNumber) 
truncate} eventually must
+     * be called for the corresponding change set. with reset/truncate/confirm 
methods?
      *
      * @param from inclusive
      */
-    CompletableFuture<Handle> persist(SequenceNumber from) throws IOException;
+    CompletableFuture<SnapshotResult<Handle>> persist(SequenceNumber from) 
throws IOException;
 
     /**
      * Truncate this state changelog to free up the resources and collect any 
garbage. That means:
@@ -71,14 +72,22 @@ public interface StateChangelogWriter<Handle extends 
ChangelogStateHandle> exten
      *
      * @param from inclusive
      * @param to exclusive
+     * @param checkpointId to confirm
      */
-    void confirm(SequenceNumber from, SequenceNumber to);
+    void confirm(SequenceNumber from, SequenceNumber to, long checkpointId);
+
+    /**
+     * Mark the state changes of the given checkpoint as subsumed.
+     *
+     * @param checkpointId
+     */
+    void subsume(long checkpointId);
 
     /**
      * Reset the state the given state changes. Called upon abortion so that 
if requested later then
      * these changes will be re-uploaded.
      */
-    void reset(SequenceNumber from, SequenceNumber to);
+    void reset(SequenceNumber from, SequenceNumber to, long checkpointId);
 
     /**
      * Truncate the tail of log and close it. No new appends will be possible. 
Any appended but not
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
index 786a6a7be29..39ca25fde79 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogStorageFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog.inmemory;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
@@ -36,7 +37,10 @@ public class InMemoryStateChangelogStorageFactory implements 
StateChangelogStora
 
     @Override
     public StateChangelogStorage<?> createStorage(
-            JobID jobID, Configuration configuration, 
TaskManagerJobMetricGroup metricGroup) {
+            JobID jobID,
+            Configuration configuration,
+            TaskManagerJobMetricGroup metricGroup,
+            LocalRecoveryConfig localRecoveryConfig) {
         return new InMemoryStateChangelogStorage();
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
index 3bd0a969d83..eb07ba47716 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.changelog.inmemory;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -75,11 +76,14 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
     }
 
     @Override
-    public CompletableFuture<InMemoryChangelogStateHandle> 
persist(SequenceNumber from) {
+    public CompletableFuture<SnapshotResult<InMemoryChangelogStateHandle>> 
persist(
+            SequenceNumber from) {
         LOG.debug("Persist after {}", from);
         Preconditions.checkNotNull(from);
         return completedFuture(
-                new InMemoryChangelogStateHandle(collectChanges(from), from, 
sqn, keyGroupRange));
+                SnapshotResult.of(
+                        new InMemoryChangelogStateHandle(
+                                collectChanges(from), from, sqn, 
keyGroupRange)));
     }
 
     private List<StateChange> collectChanges(SequenceNumber after) {
@@ -113,8 +117,11 @@ class InMemoryStateChangelogWriter implements 
StateChangelogWriter<InMemoryChang
     }
 
     @Override
-    public void confirm(SequenceNumber from, SequenceNumber to) {}
+    public void confirm(SequenceNumber from, SequenceNumber to, long 
checkpointID) {}
 
     @Override
-    public void reset(SequenceNumber from, SequenceNumber to) {}
+    public void subsume(long checkpointId) {}
+
+    @Override
+    public void reset(SequenceNumber from, SequenceNumber to, long 
checkpointID) {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 26b274f5a5c..f4481ea9d80 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -700,7 +700,10 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
             try {
                 changelogStorage =
                         changelogStoragesManager.stateChangelogStorageForJob(
-                                jobId, 
taskManagerConfiguration.getConfiguration(), jobGroup);
+                                jobId,
+                                taskManagerConfiguration.getConfiguration(),
+                                jobGroup,
+                                localStateStore.getLocalRecoveryConfig());
             } catch (IOException e) {
                 throw new TaskSubmissionException(e);
             }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
index 8493064f806..abf058986a1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
@@ -131,6 +131,8 @@ public class ChangelogTaskLocalStateStoreTest extends 
TaskLocalStateStoreImplTes
         assertTrue(stateSnapshot2.isDiscarded());
         // the materialized part of checkpoint 2 retain, because it still used 
by checkpoint 3
         assertTrue(checkMaterializedDirExists(2));
+        // checkpoint 1 retain
+        assertEquals(stateSnapshot1, 
taskLocalStateStore.retrieveLocalState(1));
         assertTrue(checkMaterializedDirExists(1));
         assertEquals(stateSnapshot3, 
taskLocalStateStore.retrieveLocalState(3));
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
index b0efaa9c427..1ad0c7b8231 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.java
@@ -53,16 +53,25 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
         JobID jobId1 = new JobID(1L, 1L);
         StateChangelogStorage<?> storage1 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         StateChangelogStorage<?> storage2 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertEquals(storage1, storage2);
 
         JobID jobId2 = new JobID(1L, 2L);
         StateChangelogStorage<?> storage3 =
                 manager.stateChangelogStorageForJob(
-                        jobId2, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId2,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNotEquals(storage1, storage3);
         manager.shutdown();
     }
@@ -79,7 +88,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
         JobID jobId1 = new JobID(1L, 1L);
         StateChangelogStorage<?> storage1 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertTrue(storage1 instanceof TestStateChangelogStorage);
         Assert.assertFalse(((TestStateChangelogStorage) storage1).closed);
         manager.releaseResourcesForJob(jobId1);
@@ -87,7 +99,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest {
 
         StateChangelogStorage<?> storage2 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNotEquals(storage1, storage2);
 
         manager.shutdown();
@@ -104,7 +119,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest 
{
         JobID jobId1 = new JobID(1L, 1L);
         StateChangelogStorage<?> storage1 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNull(storage1);
 
         // change configuration, assert the result not change.
@@ -113,19 +131,28 @@ public class 
TaskExecutorStateChangelogStoragesManagerTest {
                 StateChangelogOptions.STATE_CHANGE_LOG_STORAGE.defaultValue());
         StateChangelogStorage<?> storage2 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNull(storage2);
 
         JobID jobId2 = new JobID(1L, 2L);
         StateChangelogStorage<?> storage3 =
                 manager.stateChangelogStorageForJob(
-                        jobId2, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId2,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNotNull(storage3);
 
         configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, 
"invalid");
         StateChangelogStorage<?> storage4 =
                 manager.stateChangelogStorageForJob(
-                        jobId2, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId2,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertNotNull(storage4);
         Assert.assertEquals(storage3, storage4);
 
@@ -144,14 +171,20 @@ public class 
TaskExecutorStateChangelogStoragesManagerTest {
         JobID jobId1 = new JobID(1L, 1L);
         StateChangelogStorage<?> storage1 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertTrue(storage1 instanceof TestStateChangelogStorage);
         Assert.assertFalse(((TestStateChangelogStorage) storage1).closed);
 
         JobID jobId2 = new JobID(1L, 2L);
         StateChangelogStorage<?> storage2 =
                 manager.stateChangelogStorageForJob(
-                        jobId1, configuration, 
createUnregisteredTaskManagerJobMetricGroup());
+                        jobId1,
+                        configuration,
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         Assert.assertTrue(storage2 instanceof TestStateChangelogStorage);
         Assert.assertFalse(((TestStateChangelogStorage) storage2).closed);
 
@@ -207,7 +240,10 @@ public class TaskExecutorStateChangelogStoragesManagerTest 
{
 
         @Override
         public StateChangelogStorage<?> createStorage(
-                JobID jobID, Configuration configuration, 
TaskManagerJobMetricGroup metricGroup) {
+                JobID jobID,
+                Configuration configuration,
+                TaskManagerJobMetricGroup metricGroup,
+                LocalRecoveryConfig localRecoveryConfig) {
             return new TestStateChangelogStorage();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
new file mode 100644
index 00000000000..d28c33e603a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/LocalChangelogRegistryTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.runtime.state.TestingStreamStateHandle;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** {@link LocalChangelogRegistryImpl}'s test. */
+public class LocalChangelogRegistryTest extends TestLogger {
+
+    @Test
+    public void testRegistryNormal() {
+        LocalChangelogRegistry localStateRegistry =
+                new LocalChangelogRegistryImpl(Executors.directExecutor());
+        TestingStreamStateHandle handle1 = new TestingStreamStateHandle();
+        TestingStreamStateHandle handle2 = new TestingStreamStateHandle();
+        // checkpoint 1: handle1, handle2
+        localStateRegistry.register(handle1, 1);
+        localStateRegistry.register(handle2, 1);
+
+        // checkpoint 2: handle2, handle3
+        TestingStreamStateHandle handle3 = new TestingStreamStateHandle();
+        localStateRegistry.register(handle2, 2);
+        localStateRegistry.register(handle3, 2);
+
+        localStateRegistry.discardUpToCheckpoint(2);
+        assertTrue(handle1.isDisposed());
+        assertFalse(handle2.isDisposed());
+
+        localStateRegistry.discardUpToCheckpoint(3);
+        assertTrue(handle2.isDisposed());
+        assertTrue(handle3.isDisposed());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
index 6c091eeb546..94782f9dda8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageLoaderTest.java
@@ -24,6 +24,8 @@ import org.apache.flink.configuration.StateChangelogOptions;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.LocalRecoveryConfig;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -55,7 +57,8 @@ public class StateChangelogStorageLoaderTest {
                 StateChangelogStorageLoader.load(
                         JobID.generate(),
                         new Configuration(),
-                        createUnregisteredTaskManagerJobMetricGroup()));
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled()));
     }
 
     @Test
@@ -66,7 +69,8 @@ public class StateChangelogStorageLoaderTest {
                         JobID.generate(),
                         new Configuration()
                                 
.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "not_exist"),
-                        createUnregisteredTaskManagerJobMetricGroup()));
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled()));
     }
 
     @Test
@@ -79,7 +83,8 @@ public class StateChangelogStorageLoaderTest {
                 StateChangelogStorageLoader.load(
                         JobID.generate(),
                         new Configuration(),
-                        createUnregisteredTaskManagerJobMetricGroup());
+                        createUnregisteredTaskManagerJobMetricGroup(),
+                        TestLocalRecoveryConfig.disabled());
         assertTrue(loaded instanceof TestStateChangelogStorage);
     }
 
@@ -120,7 +125,10 @@ public class StateChangelogStorageLoaderTest {
 
         @Override
         public StateChangelogStorage<?> createStorage(
-                JobID jobID, Configuration configuration, 
TaskManagerJobMetricGroup metricGroup) {
+                JobID jobID,
+                Configuration configuration,
+                TaskManagerJobMetricGroup metricGroup,
+                LocalRecoveryConfig localRecoveryConfig) {
             return new TestStateChangelogStorage();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
index a130ba79723..f0c92ab55ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/changelog/inmemory/StateChangelogStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog.inmemory;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
 import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChange;
@@ -97,10 +98,10 @@ public class StateChangelogStorageTest<T extends 
ChangelogStateHandle> {
                 writer.nextSequenceNumber();
             }
 
-            T handle = writer.persist(prev).get();
+            SnapshotResult<T> res = writer.persist(prev).get();
+            T jmHandle = res.getJobManagerOwnedSnapshot();
             StateChangelogHandleReader<T> reader = client.createReader();
-
-            assertByteMapsEqual(appendsByKeyGroup, extract(handle, reader));
+            assertByteMapsEqual(appendsByKeyGroup, extract(jmHandle, reader));
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
index 0739c96e271..60d282da8f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorExecutionDeploymentReconciliationTest.java
@@ -52,12 +52,15 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.Reference;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.junit.After;
@@ -65,7 +68,9 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Collections;
@@ -108,6 +113,8 @@ public class 
TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge
     public final TestingFatalErrorHandlerResource 
testingFatalErrorHandlerResource =
             new TestingFatalErrorHandlerResource();
 
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
     @Before
     public void setup() {
         
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
@@ -136,13 +143,18 @@ public class 
TaskExecutorExecutionDeploymentReconciliationTest extends TestLogge
         final CompletableFuture<SlotReport> initialSlotReportFuture = new 
CompletableFuture<>();
         final TestingResourceManagerGateway testingResourceManagerGateway =
                 setupResourceManagerGateway(initialSlotReportFuture);
-
+        final TaskExecutorLocalStateStoresManager localStateStoresManager =
+                new TaskExecutorLocalStateStoresManager(
+                        false,
+                        Reference.owned(new File[] {tmp.newFolder()}),
+                        Executors.directExecutor());
         final TaskManagerServices taskManagerServices =
                 new TaskManagerServicesBuilder()
                         .setTaskSlotTable(
                                 TaskSlotUtils.createTaskSlotTable(
                                         1, timeout, 
EXECUTOR_RESOURCE.getExecutor()))
                         .setShuffleEnvironment(new 
NettyShuffleEnvironmentBuilder().build())
+                        .setTaskStateManager(localStateStoresManager)
                         .build();
 
         final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
index f91f9e72b37..35f099b4215 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSlotLifetimeTest.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGate
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.rpc.TestingRpcServiceResource;
+import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -51,14 +52,18 @@ import 
org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
 import org.apache.flink.testutils.TestFileUtils;
 import org.apache.flink.testutils.TestingUtils;
 import org.apache.flink.testutils.executor.TestExecutorResource;
+import org.apache.flink.util.Reference;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.FunctionUtils;
 
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -84,6 +89,8 @@ public class TaskExecutorSlotLifetimeTest extends TestLogger {
     public final TestingFatalErrorHandlerResource 
testingFatalErrorHandlerResource =
             new TestingFatalErrorHandlerResource();
 
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
     @Before
     public void setup() {
         UserClassLoaderExtractingInvokable.clearQueue();
@@ -221,6 +228,7 @@ public class TaskExecutorSlotLifetimeTest extends 
TestLogger {
                                 TaskSlotUtils.createTaskSlotTable(
                                         1, EXECUTOR_RESOURCE.getExecutor()))
                         
.setUnresolvedTaskManagerLocation(unresolvedTaskManagerLocation)
+                        
.setTaskStateManager(createTaskExecutorLocalStateStoresManager())
                         .build(),
                 ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
                 new TestingHeartbeatServices(),
@@ -231,6 +239,12 @@ public class TaskExecutorSlotLifetimeTest extends 
TestLogger {
                 new TestingTaskExecutorPartitionTracker());
     }
 
+    private TaskExecutorLocalStateStoresManager 
createTaskExecutorLocalStateStoresManager()
+            throws IOException {
+        return new TaskExecutorLocalStateStoresManager(
+                false, Reference.owned(new File[] {tmp.newFolder()}), 
Executors.directExecutor());
+    }
+
     public static final class UserClassLoaderExtractingInvokable extends 
AbstractInvokable {
 
         private static BlockingQueue<ClassLoader> userCodeClassLoaders =
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index eb5792dc23a..9c7fe9d57bb 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -417,22 +417,34 @@ public class ChangelogKeyedStateBackend<K>
 
     private SnapshotResult<ChangelogStateBackendHandle> buildSnapshotResult(
             long checkpointId,
-            ChangelogStateHandle delta,
+            SnapshotResult<? extends ChangelogStateHandle> delta,
             ChangelogSnapshotState changelogStateBackendStateCopy) {
 
         // collections don't change once started and handles are immutable
         List<ChangelogStateHandle> prevDeltaCopy =
                 new 
ArrayList<>(changelogStateBackendStateCopy.getRestoredNonMaterialized());
         long persistedSizeOfThisCheckpoint = 0L;
-        if (delta != null && delta.getStateSize() > 0) {
-            prevDeltaCopy.add(delta);
-            persistedSizeOfThisCheckpoint += delta.getCheckpointedSize();
+        if (delta != null
+                && delta.getJobManagerOwnedSnapshot() != null
+                && delta.getJobManagerOwnedSnapshot().getStateSize() > 0) {
+            prevDeltaCopy.add(delta.getJobManagerOwnedSnapshot());
+            persistedSizeOfThisCheckpoint +=
+                    delta.getJobManagerOwnedSnapshot().getCheckpointedSize();
         }
 
         if (prevDeltaCopy.isEmpty()
                 && 
changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
             return SnapshotResult.empty();
-        } else if 
(!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()) {
+        } else if 
(!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()
+                || delta.getTaskLocalSnapshot() != null) {
+            List<ChangelogStateHandle> localDeltaCopy =
+                    new ArrayList<>(
+                            
changelogStateBackendStateCopy.getLocalRestoredNonMaterialized());
+            if (delta != null
+                    && delta.getTaskLocalSnapshot() != null
+                    && delta.getTaskLocalSnapshot().getStateSize() > 0) {
+                localDeltaCopy.add(delta.getTaskLocalSnapshot());
+            }
             ChangelogStateBackendHandleImpl jmHandle =
                     new ChangelogStateBackendHandleImpl(
                             
changelogStateBackendStateCopy.getMaterializedSnapshot(),
@@ -445,7 +457,7 @@ public class ChangelogKeyedStateBackend<K>
                     jmHandle,
                     new ChangelogStateBackendLocalHandle(
                             
changelogStateBackendStateCopy.getLocalMaterializedSnapshot(),
-                            prevDeltaCopy,
+                            localDeltaCopy,
                             jmHandle));
         } else {
             return SnapshotResult.of(
@@ -540,7 +552,7 @@ public class ChangelogKeyedStateBackend<K>
             // newer upload instead of the previous one. This newer upload 
could then be re-used
             // while in fact JM has discarded its results.
             // This might change if the log ownership changes (the method 
won't likely be needed).
-            stateChangelogWriter.confirm(lastUploadedFrom, lastUploadedTo);
+            stateChangelogWriter.confirm(lastUploadedFrom, lastUploadedTo, 
checkpointId);
         }
         Long materializationID = 
materializationIdByCheckpointId.remove(checkpointId);
         if (materializationID != null) {
@@ -559,7 +571,7 @@ public class ChangelogKeyedStateBackend<K>
             // change if it is not relevant anymore. Otherwise, it could 
DISCARD a newer upload
             // instead of the previous one. Rely on truncation for the cleanup 
in this case.
             // This might change if the log ownership changes (the method 
won't likely be needed).
-            stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo);
+            stateChangelogWriter.reset(lastUploadedFrom, lastUploadedTo, 
checkpointId);
         }
         // TODO: Consider notifying nested state backend about checkpoint 
abortion (FLINK-25850)
     }
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
index 6a6b71868b5..12b5d3388f2 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogTruncateHelper.java
@@ -70,6 +70,7 @@ class ChangelogTruncateHelper {
             subsumedUpTo = sqn;
             checkpointedUpTo.headMap(checkpointId, true).clear();
             truncate();
+            stateChangelogWriter.subsume(checkpointId);
         }
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
index 6a3a41bccd1..c26a736ddb9 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateBackendTestUtils.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryImpl;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
@@ -190,7 +191,8 @@ public class ChangelogStateBackendTestUtils {
                                 new ChangelogStorageMetricGroup(
                                         UnregisteredMetricGroups
                                                 
.createUnregisteredTaskManagerJobMetricGroup()),
-                                TaskChangelogRegistry.NO_OP))
+                                TaskChangelogRegistry.NO_OP,
+                                TestLocalRecoveryConfig.disabled()))
                 .build();
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
index 26855819f1e..1e69afe2f9b 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/ChangelogStateDiscardTest.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.FsStateChangelogStorage;
 import org.apache.flink.changelog.fs.StateChangeSet;
 import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
@@ -39,6 +40,7 @@ import 
org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
 import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
 import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
@@ -128,7 +130,8 @@ public class ChangelogStateDiscardTest {
                 
TaskChangelogRegistry.defaultChangelogRegistry(directExecutor());
         final TestingUploadScheduler scheduler = new 
TestingUploadScheduler(registry);
         singleBackendTest(
-                new FsStateChangelogStorage(scheduler, 0L, registry),
+                new FsStateChangelogStorage(
+                        scheduler, 0L, registry, 
TestLocalRecoveryConfig.disabled()),
                 (backend, writer) -> {
                     changeAndLogRandomState(backend, scheduler.uploads::size);
                     truncate(writer, backend);
@@ -185,7 +188,8 @@ public class ChangelogStateDiscardTest {
                 
TaskChangelogRegistry.defaultChangelogRegistry(directExecutor());
         final TestingUploadScheduler scheduler = new 
TestingUploadScheduler(registry);
         final StateChangelogStorage<?> storage =
-                new FsStateChangelogStorage(scheduler, 0, registry);
+                new FsStateChangelogStorage(
+                        scheduler, 0, registry, 
TestLocalRecoveryConfig.disabled());
         final StateChangelogWriter<?>
                 w1 = storage.createWriter("test-operator-1", kgRange, new 
SyncMailboxExecutor()),
                 w2 = storage.createWriter("test-operator-2", kgRange, new 
SyncMailboxExecutor());
@@ -222,7 +226,10 @@ public class ChangelogStateDiscardTest {
         long preEmptivePersistThresholdInBytes = 0L; // flush ASAP
         singleBackendTest(
                 new FsStateChangelogStorage(
-                        directScheduler(uploader), 
preEmptivePersistThresholdInBytes, registry),
+                        directScheduler(uploader),
+                        preEmptivePersistThresholdInBytes,
+                        registry,
+                        TestLocalRecoveryConfig.disabled()),
                 (backend, writer) -> testCase.accept(backend, writer, 
uploader));
     }
 
@@ -367,14 +374,15 @@ public class ChangelogStateDiscardTest {
             results.add(handle);
             // todo: avoid making StateChangeSet and its internals public?
             // todo: make the contract more explicit or extract common code
-            Map<UploadTask, Map<StateChangeSet, Long>> taskOffsets =
+            Map<UploadTask, Map<StateChangeSet, Tuple2<Long, Long>>> 
taskOffsets =
                     tasks.stream().collect(toMap(identity(), 
this::mapOffsets));
             tasks.forEach(task -> startTracking(registry, handle, task));
             return new UploadTasksResult(taskOffsets, handle);
         }
 
-        private Map<StateChangeSet, Long> mapOffsets(UploadTask task) {
-            return 
task.getChangeSets().stream().collect(Collectors.toMap(identity(), ign -> 0L));
+        private Map<StateChangeSet, Tuple2<Long, Long>> mapOffsets(UploadTask 
task) {
+            return task.getChangeSets().stream()
+                    .collect(Collectors.toMap(identity(), ign -> Tuple2.of(0L, 
0L)));
         }
 
         @Override
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
index 70604262496..9813409475f 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/test/java/org/apache/flink/state/changelog/StateChangeLoggerTestBase.java
@@ -128,10 +128,13 @@ abstract class StateChangeLoggerTestBase<Namespace> {
         public void truncate(SequenceNumber to) {}
 
         @Override
-        public void confirm(SequenceNumber from, SequenceNumber to) {}
+        public void confirm(SequenceNumber from, SequenceNumber to, long 
checkpointId) {}
 
         @Override
-        public void reset(SequenceNumber from, SequenceNumber to) {}
+        public void subsume(long checkpointId) {}
+
+        @Override
+        public void reset(SequenceNumber from, SequenceNumber to, long 
checkpointId) {}
 
         @Override
         public void truncateAndClose(SequenceNumber from) {}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
index 1f770de6066..3b0693cd3ea 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
@@ -150,11 +150,9 @@ public class ChangelogLocalRecoveryITCase extends 
TestLogger {
                 () ->
                         miniCluster
                                 .getExecutionGraph(firstJobGraph.getJobID())
-                                .get(10000, TimeUnit.SECONDS),
+                                .get(500, TimeUnit.SECONDS),
                 false);
-
-        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), false);
-        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID());
     }
 
     private StreamExecutionEnvironment getEnv(

Reply via email to