rkhachatryan commented on a change in pull request #18224: URL: https://github.com/apache/flink/pull/18224#discussion_r805742655
########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); Review comment: This field is not used. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMinutes(3))); Review comment: Why do we need to explicitly set the interval here? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMinutes(3))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + })); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + PENDING_MATERIALIZATION.clear(); + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { Review comment: Don't we already have a checkpoint with materialized state? Could you clarify the intention of this branch? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMinutes(3))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + })); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + PENDING_MATERIALIZATION.clear(); + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> + completedCheckpointNum.get() <= 1 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + throwArtificialFailure(); + } + } + })); + } + + @Test + public void testFailedMaterialization() throws Exception { Review comment: I think it makes sense to make sure that materialization failure doesn't affect subsequent materializations. I can imagine that some bug in error handling can prevent re-scheduling. So we probably need one more materialization after the failed one. WDYT? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java ########## @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.SavepointResources; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** Base class for tests related to period materialization of ChangelogStateBackend. */ +@RunWith(Parameterized.class) +public abstract class ChangelogPeriodicMaterializationTestBase extends TestLogger { + + private static final int NUM_TASK_MANAGERS = 1; + private static final int NUM_TASK_SLOTS = 4; + protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + protected static final int TOTAL_ELEMENTS = 10_000; + + protected final AbstractStateBackend delegatedStateBackend; + + protected MiniClusterWithClientResource cluster; + + // Maintain the RunnableFuture of SnapshotResult to make sure the materialization phase has + // been done + protected static final Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>(); + + @Rule public Timeout globalTimeout = Timeout.seconds(60); Review comment: The timeout here is probably unnecessary. IIRC, there was a concencus on dev ML to rely on global timeouts. When running multiple times locally, the test failed a couple of times because currently the timeout is too tight. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java ########## @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.SavepointResources; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** Base class for tests related to period materialization of ChangelogStateBackend. */ +@RunWith(Parameterized.class) +public abstract class ChangelogPeriodicMaterializationTestBase extends TestLogger { + + private static final int NUM_TASK_MANAGERS = 1; + private static final int NUM_TASK_SLOTS = 4; + protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + protected static final int TOTAL_ELEMENTS = 10_000; + + protected final AbstractStateBackend delegatedStateBackend; + + protected MiniClusterWithClientResource cluster; + + // Maintain the RunnableFuture of SnapshotResult to make sure the materialization phase has + // been done + protected static final Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>(); + + @Rule public Timeout globalTimeout = Timeout.seconds(60); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Parameterized.Parameters(name = "delegated state backend type ={0}") + public static Collection<AbstractStateBackend> parameter() { + return Arrays.asList( + new HashMapStateBackend(), + new EmbeddedRocksDBStateBackend(true), + new EmbeddedRocksDBStateBackend()); + } + + public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend delegatedStateBackend) { + this.delegatedStateBackend = delegatedStateBackend; + } + + @Before + public void setup() throws Exception { + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configureDSTL()) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build()); + cluster.before(); + cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend(); + + // init source data randomly + ControlledSource.initSourceData(TOTAL_ELEMENTS); + } + + @After + public void tearDown() throws IOException { + cluster.after(); + // clear result in sink + CollectionSink.clearExpectedResult(); + PENDING_MATERIALIZATION.clear(); + } + + protected StreamGraph buildStreamGraph( + StreamExecutionEnvironment env, ControlledSource controlledSource) { + env.addSource(controlledSource) + .keyBy(element -> element) + .map(new CountMapper()) + .addSink(new CollectionSink()) + .setParallelism(1); + return env.getStreamGraph(); + } + + protected void waitAndAssert(StreamExecutionEnvironment env, StreamGraph streamGraph) + throws Exception { + waitUntilJobFinished(env, streamGraph); + assertEquals(CollectionSink.getExpectedResult(), ControlledSource.getActualResult()); Review comment: I think it's actually the source that holds "expected" result, and sink collects the actual result. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( Review comment: Could you extract the common code of initializing the environment? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMinutes(3))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + })); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + PENDING_MATERIALIZATION.clear(); + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> + completedCheckpointNum.get() <= 1 Review comment: Is it possible that before the 1st failure, two checkpoint were completed; and now we don't have to wait for the 3rd anymore; so no checkpoint is performed after the recovery? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMinutes(3))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + })); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + PENDING_MATERIALIZATION.clear(); + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> + completedCheckpointNum.get() <= 1 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + throwArtificialFailure(); + } + } + })); + } + + @Test + public void testFailedMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, + snapshotResultFuture -> { + RunnableFuture<SnapshotResult<KeyedStateHandle>> + snapshotResultFutureWrapper = + new FutureTaskWithException<>( + () -> { + SnapshotResult<KeyedStateHandle> + snapshotResult = + snapshotResultFuture + .get(); + if (PENDING_MATERIALIZATION.size() + == 0) { + throw new RuntimeException(); + } else { + return snapshotResult; + } + }); + PENDING_MATERIALIZATION.add(snapshotResultFutureWrapper); + return snapshotResultFutureWrapper; + })) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(20)) + .set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, 1)); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + waitWhile( + () -> + currentIndex >= TOTAL_ELEMENTS / 2 + && PENDING_MATERIALIZATION.size() == 0); Review comment: nit: replace `PENDING_MATERIALIZATION.size() == 0` with `PENDING_MATERIALIZATION.isEmpty()` ? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMinutes(3))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + })); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + PENDING_MATERIALIZATION.clear(); + throwArtificialFailure(); Review comment: IIUC, the intention here is to wait for a checkpoint that has some materialized state (PCIIW). At this point (before `throwArtificialFailure`), the materialization was started (thanks to `PENDING_MATERIALIZATION....isDone`). But there is no guarantee that it was finished, made it to the `ChangelogKeyedStateBackend.updateChangelogSnapshotState`, and finally was included into the new checkpoint. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMinutes(3))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + })); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); Review comment: Could you explain why this variable (`PENDING_MATERIALIZATION`) can not be local? It would be easier for me to follow the logic and less-error prone if it would (be local). ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.io.File; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; + +import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint; +import static org.junit.Assert.assertTrue; + +/** + * This verifies that rescale works correctly for Changelog state backend with materialized state / + * non-materialized state. + */ +public class ChangelogPeriodicMaterializationRescaleITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationRescaleITCase( + AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Test + public void testRescaleOut() throws Exception { + testRescale(NUM_SLOTS / 2, NUM_SLOTS); + } + + @Test + public void testRescaleIn() throws Exception { + testRescale(NUM_SLOTS, NUM_SLOTS / 2); + } + + private void testRescale(int firstParallelism, int secondParallelism) throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(checkpointFile.toURI()); + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.noRestart()); + env.getCheckpointConfig() + .setExternalizedCheckpointCleanup( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + env.setParallelism(firstParallelism); + + StreamGraph firstStreamGraph = + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + } else if (currentIndex > TOTAL_ELEMENTS / 2) { + throwArtificialFailure(); + } + } + }); + + JobGraph firstJobGraph = firstStreamGraph.getJobGraph(); + try { + cluster.getMiniCluster().submitJob(firstJobGraph).get(); + cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get(); Review comment: I think this future returns the submission result, not the job result, right? Should we wait for the job completion here and check the result? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMinutes(3))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + })); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + PENDING_MATERIALIZATION.clear(); + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> + completedCheckpointNum.get() <= 1 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + throwArtificialFailure(); + } + } + })); + } + + @Test + public void testFailedMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, + snapshotResultFuture -> { + RunnableFuture<SnapshotResult<KeyedStateHandle>> + snapshotResultFutureWrapper = + new FutureTaskWithException<>( + () -> { + SnapshotResult<KeyedStateHandle> + snapshotResult = + snapshotResultFuture + .get(); + if (PENDING_MATERIALIZATION.size() + == 0) { + throw new RuntimeException(); + } else { + return snapshotResult; + } + }); + PENDING_MATERIALIZATION.add(snapshotResultFutureWrapper); + return snapshotResultFutureWrapper; + })) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(20)) + .set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, 1)); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + waitWhile( + () -> + currentIndex >= TOTAL_ELEMENTS / 2 + && PENDING_MATERIALIZATION.size() == 0); Review comment: This condition doesn't seem correct to me, could you clarify the intention? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.function.FutureTaskWithException; + +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + private static final AtomicBoolean triggerDelegatedSnapshot = new AtomicBoolean(); + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + triggerDelegatedSnapshot.set(false); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMinutes(3))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + })); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(2, 0)); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + waitAndAssert( + env, + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + PENDING_MATERIALIZATION.clear(); + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> + completedCheckpointNum.get() <= 1 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + throwArtificialFailure(); + } + } + })); + } + + @Test + public void testFailedMaterialization() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI()); + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, + snapshotResultFuture -> { + RunnableFuture<SnapshotResult<KeyedStateHandle>> + snapshotResultFutureWrapper = + new FutureTaskWithException<>( + () -> { + SnapshotResult<KeyedStateHandle> + snapshotResult = + snapshotResultFuture + .get(); + if (PENDING_MATERIALIZATION.size() + == 0) { Review comment: nit: replace `PENDING_MATERIALIZATION.size() == 0` with `PENDING_MATERIALIZATION.isEmpty()` ? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.io.File; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; + +import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint; +import static org.junit.Assert.assertTrue; + +/** + * This verifies that rescale works correctly for Changelog state backend with materialized state / + * non-materialized state. + */ +public class ChangelogPeriodicMaterializationRescaleITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationRescaleITCase( + AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Test + public void testRescaleOut() throws Exception { + testRescale(NUM_SLOTS / 2, NUM_SLOTS); + } + + @Test + public void testRescaleIn() throws Exception { + testRescale(NUM_SLOTS, NUM_SLOTS / 2); + } + + private void testRescale(int firstParallelism, int secondParallelism) throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(checkpointFile.toURI()); + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.noRestart()); + env.getCheckpointConfig() + .setExternalizedCheckpointCleanup( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + env.setParallelism(firstParallelism); + + StreamGraph firstStreamGraph = + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + } else if (currentIndex > TOTAL_ELEMENTS / 2) { + throwArtificialFailure(); + } + } + }); + + JobGraph firstJobGraph = firstStreamGraph.getJobGraph(); + try { + cluster.getMiniCluster().submitJob(firstJobGraph).get(); + cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get(); + } catch (Exception ex) { + ExceptionUtils.findThrowable(ex, ArtificialFailure.class); Review comment: Shouldn't we check `ifPresent` here and rethrow if not? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java ########## @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.SavepointResources; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** Base class for tests related to period materialization of ChangelogStateBackend. */ +@RunWith(Parameterized.class) +public abstract class ChangelogPeriodicMaterializationTestBase extends TestLogger { + + private static final int NUM_TASK_MANAGERS = 1; + private static final int NUM_TASK_SLOTS = 4; + protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + protected static final int TOTAL_ELEMENTS = 10_000; + + protected final AbstractStateBackend delegatedStateBackend; + + protected MiniClusterWithClientResource cluster; + + // Maintain the RunnableFuture of SnapshotResult to make sure the materialization phase has + // been done + protected static final Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>(); + + @Rule public Timeout globalTimeout = Timeout.seconds(60); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Parameterized.Parameters(name = "delegated state backend type ={0}") + public static Collection<AbstractStateBackend> parameter() { + return Arrays.asList( + new HashMapStateBackend(), + new EmbeddedRocksDBStateBackend(true), + new EmbeddedRocksDBStateBackend()); Review comment: nit: explicitly pass `false` to the 2nd constructor call? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.io.File; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; + +import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint; +import static org.junit.Assert.assertTrue; + +/** + * This verifies that rescale works correctly for Changelog state backend with materialized state / + * non-materialized state. + */ +public class ChangelogPeriodicMaterializationRescaleITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationRescaleITCase( + AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Test + public void testRescaleOut() throws Exception { + testRescale(NUM_SLOTS / 2, NUM_SLOTS); + } + + @Test + public void testRescaleIn() throws Exception { + testRescale(NUM_SLOTS, NUM_SLOTS / 2); + } + + private void testRescale(int firstParallelism, int secondParallelism) throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(checkpointFile.toURI()); + SerializableFunctionWithException<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + snapshotResultConsumer = + snapshotResultFuture -> { + PENDING_MATERIALIZATION.add(snapshotResultFuture); + return snapshotResultFuture; + }; + env.setStateBackend( + new DelegatedStateBackendWrapper( + delegatedStateBackend, snapshotResultConsumer)) + .setRestartStrategy(RestartStrategies.noRestart()); + env.getCheckpointConfig() + .setExternalizedCheckpointCleanup( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(10))); + env.setParallelism(firstParallelism); + + StreamGraph firstStreamGraph = + buildStreamGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement() throws Exception { + if (currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || PENDING_MATERIALIZATION.stream() + .noneMatch(Future::isDone)); + } else if (currentIndex > TOTAL_ELEMENTS / 2) { + throwArtificialFailure(); + } + } + }); + + JobGraph firstJobGraph = firstStreamGraph.getJobGraph(); + try { + cluster.getMiniCluster().submitJob(firstJobGraph).get(); + cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get(); + } catch (Exception ex) { + ExceptionUtils.findThrowable(ex, ArtificialFailure.class); + } + + env.setParallelism(secondParallelism); + StreamGraph streamGraph = buildStreamGraph(env, new ControlledSource()); + Optional<Path> checkpoint = + findExternalizedCheckpoint(checkpointFile, firstJobGraph.getJobID()); + assertTrue(checkpoint.isPresent()); Review comment: The absense of a checkpoint here most likely indicates some bug in test code, so I'd rather use something like `Preconditions.checkState` instead of `assertTrue`. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java ########## @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.SavepointResources; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** Base class for tests related to period materialization of ChangelogStateBackend. */ +@RunWith(Parameterized.class) +public abstract class ChangelogPeriodicMaterializationTestBase extends TestLogger { + + private static final int NUM_TASK_MANAGERS = 1; + private static final int NUM_TASK_SLOTS = 4; + protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + protected static final int TOTAL_ELEMENTS = 10_000; + + protected final AbstractStateBackend delegatedStateBackend; + + protected MiniClusterWithClientResource cluster; + + // Maintain the RunnableFuture of SnapshotResult to make sure the materialization phase has + // been done + protected static final Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>(); + + @Rule public Timeout globalTimeout = Timeout.seconds(60); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Parameterized.Parameters(name = "delegated state backend type ={0}") + public static Collection<AbstractStateBackend> parameter() { + return Arrays.asList( + new HashMapStateBackend(), + new EmbeddedRocksDBStateBackend(true), + new EmbeddedRocksDBStateBackend()); + } + + public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend delegatedStateBackend) { + this.delegatedStateBackend = delegatedStateBackend; + } + + @Before + public void setup() throws Exception { + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configureDSTL()) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build()); + cluster.before(); + cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend(); + + // init source data randomly + ControlledSource.initSourceData(TOTAL_ELEMENTS); + } + + @After + public void tearDown() throws IOException { + cluster.after(); + // clear result in sink + CollectionSink.clearExpectedResult(); + PENDING_MATERIALIZATION.clear(); + } + + protected StreamGraph buildStreamGraph( + StreamExecutionEnvironment env, ControlledSource controlledSource) { + env.addSource(controlledSource) + .keyBy(element -> element) + .map(new CountMapper()) + .addSink(new CollectionSink()) + .setParallelism(1); + return env.getStreamGraph(); + } + + protected void waitAndAssert(StreamExecutionEnvironment env, StreamGraph streamGraph) + throws Exception { + waitUntilJobFinished(env, streamGraph); + assertEquals(CollectionSink.getExpectedResult(), ControlledSource.getActualResult()); + } + + private void waitUntilJobFinished(StreamExecutionEnvironment env, StreamGraph streamGraph) + throws Exception { + JobExecutionResult jobExecutionResult = env.execute(streamGraph); + JobStatus jobStatus = + cluster.getClusterClient().getJobStatus(jobExecutionResult.getJobID()).get(); + assertSame(jobStatus, JobStatus.FINISHED); + } + + private Configuration configureDSTL() throws IOException { + Configuration configuration = new Configuration(); + FsStateChangelogStorageFactory.configure(configuration, TEMPORARY_FOLDER.newFolder()); + return configuration; + } + + /** A source consumes data which is generated randomly and supports pre-handling record. */ + protected static class ControlledSource extends RichSourceFunction<Integer> + implements CheckpointedFunction, CheckpointListener { + + private static final long serialVersionUID = 1L; + + protected volatile int currentIndex; + + protected final AtomicInteger completedCheckpointNum; + + protected volatile boolean isCanceled; + + private static List<Integer> sourceList; Review comment: Can we avoid at least mutable static fields? Maybe make the list (reference and contents) it immutable and initialize on class loading? Or pass the elements into constructor - so there is no static field? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.io.File; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; + +import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint; +import static org.junit.Assert.assertTrue; + +/** + * This verifies that rescale works correctly for Changelog state backend with materialized state / + * non-materialized state. + */ +public class ChangelogPeriodicMaterializationRescaleITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationRescaleITCase( + AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Test + public void testRescaleOut() throws Exception { + testRescale(NUM_SLOTS / 2, NUM_SLOTS); + } + + @Test + public void testRescaleIn() throws Exception { + testRescale(NUM_SLOTS, NUM_SLOTS / 2); + } + + private void testRescale(int firstParallelism, int secondParallelism) throws Exception { Review comment: The test induces a single failure and then rescales, right? Shouldn't it perform one more materialization + recovery after rescaling? That would check for example that the up-scaled tasks don't discard each other's state ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java ########## @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.SavepointResources; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BooleanSupplier; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** Base class for tests related to period materialization of ChangelogStateBackend. */ +@RunWith(Parameterized.class) +public abstract class ChangelogPeriodicMaterializationTestBase extends TestLogger { + + private static final int NUM_TASK_MANAGERS = 1; + private static final int NUM_TASK_SLOTS = 4; + protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + protected static final int TOTAL_ELEMENTS = 10_000; + + protected final AbstractStateBackend delegatedStateBackend; + + protected MiniClusterWithClientResource cluster; + + // Maintain the RunnableFuture of SnapshotResult to make sure the materialization phase has + // been done + protected static final Queue<RunnableFuture<SnapshotResult<KeyedStateHandle>>> + PENDING_MATERIALIZATION = new ConcurrentLinkedQueue<>(); + + @Rule public Timeout globalTimeout = Timeout.seconds(60); + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Parameterized.Parameters(name = "delegated state backend type ={0}") + public static Collection<AbstractStateBackend> parameter() { + return Arrays.asList( + new HashMapStateBackend(), + new EmbeddedRocksDBStateBackend(true), + new EmbeddedRocksDBStateBackend()); + } + + public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend delegatedStateBackend) { + this.delegatedStateBackend = delegatedStateBackend; + } + + @Before + public void setup() throws Exception { + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configureDSTL()) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build()); + cluster.before(); + cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend(); + + // init source data randomly + ControlledSource.initSourceData(TOTAL_ELEMENTS); + } + + @After + public void tearDown() throws IOException { + cluster.after(); + // clear result in sink + CollectionSink.clearExpectedResult(); + PENDING_MATERIALIZATION.clear(); + } + + protected StreamGraph buildStreamGraph( + StreamExecutionEnvironment env, ControlledSource controlledSource) { + env.addSource(controlledSource) + .keyBy(element -> element) + .map(new CountMapper()) Review comment: The nested state backend has to maintain state per key; I wonder if `keyBy element` can become a bottleneck if the number of elements is increased? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
