rkhachatryan commented on code in PR #19907:
URL: https://github.com/apache/flink/pull/19907#discussion_r926124747


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.LongPredicate;
+
+/** Changelog's implementation of a {@link TaskLocalStateStore}. */
+public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogTaskLocalStateStore.class);
+
+    private static final String CHANGE_LOG_CHECKPOINT_PREFIX = 
"changelog_chk_";
+
+    /**
+     * The mapper of checkpointId and materializationId. (cp3, 
materializationId2) means cp3 refer
+     * to m1.
+     */
+    private final Map<Long, Long> mapToMaterializationId;
+
+    /** Last checkpointId, to check whether checkpoint is out of order. */
+    private long lastCheckpointId = -1L;
+
+    public ChangelogTaskLocalStateStore(
+            @Nonnull JobID jobID,
+            @Nonnull AllocationID allocationID,
+            @Nonnull JobVertexID jobVertexID,
+            @Nonnegative int subtaskIndex,
+            @Nonnull LocalRecoveryConfig localRecoveryConfig,
+            @Nonnull Executor discardExecutor) {
+        super(jobID, allocationID, jobVertexID, subtaskIndex, 
localRecoveryConfig, discardExecutor);
+        this.mapToMaterializationId = new HashMap<>();
+    }
+
+    private void updateReference(long checkpointId, TaskStateSnapshot 
localState) {
+        if (localState == null) {
+            localState = NULL_DUMMY;
+        }
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = 
changelogStateBackendHandle.getMaterializationID();
+                    if (mapToMaterializationId.getOrDefault(checkpointId, 
Long.MAX_VALUE)
+                            < materializationID) {
+                        LOG.info(
+                                "Update checkpoint {}, old materializationID 
{}, new materializationID {}.",
+                                checkpointId,
+                                mapToMaterializationId.get(checkpointId),
+                                materializationID);
+                    }
+                    mapToMaterializationId.put(checkpointId, 
materializationID);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot 
localState) {
+        if (checkpointId < lastCheckpointId) {
+            LOG.info(
+                    "Current checkpoint {} is out of order, smaller than last 
CheckpointId {}.",
+                    lastCheckpointId,
+                    checkpointId);
+            return;
+        } else {
+            lastCheckpointId = checkpointId;
+        }
+        synchronized (lock) {
+            updateReference(checkpointId, localState);
+        }
+        super.storeLocalState(checkpointId, localState);
+    }
+
+    @Override
+    protected File getCheckpointDirectory(long checkpointId) {
+        final File checkpointDirectory =
+                localRecoveryConfig
+                        .getLocalStateDirectoryProvider()
+                        .orElseThrow(
+                                () -> new IllegalStateException("Local 
recovery must be enabled."))

Review Comment:
   Should we extract this into a field or function already:) 
   
   (also duplicated in parent)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.LongPredicate;
+
+/** Changelog's implementation of a {@link TaskLocalStateStore}. */
+public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogTaskLocalStateStore.class);
+
+    private static final String CHANGE_LOG_CHECKPOINT_PREFIX = 
"changelog_chk_";
+
+    /**
+     * The mapper of checkpointId and materializationId. (cp3, 
materializationId2) means cp3 refer
+     * to m1.
+     */
+    private final Map<Long, Long> mapToMaterializationId;
+
+    /** Last checkpointId, to check whether checkpoint is out of order. */
+    private long lastCheckpointId = -1L;
+
+    public ChangelogTaskLocalStateStore(
+            @Nonnull JobID jobID,
+            @Nonnull AllocationID allocationID,
+            @Nonnull JobVertexID jobVertexID,
+            @Nonnegative int subtaskIndex,
+            @Nonnull LocalRecoveryConfig localRecoveryConfig,
+            @Nonnull Executor discardExecutor) {
+        super(jobID, allocationID, jobVertexID, subtaskIndex, 
localRecoveryConfig, discardExecutor);
+        this.mapToMaterializationId = new HashMap<>();
+    }
+
+    private void updateReference(long checkpointId, TaskStateSnapshot 
localState) {
+        if (localState == null) {
+            localState = NULL_DUMMY;
+        }
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = 
changelogStateBackendHandle.getMaterializationID();
+                    if (mapToMaterializationId.getOrDefault(checkpointId, 
Long.MAX_VALUE)
+                            < materializationID) {
+                        LOG.info(
+                                "Update checkpoint {}, old materializationID 
{}, new materializationID {}.",
+                                checkpointId,
+                                mapToMaterializationId.get(checkpointId),
+                                materializationID);
+                    }
+                    mapToMaterializationId.put(checkpointId, 
materializationID);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot 
localState) {
+        if (checkpointId < lastCheckpointId) {
+            LOG.info(
+                    "Current checkpoint {} is out of order, smaller than last 
CheckpointId {}.",
+                    lastCheckpointId,
+                    checkpointId);
+            return;
+        } else {
+            lastCheckpointId = checkpointId;
+        }
+        synchronized (lock) {
+            updateReference(checkpointId, localState);
+        }
+        super.storeLocalState(checkpointId, localState);
+    }
+
+    @Override
+    protected File getCheckpointDirectory(long checkpointId) {
+        final File checkpointDirectory =
+                localRecoveryConfig
+                        .getLocalStateDirectoryProvider()
+                        .orElseThrow(
+                                () -> new IllegalStateException("Local 
recovery must be enabled."))
+                        .subtaskBaseDirectory(checkpointId);
+        File directoryForChangelog =
+                new File(checkpointDirectory, CHANGE_LOG_CHECKPOINT_PREFIX + 
checkpointId);
+
+        if (!directoryForChangelog.exists() && 
!directoryForChangelog.mkdirs()) {
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "Could not create the checkpoint directory '%s'",
+                            directoryForChangelog));
+        }
+
+        return directoryForChangelog;
+    }
+
+    private void deleteMaterialization(LongPredicate pruningChecker) {
+        final Set<Long> materializationToRemove = new HashSet<>();
+        synchronized (lock) {
+            Iterator<Entry<Long, Long>> iterator = 
mapToMaterializationId.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, Long> entry = iterator.next();
+                long entryCheckpointId = entry.getKey();
+                if (pruningChecker.test(entryCheckpointId)) {
+                    materializationToRemove.add(entry.getValue());
+                    iterator.remove();
+                }
+            }
+
+            iterator = mapToMaterializationId.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, Long> entry = iterator.next();
+                materializationToRemove.remove(entry.getValue());
+            }

Review Comment:
   How about simplifying this code?
   ```
   Set<Long> checkpoints =
           mapToMaterializationId.keySet().stream()
                   .filter(pruningChecker::test)
                   .collect(Collectors.toSet());
   materializationToRemove =
           checkpoints.stream()
                   .map(mapToMaterializationId::remove)
                   .collect(Collectors.toSet());
   materializationToRemove.removeAll(mapToMaterializationId.values());
   ```
   
   Alternatively, we could maintain a Map<materializationId, Set<checkpointId>> 
and discard materialization whenever the set is empty. But two maps seems more 
complex.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.LongPredicate;
+
+/** Changelog's implementation of a {@link TaskLocalStateStore}. */
+public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogTaskLocalStateStore.class);
+
+    private static final String CHANGE_LOG_CHECKPOINT_PREFIX = 
"changelog_chk_";
+
+    /**
+     * The mapper of checkpointId and materializationId. (cp3, 
materializationId2) means cp3 refer
+     * to m1.
+     */
+    private final Map<Long, Long> mapToMaterializationId;
+
+    /** Last checkpointId, to check whether checkpoint is out of order. */
+    private long lastCheckpointId = -1L;
+
+    public ChangelogTaskLocalStateStore(
+            @Nonnull JobID jobID,
+            @Nonnull AllocationID allocationID,
+            @Nonnull JobVertexID jobVertexID,
+            @Nonnegative int subtaskIndex,
+            @Nonnull LocalRecoveryConfig localRecoveryConfig,
+            @Nonnull Executor discardExecutor) {
+        super(jobID, allocationID, jobVertexID, subtaskIndex, 
localRecoveryConfig, discardExecutor);
+        this.mapToMaterializationId = new HashMap<>();
+    }
+
+    private void updateReference(long checkpointId, TaskStateSnapshot 
localState) {
+        if (localState == null) {
+            localState = NULL_DUMMY;
+        }
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = 
changelogStateBackendHandle.getMaterializationID();
+                    if (mapToMaterializationId.getOrDefault(checkpointId, 
Long.MAX_VALUE)
+                            < materializationID) {
+                        LOG.info(
+                                "Update checkpoint {}, old materializationID 
{}, new materializationID {}.",
+                                checkpointId,
+                                mapToMaterializationId.get(checkpointId),
+                                materializationID);
+                    }
+                    mapToMaterializationId.put(checkpointId, 
materializationID);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot 
localState) {
+        if (checkpointId < lastCheckpointId) {
+            LOG.info(
+                    "Current checkpoint {} is out of order, smaller than last 
CheckpointId {}.",
+                    lastCheckpointId,
+                    checkpointId);
+            return;
+        } else {
+            lastCheckpointId = checkpointId;
+        }
+        synchronized (lock) {
+            updateReference(checkpointId, localState);
+        }
+        super.storeLocalState(checkpointId, localState);
+    }
+
+    @Override
+    protected File getCheckpointDirectory(long checkpointId) {
+        final File checkpointDirectory =
+                localRecoveryConfig
+                        .getLocalStateDirectoryProvider()
+                        .orElseThrow(
+                                () -> new IllegalStateException("Local 
recovery must be enabled."))
+                        .subtaskBaseDirectory(checkpointId);
+        File directoryForChangelog =
+                new File(checkpointDirectory, CHANGE_LOG_CHECKPOINT_PREFIX + 
checkpointId);
+
+        if (!directoryForChangelog.exists() && 
!directoryForChangelog.mkdirs()) {
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "Could not create the checkpoint directory '%s'",
+                            directoryForChangelog));
+        }
+
+        return directoryForChangelog;
+    }
+
+    private void deleteMaterialization(LongPredicate pruningChecker) {
+        final Set<Long> materializationToRemove = new HashSet<>();
+        synchronized (lock) {
+            Iterator<Entry<Long, Long>> iterator = 
mapToMaterializationId.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, Long> entry = iterator.next();
+                long entryCheckpointId = entry.getKey();
+                if (pruningChecker.test(entryCheckpointId)) {
+                    materializationToRemove.add(entry.getValue());
+                    iterator.remove();
+                }
+            }
+
+            iterator = mapToMaterializationId.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, Long> entry = iterator.next();
+                materializationToRemove.remove(entry.getValue());
+            }
+        }
+
+        for (Long materializationId : materializationToRemove) {
+            File materializedDir =
+                    localRecoveryConfig
+                            .getLocalStateDirectoryProvider()
+                            .orElseThrow(
+                                    () ->
+                                            new IllegalStateException(
+                                                    "Local recovery must be 
enabled."))
+                            
.subtaskSpecificCheckpointDirectory(materializationId);
+
+            if (!materializedDir.exists()) {
+                continue;
+            }
+            try {
+                deleteDirectory(materializedDir);
+            } catch (IOException ex) {
+                LOG.warn(
+                        "Exception while deleting local state directory of 
materialized part {} in subtask ({} - {} - {}).",
+                        materializedDir,
+                        jobID,
+                        jobVertexID,
+                        subtaskIndex,
+                        ex);
+            }
+        }
+    }
+
+    @Override
+    public void confirmCheckpoint(long confirmedCheckpointId) {

Review Comment:
   Instead of overriding (and duplicating) these two methdos (confirm & abort)
   can we override `pruneCheckpoints`?
   Deleting materialization seems to be a part of pruning...
   
   ```
       @Override
       protected void pruneCheckpoints(LongPredicate pruningChecker, boolean 
breakOnceCheckerFalse) {
           deleteMaterialization(pruningChecker);
           super.pruneCheckpoints(pruningChecker, breakOnceCheckerFalse);
       }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.LongPredicate;
+
+/** Changelog's implementation of a {@link TaskLocalStateStore}. */
+public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogTaskLocalStateStore.class);
+
+    private static final String CHANGE_LOG_CHECKPOINT_PREFIX = 
"changelog_chk_";
+
+    /**
+     * The mapper of checkpointId and materializationId. (cp3, 
materializationId2) means cp3 refer
+     * to m1.
+     */
+    private final Map<Long, Long> mapToMaterializationId;
+
+    /** Last checkpointId, to check whether checkpoint is out of order. */
+    private long lastCheckpointId = -1L;
+
+    public ChangelogTaskLocalStateStore(
+            @Nonnull JobID jobID,
+            @Nonnull AllocationID allocationID,
+            @Nonnull JobVertexID jobVertexID,
+            @Nonnegative int subtaskIndex,
+            @Nonnull LocalRecoveryConfig localRecoveryConfig,
+            @Nonnull Executor discardExecutor) {
+        super(jobID, allocationID, jobVertexID, subtaskIndex, 
localRecoveryConfig, discardExecutor);
+        this.mapToMaterializationId = new HashMap<>();
+    }
+
+    private void updateReference(long checkpointId, TaskStateSnapshot 
localState) {
+        if (localState == null) {
+            localState = NULL_DUMMY;
+        }
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = 
changelogStateBackendHandle.getMaterializationID();
+                    if (mapToMaterializationId.getOrDefault(checkpointId, 
Long.MAX_VALUE)
+                            < materializationID) {
+                        LOG.info(
+                                "Update checkpoint {}, old materializationID 
{}, new materializationID {}.",
+                                checkpointId,
+                                mapToMaterializationId.get(checkpointId),
+                                materializationID);
+                    }
+                    mapToMaterializationId.put(checkpointId, 
materializationID);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot 
localState) {
+        if (checkpointId < lastCheckpointId) {
+            LOG.info(
+                    "Current checkpoint {} is out of order, smaller than last 
CheckpointId {}.",
+                    lastCheckpointId,
+                    checkpointId);
+            return;
+        } else {
+            lastCheckpointId = checkpointId;
+        }
+        synchronized (lock) {
+            updateReference(checkpointId, localState);
+        }
+        super.storeLocalState(checkpointId, localState);
+    }
+
+    @Override
+    protected File getCheckpointDirectory(long checkpointId) {
+        final File checkpointDirectory =
+                localRecoveryConfig
+                        .getLocalStateDirectoryProvider()
+                        .orElseThrow(
+                                () -> new IllegalStateException("Local 
recovery must be enabled."))
+                        .subtaskBaseDirectory(checkpointId);
+        File directoryForChangelog =
+                new File(checkpointDirectory, CHANGE_LOG_CHECKPOINT_PREFIX + 
checkpointId);
+
+        if (!directoryForChangelog.exists() && 
!directoryForChangelog.mkdirs()) {
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "Could not create the checkpoint directory '%s'",
+                            directoryForChangelog));
+        }
+
+        return directoryForChangelog;
+    }
+
+    private void deleteMaterialization(LongPredicate pruningChecker) {
+        final Set<Long> materializationToRemove = new HashSet<>();
+        synchronized (lock) {
+            Iterator<Entry<Long, Long>> iterator = 
mapToMaterializationId.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, Long> entry = iterator.next();
+                long entryCheckpointId = entry.getKey();
+                if (pruningChecker.test(entryCheckpointId)) {
+                    materializationToRemove.add(entry.getValue());
+                    iterator.remove();
+                }
+            }
+
+            iterator = mapToMaterializationId.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, Long> entry = iterator.next();
+                materializationToRemove.remove(entry.getValue());
+            }
+        }
+
+        for (Long materializationId : materializationToRemove) {
+            File materializedDir =
+                    localRecoveryConfig
+                            .getLocalStateDirectoryProvider()
+                            .orElseThrow(
+                                    () ->
+                                            new IllegalStateException(
+                                                    "Local recovery must be 
enabled."))
+                            
.subtaskSpecificCheckpointDirectory(materializationId);
+
+            if (!materializedDir.exists()) {
+                continue;
+            }

Review Comment:
   nit: invert if?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java:
##########
@@ -164,22 +165,27 @@ public TaskLocalStateStore localStateStoreForSubtask(
                 LocalRecoveryConfig localRecoveryConfig =
                         new LocalRecoveryConfig(directoryProvider);
 
-                taskLocalStateStore =
-                        localRecoveryConfig.isLocalRecoveryEnabled()
-                                ?
-
-                                // Real store implementation if local recovery 
is enabled
-                                new TaskLocalStateStoreImpl(
-                                        jobId,
-                                        allocationID,
-                                        jobVertexID,
-                                        subtaskIndex,
-                                        localRecoveryConfig,
-                                        discardExecutor)
-                                :
-
-                                // NOP implementation if local recovery is 
disabled
-                                new 
NoOpTaskLocalStateStoreImpl(localRecoveryConfig);
+                if (localRecoveryConfig.isLocalRecoveryEnabled()) {
+                    taskLocalStateStore =
+                            changelogEnabled
+                                    ? new ChangelogTaskLocalStateStore(

Review Comment:
   nit: I'd flatten `if`s and have no-op branch followed by changelog branch 
followed by normal branch



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -686,12 +688,25 @@ public CompletableFuture<Acknowledge> submitTask(
             PartitionProducerStateChecker partitionStateChecker =
                     jobManagerConnection.getPartitionStateChecker();
 
+            // Configuration from application will override the one from env.
+            boolean envChangelogEnabled =
+                    taskManagerConfiguration
+                            .getConfiguration()

Review Comment:
   How about passing cluster and job configurations to 
`TaskExecutorLocalStateStoresManager.localStateStoreForSubtask`
   and parsing there?
   
   nit: I'd inline `envChangelogEnabled` but that's a matter of taste :)



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CollectionSink;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CountFunction;
+import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
+import static 
org.apache.flink.configuration.ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+
+/**
+ * Local recovery IT case for changelog. It never fails because local recovery 
is nice but not
+ * necessary.
+ */
+@RunWith(Parameterized.class)
+public class ChangelogLocalRecoveryITCase {

Review Comment:
   extend `TestLogger`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -409,39 +405,31 @@ private void discardLocalStateForCheckpoint(long 
checkpointID, Optional<TaskStat
                     }
                 });
 
-        Optional<LocalRecoveryDirectoryProvider> directoryProviderOptional =
-                localRecoveryConfig.getLocalStateDirectoryProvider();
+        File checkpointDir = getCheckpointDirectory(checkpointID);
 
-        if (directoryProviderOptional.isPresent()) {
-            File checkpointDir =
-                    directoryProviderOptional
-                            .get()
-                            .subtaskSpecificCheckpointDirectory(checkpointID);

Review Comment:
   This slightly changes the semantics - it now tries to re-create the folder 
if it doesn't exist. Is it intentional? If not, is it safe?
   
   (asserting that the provider exists looks safe)



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CollectionSink;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CountFunction;
+import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
+import static 
org.apache.flink.configuration.ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+
+/**
+ * Local recovery IT case for changelog. It never fails because local recovery 
is nice but not
+ * necessary.
+ */
+@RunWith(Parameterized.class)
+public class ChangelogLocalRecoveryITCase {
+
+    private static final int NUM_TASK_MANAGERS = 2;
+    private static final int NUM_TASK_SLOTS = 1;
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Parameterized.Parameter public AbstractStateBackend delegatedStateBackend;
+
+    @Parameterized.Parameters(name = "delegated state backend type = {0}")
+    public static Collection<AbstractStateBackend> parameter() {
+        return Arrays.asList(
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend(false),
+                new EmbeddedRocksDBStateBackend(true));
+    }
+
+    private MiniClusterWithClientResource cluster;
+    private static String workingDir;
+
+    @BeforeClass
+    public static void setWorkingDir() throws IOException {
+        workingDir = TEMPORARY_FOLDER.newFolder("work").getAbsolutePath();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        Configuration configuration = new Configuration();
+        
configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
+
+        configuration.setString(PROCESS_WORKING_DIR_BASE, workingDir);
+        configuration.setString(JOB_MANAGER_PROCESS_WORKING_DIR_BASE, 
workingDir);
+        configuration.setString(TASK_MANAGER_PROCESS_WORKING_DIR_BASE, 
workingDir);
+        configuration.setBoolean(LOCAL_RECOVERY, true);
+        FsStateChangelogStorageFactory.configure(
+                configuration, TEMPORARY_FOLDER.newFolder(), 
Duration.ofMillis(1000), 1);
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+                                .build());
+        cluster.before();

Review Comment:
   I think cluster teardown is missing.
   
   nit: and probably we could use a rule for that (something like `@ClassRule 
... MiniClusterWithClientResource miniClusterResource`).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -464,7 +452,7 @@ private void pruneCheckpoints(LongPredicate pruningChecker, 
boolean breakOnceChe
                 long entryCheckpointId = snapshotEntry.getKey();
 
                 if (pruningChecker.test(entryCheckpointId)) {
-                    toRemove.add(snapshotEntry);
+                    toRemove.add(new SimpleEntry<>(snapshotEntry));

Review Comment:
   Please add a comment why it's necessary (or even better I guess refactor the 
code and replace `Map.Entry` with `Tuple2` but that's probably out of scope).



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java:
##########
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.LongPredicate;
+
+/** Changelog's implementation of a {@link TaskLocalStateStore}. */
+public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogTaskLocalStateStore.class);
+
+    private static final String CHANGE_LOG_CHECKPOINT_PREFIX = 
"changelog_chk_";
+
+    /**
+     * The mapper of checkpointId and materializationId. (cp3, 
materializationId2) means cp3 refer
+     * to m1.
+     */
+    private final Map<Long, Long> mapToMaterializationId;
+
+    /** Last checkpointId, to check whether checkpoint is out of order. */
+    private long lastCheckpointId = -1L;
+
+    public ChangelogTaskLocalStateStore(
+            @Nonnull JobID jobID,
+            @Nonnull AllocationID allocationID,
+            @Nonnull JobVertexID jobVertexID,
+            @Nonnegative int subtaskIndex,
+            @Nonnull LocalRecoveryConfig localRecoveryConfig,
+            @Nonnull Executor discardExecutor) {
+        super(jobID, allocationID, jobVertexID, subtaskIndex, 
localRecoveryConfig, discardExecutor);
+        this.mapToMaterializationId = new HashMap<>();
+    }
+
+    private void updateReference(long checkpointId, TaskStateSnapshot 
localState) {
+        if (localState == null) {
+            localState = NULL_DUMMY;
+        }
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = 
changelogStateBackendHandle.getMaterializationID();
+                    if (mapToMaterializationId.getOrDefault(checkpointId, 
Long.MAX_VALUE)
+                            < materializationID) {
+                        LOG.info(
+                                "Update checkpoint {}, old materializationID 
{}, new materializationID {}.",
+                                checkpointId,
+                                mapToMaterializationId.get(checkpointId),
+                                materializationID);
+                    }
+                    mapToMaterializationId.put(checkpointId, 
materializationID);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot 
localState) {
+        if (checkpointId < lastCheckpointId) {
+            LOG.info(
+                    "Current checkpoint {} is out of order, smaller than last 
CheckpointId {}.",
+                    lastCheckpointId,
+                    checkpointId);
+            return;
+        } else {
+            lastCheckpointId = checkpointId;
+        }
+        synchronized (lock) {
+            updateReference(checkpointId, localState);
+        }
+        super.storeLocalState(checkpointId, localState);
+    }
+
+    @Override
+    protected File getCheckpointDirectory(long checkpointId) {
+        final File checkpointDirectory =
+                localRecoveryConfig
+                        .getLocalStateDirectoryProvider()
+                        .orElseThrow(
+                                () -> new IllegalStateException("Local 
recovery must be enabled."))
+                        .subtaskBaseDirectory(checkpointId);
+        File directoryForChangelog =
+                new File(checkpointDirectory, CHANGE_LOG_CHECKPOINT_PREFIX + 
checkpointId);
+
+        if (!directoryForChangelog.exists() && 
!directoryForChangelog.mkdirs()) {
+            throw new FlinkRuntimeException(
+                    String.format(
+                            "Could not create the checkpoint directory '%s'",
+                            directoryForChangelog));
+        }
+
+        return directoryForChangelog;
+    }
+
+    private void deleteMaterialization(LongPredicate pruningChecker) {
+        final Set<Long> materializationToRemove = new HashSet<>();
+        synchronized (lock) {
+            Iterator<Entry<Long, Long>> iterator = 
mapToMaterializationId.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, Long> entry = iterator.next();
+                long entryCheckpointId = entry.getKey();
+                if (pruningChecker.test(entryCheckpointId)) {
+                    materializationToRemove.add(entry.getValue());
+                    iterator.remove();
+                }
+            }
+
+            iterator = mapToMaterializationId.entrySet().iterator();
+            while (iterator.hasNext()) {
+                Map.Entry<Long, Long> entry = iterator.next();
+                materializationToRemove.remove(entry.getValue());
+            }
+        }
+
+        for (Long materializationId : materializationToRemove) {
+            File materializedDir =
+                    localRecoveryConfig
+                            .getLocalStateDirectoryProvider()
+                            .orElseThrow(
+                                    () ->
+                                            new IllegalStateException(
+                                                    "Local recovery must be 
enabled."))
+                            
.subtaskSpecificCheckpointDirectory(materializationId);
+
+            if (!materializedDir.exists()) {
+                continue;
+            }
+            try {
+                deleteDirectory(materializedDir);
+            } catch (IOException ex) {
+                LOG.warn(
+                        "Exception while deleting local state directory of 
materialized part {} in subtask ({} - {} - {}).",
+                        materializedDir,
+                        jobID,
+                        jobVertexID,
+                        subtaskIndex,
+                        ex);
+            }
+        }
+    }
+
+    @Override
+    public void confirmCheckpoint(long confirmedCheckpointId) {
+        // Scenarios:
+        //   c1,m1
+        //   confirm c1, do nothing.
+        //   c2,m1
+        //   confirm c2, delete c1, don't delete m1
+        //   c3,m2
+        //   confirm c3, delete c2, delete m1
+        LOG.debug(
+                "Received confirmation for checkpoint {} in subtask ({} - {} - 
{}). Starting to prune history.",
+                confirmedCheckpointId,
+                jobID,
+                jobVertexID,
+                subtaskIndex);
+        // delete changelog-chk
+        pruneCheckpoints(checkpointId -> checkpointId < confirmedCheckpointId, 
false);

Review Comment:
   Parent method has `true` here (break iteration).
   Is it intentional?



##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogLocalRecoveryITCase.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CollectionSink;
+import 
org.apache.flink.test.checkpointing.ChangelogPeriodicMaterializationTestBase.CountFunction;
+import org.apache.flink.test.util.InfiniteIntegerSource;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.configuration.CheckpointingOptions.LOCAL_RECOVERY;
+import static 
org.apache.flink.configuration.ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.configuration.ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+
+/**
+ * Local recovery IT case for changelog. It never fails because local recovery 
is nice but not
+ * necessary.
+ */
+@RunWith(Parameterized.class)
+public class ChangelogLocalRecoveryITCase {
+
+    private static final int NUM_TASK_MANAGERS = 2;
+    private static final int NUM_TASK_SLOTS = 1;
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Parameterized.Parameter public AbstractStateBackend delegatedStateBackend;
+
+    @Parameterized.Parameters(name = "delegated state backend type = {0}")
+    public static Collection<AbstractStateBackend> parameter() {
+        return Arrays.asList(
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend(false),
+                new EmbeddedRocksDBStateBackend(true));
+    }
+
+    private MiniClusterWithClientResource cluster;
+    private static String workingDir;
+
+    @BeforeClass
+    public static void setWorkingDir() throws IOException {
+        workingDir = TEMPORARY_FOLDER.newFolder("work").getAbsolutePath();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        Configuration configuration = new Configuration();
+        
configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
+
+        configuration.setString(PROCESS_WORKING_DIR_BASE, workingDir);
+        configuration.setString(JOB_MANAGER_PROCESS_WORKING_DIR_BASE, 
workingDir);
+        configuration.setString(TASK_MANAGER_PROCESS_WORKING_DIR_BASE, 
workingDir);
+        configuration.setBoolean(LOCAL_RECOVERY, true);
+        FsStateChangelogStorageFactory.configure(
+                configuration, TEMPORARY_FOLDER.newFolder(), 
Duration.ofMillis(1000), 1);
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+                                .build());
+        cluster.before();
+        cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
+    }
+
+    private JobGraph buildJobGraph(StreamExecutionEnvironment env) {
+        env.addSource(new InfiniteIntegerSource())
+                .setParallelism(1)
+                .keyBy(element -> element)
+                .process(new CountFunction())
+                .addSink(new CollectionSink())
+                .setParallelism(1);
+        return env.getStreamGraph().getJobGraph();
+    }
+
+    @Test
+    public void testRestartTM() throws Exception {
+        File checkpointFolder = TEMPORARY_FOLDER.newFolder();
+        MiniCluster miniCluster = cluster.getMiniCluster();
+        StreamExecutionEnvironment env1 =
+                getEnv(delegatedStateBackend, checkpointFolder, true, 200, 
800);
+        JobGraph firstJobGraph = buildJobGraph(env1);
+
+        miniCluster.submitJob(firstJobGraph).get();
+        waitForAllTaskRunning(miniCluster, firstJobGraph.getJobID(), false);
+        // wait job for doing materialization.
+        Thread.sleep(2000);

Review Comment:
   Could you replace it with the actual check that there's a checkpoint with 
some materialized state? (some tests are already doing that).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to