rkhachatryan commented on a change in pull request #18224:
URL: https://github.com/apache/flink/pull/18224#discussion_r818237157



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }

Review comment:
       Is this override neccessary?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, 
t -> t));
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }

Review comment:
       Is it guaranteed that there was no materialization? (or it's not a 
strict requirement?)

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, 
t -> t));
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        generateJobID()));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        JobID jobID = generateJobID();
+        SharedReference<AtomicInteger> currentCheckpointNum =
+                sharedObjects.add(new AtomicInteger());
+        SharedReference<List<Long>> currentMaterializationId = 
sharedObjects.add(new ArrayList<>());

Review comment:
       Concurrent access? (although there likely be memory barriers between 
accesses, it's safer to using some concurrent collection IMO).

##########
File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
##########
@@ -94,6 +102,59 @@ public static void waitUntilJobInitializationFinished(
                 userCodeClassloader);
     }
 
+    public static CheckpointMetadata loadCheckpointMetadata(String 
savepointPath)
+            throws IOException {
+        CompletedCheckpointStorageLocation location =
+                
AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(savepointPath);
+
+        try (DataInputStream stream =
+                new 
DataInputStream(location.getMetadataHandle().openInputStream())) {
+            return Checkpoints.loadCheckpointMetadata(
+                    stream, Thread.currentThread().getContextClassLoader(), 
savepointPath);
+        }
+    }
+
+    public static List<Path> findAllSortedExternalizedCheckpoint(File 
checkpointDir, JobID jobId)
+            throws IOException {
+        return findAllExternalizedCheckpoint(checkpointDir, jobId).stream()
+                .sorted(
+                        Comparator.comparingInt(
+                                path ->
+                                        Integer.parseInt(
+                                                path.getFileName()
+                                                        .toString()
+                                                        .substring(
+                                                                
CHECKPOINT_DIR_PREFIX.length()))))
+                .collect(Collectors.toList());
+    }
+
+    public static Optional<Path> findExternalizedCheckpoint(File 
checkpointDir, JobID jobId)

Review comment:
       Can't some existing methods be used here, e.g. 
`getMostRecentCompletedCheckpointMaybe`?
   Are these methods specific to externalized checkpoints?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+
+import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This verifies that rescale works correctly for Changelog state backend with 
materialized state /
+ * non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationRescaleITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationRescaleITCase(
+            AbstractStateBackend delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Test
+    public void testRescaleOut() throws Exception {
+        testRescale(NUM_SLOTS / 2, NUM_SLOTS);
+    }
+
+    @Test
+    public void testRescaleIn() throws Exception {
+        testRescale(NUM_SLOTS, NUM_SLOTS / 2);
+    }
+
+    private void testRescale(int firstParallelism, int secondParallelism) 
throws Exception {

Review comment:
       Sure, I've found two issues:
   1. FLINK-26455 which causes `CancellationException`
   2. Unaligned checkpoints have some 
[limitations](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/unaligned_checkpoints/#certain-data-distribution-patterns-are-not-checkpointed)
 with rescaling which fails the recovery. I think we can simply disable UC for 
now

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint;
+
+/**
+ * This verifies that rescale works correctly for Changelog state backend with 
materialized state /
+ * non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationRescaleITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationRescaleITCase(
+            AbstractStateBackend delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Test
+    public void testRescaleOut() throws Exception {
+        testRescale(NUM_SLOTS / 2, NUM_SLOTS);
+    }
+
+    @Test
+    public void testRescaleIn() throws Exception {
+        testRescale(NUM_SLOTS, NUM_SLOTS / 2);
+    }
+
+    private void testRescale(int firstParallelism, int secondParallelism) 
throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        JobID firstJobID = generateJobID();
+
+        StreamExecutionEnvironment env =
+                getEnv(delegatedStateBackend, checkpointFile, 50, 0, 20, 0);
+        env.getCheckpointConfig()
+                .setExternalizedCheckpointCleanup(
+                        
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.setParallelism(firstParallelism);
+
+        JobGraph firstJobGraph =
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (currentIndex == TOTAL_ELEMENTS / 4) {
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
getAllMaterializationId(
+                                                                            
checkpointFile,
+                                                                            
firstJobID)
+                                                                    
.isEmpty());
+                                } else if (currentIndex > TOTAL_ELEMENTS / 4) {
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        firstJobID);
+
+        try {
+            cluster.getMiniCluster().submitJob(firstJobGraph).get();
+            
cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get();
+        } catch (Exception ex) {
+            Preconditions.checkState(
+                    ExceptionUtils.findThrowable(ex, 
ArtificialFailure.class).isPresent());
+        }
+
+        env.setParallelism(secondParallelism);
+        JobID secondJobId = generateJobID();
+        JobGraph jobGraph =
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (currentIndex == TOTAL_ELEMENTS / 2) {
+                                    waitWhile(
+                                            () ->
+                                                    getAllMaterializationId(
+                                                                    
checkpointFile, secondJobId)
+                                                            .isEmpty());

Review comment:
       Does this include also old (pre-recovery) materializations?
   If so, the test currently validates recovery, but not the materialization 
after recovery.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, 
t -> t));
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        generateJobID()));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        JobID jobID = generateJobID();
+        SharedReference<AtomicInteger> currentCheckpointNum =
+                sharedObjects.add(new AtomicInteger());
+        SharedReference<List<Long>> currentMaterializationId = 
sharedObjects.add(new ArrayList<>());
+        StreamExecutionEnvironment env =
+                getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0);
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () -> {
+                                                if 
(completedCheckpointNum.get() <= 0) {
+                                                    return true;
+                                                }
+                                                List<Long> 
allMaterializationId =
+                                                        
getAllMaterializationId(
+                                                                
checkpointFile, jobID);
+                                                if 
(!allMaterializationId.isEmpty()) {
+                                                    currentMaterializationId
+                                                            .get()
+                                                            
.addAll(allMaterializationId);
+                                                    Collections.sort(
+                                                            
currentMaterializationId.get());
+                                                    currentCheckpointNum
+                                                            .get()
+                                                            .compareAndSet(
+                                                                    0,
+                                                                    
completedCheckpointNum.get());
+                                                    return false;
+                                                }
+                                                return true;
+                                            });
+
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(
+                                            () -> {
+                                                if 
(completedCheckpointNum.get()
+                                                        <= 
currentCheckpointNum.get().get()) {
+                                                    return true;
+                                                }
+                                                List<Long> 
allMaterializationId =
+                                                        
getAllMaterializationId(
+                                                                
checkpointFile, jobID);
+                                                
Collections.sort(allMaterializationId);

Review comment:
       Can't we use Set instead of sorting to check for equality?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, 
t -> t));
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        generateJobID()));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        JobID jobID = generateJobID();
+        SharedReference<AtomicInteger> currentCheckpointNum =
+                sharedObjects.add(new AtomicInteger());
+        SharedReference<List<Long>> currentMaterializationId = 
sharedObjects.add(new ArrayList<>());
+        StreamExecutionEnvironment env =
+                getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0);
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () -> {
+                                                if 
(completedCheckpointNum.get() <= 0) {
+                                                    return true;
+                                                }
+                                                List<Long> 
allMaterializationId =
+                                                        
getAllMaterializationId(
+                                                                
checkpointFile, jobID);
+                                                if 
(!allMaterializationId.isEmpty()) {
+                                                    currentMaterializationId
+                                                            .get()
+                                                            
.addAll(allMaterializationId);
+                                                    Collections.sort(
+                                                            
currentMaterializationId.get());
+                                                    currentCheckpointNum
+                                                            .get()
+                                                            .compareAndSet(
+                                                                    0,
+                                                                    
completedCheckpointNum.get());
+                                                    return false;
+                                                }
+                                                return true;
+                                            });
+
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1

Review comment:
       What if some other failure happens and `getAttemptNumber` returns 2?
   I guess it's not allowed by maxRestartAttempts == 2, but probably it makes 
sense to enforce this (e.g. with `Preconditions.checkState`)?

##########
File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
##########
@@ -94,6 +102,59 @@ public static void waitUntilJobInitializationFinished(
                 userCodeClassloader);
     }
 
+    public static CheckpointMetadata loadCheckpointMetadata(String 
savepointPath)
+            throws IOException {
+        CompletedCheckpointStorageLocation location =
+                
AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(savepointPath);
+
+        try (DataInputStream stream =
+                new 
DataInputStream(location.getMetadataHandle().openInputStream())) {
+            return Checkpoints.loadCheckpointMetadata(
+                    stream, Thread.currentThread().getContextClassLoader(), 
savepointPath);
+        }
+    }
+
+    public static List<Path> findAllSortedExternalizedCheckpoint(File 
checkpointDir, JobID jobId)
+            throws IOException {
+        return findAllExternalizedCheckpoint(checkpointDir, jobId).stream()
+                .sorted(
+                        Comparator.comparingInt(
+                                path ->
+                                        Integer.parseInt(
+                                                path.getFileName()
+                                                        .toString()
+                                                        .substring(
+                                                                
CHECKPOINT_DIR_PREFIX.length()))))
+                .collect(Collectors.toList());
+    }
+
+    public static Optional<Path> findExternalizedCheckpoint(File 
checkpointDir, JobID jobId)
+            throws IOException {
+        return findAllExternalizedCheckpoint(checkpointDir, 
jobId).stream().findAny();
+    }
+
+    public static List<Path> findAllExternalizedCheckpoint(File checkpointDir, 
JobID jobId)
+            throws IOException {
+        try (Stream<Path> checkpoints =
+                Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
+            return checkpoints
+                    .filter(path -> 
path.getFileName().toString().startsWith(CHECKPOINT_DIR_PREFIX))
+                    .filter(
+                            path -> {
+                                try (Stream<Path> checkpointFiles = 
Files.list(path)) {
+                                    return checkpointFiles.anyMatch(
+                                            child ->
+                                                    child.getFileName()
+                                                            .toString()
+                                                            
.equals(METADATA_FILE_NAME));
+                                } catch (IOException ignored) {
+                                    return false;
+                                }

Review comment:
       Could you explain why is the exception ignored here?
   With externalized checkpoints, the checkpoint shouldn't be removed, should 
it?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, 
t -> t));
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        generateJobID()));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();

Review comment:
       Rename `checkpointFile` to `checkpointFolder`? 
   
   ditto: the next test

##########
File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
##########
@@ -94,6 +102,59 @@ public static void waitUntilJobInitializationFinished(
                 userCodeClassloader);
     }
 
+    public static CheckpointMetadata loadCheckpointMetadata(String 
savepointPath)
+            throws IOException {
+        CompletedCheckpointStorageLocation location =
+                
AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(savepointPath);
+
+        try (DataInputStream stream =
+                new 
DataInputStream(location.getMetadataHandle().openInputStream())) {
+            return Checkpoints.loadCheckpointMetadata(
+                    stream, Thread.currentThread().getContextClassLoader(), 
savepointPath);
+        }
+    }
+
+    public static List<Path> findAllSortedExternalizedCheckpoint(File 
checkpointDir, JobID jobId)
+            throws IOException {
+        return findAllExternalizedCheckpoint(checkpointDir, jobId).stream()
+                .sorted(
+                        Comparator.comparingInt(
+                                path ->
+                                        Integer.parseInt(
+                                                path.getFileName()
+                                                        .toString()
+                                                        .substring(
+                                                                
CHECKPOINT_DIR_PREFIX.length()))))
+                .collect(Collectors.toList());
+    }
+
+    public static Optional<Path> findExternalizedCheckpoint(File 
checkpointDir, JobID jobId)

Review comment:
       Can't some existing methods be used here, e.g. 
`getMostRecentCompletedCheckpointMaybe`?
   Are these methods specific to externalized checkpoints?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, 
t -> t));
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        generateJobID()));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        JobID jobID = generateJobID();
+        SharedReference<AtomicInteger> currentCheckpointNum =
+                sharedObjects.add(new AtomicInteger());
+        SharedReference<List<Long>> currentMaterializationId = 
sharedObjects.add(new ArrayList<>());
+        StreamExecutionEnvironment env =
+                getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0);
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () -> {
+                                                if 
(completedCheckpointNum.get() <= 0) {
+                                                    return true;
+                                                }
+                                                List<Long> 
allMaterializationId =
+                                                        
getAllMaterializationId(
+                                                                
checkpointFile, jobID);
+                                                if 
(!allMaterializationId.isEmpty()) {
+                                                    currentMaterializationId
+                                                            .get()
+                                                            
.addAll(allMaterializationId);
+                                                    Collections.sort(
+                                                            
currentMaterializationId.get());
+                                                    currentCheckpointNum
+                                                            .get()
+                                                            .compareAndSet(
+                                                                    0,
+                                                                    
completedCheckpointNum.get());
+                                                    return false;
+                                                }
+                                                return true;
+                                            });
+
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(
+                                            () -> {
+                                                if 
(completedCheckpointNum.get()
+                                                        <= 
currentCheckpointNum.get().get()) {
+                                                    return true;
+                                                }
+                                                List<Long> 
allMaterializationId =
+                                                        
getAllMaterializationId(
+                                                                
checkpointFile, jobID);
+                                                
Collections.sort(allMaterializationId);
+                                                return 
allMaterializationId.isEmpty()
+                                                        || 
!currentMaterializationId
+                                                                .get()
+                                                                
.equals(allMaterializationId);
+                                            });
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        jobID));
+    }
+
+    @Test
+    public void testFailedMaterialization() throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        JobID jobID = generateJobID();
+        SharedReference<AtomicBoolean> hasTriggerCheckpoint =
+                sharedObjects.add(new AtomicBoolean());
+        SharedReference<List<Long>> currentMaterializationId = 
sharedObjects.add(new ArrayList<>());
+        StreamExecutionEnvironment env =
+                getEnv(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend,
+                                snapshotResultFuture -> {
+                                    if 
(hasTriggerCheckpoint.get().compareAndSet(false, true)) {
+                                        throw new RuntimeException();

Review comment:
       Rename `hasTriggerCheckpoint` to `hasFailed`?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, 
t -> t));
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        generateJobID()));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        JobID jobID = generateJobID();
+        SharedReference<AtomicInteger> currentCheckpointNum =
+                sharedObjects.add(new AtomicInteger());
+        SharedReference<List<Long>> currentMaterializationId = 
sharedObjects.add(new ArrayList<>());
+        StreamExecutionEnvironment env =
+                getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0);
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () -> {
+                                                if 
(completedCheckpointNum.get() <= 0) {
+                                                    return true;
+                                                }
+                                                List<Long> 
allMaterializationId =
+                                                        
getAllMaterializationId(
+                                                                
checkpointFile, jobID);
+                                                if 
(!allMaterializationId.isEmpty()) {
+                                                    currentMaterializationId
+                                                            .get()
+                                                            
.addAll(allMaterializationId);
+                                                    Collections.sort(
+                                                            
currentMaterializationId.get());
+                                                    currentCheckpointNum
+                                                            .get()
+                                                            .compareAndSet(
+                                                                    0,
+                                                                    
completedCheckpointNum.get());
+                                                    return false;
+                                                }
+                                                return true;
+                                            });
+
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(
+                                            () -> {
+                                                if 
(completedCheckpointNum.get()
+                                                        <= 
currentCheckpointNum.get().get()) {
+                                                    return true;
+                                                }
+                                                List<Long> 
allMaterializationId =
+                                                        
getAllMaterializationId(
+                                                                
checkpointFile, jobID);
+                                                
Collections.sort(allMaterializationId);

Review comment:
       And probably it's better to use `StateHandleID` collected from the 
materialized states to avoid collisions between different subtasks if DoP > 1. 

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
##########
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get;
+import static 
org.apache.flink.test.util.TestUtils.findAllSortedExternalizedCheckpoint;
+import static org.apache.flink.test.util.TestUtils.loadCheckpointMetadata;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/** Base class for tests related to period materialization of 
ChangelogStateBackend. */
+@RunWith(Parameterized.class)
+public abstract class ChangelogPeriodicMaterializationTestBase extends 
TestLogger {
+
+    private static final int NUM_TASK_MANAGERS = 1;
+    private static final int NUM_TASK_SLOTS = 4;
+    protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+    protected static final int TOTAL_ELEMENTS = 10_000;
+
+    protected final AbstractStateBackend delegatedStateBackend;
+
+    protected MiniClusterWithClientResource cluster;
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    @Parameterized.Parameters(name = "delegated state backend type ={0}")
+    public static Collection<AbstractStateBackend> parameter() {
+        return Arrays.asList(
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend(true),
+                new EmbeddedRocksDBStateBackend(false));
+    }
+
+    public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend 
delegatedStateBackend) {
+        this.delegatedStateBackend = delegatedStateBackend;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configure())
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+                                .build());
+        cluster.before();
+        cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        cluster.after();
+        // clear result in sink
+        CollectionSink.clearExpectedResult();
+    }
+
+    protected StreamExecutionEnvironment getEnv(
+            StateBackend stateBackend,
+            File checkpointFile,
+            long checkpointInterval,
+            int restartAttempts,
+            int materializationInterval,
+            int materializationMaxFailure) {
+        StreamExecutionEnvironment env =
+                getEnv(stateBackend, checkpointFile, checkpointInterval, 
restartAttempts);
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(materializationInterval))
+                        .set(
+                                
StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED,
+                                materializationMaxFailure));
+        return env;
+    }
+
+    protected StreamExecutionEnvironment getEnv(
+            StateBackend stateBackend,
+            File checkpointFile,
+            long checkpointInterval,
+            int restartAttempts) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(checkpointInterval)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(checkpointFile.toURI());
+        env.setStateBackend(stateBackend)
+                
.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, 0));
+        return env;
+    }
+
+    protected StreamExecutionEnvironment getEnv(StateBackend stateBackend) 
throws IOException {
+        return getEnv(stateBackend, TEMPORARY_FOLDER.newFolder(), 10, 1);
+    }
+
+    protected JobGraph buildJobGraph(
+            StreamExecutionEnvironment env, ControlledSource controlledSource, 
JobID jobId) {
+        env.addSource(controlledSource)
+                .keyBy(element -> element)
+                .map(new CountMapper())
+                .addSink(new CollectionSink())
+                .setParallelism(1);
+        return env.getStreamGraph().getJobGraph(jobId);
+    }
+
+    protected void waitAndAssert(JobGraph jobGraph) throws Exception {
+        waitUntilJobFinished(jobGraph);
+        assertEquals(CollectionSink.getActualResult(), 
ControlledSource.getExpectedResult());
+    }
+
+    protected JobID generateJobID() {
+        byte[] randomBytes = new byte[AbstractID.SIZE];
+        ThreadLocalRandom.current().nextBytes(randomBytes);
+        return JobID.fromByteArray(randomBytes);
+    }
+
+    protected static List<Long> getAllMaterializationId(File checkpointFile, 
JobID jobID)
+            throws IOException {
+        List<Path> checkpointPaths = 
findAllSortedExternalizedCheckpoint(checkpointFile, jobID);
+        for (Path checkpointPath : checkpointPaths) {
+            CheckpointMetadata checkpointMetadata =
+                    loadCheckpointMetadata(checkpointPath.toString());
+            List<Long> materializationIds =
+                    checkpointMetadata.getOperatorStates().stream()
+                            .flatMap(operatorState -> 
operatorState.getStates().stream())
+                            .flatMap(
+                                    operatorSubtaskState ->
+                                            
operatorSubtaskState.getManagedKeyedState().stream())
+                            .map(keyedStateHandle -> 
(ChangelogStateBackendHandle) keyedStateHandle)
+                            .filter(
+                                    changelogStateBackendHandle ->
+                                            !changelogStateBackendHandle
+                                                    
.getMaterializedStateHandles()
+                                                    .isEmpty())
+                            
.map(ChangelogStateBackendHandle::getMaterializationID)
+                            .collect(Collectors.toList());
+            if (!materializationIds.isEmpty()) {
+                return materializationIds;
+            }
+        }
+        return Collections.emptyList();
+    }
+
+    private void waitUntilJobFinished(JobGraph jobGraph) throws Exception {
+        JobSubmissionResult jobSubmissionResult =
+                cluster.getMiniCluster().submitJob(jobGraph).get();
+        JobResult jobResult =
+                
cluster.getMiniCluster().requestJobResult(jobSubmissionResult.getJobID()).get();
+        assertSame(ApplicationStatus.SUCCEEDED, 
jobResult.getApplicationStatus());

Review comment:
       Could you please throw `jobResult.getSerializedThrowable()` if status 
isn't `SUCCEEDED`?

##########
File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java
##########
@@ -94,6 +102,59 @@ public static void waitUntilJobInitializationFinished(
                 userCodeClassloader);
     }
 
+    public static CheckpointMetadata loadCheckpointMetadata(String 
savepointPath)
+            throws IOException {
+        CompletedCheckpointStorageLocation location =
+                
AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(savepointPath);
+
+        try (DataInputStream stream =
+                new 
DataInputStream(location.getMetadataHandle().openInputStream())) {
+            return Checkpoints.loadCheckpointMetadata(
+                    stream, Thread.currentThread().getContextClassLoader(), 
savepointPath);
+        }
+    }
+
+    public static List<Path> findAllSortedExternalizedCheckpoint(File 
checkpointDir, JobID jobId)
+            throws IOException {
+        return findAllExternalizedCheckpoint(checkpointDir, jobId).stream()
+                .sorted(
+                        Comparator.comparingInt(
+                                path ->
+                                        Integer.parseInt(
+                                                path.getFileName()
+                                                        .toString()
+                                                        .substring(
+                                                                
CHECKPOINT_DIR_PREFIX.length()))))
+                .collect(Collectors.toList());
+    }
+
+    public static Optional<Path> findExternalizedCheckpoint(File 
checkpointDir, JobID jobId)
+            throws IOException {
+        return findAllExternalizedCheckpoint(checkpointDir, 
jobId).stream().findAny();
+    }
+
+    public static List<Path> findAllExternalizedCheckpoint(File checkpointDir, 
JobID jobId)
+            throws IOException {
+        try (Stream<Path> checkpoints =
+                Files.list(checkpointDir.toPath().resolve(jobId.toString()))) {
+            return checkpoints
+                    .filter(path -> 
path.getFileName().toString().startsWith(CHECKPOINT_DIR_PREFIX))
+                    .filter(
+                            path -> {
+                                try (Stream<Path> checkpointFiles = 
Files.list(path)) {
+                                    return checkpointFiles.anyMatch(
+                                            child ->
+                                                    child.getFileName()
+                                                            .toString()
+                                                            
.equals(METADATA_FILE_NAME));
+                                } catch (IOException ignored) {
+                                    return false;
+                                }

Review comment:
       Could you explain why is the exception ignored here?
   With externalized checkpoints, the checkpoint shouldn't be removed, should 
it?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
##########
@@ -0,0 +1,569 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.testutils.junit.SharedObjects;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get;
+import static 
org.apache.flink.test.util.TestUtils.findAllSortedExternalizedCheckpoint;
+import static org.apache.flink.test.util.TestUtils.loadCheckpointMetadata;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/** Base class for tests related to period materialization of 
ChangelogStateBackend. */
+@RunWith(Parameterized.class)
+public abstract class ChangelogPeriodicMaterializationTestBase extends 
TestLogger {
+
+    private static final int NUM_TASK_MANAGERS = 1;
+    private static final int NUM_TASK_SLOTS = 4;
+    protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+    protected static final int TOTAL_ELEMENTS = 10_000;
+
+    protected final AbstractStateBackend delegatedStateBackend;
+
+    protected MiniClusterWithClientResource cluster;
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Rule public final SharedObjects sharedObjects = SharedObjects.create();
+
+    @Parameterized.Parameters(name = "delegated state backend type ={0}")
+    public static Collection<AbstractStateBackend> parameter() {
+        return Arrays.asList(
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend(true),
+                new EmbeddedRocksDBStateBackend(false));
+    }
+
+    public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend 
delegatedStateBackend) {
+        this.delegatedStateBackend = delegatedStateBackend;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configure())
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+                                .build());
+        cluster.before();
+        cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        cluster.after();
+        // clear result in sink
+        CollectionSink.clearExpectedResult();
+    }
+
+    protected StreamExecutionEnvironment getEnv(
+            StateBackend stateBackend,
+            File checkpointFile,
+            long checkpointInterval,
+            int restartAttempts,
+            int materializationInterval,
+            int materializationMaxFailure) {
+        StreamExecutionEnvironment env =
+                getEnv(stateBackend, checkpointFile, checkpointInterval, 
restartAttempts);
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(materializationInterval))
+                        .set(
+                                
StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED,
+                                materializationMaxFailure));
+        return env;
+    }
+
+    protected StreamExecutionEnvironment getEnv(
+            StateBackend stateBackend,
+            File checkpointFile,
+            long checkpointInterval,
+            int restartAttempts) {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(checkpointInterval)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(checkpointFile.toURI());
+        env.setStateBackend(stateBackend)
+                
.setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, 0));
+        return env;
+    }
+
+    protected StreamExecutionEnvironment getEnv(StateBackend stateBackend) 
throws IOException {
+        return getEnv(stateBackend, TEMPORARY_FOLDER.newFolder(), 10, 1);
+    }
+
+    protected JobGraph buildJobGraph(
+            StreamExecutionEnvironment env, ControlledSource controlledSource, 
JobID jobId) {
+        env.addSource(controlledSource)
+                .keyBy(element -> element)
+                .map(new CountMapper())
+                .addSink(new CollectionSink())
+                .setParallelism(1);
+        return env.getStreamGraph().getJobGraph(jobId);
+    }
+
+    protected void waitAndAssert(JobGraph jobGraph) throws Exception {
+        waitUntilJobFinished(jobGraph);
+        assertEquals(CollectionSink.getActualResult(), 
ControlledSource.getExpectedResult());
+    }
+
+    protected JobID generateJobID() {
+        byte[] randomBytes = new byte[AbstractID.SIZE];
+        ThreadLocalRandom.current().nextBytes(randomBytes);
+        return JobID.fromByteArray(randomBytes);
+    }
+
+    protected static List<Long> getAllMaterializationId(File checkpointFile, 
JobID jobID)
+            throws IOException {
+        List<Path> checkpointPaths = 
findAllSortedExternalizedCheckpoint(checkpointFile, jobID);
+        for (Path checkpointPath : checkpointPaths) {
+            CheckpointMetadata checkpointMetadata =
+                    loadCheckpointMetadata(checkpointPath.toString());
+            List<Long> materializationIds =
+                    checkpointMetadata.getOperatorStates().stream()
+                            .flatMap(operatorState -> 
operatorState.getStates().stream())
+                            .flatMap(
+                                    operatorSubtaskState ->
+                                            
operatorSubtaskState.getManagedKeyedState().stream())
+                            .map(keyedStateHandle -> 
(ChangelogStateBackendHandle) keyedStateHandle)
+                            .filter(
+                                    changelogStateBackendHandle ->
+                                            !changelogStateBackendHandle
+                                                    
.getMaterializedStateHandles()
+                                                    .isEmpty())
+                            
.map(ChangelogStateBackendHandle::getMaterializationID)
+                            .collect(Collectors.toList());
+            if (!materializationIds.isEmpty()) {
+                return materializationIds;
+            }

Review comment:
       This means the 1st checkpoint with some materialized state will be 
returned, right?
   Is there any particular reason why not the last one?
   I think at least in case of failover, it could break the check that there 
were materialization after the recovery

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.testutils.junit.SharedReference;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env =
+                getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, 
t -> t));
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        generateJobID()));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        JobID jobID = generateJobID();
+        SharedReference<AtomicInteger> currentCheckpointNum =
+                sharedObjects.add(new AtomicInteger());
+        SharedReference<List<Long>> currentMaterializationId = 
sharedObjects.add(new ArrayList<>());
+        StreamExecutionEnvironment env =
+                getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0);
+        waitAndAssert(
+                buildJobGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void 
beforeElement(SourceContext<Integer> ctx)
+                                    throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () -> {
+                                                if 
(completedCheckpointNum.get() <= 0) {
+                                                    return true;
+                                                }
+                                                List<Long> 
allMaterializationId =
+                                                        
getAllMaterializationId(
+                                                                
checkpointFile, jobID);
+                                                if 
(!allMaterializationId.isEmpty()) {
+                                                    currentMaterializationId
+                                                            .get()
+                                                            
.addAll(allMaterializationId);
+                                                    Collections.sort(
+                                                            
currentMaterializationId.get());
+                                                    currentCheckpointNum
+                                                            .get()
+                                                            .compareAndSet(
+                                                                    0,
+                                                                    
completedCheckpointNum.get());
+                                                    return false;
+                                                }
+                                                return true;
+                                            });
+
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(
+                                            () -> {
+                                                if 
(completedCheckpointNum.get()
+                                                        <= 
currentCheckpointNum.get().get()) {
+                                                    return true;
+                                                }
+                                                List<Long> 
allMaterializationId =
+                                                        
getAllMaterializationId(
+                                                                
checkpointFile, jobID);
+                                                
Collections.sort(allMaterializationId);
+                                                return 
allMaterializationId.isEmpty()
+                                                        || 
!currentMaterializationId
+                                                                .get()
+                                                                
.equals(allMaterializationId);
+                                            });
+                                    throwArtificialFailure();
+                                }
+                            }
+                        },
+                        jobID));
+    }
+
+    @Test
+    public void testFailedMaterialization() throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        JobID jobID = generateJobID();
+        SharedReference<AtomicBoolean> hasTriggerCheckpoint =
+                sharedObjects.add(new AtomicBoolean());
+        SharedReference<List<Long>> currentMaterializationId = 
sharedObjects.add(new ArrayList<>());
+        StreamExecutionEnvironment env =
+                getEnv(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend,
+                                snapshotResultFuture -> {
+                                    if 
(hasTriggerCheckpoint.get().compareAndSet(false, true)) {
+                                        throw new RuntimeException();

Review comment:
       Is it possible that failed is one subtask and materialized another?
   How about setting parallelism to 1 explicitly in `getEnv`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to