rkhachatryan commented on a change in pull request #18224:
URL: https://github.com/apache/flink/pull/18224#discussion_r805742655



##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();

Review comment:
       This field is not used.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMinutes(3)));

Review comment:
       Why do we need to explicitly set the interval here?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMinutes(3)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    PENDING_MATERIALIZATION.clear();
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{

Review comment:
       Don't we already have a checkpoint with materialized state?
   Could you clarify the intention of this branch?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMinutes(3)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    PENDING_MATERIALIZATION.clear();
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 1
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    @Test
+    public void testFailedMaterialization() throws Exception {

Review comment:
       I think it makes sense to make sure that materialization failure doesn't 
affect subsequent materializations. I can imagine that some bug in error 
handling can prevent re-scheduling.
   
   So we probably need one more materialization after the failed one.
   
   WDYT?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/** Base class for tests related to period materialization of 
ChangelogStateBackend. */
+@RunWith(Parameterized.class)
+public abstract class ChangelogPeriodicMaterializationTestBase extends 
TestLogger {
+
+    private static final int NUM_TASK_MANAGERS = 1;
+    private static final int NUM_TASK_SLOTS = 4;
+    protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+    protected static final int TOTAL_ELEMENTS = 10_000;
+
+    protected final AbstractStateBackend delegatedStateBackend;
+
+    protected MiniClusterWithClientResource cluster;
+
+    // Maintain the RunnableFuture of SnapshotResult to make sure the 
materialization phase has
+    // been done
+    protected static final 
Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+            PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>();
+
+    @Rule public Timeout globalTimeout = Timeout.seconds(60);

Review comment:
       The timeout here is probably unnecessary. IIRC, there was a concencus on 
dev ML to rely on global timeouts.
   
   When running multiple times locally, the test failed a couple of times 
because currently the timeout is too tight.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/** Base class for tests related to period materialization of 
ChangelogStateBackend. */
+@RunWith(Parameterized.class)
+public abstract class ChangelogPeriodicMaterializationTestBase extends 
TestLogger {
+
+    private static final int NUM_TASK_MANAGERS = 1;
+    private static final int NUM_TASK_SLOTS = 4;
+    protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+    protected static final int TOTAL_ELEMENTS = 10_000;
+
+    protected final AbstractStateBackend delegatedStateBackend;
+
+    protected MiniClusterWithClientResource cluster;
+
+    // Maintain the RunnableFuture of SnapshotResult to make sure the 
materialization phase has
+    // been done
+    protected static final 
Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+            PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>();
+
+    @Rule public Timeout globalTimeout = Timeout.seconds(60);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Parameterized.Parameters(name = "delegated state backend type ={0}")
+    public static Collection<AbstractStateBackend> parameter() {
+        return Arrays.asList(
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend(true),
+                new EmbeddedRocksDBStateBackend());
+    }
+
+    public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend 
delegatedStateBackend) {
+        this.delegatedStateBackend = delegatedStateBackend;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configureDSTL())
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+                                .build());
+        cluster.before();
+        cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
+
+        // init source data randomly
+        ControlledSource.initSourceData(TOTAL_ELEMENTS);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        cluster.after();
+        // clear result in sink
+        CollectionSink.clearExpectedResult();
+        PENDING_MATERIALIZATION.clear();
+    }
+
+    protected StreamGraph buildStreamGraph(
+            StreamExecutionEnvironment env, ControlledSource controlledSource) 
{
+        env.addSource(controlledSource)
+                .keyBy(element -> element)
+                .map(new CountMapper())
+                .addSink(new CollectionSink())
+                .setParallelism(1);
+        return env.getStreamGraph();
+    }
+
+    protected void waitAndAssert(StreamExecutionEnvironment env, StreamGraph 
streamGraph)
+            throws Exception {
+        waitUntilJobFinished(env, streamGraph);
+        assertEquals(CollectionSink.getExpectedResult(), 
ControlledSource.getActualResult());

Review comment:
       I think it's actually the source that holds "expected" result, and sink 
collects the actual result.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(

Review comment:
       Could you extract the common code of initializing the environment?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMinutes(3)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    PENDING_MATERIALIZATION.clear();
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 1

Review comment:
       Is it possible that before the 1st failure, two checkpoint were 
completed;
   and now we don't have to wait for the 3rd anymore;
   so no checkpoint is performed after the recovery?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMinutes(3)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    PENDING_MATERIALIZATION.clear();
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 1
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    @Test
+    public void testFailedMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend,
+                                snapshotResultFuture -> {
+                                    
RunnableFuture<SnapshotResult<KeyedStateHandle>>
+                                            snapshotResultFutureWrapper =
+                                                    new 
FutureTaskWithException<>(
+                                                            () -> {
+                                                                
SnapshotResult<KeyedStateHandle>
+                                                                        
snapshotResult =
+                                                                               
 snapshotResultFuture
+                                                                               
         .get();
+                                                                if 
(PENDING_MATERIALIZATION.size()
+                                                                        == 0) {
+                                                                    throw new 
RuntimeException();
+                                                                } else {
+                                                                    return 
snapshotResult;
+                                                                }
+                                                            });
+                                    
PENDING_MATERIALIZATION.add(snapshotResultFutureWrapper);
+                                    return snapshotResultFutureWrapper;
+                                }))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(20))
+                        
.set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, 1));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                waitWhile(
+                                        () ->
+                                                currentIndex >= TOTAL_ELEMENTS 
/ 2
+                                                        && 
PENDING_MATERIALIZATION.size() == 0);

Review comment:
       nit: replace `PENDING_MATERIALIZATION.size() == 0` with 
`PENDING_MATERIALIZATION.isEmpty()` ?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMinutes(3)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    PENDING_MATERIALIZATION.clear();
+                                    throwArtificialFailure();

Review comment:
       IIUC, the intention here is to wait for a checkpoint that has some 
materialized state (PCIIW).
   
   At this point (before `throwArtificialFailure`), the materialization was 
started (thanks to `PENDING_MATERIALIZATION....isDone`).
   
   But there is no guarantee that it was finished, made it to the 
`ChangelogKeyedStateBackend.updateChangelogSnapshotState`, and finally was 
included into the new checkpoint.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMinutes(3)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);

Review comment:
       Could you explain why this variable (`PENDING_MATERIALIZATION`) can not 
be local?
   
   It would be easier for me to follow the logic and less-error prone if it 
would (be local).

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+
+import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This verifies that rescale works correctly for Changelog state backend with 
materialized state /
+ * non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationRescaleITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationRescaleITCase(
+            AbstractStateBackend delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Test
+    public void testRescaleOut() throws Exception {
+        testRescale(NUM_SLOTS / 2, NUM_SLOTS);
+    }
+
+    @Test
+    public void testRescaleIn() throws Exception {
+        testRescale(NUM_SLOTS, NUM_SLOTS / 2);
+    }
+
+    private void testRescale(int firstParallelism, int secondParallelism) 
throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(checkpointFile.toURI());
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.noRestart());
+        env.getCheckpointConfig()
+                .setExternalizedCheckpointCleanup(
+                        
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        env.setParallelism(firstParallelism);
+
+        StreamGraph firstStreamGraph =
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (currentIndex == TOTAL_ELEMENTS / 2) {
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                } else if (currentIndex > TOTAL_ELEMENTS / 2) {
+                                    throwArtificialFailure();
+                                }
+                            }
+                        });
+
+        JobGraph firstJobGraph = firstStreamGraph.getJobGraph();
+        try {
+            cluster.getMiniCluster().submitJob(firstJobGraph).get();
+            
cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get();

Review comment:
       I think this future returns the submission result, not the job result, 
right?
   Should we wait for the job completion here and check the result?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMinutes(3)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    PENDING_MATERIALIZATION.clear();
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 1
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    @Test
+    public void testFailedMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend,
+                                snapshotResultFuture -> {
+                                    
RunnableFuture<SnapshotResult<KeyedStateHandle>>
+                                            snapshotResultFutureWrapper =
+                                                    new 
FutureTaskWithException<>(
+                                                            () -> {
+                                                                
SnapshotResult<KeyedStateHandle>
+                                                                        
snapshotResult =
+                                                                               
 snapshotResultFuture
+                                                                               
         .get();
+                                                                if 
(PENDING_MATERIALIZATION.size()
+                                                                        == 0) {
+                                                                    throw new 
RuntimeException();
+                                                                } else {
+                                                                    return 
snapshotResult;
+                                                                }
+                                                            });
+                                    
PENDING_MATERIALIZATION.add(snapshotResultFutureWrapper);
+                                    return snapshotResultFutureWrapper;
+                                }))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(20))
+                        
.set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, 1));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                waitWhile(
+                                        () ->
+                                                currentIndex >= TOTAL_ELEMENTS 
/ 2
+                                                        && 
PENDING_MATERIALIZATION.size() == 0);

Review comment:
       This condition doesn't seem correct to me, could you clarify the 
intention?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.function.FutureTaskWithException;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * This verifies that checkpointing works correctly for Changelog state 
backend with materialized
+ * state / non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    private static final AtomicBoolean triggerDelegatedSnapshot = new 
AtomicBoolean();
+
+    public ChangelogPeriodicMaterializationITCase(AbstractStateBackend 
delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Before
+    public void setup() throws Exception {
+        super.setup();
+        triggerDelegatedSnapshot.set(false);
+    }
+
+    /** Recovery from checkpoint only containing non-materialized state. */
+    @Test
+    public void testNonMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(new 
DelegatedStateBackendWrapper(delegatedStateBackend, t -> t))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMinutes(3)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(() -> 
completedCheckpointNum.get() <= 0);
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    /** Recovery from checkpoint containing non-materialized state and 
materialized state. */
+    @Test
+    public void testMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0));
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        waitAndAssert(
+                env,
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (getRuntimeContext().getAttemptNumber() == 0
+                                        && currentIndex == TOTAL_ELEMENTS / 4) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    PENDING_MATERIALIZATION.clear();
+                                    throwArtificialFailure();
+                                } else if 
(getRuntimeContext().getAttemptNumber() == 1
+                                        && currentIndex == TOTAL_ELEMENTS / 2) 
{
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 1
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                    throwArtificialFailure();
+                                }
+                            }
+                        }));
+    }
+
+    @Test
+    public void testFailedMaterialization() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend,
+                                snapshotResultFuture -> {
+                                    
RunnableFuture<SnapshotResult<KeyedStateHandle>>
+                                            snapshotResultFutureWrapper =
+                                                    new 
FutureTaskWithException<>(
+                                                            () -> {
+                                                                
SnapshotResult<KeyedStateHandle>
+                                                                        
snapshotResult =
+                                                                               
 snapshotResultFuture
+                                                                               
         .get();
+                                                                if 
(PENDING_MATERIALIZATION.size()
+                                                                        == 0) {

Review comment:
       nit: replace `PENDING_MATERIALIZATION.size() == 0` with 
`PENDING_MATERIALIZATION.isEmpty()` ?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+
+import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This verifies that rescale works correctly for Changelog state backend with 
materialized state /
+ * non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationRescaleITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationRescaleITCase(
+            AbstractStateBackend delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Test
+    public void testRescaleOut() throws Exception {
+        testRescale(NUM_SLOTS / 2, NUM_SLOTS);
+    }
+
+    @Test
+    public void testRescaleIn() throws Exception {
+        testRescale(NUM_SLOTS, NUM_SLOTS / 2);
+    }
+
+    private void testRescale(int firstParallelism, int secondParallelism) 
throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(checkpointFile.toURI());
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.noRestart());
+        env.getCheckpointConfig()
+                .setExternalizedCheckpointCleanup(
+                        
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        env.setParallelism(firstParallelism);
+
+        StreamGraph firstStreamGraph =
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (currentIndex == TOTAL_ELEMENTS / 2) {
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                } else if (currentIndex > TOTAL_ELEMENTS / 2) {
+                                    throwArtificialFailure();
+                                }
+                            }
+                        });
+
+        JobGraph firstJobGraph = firstStreamGraph.getJobGraph();
+        try {
+            cluster.getMiniCluster().submitJob(firstJobGraph).get();
+            
cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get();
+        } catch (Exception ex) {
+            ExceptionUtils.findThrowable(ex, ArtificialFailure.class);

Review comment:
       Shouldn't we check `ifPresent` here and rethrow if not?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/** Base class for tests related to period materialization of 
ChangelogStateBackend. */
+@RunWith(Parameterized.class)
+public abstract class ChangelogPeriodicMaterializationTestBase extends 
TestLogger {
+
+    private static final int NUM_TASK_MANAGERS = 1;
+    private static final int NUM_TASK_SLOTS = 4;
+    protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+    protected static final int TOTAL_ELEMENTS = 10_000;
+
+    protected final AbstractStateBackend delegatedStateBackend;
+
+    protected MiniClusterWithClientResource cluster;
+
+    // Maintain the RunnableFuture of SnapshotResult to make sure the 
materialization phase has
+    // been done
+    protected static final 
Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+            PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>();
+
+    @Rule public Timeout globalTimeout = Timeout.seconds(60);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Parameterized.Parameters(name = "delegated state backend type ={0}")
+    public static Collection<AbstractStateBackend> parameter() {
+        return Arrays.asList(
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend(true),
+                new EmbeddedRocksDBStateBackend());

Review comment:
       nit: explicitly pass `false` to the 2nd constructor call?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+
+import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This verifies that rescale works correctly for Changelog state backend with 
materialized state /
+ * non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationRescaleITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationRescaleITCase(
+            AbstractStateBackend delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Test
+    public void testRescaleOut() throws Exception {
+        testRescale(NUM_SLOTS / 2, NUM_SLOTS);
+    }
+
+    @Test
+    public void testRescaleIn() throws Exception {
+        testRescale(NUM_SLOTS, NUM_SLOTS / 2);
+    }
+
+    private void testRescale(int firstParallelism, int secondParallelism) 
throws Exception {
+        File checkpointFile = TEMPORARY_FOLDER.newFolder();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.enableCheckpointing(10)
+                .enableChangelogStateBackend(true)
+                .getCheckpointConfig()
+                .setCheckpointStorage(checkpointFile.toURI());
+        
SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+                snapshotResultConsumer =
+                        snapshotResultFuture -> {
+                            PENDING_MATERIALIZATION.add(snapshotResultFuture);
+                            return snapshotResultFuture;
+                        };
+        env.setStateBackend(
+                        new DelegatedStateBackendWrapper(
+                                delegatedStateBackend, snapshotResultConsumer))
+                .setRestartStrategy(RestartStrategies.noRestart());
+        env.getCheckpointConfig()
+                .setExternalizedCheckpointCleanup(
+                        
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.configure(
+                new Configuration()
+                        .set(
+                                
StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL,
+                                Duration.ofMillis(10)));
+        env.setParallelism(firstParallelism);
+
+        StreamGraph firstStreamGraph =
+                buildStreamGraph(
+                        env,
+                        new ControlledSource() {
+                            @Override
+                            protected void beforeElement() throws Exception {
+                                if (currentIndex == TOTAL_ELEMENTS / 2) {
+                                    waitWhile(
+                                            () ->
+                                                    
completedCheckpointNum.get() <= 0
+                                                            || 
PENDING_MATERIALIZATION.stream()
+                                                                    
.noneMatch(Future::isDone));
+                                } else if (currentIndex > TOTAL_ELEMENTS / 2) {
+                                    throwArtificialFailure();
+                                }
+                            }
+                        });
+
+        JobGraph firstJobGraph = firstStreamGraph.getJobGraph();
+        try {
+            cluster.getMiniCluster().submitJob(firstJobGraph).get();
+            
cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get();
+        } catch (Exception ex) {
+            ExceptionUtils.findThrowable(ex, ArtificialFailure.class);
+        }
+
+        env.setParallelism(secondParallelism);
+        StreamGraph streamGraph = buildStreamGraph(env, new 
ControlledSource());
+        Optional<Path> checkpoint =
+                findExternalizedCheckpoint(checkpointFile, 
firstJobGraph.getJobID());
+        assertTrue(checkpoint.isPresent());

Review comment:
       The absense of a checkpoint here most likely indicates some bug in test 
code, so I'd rather use something like `Preconditions.checkState` instead of 
`assertTrue`.

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/** Base class for tests related to period materialization of 
ChangelogStateBackend. */
+@RunWith(Parameterized.class)
+public abstract class ChangelogPeriodicMaterializationTestBase extends 
TestLogger {
+
+    private static final int NUM_TASK_MANAGERS = 1;
+    private static final int NUM_TASK_SLOTS = 4;
+    protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+    protected static final int TOTAL_ELEMENTS = 10_000;
+
+    protected final AbstractStateBackend delegatedStateBackend;
+
+    protected MiniClusterWithClientResource cluster;
+
+    // Maintain the RunnableFuture of SnapshotResult to make sure the 
materialization phase has
+    // been done
+    protected static final 
Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+            PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>();
+
+    @Rule public Timeout globalTimeout = Timeout.seconds(60);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Parameterized.Parameters(name = "delegated state backend type ={0}")
+    public static Collection<AbstractStateBackend> parameter() {
+        return Arrays.asList(
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend(true),
+                new EmbeddedRocksDBStateBackend());
+    }
+
+    public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend 
delegatedStateBackend) {
+        this.delegatedStateBackend = delegatedStateBackend;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configureDSTL())
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+                                .build());
+        cluster.before();
+        cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
+
+        // init source data randomly
+        ControlledSource.initSourceData(TOTAL_ELEMENTS);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        cluster.after();
+        // clear result in sink
+        CollectionSink.clearExpectedResult();
+        PENDING_MATERIALIZATION.clear();
+    }
+
+    protected StreamGraph buildStreamGraph(
+            StreamExecutionEnvironment env, ControlledSource controlledSource) 
{
+        env.addSource(controlledSource)
+                .keyBy(element -> element)
+                .map(new CountMapper())
+                .addSink(new CollectionSink())
+                .setParallelism(1);
+        return env.getStreamGraph();
+    }
+
+    protected void waitAndAssert(StreamExecutionEnvironment env, StreamGraph 
streamGraph)
+            throws Exception {
+        waitUntilJobFinished(env, streamGraph);
+        assertEquals(CollectionSink.getExpectedResult(), 
ControlledSource.getActualResult());
+    }
+
+    private void waitUntilJobFinished(StreamExecutionEnvironment env, 
StreamGraph streamGraph)
+            throws Exception {
+        JobExecutionResult jobExecutionResult = env.execute(streamGraph);
+        JobStatus jobStatus =
+                
cluster.getClusterClient().getJobStatus(jobExecutionResult.getJobID()).get();
+        assertSame(jobStatus, JobStatus.FINISHED);
+    }
+
+    private Configuration configureDSTL() throws IOException {
+        Configuration configuration = new Configuration();
+        FsStateChangelogStorageFactory.configure(configuration, 
TEMPORARY_FOLDER.newFolder());
+        return configuration;
+    }
+
+    /** A source consumes data which is generated randomly and supports 
pre-handling record. */
+    protected static class ControlledSource extends RichSourceFunction<Integer>
+            implements CheckpointedFunction, CheckpointListener {
+
+        private static final long serialVersionUID = 1L;
+
+        protected volatile int currentIndex;
+
+        protected final AtomicInteger completedCheckpointNum;
+
+        protected volatile boolean isCanceled;
+
+        private static List<Integer> sourceList;

Review comment:
       Can we avoid at least mutable static fields?
   Maybe make the list (reference and contents) it immutable and initialize on 
class loading?
   Or pass the elements into constructor - so there is no static field?

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateChangelogOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+
+import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This verifies that rescale works correctly for Changelog state backend with 
materialized state /
+ * non-materialized state.
+ */
+public class ChangelogPeriodicMaterializationRescaleITCase
+        extends ChangelogPeriodicMaterializationTestBase {
+
+    public ChangelogPeriodicMaterializationRescaleITCase(
+            AbstractStateBackend delegatedStateBackend) {
+        super(delegatedStateBackend);
+    }
+
+    @Test
+    public void testRescaleOut() throws Exception {
+        testRescale(NUM_SLOTS / 2, NUM_SLOTS);
+    }
+
+    @Test
+    public void testRescaleIn() throws Exception {
+        testRescale(NUM_SLOTS, NUM_SLOTS / 2);
+    }
+
+    private void testRescale(int firstParallelism, int secondParallelism) 
throws Exception {

Review comment:
       The test induces a single failure and then rescales, right?
   
   Shouldn't it perform one more materialization + recovery after rescaling?
   That would check for example that the up-scaled tasks don't discard each 
other's state

##########
File path: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java
##########
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.Keyed;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PriorityComparable;
+import org.apache.flink.runtime.state.SavepointResources;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateSnapshotTransformer;
+import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.FunctionWithException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BooleanSupplier;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+/** Base class for tests related to period materialization of 
ChangelogStateBackend. */
+@RunWith(Parameterized.class)
+public abstract class ChangelogPeriodicMaterializationTestBase extends 
TestLogger {
+
+    private static final int NUM_TASK_MANAGERS = 1;
+    private static final int NUM_TASK_SLOTS = 4;
+    protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
+    protected static final int TOTAL_ELEMENTS = 10_000;
+
+    protected final AbstractStateBackend delegatedStateBackend;
+
+    protected MiniClusterWithClientResource cluster;
+
+    // Maintain the RunnableFuture of SnapshotResult to make sure the 
materialization phase has
+    // been done
+    protected static final 
Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>>
+            PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>();
+
+    @Rule public Timeout globalTimeout = Timeout.seconds(60);
+
+    @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+
+    @Parameterized.Parameters(name = "delegated state backend type ={0}")
+    public static Collection<AbstractStateBackend> parameter() {
+        return Arrays.asList(
+                new HashMapStateBackend(),
+                new EmbeddedRocksDBStateBackend(true),
+                new EmbeddedRocksDBStateBackend());
+    }
+
+    public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend 
delegatedStateBackend) {
+        this.delegatedStateBackend = delegatedStateBackend;
+    }
+
+    @Before
+    public void setup() throws Exception {
+        cluster =
+                new MiniClusterWithClientResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configureDSTL())
+                                .setNumberTaskManagers(NUM_TASK_MANAGERS)
+                                .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS)
+                                .build());
+        cluster.before();
+        cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
+
+        // init source data randomly
+        ControlledSource.initSourceData(TOTAL_ELEMENTS);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        cluster.after();
+        // clear result in sink
+        CollectionSink.clearExpectedResult();
+        PENDING_MATERIALIZATION.clear();
+    }
+
+    protected StreamGraph buildStreamGraph(
+            StreamExecutionEnvironment env, ControlledSource controlledSource) 
{
+        env.addSource(controlledSource)
+                .keyBy(element -> element)
+                .map(new CountMapper())

Review comment:
       The nested state backend has to maintain state per key;
   I wonder if `keyBy element` can become a bottleneck if the number of 
elements is increased?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to