This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aef75be34d99c737f5c565703a971027ac44f855
Author: fredia <[email protected]>
AuthorDate: Tue May 24 15:23:38 2022 +0800

    [FLINK-27692][changelog] Support local recovery for materialized part
---
 .../flink/runtime/minicluster/MiniCluster.java     |   3 +-
 .../state/ChangelogTaskLocalStateStore.java        | 209 ++++++++++++++++++++
 .../state/TaskExecutorLocalStateStoresManager.java |  54 ++++--
 .../runtime/state/TaskLocalStateStoreImpl.java     | 146 +++++++-------
 .../ChangelogStateBackendLocalHandle.java          | 116 +++++++++++
 .../flink/runtime/taskexecutor/TaskExecutor.java   |   4 +-
 .../state/ChangelogTaskLocalStateStoreTest.java    | 214 +++++++++++++++++++++
 .../TaskExecutorLocalStateStoresManagerTest.java   |  30 ++-
 .../runtime/state/TaskLocalStateStoreImplTest.java |  18 +-
 .../changelog/ChangelogKeyedStateBackend.java      |  75 +++++---
 .../ChangelogLocalRecoveryITCase.java              | 189 ++++++++++++++++++
 .../ChangelogPeriodicMaterializationTestBase.java  |   2 +-
 12 files changed, 920 insertions(+), 140 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index e07980aa5c0..dc6d09fe4f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -137,6 +137,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.configuration.ClusterOptions.PROCESS_WORKING_DIR_BASE;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -321,7 +322,7 @@ public class MiniCluster implements AutoCloseableAsync {
                         WorkingDirectory.create(
                                 
ClusterEntrypointUtils.generateWorkingDirectoryFile(
                                         configuration,
-                                        Optional.empty(),
+                                        Optional.of(PROCESS_WORKING_DIR_BASE),
                                         "minicluster_" + 
ResourceID.generate()));
 
                 initializeIOFormatClasses(configuration);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
new file mode 100644
index 00000000000..a814589d45f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.LongPredicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Changelog's implementation of a {@link TaskLocalStateStore}. */
+public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogTaskLocalStateStore.class);
+
+    private static final String CHANGE_LOG_CHECKPOINT_PREFIX = 
"changelog_chk_";
+
+    /**
+     * The mapper of checkpointId and materializationId. (cp3, 
materializationId2) means cp3 refer
+     * to m1.
+     */
+    private final Map<Long, Long> mapToMaterializationId;
+
+    /** Last checkpointId, to check whether checkpoint is out of order. */
+    private long lastCheckpointId = -1L;
+
+    public ChangelogTaskLocalStateStore(
+            @Nonnull JobID jobID,
+            @Nonnull AllocationID allocationID,
+            @Nonnull JobVertexID jobVertexID,
+            @Nonnegative int subtaskIndex,
+            @Nonnull LocalRecoveryConfig localRecoveryConfig,
+            @Nonnull Executor discardExecutor) {
+        super(jobID, allocationID, jobVertexID, subtaskIndex, 
localRecoveryConfig, discardExecutor);
+        this.mapToMaterializationId = new HashMap<>();
+    }
+
+    private void updateReference(long checkpointId, TaskStateSnapshot 
localState) {
+        if (localState == null) {
+            localState = NULL_DUMMY;
+        }
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = 
changelogStateBackendHandle.getMaterializationID();
+                    if (mapToMaterializationId.containsKey(checkpointId)) {
+                        checkState(
+                                materializationID == 
mapToMaterializationId.get(checkpointId),
+                                "one checkpoint contains at most one 
materializationID");
+                    } else {
+                        mapToMaterializationId.put(checkpointId, 
materializationID);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot 
localState) {
+        if (checkpointId < lastCheckpointId) {
+            LOG.info(
+                    "Current checkpoint {} is out of order, smaller than last 
CheckpointId {}.",
+                    lastCheckpointId,
+                    checkpointId);
+            return;
+        } else {
+            lastCheckpointId = checkpointId;
+        }
+        synchronized (lock) {
+            updateReference(checkpointId, localState);
+        }
+        super.storeLocalState(checkpointId, localState);
+    }
+
+    @Override
+    protected File getCheckpointDirectory(long checkpointId) {
+        return new File(
+                
getLocalRecoveryDirectoryProvider().subtaskBaseDirectory(checkpointId),
+                CHANGE_LOG_CHECKPOINT_PREFIX + checkpointId);
+    }
+
+    private void deleteMaterialization(LongPredicate pruningChecker) {
+        Set<Long> materializationToRemove;
+        synchronized (lock) {
+            Set<Long> checkpoints =
+                    mapToMaterializationId.keySet().stream()
+                            .filter(pruningChecker::test)
+                            .collect(Collectors.toSet());
+            materializationToRemove =
+                    checkpoints.stream()
+                            .map(mapToMaterializationId::remove)
+                            .collect(Collectors.toSet());
+            materializationToRemove.removeAll(mapToMaterializationId.values());
+        }
+
+        discardExecutor.execute(
+                () ->
+                        syncDiscardDirectoryForCollection(
+                                materializationToRemove.stream()
+                                        .map(super::getCheckpointDirectory)
+                                        .collect(Collectors.toList())));
+    }
+
+    private void syncDiscardDirectoryForCollection(Collection<File> toDiscard) 
{
+        for (File directory : toDiscard) {
+            if (directory.exists()) {
+                try {
+                    // TODO: This is guaranteed by the wrapped backend only 
using this folder for
+                    // its local state, the materialized handle should be 
discarded here too.
+                    deleteDirectory(directory);
+                } catch (IOException ex) {
+                    LOG.warn(
+                            "Exception while deleting local state directory of 
{} in subtask ({} - {} - {}).",
+                            directory,
+                            jobID,
+                            jobVertexID,
+                            subtaskIndex,
+                            ex);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void pruneCheckpoints(LongPredicate pruningChecker, boolean 
breakOnceCheckerFalse) {
+        // Scenarios:
+        //   c1,m1
+        //   confirm c1, do nothing.
+        //   c2,m1
+        //   confirm c2, delete c1, don't delete m1
+        //   c3,m2
+        //   confirm c3, delete c2, delete m1
+
+        // delete changelog-chk
+        super.pruneCheckpoints(pruningChecker, false);
+        deleteMaterialization(pruningChecker);
+    }
+
+    @Override
+    public CompletableFuture<Void> dispose() {
+        deleteMaterialization(id -> true);
+        synchronized (lock) {
+            mapToMaterializationId.clear();
+        }
+        return super.dispose();
+    }
+
+    @Override
+    public String toString() {
+        return "ChangelogTaskLocalStateStore{"
+                + "jobID="
+                + jobID
+                + ", jobVertexID="
+                + jobVertexID
+                + ", allocationID="
+                + allocationID.toHexString()
+                + ", subtaskIndex="
+                + subtaskIndex
+                + ", localRecoveryConfig="
+                + localRecoveryConfig
+                + ", storedCheckpointIDs="
+                + storedTaskStateByCheckpointID.keySet()
+                + ", mapToMaterializationId="
+                + mapToMaterializationId.entrySet()
+                + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
index 22c894614ee..b4dcd6af729 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
@@ -20,6 +20,9 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.configuration.StateChangelogOptionsInternal;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.FileUtils;
@@ -122,7 +125,9 @@ public class TaskExecutorLocalStateStoresManager {
             @Nonnull JobID jobId,
             @Nonnull AllocationID allocationID,
             @Nonnull JobVertexID jobVertexID,
-            @Nonnegative int subtaskIndex) {
+            @Nonnegative int subtaskIndex,
+            Configuration clusterConfiguration,
+            Configuration jobConfiguration) {
 
         synchronized (lock) {
             if (closed) {
@@ -164,22 +169,37 @@ public class TaskExecutorLocalStateStoresManager {
                 LocalRecoveryConfig localRecoveryConfig =
                         new LocalRecoveryConfig(directoryProvider);
 
-                taskLocalStateStore =
-                        localRecoveryConfig.isLocalRecoveryEnabled()
-                                ?
-
-                                // Real store implementation if local recovery 
is enabled
-                                new TaskLocalStateStoreImpl(
-                                        jobId,
-                                        allocationID,
-                                        jobVertexID,
-                                        subtaskIndex,
-                                        localRecoveryConfig,
-                                        discardExecutor)
-                                :
-
-                                // NOP implementation if local recovery is 
disabled
-                                new 
NoOpTaskLocalStateStoreImpl(localRecoveryConfig);
+                boolean changelogEnabled =
+                        jobConfiguration
+                                .getOptional(
+                                        StateChangelogOptionsInternal
+                                                
.ENABLE_CHANGE_LOG_FOR_APPLICATION)
+                                .orElse(
+                                        clusterConfiguration.getBoolean(
+                                                
StateChangelogOptions.ENABLE_STATE_CHANGE_LOG));
+
+                if (localRecoveryConfig.isLocalRecoveryEnabled() && 
changelogEnabled) {
+                    taskLocalStateStore =
+                            new ChangelogTaskLocalStateStore(
+                                    jobId,
+                                    allocationID,
+                                    jobVertexID,
+                                    subtaskIndex,
+                                    localRecoveryConfig,
+                                    discardExecutor);
+                } else if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+                    taskLocalStateStore =
+                            new TaskLocalStateStoreImpl(
+                                    jobId,
+                                    allocationID,
+                                    jobVertexID,
+                                    subtaskIndex,
+                                    localRecoveryConfig,
+                                    discardExecutor);
+                } else {
+                    // NOP implementation if local recovery is disabled
+                    taskLocalStateStore = new 
NoOpTaskLocalStateStoreImpl(localRecoveryConfig);
+                }
 
                 taskStateManagers.put(taskKey, taskLocalStateStore);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
index eef69184a6a..cf6a25545f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
@@ -42,7 +43,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -56,6 +56,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.function.LongPredicate;
+import java.util.stream.Collectors;
 
 /** Main implementation of a {@link TaskLocalStateStore}. */
 public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore {
@@ -69,34 +70,34 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
     public static final String TASK_STATE_SNAPSHOT_FILENAME = 
"_task_state_snapshot";
 
     /** JobID from the owning subtask. */
-    @Nonnull private final JobID jobID;
+    @Nonnull protected final JobID jobID;
 
     /** AllocationID of the owning slot. */
-    @Nonnull private final AllocationID allocationID;
+    @Nonnull protected final AllocationID allocationID;
 
     /** JobVertexID of the owning subtask. */
-    @Nonnull private final JobVertexID jobVertexID;
+    @Nonnull protected final JobVertexID jobVertexID;
 
     /** Subtask index of the owning subtask. */
-    @Nonnegative private final int subtaskIndex;
+    @Nonnegative protected final int subtaskIndex;
 
     /** The configured mode for local recovery. */
-    @Nonnull private final LocalRecoveryConfig localRecoveryConfig;
+    @Nonnull protected final LocalRecoveryConfig localRecoveryConfig;
 
     /** Executor that runs the discarding of released state objects. */
-    @Nonnull private final Executor discardExecutor;
+    @Nonnull protected final Executor discardExecutor;
 
     /** Lock for synchronisation on the storage map and the discarded status. 
*/
-    @Nonnull private final Object lock = new Object();
+    @Nonnull protected final Object lock = new Object();
 
     /** Status flag if this store was already discarded. */
     @GuardedBy("lock")
-    private boolean disposed;
+    protected boolean disposed;
 
     /** Maps checkpoint ids to local TaskStateSnapshots. */
     @Nonnull
     @GuardedBy("lock")
-    private final SortedMap<Long, TaskStateSnapshot> 
storedTaskStateByCheckpointID;
+    protected final SortedMap<Long, TaskStateSnapshot> 
storedTaskStateByCheckpointID;
 
     public TaskLocalStateStoreImpl(
             @Nonnull JobID jobID,
@@ -141,19 +142,19 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
                     subtaskIndex);
         }
 
-        Map.Entry<Long, TaskStateSnapshot> toDiscard = null;
+        Tuple2<Long, TaskStateSnapshot> toDiscard = null;
 
         synchronized (lock) {
             if (disposed) {
                 // we ignore late stores and simply discard the state.
-                toDiscard = new AbstractMap.SimpleEntry<>(checkpointId, 
localState);
+                toDiscard = Tuple2.of(checkpointId, localState);
             } else {
                 TaskStateSnapshot previous =
                         storedTaskStateByCheckpointID.put(checkpointId, 
localState);
                 persistLocalStateMetadata(checkpointId, localState);
 
                 if (previous != null) {
-                    toDiscard = new AbstractMap.SimpleEntry<>(checkpointId, 
previous);
+                    toDiscard = Tuple2.of(checkpointId, previous);
                 }
             }
         }
@@ -170,6 +171,7 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
      * @param localState task state snapshot that will be persisted
      */
     private void persistLocalStateMetadata(long checkpointId, 
TaskStateSnapshot localState) {
+        createFolderOrFail(getCheckpointDirectory(checkpointId));
         final File taskStateSnapshotFile = 
getTaskStateSnapshotFile(checkpointId);
         try (ObjectOutputStream oos =
                 new ObjectOutputStream(new 
FileOutputStream(taskStateSnapshotFile))) {
@@ -186,20 +188,25 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
 
     @VisibleForTesting
     File getTaskStateSnapshotFile(long checkpointId) {
-        final File checkpointDirectory =
-                localRecoveryConfig
-                        .getLocalStateDirectoryProvider()
-                        .orElseThrow(
-                                () -> new IllegalStateException("Local 
recovery must be enabled."))
-                        .subtaskSpecificCheckpointDirectory(checkpointId);
+        return new File(getCheckpointDirectory(checkpointId), 
TASK_STATE_SNAPSHOT_FILENAME);
+    }
+
+    protected File getCheckpointDirectory(long checkpointId) {
+        return 
getLocalRecoveryDirectoryProvider().subtaskSpecificCheckpointDirectory(checkpointId);
+    }
 
+    private void createFolderOrFail(File checkpointDirectory) {
         if (!checkpointDirectory.exists() && !checkpointDirectory.mkdirs()) {
             throw new FlinkRuntimeException(
                     String.format(
                             "Could not create the checkpoint directory '%s'", 
checkpointDirectory));
         }
+    }
 
-        return new File(checkpointDirectory, TASK_STATE_SNAPSHOT_FILENAME);
+    protected LocalRecoveryDirectoryProvider 
getLocalRecoveryDirectoryProvider() {
+        return localRecoveryConfig
+                .getLocalStateDirectoryProvider()
+                .orElseThrow(() -> new IllegalStateException("Local recovery 
must be enabled."));
     }
 
     @Override
@@ -213,24 +220,15 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
         }
 
         if (snapshot != null) {
-            if (LOG.isTraceEnabled()) {
-                LOG.trace(
-                        "Found registered local state for checkpoint {} in 
subtask ({} - {} - {}) : {}",
-                        checkpointID,
-                        jobID,
-                        jobVertexID,
-                        subtaskIndex,
-                        snapshot);
-            } else if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "Found registered local state for checkpoint {} in 
subtask ({} - {} - {})",
-                        checkpointID,
-                        jobID,
-                        jobVertexID,
-                        subtaskIndex);
-            }
+            LOG.info(
+                    "Found registered local state for checkpoint {} in subtask 
({} - {} - {}) : {}",
+                    checkpointID,
+                    jobID,
+                    jobVertexID,
+                    subtaskIndex,
+                    snapshot);
         } else {
-            LOG.debug(
+            LOG.info(
                     "Did not find registered local state for checkpoint {} in 
subtask ({} - {} - {})",
                     checkpointID,
                     jobID,
@@ -321,11 +319,14 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
     @Override
     public CompletableFuture<Void> dispose() {
 
-        Collection<Map.Entry<Long, TaskStateSnapshot>> statesCopy;
+        Collection<Tuple2<Long, TaskStateSnapshot>> statesCopy;
 
         synchronized (lock) {
             disposed = true;
-            statesCopy = new 
ArrayList<>(storedTaskStateByCheckpointID.entrySet());
+            statesCopy =
+                    storedTaskStateByCheckpointID.entrySet().stream()
+                            .map(entry -> Tuple2.of(entry.getKey(), 
entry.getValue()))
+                            .collect(Collectors.toList());
             storedTaskStateByCheckpointID.clear();
         }
 
@@ -335,13 +336,11 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
                     syncDiscardLocalStateForCollection(statesCopy);
 
                     // delete the local state subdirectory that belong to this 
subtask.
-                    LocalRecoveryDirectoryProvider directoryProvider =
-                            localRecoveryConfig
-                                    .getLocalStateDirectoryProvider()
-                                    
.orElseThrow(LocalRecoveryConfig.localRecoveryNotEnabled());
-
-                    for (int i = 0; i < 
directoryProvider.allocationBaseDirsCount(); ++i) {
-                        File subtaskBaseDirectory = 
directoryProvider.selectSubtaskBaseDirectory(i);
+                    for (int i = 0;
+                            i < 
getLocalRecoveryDirectoryProvider().allocationBaseDirsCount();
+                            ++i) {
+                        File subtaskBaseDirectory =
+                                
getLocalRecoveryDirectoryProvider().selectSubtaskBaseDirectory(i);
                         try {
                             deleteDirectory(subtaskBaseDirectory);
                         } catch (IOException e) {
@@ -359,16 +358,16 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
     }
 
     private void asyncDiscardLocalStateForCollection(
-            Collection<Map.Entry<Long, TaskStateSnapshot>> toDiscard) {
+            Collection<Tuple2<Long, TaskStateSnapshot>> toDiscard) {
         if (!toDiscard.isEmpty()) {
             discardExecutor.execute(() -> 
syncDiscardLocalStateForCollection(toDiscard));
         }
     }
 
     private void syncDiscardLocalStateForCollection(
-            Collection<Map.Entry<Long, TaskStateSnapshot>> toDiscard) {
-        for (Map.Entry<Long, TaskStateSnapshot> entry : toDiscard) {
-            discardLocalStateForCheckpoint(entry.getKey(), 
Optional.of(entry.getValue()));
+            Collection<Tuple2<Long, TaskStateSnapshot>> toDiscard) {
+        for (Tuple2<Long, TaskStateSnapshot> entry : toDiscard) {
+            discardLocalStateForCheckpoint(entry.f0, Optional.of(entry.f1));
         }
     }
 
@@ -409,39 +408,31 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
                     }
                 });
 
-        Optional<LocalRecoveryDirectoryProvider> directoryProviderOptional =
-                localRecoveryConfig.getLocalStateDirectoryProvider();
+        File checkpointDir = getCheckpointDirectory(checkpointID);
 
-        if (directoryProviderOptional.isPresent()) {
-            File checkpointDir =
-                    directoryProviderOptional
-                            .get()
-                            .subtaskSpecificCheckpointDirectory(checkpointID);
+        LOG.debug(
+                "Deleting local state directory {} of checkpoint {} for 
subtask ({} - {} - {}).",
+                checkpointDir,
+                checkpointID,
+                jobID,
+                jobVertexID,
+                subtaskIndex);
 
-            LOG.debug(
-                    "Deleting local state directory {} of checkpoint {} for 
subtask ({} - {} - {}).",
-                    checkpointDir,
+        try {
+            deleteDirectory(checkpointDir);
+        } catch (IOException ex) {
+            LOG.warn(
+                    "Exception while deleting local state directory of 
checkpoint {} in subtask ({} - {} - {}).",
                     checkpointID,
                     jobID,
                     jobVertexID,
-                    subtaskIndex);
-
-            try {
-                deleteDirectory(checkpointDir);
-            } catch (IOException ex) {
-                LOG.warn(
-                        "Exception while deleting local state directory of 
checkpoint {} in subtask ({} - {} - {}).",
-                        checkpointID,
-                        jobID,
-                        jobVertexID,
-                        subtaskIndex,
-                        ex);
-            }
+                    subtaskIndex,
+                    ex);
         }
     }
 
     /** Helper method to delete a directory. */
-    private void deleteDirectory(File directory) throws IOException {
+    protected void deleteDirectory(File directory) throws IOException {
         Path path = new Path(directory.toURI());
         FileSystem fileSystem = path.getFileSystem();
         if (fileSystem.exists(path)) {
@@ -450,9 +441,8 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
     }
 
     /** Pruning the useless checkpoints, it should be called only when holding 
the {@link #lock}. */
-    private void pruneCheckpoints(LongPredicate pruningChecker, boolean 
breakOnceCheckerFalse) {
-
-        final List<Map.Entry<Long, TaskStateSnapshot>> toRemove = new 
ArrayList<>();
+    protected void pruneCheckpoints(LongPredicate pruningChecker, boolean 
breakOnceCheckerFalse) {
+        final List<Tuple2<Long, TaskStateSnapshot>> toRemove = new 
ArrayList<>();
 
         synchronized (lock) {
             Iterator<Map.Entry<Long, TaskStateSnapshot>> entryIterator =
@@ -464,7 +454,7 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
                 long entryCheckpointId = snapshotEntry.getKey();
 
                 if (pruningChecker.test(entryCheckpointId)) {
-                    toRemove.add(snapshotEntry);
+                    toRemove.add(Tuple2.of(entryCheckpointId, 
snapshotEntry.getValue()));
                     entryIterator.remove();
                 } else if (breakOnceCheckerFalse) {
                     break;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java
new file mode 100644
index 00000000000..427b4427e93
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateHandleID;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * State handle for local copies of {@link ChangelogStateHandleStreamImpl}. 
Consists of a
+ * remoteHandle that maintains the mapping of local handle and remote handle, 
like
+ * sharedStateHandleIDs in {@link 
org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle}.
+ */
+public class ChangelogStateBackendLocalHandle implements 
ChangelogStateBackendHandle {
+    private static final long serialVersionUID = 1L;
+    private final List<KeyedStateHandle> localMaterialized;
+    private final List<ChangelogStateHandle> localNonMaterialized;
+    private final ChangelogStateBackendHandleImpl remoteHandle;
+
+    public ChangelogStateBackendLocalHandle(
+            List<KeyedStateHandle> localMaterialized,
+            List<ChangelogStateHandle> localNonMaterialized,
+            ChangelogStateBackendHandleImpl remoteHandle) {
+        this.localMaterialized = localMaterialized;
+        this.localNonMaterialized = localNonMaterialized;
+        this.remoteHandle = remoteHandle;
+    }
+
+    @Override
+    public List<KeyedStateHandle> getMaterializedStateHandles() {
+        return localMaterialized;
+    }
+
+    @Override
+    public List<ChangelogStateHandle> getNonMaterializedStateHandles() {
+        return localNonMaterialized;
+    }
+
+    @Override
+    public long getMaterializationID() {
+        return remoteHandle.getMaterializationID();
+    }
+
+    @Override
+    public ChangelogStateBackendHandle rebound(long checkpointId) {
+        throw new UnsupportedOperationException("Should not call here.");
+    }
+
+    public List<KeyedStateHandle> getRemoteMaterializedStateHandles() {
+        return remoteHandle.getMaterializedStateHandles();
+    }
+
+    public List<ChangelogStateHandle> getRemoteNonMaterializedStateHandles() {
+        return remoteHandle.getNonMaterializedStateHandles();
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return remoteHandle.getCheckpointId();
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
+        remoteHandle.registerSharedStates(stateRegistry, checkpointID);
+    }
+
+    @Override
+    public long getCheckpointedSize() {
+        return remoteHandle.getCheckpointedSize();
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return remoteHandle.getKeyGroupRange();
+    }
+
+    @Nullable
+    @Override
+    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+        throw new UnsupportedOperationException(
+                "This is a local state handle for the TM side only.");
+    }
+
+    @Override
+    public StateHandleID getStateHandleId() {
+        return remoteHandle.getStateHandleId();
+    }
+
+    @Override
+    public void discardState() throws Exception {}
+
+    @Override
+    public long getStateSize() {
+        return remoteHandle.getStateSize();
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index a7df344756c..3b04d8a7a40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -691,7 +691,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                             jobId,
                             tdd.getAllocationId(),
                             taskInformation.getJobVertexId(),
-                            tdd.getSubtaskIndex());
+                            tdd.getSubtaskIndex(),
+                            taskManagerConfiguration.getConfiguration(),
+                            jobInformation.getJobConfiguration());
 
             // TODO: Pass config value from user program and do overriding 
here.
             final StateChangelogStorage<?> changelogStorage;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
new file mode 100644
index 00000000000..8493064f806
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/** Test for {@link ChangelogTaskLocalStateStore}. */
+public class ChangelogTaskLocalStateStoreTest extends 
TaskLocalStateStoreImplTest {
+
+    private LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider;
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        this.taskLocalStateStore =
+                createChangelogTaskLocalStateStore(
+                        allocationBaseDirs, jobID, allocationID, jobVertexID, 
subtaskIdx);
+    }
+
+    @Nonnull
+    private ChangelogTaskLocalStateStore createChangelogTaskLocalStateStore(
+            File[] allocationBaseDirs,
+            JobID jobID,
+            AllocationID allocationID,
+            JobVertexID jobVertexID,
+            int subtaskIdx) {
+        LocalRecoveryDirectoryProviderImpl directoryProvider =
+                new LocalRecoveryDirectoryProviderImpl(
+                        allocationBaseDirs, jobID, jobVertexID, subtaskIdx);
+        this.localRecoveryDirectoryProvider = directoryProvider;
+
+        LocalRecoveryConfig localRecoveryConfig = new 
LocalRecoveryConfig(directoryProvider);
+        return new ChangelogTaskLocalStateStore(
+                jobID,
+                allocationID,
+                jobVertexID,
+                subtaskIdx,
+                localRecoveryConfig,
+                Executors.directExecutor());
+    }
+
+    @Test
+    @Override
+    public void pruneCheckpoints() throws Exception {
+        TestingTaskStateSnapshot stateSnapshot1 = storeChangelogStates(1, 1);
+        TestingTaskStateSnapshot stateSnapshot2 = storeChangelogStates(2, 1);
+        TestingTaskStateSnapshot stateSnapshot3 = storeChangelogStates(3, 1);
+
+        taskLocalStateStore.pruneMatchingCheckpoints(id -> id != 2);
+        assertNull(taskLocalStateStore.retrieveLocalState(3));
+        assertTrue(stateSnapshot3.isDiscarded());
+        assertNull(taskLocalStateStore.retrieveLocalState(1));
+        assertTrue(stateSnapshot1.isDiscarded());
+        assertTrue(checkMaterializedDirExists(1));
+        assertEquals(stateSnapshot2, 
taskLocalStateStore.retrieveLocalState(2));
+    }
+
+    @Test
+    @Override
+    public void confirmCheckpoint() throws Exception {
+        TestingTaskStateSnapshot stateSnapshot1 = storeChangelogStates(1, 1);
+        TestingTaskStateSnapshot stateSnapshot2 = storeChangelogStates(2, 1);
+        TestingTaskStateSnapshot stateSnapshot3 = storeChangelogStates(3, 1);
+
+        taskLocalStateStore.confirmCheckpoint(3);
+        assertNull(taskLocalStateStore.retrieveLocalState(2));
+        assertTrue(stateSnapshot2.isDiscarded());
+        assertTrue(stateSnapshot1.isDiscarded());
+        assertTrue(checkMaterializedDirExists(1));
+        assertEquals(stateSnapshot3, 
taskLocalStateStore.retrieveLocalState(3));
+
+        TestingTaskStateSnapshot stateSnapshot4 = storeChangelogStates(4, 2);
+        taskLocalStateStore.confirmCheckpoint(4);
+        assertNull(taskLocalStateStore.retrieveLocalState(3));
+        assertTrue(stateSnapshot3.isDiscarded());
+        // delete materialization 1
+        assertFalse(checkMaterializedDirExists(1));
+        assertEquals(stateSnapshot4, 
taskLocalStateStore.retrieveLocalState(4));
+    }
+
+    @Test
+    @Override
+    public void abortCheckpoint() throws Exception {
+        TestingTaskStateSnapshot stateSnapshot1 = storeChangelogStates(1, 1);
+        TestingTaskStateSnapshot stateSnapshot2 = storeChangelogStates(2, 2);
+        TestingTaskStateSnapshot stateSnapshot3 = storeChangelogStates(3, 2);
+        taskLocalStateStore.abortCheckpoint(2);
+        assertNull(taskLocalStateStore.retrieveLocalState(2));
+        assertTrue(stateSnapshot2.isDiscarded());
+        // the materialized part of checkpoint 2 retain, because it still used 
by checkpoint 3
+        assertTrue(checkMaterializedDirExists(2));
+        assertTrue(checkMaterializedDirExists(1));
+        assertEquals(stateSnapshot3, 
taskLocalStateStore.retrieveLocalState(3));
+
+        taskLocalStateStore.abortCheckpoint(3);
+        assertFalse(checkMaterializedDirExists(2));
+    }
+
+    @Test
+    public void retrievePersistedLocalStateFromDisc() {
+        final TaskStateSnapshot taskStateSnapshot = createTaskStateSnapshot();
+        final long checkpointId = 0L;
+        taskLocalStateStore.storeLocalState(checkpointId, taskStateSnapshot);
+        final ChangelogTaskLocalStateStore newTaskLocalStateStore =
+                createChangelogTaskLocalStateStore(
+                        allocationBaseDirs, jobID, allocationID, jobVertexID, 
subtaskIdx);
+
+        final TaskStateSnapshot retrievedTaskStateSnapshot =
+                newTaskLocalStateStore.retrieveLocalState(checkpointId);
+
+        assertThat(retrievedTaskStateSnapshot).isEqualTo(taskStateSnapshot);
+    }
+
+    @Test
+    public void deletesLocalStateIfRetrievalFails() throws IOException {
+        final TaskStateSnapshot taskStateSnapshot = createTaskStateSnapshot();
+        final long checkpointId = 0L;
+        taskLocalStateStore.storeLocalState(checkpointId, taskStateSnapshot);
+
+        final File taskStateSnapshotFile =
+                taskLocalStateStore.getTaskStateSnapshotFile(checkpointId);
+
+        Files.write(
+                taskStateSnapshotFile.toPath(), new byte[] {1, 2, 3, 4}, 
StandardOpenOption.WRITE);
+
+        final ChangelogTaskLocalStateStore newTaskLocalStateStore =
+                createChangelogTaskLocalStateStore(
+                        allocationBaseDirs, jobID, allocationID, jobVertexID, 
subtaskIdx);
+
+        
assertThat(newTaskLocalStateStore.retrieveLocalState(checkpointId)).isNull();
+        assertThat(taskStateSnapshotFile.getParentFile()).doesNotExist();
+    }
+
+    private boolean checkMaterializedDirExists(long materializationID) {
+        File materializedDir =
+                
localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory(
+                        materializationID);
+        return materializedDir.exists();
+    }
+
+    private void writeToMaterializedDir(long materializationID) {
+        File materializedDir =
+                
localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory(
+                        materializationID);
+        if (!materializedDir.exists() && !materializedDir.mkdirs()) {
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "Could not create the materialized directory 
'%s'", materializedDir));
+        }
+    }
+
+    private TestingTaskStateSnapshot storeChangelogStates(
+            long checkpointID, long materializationID) {
+        writeToMaterializedDir(materializationID);
+        OperatorID operatorID = new OperatorID();
+        TestingTaskStateSnapshot taskStateSnapshot = new 
TestingTaskStateSnapshot();
+        OperatorSubtaskState operatorSubtaskState =
+                OperatorSubtaskState.builder()
+                        .setManagedKeyedState(
+                                new ChangelogStateBackendHandleImpl(
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        new KeyGroupRange(0, 3),
+                                        checkpointID,
+                                        materializationID,
+                                        checkpointID))
+                        .build();
+        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, 
operatorSubtaskState);
+        taskLocalStateStore.storeLocalState(checkpointID, taskStateSnapshot);
+        return taskStateSnapshot;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
index 5fd6ddff67f..ef10986e28e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -166,7 +166,12 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
 
         TaskLocalStateStore taskLocalStateStore =
                 storesManager.localStateStoreForSubtask(
-                        jobID, allocationID, jobVertexID, subtaskIdx);
+                        jobID,
+                        allocationID,
+                        jobVertexID,
+                        subtaskIdx,
+                        new Configuration(),
+                        new Configuration());
 
         
Assert.assertFalse(taskLocalStateStore.getLocalRecoveryConfig().isLocalRecoveryEnabled());
         Assert.assertNull(
@@ -202,7 +207,12 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
 
         TaskLocalStateStore taskLocalStateStore =
                 storesManager.localStateStoreForSubtask(
-                        jobID, allocationID, jobVertexID, subtaskIdx);
+                        jobID,
+                        allocationID,
+                        jobVertexID,
+                        subtaskIdx,
+                        new Configuration(),
+                        new Configuration());
 
         LocalRecoveryDirectoryProvider directoryProvider =
                 taskLocalStateStore
@@ -258,7 +268,12 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
 
         taskLocalStateStore =
                 storesManager.localStateStoreForSubtask(
-                        jobID, otherAllocationID, jobVertexID, subtaskIdx);
+                        jobID,
+                        otherAllocationID,
+                        jobVertexID,
+                        subtaskIdx,
+                        new Configuration(),
+                        new Configuration());
 
         directoryProvider =
                 taskLocalStateStore
@@ -338,9 +353,14 @@ public class TaskExecutorLocalStateStoresManagerTest 
extends TestLogger {
 
         // register local state stores
         taskExecutorLocalStateStoresManager.localStateStoreForSubtask(
-                jobId, retainedAllocationId, jobVertexId, 0);
+                jobId,
+                retainedAllocationId,
+                jobVertexId,
+                0,
+                new Configuration(),
+                new Configuration());
         taskExecutorLocalStateStoresManager.localStateStoreForSubtask(
-                jobId, otherAllocationId, jobVertexId, 1);
+                jobId, otherAllocationId, jobVertexId, 1, new Configuration(), 
new Configuration());
 
         final Collection<Path> allocationDirectories =
                 
TaskExecutorLocalStateStoresManager.listAllocationDirectoriesIn(localStateStore);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
index 8c906b51fa2..661e39d876e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -51,13 +51,13 @@ import static org.junit.Assert.assertTrue;
 /** Test for the {@link TaskLocalStateStoreImpl}. */
 public class TaskLocalStateStoreImplTest extends TestLogger {
 
-    private TemporaryFolder temporaryFolder;
-    private File[] allocationBaseDirs;
-    private TaskLocalStateStoreImpl taskLocalStateStore;
-    private JobID jobID;
-    private AllocationID allocationID;
-    private JobVertexID jobVertexID;
-    private int subtaskIdx;
+    protected TemporaryFolder temporaryFolder;
+    protected File[] allocationBaseDirs;
+    protected TaskLocalStateStoreImpl taskLocalStateStore;
+    protected JobID jobID;
+    protected AllocationID allocationID;
+    protected JobVertexID jobVertexID;
+    protected int subtaskIdx;
 
     @Before
     public void before() throws Exception {
@@ -216,7 +216,7 @@ public class TaskLocalStateStoreImplTest extends TestLogger 
{
     }
 
     @Nonnull
-    private TaskStateSnapshot createTaskStateSnapshot() {
+    protected TaskStateSnapshot createTaskStateSnapshot() {
         final Map<OperatorID, OperatorSubtaskState> operatorSubtaskStates = 
new HashMap<>();
         operatorSubtaskStates.put(new OperatorID(), 
OperatorSubtaskState.builder().build());
         operatorSubtaskStates.put(new OperatorID(), 
OperatorSubtaskState.builder().build());
@@ -273,7 +273,7 @@ public class TaskLocalStateStoreImplTest extends TestLogger 
{
         return taskStateSnapshots;
     }
 
-    private static final class TestingTaskStateSnapshot extends 
TaskStateSnapshot {
+    protected static final class TestingTaskStateSnapshot extends 
TaskStateSnapshot {
         private static final long serialVersionUID = 2046321877379917040L;
 
         private boolean isDiscarded = false;
diff --git 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
index 1935600fd4e..a89c22f3a58 100644
--- 
a/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
+++ 
b/flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestableKeyedStateBackend;
 import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
 import 
org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
+import 
org.apache.flink.runtime.state.changelog.ChangelogStateBackendLocalHandle;
 import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
 import org.apache.flink.runtime.state.changelog.SequenceNumber;
 import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
@@ -381,11 +382,12 @@ public class ChangelogKeyedStateBackend<K>
         changelogTruncateHelper.checkpoint(checkpointId, lastUploadedTo);
 
         LOG.info(
-                "snapshot of {} for checkpoint {}, change range: {}..{}",
+                "snapshot of {} for checkpoint {}, change range: {}..{}, 
materialization ID {}",
                 subtaskName,
                 checkpointId,
                 lastUploadedFrom,
-                lastUploadedTo);
+                lastUploadedTo,
+                changelogSnapshotState.getMaterializationID());
 
         ChangelogSnapshotState changelogStateBackendStateCopy = 
changelogSnapshotState;
 
@@ -430,25 +432,20 @@ public class ChangelogKeyedStateBackend<K>
                 && 
changelogStateBackendStateCopy.getMaterializedSnapshot().isEmpty()) {
             return SnapshotResult.empty();
         } else if 
(!changelogStateBackendStateCopy.getLocalMaterializedSnapshot().isEmpty()) {
-            return SnapshotResult.withLocalState(
+            ChangelogStateBackendHandleImpl jmHandle =
                     new ChangelogStateBackendHandleImpl(
                             
changelogStateBackendStateCopy.getMaterializedSnapshot(),
                             prevDeltaCopy,
                             getKeyGroupRange(),
                             checkpointId,
                             changelogStateBackendStateCopy.materializationID,
-                            persistedSizeOfThisCheckpoint),
-                    new ChangelogStateBackendHandleImpl(
+                            persistedSizeOfThisCheckpoint);
+            return SnapshotResult.withLocalState(
+                    jmHandle,
+                    new ChangelogStateBackendLocalHandle(
                             
changelogStateBackendStateCopy.getLocalMaterializedSnapshot(),
-                            // TODO: Restore ChangelogStateHandles from remote 
temporarily, because
-                            // ChangelogStateHandles are small(about 10MB).
-                            //  In the future, the double-stream option may be 
implemented according
-                            // to the test results.
                             prevDeltaCopy,
-                            getKeyGroupRange(),
-                            checkpointId,
-                            changelogStateBackendStateCopy.materializationID,
-                            persistedSizeOfThisCheckpoint));
+                            jmHandle));
         } else {
             return SnapshotResult.of(
                     new ChangelogStateBackendHandleImpl(
@@ -630,16 +627,39 @@ public class ChangelogKeyedStateBackend<K>
         List<KeyedStateHandle> materialized = new ArrayList<>();
         List<ChangelogStateHandle> restoredNonMaterialized = new ArrayList<>();
 
+        List<KeyedStateHandle> localMaterialized = new ArrayList<>();
+        List<ChangelogStateHandle> localRestoredNonMaterialized = new 
ArrayList<>();
+
         for (ChangelogStateBackendHandle h : stateHandles) {
             if (h != null) {
-                materialized.addAll(h.getMaterializedStateHandles());
-                
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
+                if (h instanceof ChangelogStateBackendLocalHandle) {
+                    ChangelogStateBackendLocalHandle localHandle =
+                            (ChangelogStateBackendLocalHandle) h;
+                    
materialized.addAll(localHandle.getRemoteMaterializedStateHandles());
+                    restoredNonMaterialized.addAll(
+                            
localHandle.getRemoteNonMaterializedStateHandles());
+                    
localMaterialized.addAll(localHandle.getMaterializedStateHandles());
+                    localRestoredNonMaterialized.addAll(
+                            localHandle.getNonMaterializedStateHandles());
+                } else {
+                    materialized.addAll(h.getMaterializedStateHandles());
+                    
restoredNonMaterialized.addAll(h.getNonMaterializedStateHandles());
+                }
                 // choose max materializationID to handle rescaling
                 materializationId = Math.max(materializationId, 
h.getMaterializationID());
             }
         }
         this.materializedId = materializationId + 1;
-        // Todo: distinguish whether the handle is local or remote
+
+        if (!localMaterialized.isEmpty() || 
!localRestoredNonMaterialized.isEmpty()) {
+            return new ChangelogSnapshotState(
+                    materialized,
+                    localMaterialized,
+                    restoredNonMaterialized,
+                    localRestoredNonMaterialized,
+                    stateChangelogWriter.initialSequenceNumber(),
+                    materializationId);
+        }
         return new ChangelogSnapshotState(
                 materialized,
                 restoredNonMaterialized,
@@ -872,22 +892,21 @@ public class ChangelogKeyedStateBackend<K>
                 List<ChangelogStateHandle> localRestoredNonMaterialized,
                 SequenceNumber materializedTo,
                 long materializationID) {
+            ChangelogStateBackendHandleImpl jmHandle =
+                    new ChangelogStateBackendHandleImpl(
+                            materializedSnapshot,
+                            restoredNonMaterialized,
+                            getKeyGroupRange(),
+                            lastCheckpointId,
+                            materializationID,
+                            0L);
             this.changelogSnapshot =
                     SnapshotResult.withLocalState(
-                            new ChangelogStateBackendHandleImpl(
-                                    materializedSnapshot,
-                                    restoredNonMaterialized,
-                                    getKeyGroupRange(),
-                                    lastCheckpointId,
-                                    materializationID,
-                                    0L),
-                            new ChangelogStateBackendHandleImpl(
+                            jmHandle,
+                            new ChangelogStateBackendLocalHandle(
                                     localMaterializedSnapshot,
                                     localRestoredNonMaterialized,
-                                    getKeyGroupRange(),
-                                    lastCheckpointId,
-                                    materializationID,
-                                    0L));
+                                    jmHandle));
             this.materializedTo = materializedTo;
             this.materializationID = materializationID;
         }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
new file mode 100644
index 00000000000..1f770de6066
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CollectionSink;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CountFunction;
+import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
+import static 
org.apache.flink.configuration.ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;
+import static 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.getAllStateHandleId;
+
+/**
+ * Local recovery IT case for changelog. It never fails because local recovery 
is nice but not
+ * necessary.
+ */
+@RunWith(Parameterized.class)
+public class ChangelogLocalRecoveryITCase extends TestLogger {
+
+    private static final int NUM_TASK_MANAGERS = 2;
+    private static final int NUM_TASK_SLOTS = 1;
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Parameterized.Parameter public AbstractStateBackend delegatedStateBackend;
+
+    @Parameterized.Parameters(name = "delegated state backend type = {0}")
+    public static Collection<AbstractStateBackend> parameter() {
+        return Arrays.asList(
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend(false),
+                new EmbeddedRocksDBStateBackend(true));
+    }
+
+    private MiniClusterWithClientResource cluster;
+    private static String workingDir;
+
+    @BeforeClass
+    public static void setWorkingDir() throws IOException {
+        workingDir = TEMPORARY_FOLDER.newFolder("work").getAbsolutePath();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        Configuration configuration = new Configuration();
+        
configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
+
+        configuration.setString(PROCESS_WORKING_DIR_BASE, workingDir);
+        configuration.setString(JOB_MANAGER_PROCESS_WORKING_DIR_BASE, 
workingDir);
+        configuration.setString(TASK_MANAGER_PROCESS_WORKING_DIR_BASE, 
workingDir);
+        configuration.setBoolean(LOCAL_RECOVERY, true);
+        FsStateChangelogStorageFactory.configure(
+                configuration, TEMPORARY_FOLDER.newFolder(), 
Duration.ofMillis(1000), 1);
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+                                .build());
+        cluster.before();
+        cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
+    }
+
+    @After
+    public void teardown() {
+        cluster.after();
+    }
+
+    private JobGraph buildJobGraph(StreamExecutionEnvironment env) {
+        env.addSource(new InfiniteIntegerSource())
+                .setParallelism(1)
+                .keyBy(element -> element)
+                .process(new CountFunction())
+                .addSink(new CollectionSink())
+                .setParallelism(1);
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    @Test
+    public void testRestartTM() throws Exception {
+        File checkpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, checkpointFolder, true, 200, 
800);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), false);
+        // wait job for doing materialization.
+        waitUntilCondition(
+                () -> !getAllStateHandleId(firstJobGraph.getJobID(), 
miniCluster).isEmpty());
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+        CompletableFuture<Void> terminationFuture = 
miniCluster.terminateTaskManager(1);
+        terminationFuture.get();
+        miniCluster.startTaskManager();
+        waitForAllTaskRunning(
+                () ->
+                        miniCluster
+                                .getExecutionGraph(firstJobGraph.getJobID())
+                                .get(10000, TimeUnit.SECONDS),
+                false);
+
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), false);
+        miniCluster.triggerCheckpoint(firstJobGraph.getJobID()).get();
+    }
+
+    private StreamExecutionEnvironment getEnv(
+            StateBackend stateBackend,
+            File checkpointFile,
+            boolean changelogEnabled,
+            long checkpointInterval,
+            long materializationInterval) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(checkpointInterval);
+        env.getCheckpointConfig().enableUnalignedCheckpoints(false);
+        env.setStateBackend(stateBackend)
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 
10));
+        env.configure(new Configuration().set(LOCAL_RECOVERY, true));
+
+        env.getCheckpointConfig().setCheckpointStorage(checkpointFile.toURI());
+        env.enableChangelogStateBackend(changelogEnabled);
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(materializationInterval))
+                        
.set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, 1));
+        env.getCheckpointConfig()
+                .setExternalizedCheckpointCleanup(
+                        
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        Configuration configuration = new Configuration();
+        
configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
+        env.configure(configuration);
+        return env;
+    }
+}
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
index 97fb8cecda9..6220e29bb0d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
@@ -219,7 +219,7 @@ public abstract class 
ChangelogPeriodicMaterializationTestBase extends TestLogge
         return JobID.fromByteArray(randomBytes);
     }
 
-    protected static Set<StateHandleID> getAllStateHandleId(JobID jobID, 
MiniCluster miniCluster)
+    public static Set<StateHandleID> getAllStateHandleId(JobID jobID, 
MiniCluster miniCluster)
             throws IOException, FlinkJobNotFoundException, ExecutionException,
                     InterruptedException {
         Optional<String> mostRecentCompletedCheckpointPath =

Reply via email to