rkhachatryan commented on a change in pull request #18224: URL: https://github.com/apache/flink/pull/18224#discussion_r818237157
########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + } Review comment: Is this override neccessary? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = + getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } Review comment: Is it guaranteed that there was no materialization? (or it's not a strict requirement?) ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = + getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + }, + generateJobID())); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + JobID jobID = generateJobID(); + SharedReference<AtomicInteger> currentCheckpointNum = + sharedObjects.add(new AtomicInteger()); + SharedReference<List<Long>> currentMaterializationId = sharedObjects.add(new ArrayList<>()); Review comment: Concurrent access? (although there likely be memory barriers between accesses, it's safer to using some concurrent collection IMO). ########## File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java ########## @@ -94,6 +102,59 @@ public static void waitUntilJobInitializationFinished( userCodeClassloader); } + public static CheckpointMetadata loadCheckpointMetadata(String savepointPath) + throws IOException { + CompletedCheckpointStorageLocation location = + AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(savepointPath); + + try (DataInputStream stream = + new DataInputStream(location.getMetadataHandle().openInputStream())) { + return Checkpoints.loadCheckpointMetadata( + stream, Thread.currentThread().getContextClassLoader(), savepointPath); + } + } + + public static List<Path> findAllSortedExternalizedCheckpoint(File checkpointDir, JobID jobId) + throws IOException { + return findAllExternalizedCheckpoint(checkpointDir, jobId).stream() + .sorted( + Comparator.comparingInt( + path -> + Integer.parseInt( + path.getFileName() + .toString() + .substring( + CHECKPOINT_DIR_PREFIX.length())))) + .collect(Collectors.toList()); + } + + public static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) Review comment: Can't some existing methods be used here, e.g. `getMostRecentCompletedCheckpointMaybe`? Are these methods specific to externalized checkpoints? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.StreamGraph; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Test; + +import java.io.File; +import java.nio.file.Path; +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.RunnableFuture; + +import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint; +import static org.junit.Assert.assertTrue; + +/** + * This verifies that rescale works correctly for Changelog state backend with materialized state / + * non-materialized state. + */ +public class ChangelogPeriodicMaterializationRescaleITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationRescaleITCase( + AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Test + public void testRescaleOut() throws Exception { + testRescale(NUM_SLOTS / 2, NUM_SLOTS); + } + + @Test + public void testRescaleIn() throws Exception { + testRescale(NUM_SLOTS, NUM_SLOTS / 2); + } + + private void testRescale(int firstParallelism, int secondParallelism) throws Exception { Review comment: Sure, I've found two issues: 1. FLINK-26455 which causes `CancellationException` 2. Unaligned checkpoints have some [limitations](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/unaligned_checkpoints/#certain-data-distribution-patterns-are-not-checkpointed) with rescaling which fails the recovery. I think we can simply disable UC for now ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationRescaleITCase.java ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import org.junit.Test; + +import java.io.File; +import java.nio.file.Path; +import java.util.Optional; + +import static org.apache.flink.test.util.TestUtils.findExternalizedCheckpoint; + +/** + * This verifies that rescale works correctly for Changelog state backend with materialized state / + * non-materialized state. + */ +public class ChangelogPeriodicMaterializationRescaleITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationRescaleITCase( + AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Test + public void testRescaleOut() throws Exception { + testRescale(NUM_SLOTS / 2, NUM_SLOTS); + } + + @Test + public void testRescaleIn() throws Exception { + testRescale(NUM_SLOTS, NUM_SLOTS / 2); + } + + private void testRescale(int firstParallelism, int secondParallelism) throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + JobID firstJobID = generateJobID(); + + StreamExecutionEnvironment env = + getEnv(delegatedStateBackend, checkpointFile, 50, 0, 20, 0); + env.getCheckpointConfig() + .setExternalizedCheckpointCleanup( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + env.setParallelism(firstParallelism); + + JobGraph firstJobGraph = + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> + completedCheckpointNum.get() <= 0 + || getAllMaterializationId( + checkpointFile, + firstJobID) + .isEmpty()); + } else if (currentIndex > TOTAL_ELEMENTS / 4) { + throwArtificialFailure(); + } + } + }, + firstJobID); + + try { + cluster.getMiniCluster().submitJob(firstJobGraph).get(); + cluster.getMiniCluster().requestJobResult(firstJobGraph.getJobID()).get(); + } catch (Exception ex) { + Preconditions.checkState( + ExceptionUtils.findThrowable(ex, ArtificialFailure.class).isPresent()); + } + + env.setParallelism(secondParallelism); + JobID secondJobId = generateJobID(); + JobGraph jobGraph = + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> + getAllMaterializationId( + checkpointFile, secondJobId) + .isEmpty()); Review comment: Does this include also old (pre-recovery) materializations? If so, the test currently validates recovery, but not the materialization after recovery. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = + getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + }, + generateJobID())); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + JobID jobID = generateJobID(); + SharedReference<AtomicInteger> currentCheckpointNum = + sharedObjects.add(new AtomicInteger()); + SharedReference<List<Long>> currentMaterializationId = sharedObjects.add(new ArrayList<>()); + StreamExecutionEnvironment env = + getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> { + if (completedCheckpointNum.get() <= 0) { + return true; + } + List<Long> allMaterializationId = + getAllMaterializationId( + checkpointFile, jobID); + if (!allMaterializationId.isEmpty()) { + currentMaterializationId + .get() + .addAll(allMaterializationId); + Collections.sort( + currentMaterializationId.get()); + currentCheckpointNum + .get() + .compareAndSet( + 0, + completedCheckpointNum.get()); + return false; + } + return true; + }); + + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> { + if (completedCheckpointNum.get() + <= currentCheckpointNum.get().get()) { + return true; + } + List<Long> allMaterializationId = + getAllMaterializationId( + checkpointFile, jobID); + Collections.sort(allMaterializationId); Review comment: Can't we use Set instead of sorting to check for equality? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = + getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + }, + generateJobID())); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + JobID jobID = generateJobID(); + SharedReference<AtomicInteger> currentCheckpointNum = + sharedObjects.add(new AtomicInteger()); + SharedReference<List<Long>> currentMaterializationId = sharedObjects.add(new ArrayList<>()); + StreamExecutionEnvironment env = + getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> { + if (completedCheckpointNum.get() <= 0) { + return true; + } + List<Long> allMaterializationId = + getAllMaterializationId( + checkpointFile, jobID); + if (!allMaterializationId.isEmpty()) { + currentMaterializationId + .get() + .addAll(allMaterializationId); + Collections.sort( + currentMaterializationId.get()); + currentCheckpointNum + .get() + .compareAndSet( + 0, + completedCheckpointNum.get()); + return false; + } + return true; + }); + + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 Review comment: What if some other failure happens and `getAttemptNumber` returns 2? I guess it's not allowed by maxRestartAttempts == 2, but probably it makes sense to enforce this (e.g. with `Preconditions.checkState`)? ########## File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java ########## @@ -94,6 +102,59 @@ public static void waitUntilJobInitializationFinished( userCodeClassloader); } + public static CheckpointMetadata loadCheckpointMetadata(String savepointPath) + throws IOException { + CompletedCheckpointStorageLocation location = + AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(savepointPath); + + try (DataInputStream stream = + new DataInputStream(location.getMetadataHandle().openInputStream())) { + return Checkpoints.loadCheckpointMetadata( + stream, Thread.currentThread().getContextClassLoader(), savepointPath); + } + } + + public static List<Path> findAllSortedExternalizedCheckpoint(File checkpointDir, JobID jobId) + throws IOException { + return findAllExternalizedCheckpoint(checkpointDir, jobId).stream() + .sorted( + Comparator.comparingInt( + path -> + Integer.parseInt( + path.getFileName() + .toString() + .substring( + CHECKPOINT_DIR_PREFIX.length())))) + .collect(Collectors.toList()); + } + + public static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) + throws IOException { + return findAllExternalizedCheckpoint(checkpointDir, jobId).stream().findAny(); + } + + public static List<Path> findAllExternalizedCheckpoint(File checkpointDir, JobID jobId) + throws IOException { + try (Stream<Path> checkpoints = + Files.list(checkpointDir.toPath().resolve(jobId.toString()))) { + return checkpoints + .filter(path -> path.getFileName().toString().startsWith(CHECKPOINT_DIR_PREFIX)) + .filter( + path -> { + try (Stream<Path> checkpointFiles = Files.list(path)) { + return checkpointFiles.anyMatch( + child -> + child.getFileName() + .toString() + .equals(METADATA_FILE_NAME)); + } catch (IOException ignored) { + return false; + } Review comment: Could you explain why is the exception ignored here? With externalized checkpoints, the checkpoint shouldn't be removed, should it? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = + getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + }, + generateJobID())); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); Review comment: Rename `checkpointFile` to `checkpointFolder`? ditto: the next test ########## File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java ########## @@ -94,6 +102,59 @@ public static void waitUntilJobInitializationFinished( userCodeClassloader); } + public static CheckpointMetadata loadCheckpointMetadata(String savepointPath) + throws IOException { + CompletedCheckpointStorageLocation location = + AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(savepointPath); + + try (DataInputStream stream = + new DataInputStream(location.getMetadataHandle().openInputStream())) { + return Checkpoints.loadCheckpointMetadata( + stream, Thread.currentThread().getContextClassLoader(), savepointPath); + } + } + + public static List<Path> findAllSortedExternalizedCheckpoint(File checkpointDir, JobID jobId) + throws IOException { + return findAllExternalizedCheckpoint(checkpointDir, jobId).stream() + .sorted( + Comparator.comparingInt( + path -> + Integer.parseInt( + path.getFileName() + .toString() + .substring( + CHECKPOINT_DIR_PREFIX.length())))) + .collect(Collectors.toList()); + } + + public static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) Review comment: Can't some existing methods be used here, e.g. `getMostRecentCompletedCheckpointMaybe`? Are these methods specific to externalized checkpoints? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = + getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + }, + generateJobID())); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + JobID jobID = generateJobID(); + SharedReference<AtomicInteger> currentCheckpointNum = + sharedObjects.add(new AtomicInteger()); + SharedReference<List<Long>> currentMaterializationId = sharedObjects.add(new ArrayList<>()); + StreamExecutionEnvironment env = + getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> { + if (completedCheckpointNum.get() <= 0) { + return true; + } + List<Long> allMaterializationId = + getAllMaterializationId( + checkpointFile, jobID); + if (!allMaterializationId.isEmpty()) { + currentMaterializationId + .get() + .addAll(allMaterializationId); + Collections.sort( + currentMaterializationId.get()); + currentCheckpointNum + .get() + .compareAndSet( + 0, + completedCheckpointNum.get()); + return false; + } + return true; + }); + + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> { + if (completedCheckpointNum.get() + <= currentCheckpointNum.get().get()) { + return true; + } + List<Long> allMaterializationId = + getAllMaterializationId( + checkpointFile, jobID); + Collections.sort(allMaterializationId); + return allMaterializationId.isEmpty() + || !currentMaterializationId + .get() + .equals(allMaterializationId); + }); + throwArtificialFailure(); + } + } + }, + jobID)); + } + + @Test + public void testFailedMaterialization() throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + JobID jobID = generateJobID(); + SharedReference<AtomicBoolean> hasTriggerCheckpoint = + sharedObjects.add(new AtomicBoolean()); + SharedReference<List<Long>> currentMaterializationId = sharedObjects.add(new ArrayList<>()); + StreamExecutionEnvironment env = + getEnv( + new DelegatedStateBackendWrapper( + delegatedStateBackend, + snapshotResultFuture -> { + if (hasTriggerCheckpoint.get().compareAndSet(false, true)) { + throw new RuntimeException(); Review comment: Rename `hasTriggerCheckpoint` to `hasFailed`? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = + getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + }, + generateJobID())); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + JobID jobID = generateJobID(); + SharedReference<AtomicInteger> currentCheckpointNum = + sharedObjects.add(new AtomicInteger()); + SharedReference<List<Long>> currentMaterializationId = sharedObjects.add(new ArrayList<>()); + StreamExecutionEnvironment env = + getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> { + if (completedCheckpointNum.get() <= 0) { + return true; + } + List<Long> allMaterializationId = + getAllMaterializationId( + checkpointFile, jobID); + if (!allMaterializationId.isEmpty()) { + currentMaterializationId + .get() + .addAll(allMaterializationId); + Collections.sort( + currentMaterializationId.get()); + currentCheckpointNum + .get() + .compareAndSet( + 0, + completedCheckpointNum.get()); + return false; + } + return true; + }); + + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> { + if (completedCheckpointNum.get() + <= currentCheckpointNum.get().get()) { + return true; + } + List<Long> allMaterializationId = + getAllMaterializationId( + checkpointFile, jobID); + Collections.sort(allMaterializationId); Review comment: And probably it's better to use `StateHandleID` collected from the materialized states to avoid collisions between different subtasks if DoP > 1. ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java ########## @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.SavepointResources; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get; +import static org.apache.flink.test.util.TestUtils.findAllSortedExternalizedCheckpoint; +import static org.apache.flink.test.util.TestUtils.loadCheckpointMetadata; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** Base class for tests related to period materialization of ChangelogStateBackend. */ +@RunWith(Parameterized.class) +public abstract class ChangelogPeriodicMaterializationTestBase extends TestLogger { + + private static final int NUM_TASK_MANAGERS = 1; + private static final int NUM_TASK_SLOTS = 4; + protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + protected static final int TOTAL_ELEMENTS = 10_000; + + protected final AbstractStateBackend delegatedStateBackend; + + protected MiniClusterWithClientResource cluster; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + + @Parameterized.Parameters(name = "delegated state backend type ={0}") + public static Collection<AbstractStateBackend> parameter() { + return Arrays.asList( + new HashMapStateBackend(), + new EmbeddedRocksDBStateBackend(true), + new EmbeddedRocksDBStateBackend(false)); + } + + public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend delegatedStateBackend) { + this.delegatedStateBackend = delegatedStateBackend; + } + + @Before + public void setup() throws Exception { + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configure()) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build()); + cluster.before(); + cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend(); + } + + @After + public void tearDown() throws IOException { + cluster.after(); + // clear result in sink + CollectionSink.clearExpectedResult(); + } + + protected StreamExecutionEnvironment getEnv( + StateBackend stateBackend, + File checkpointFile, + long checkpointInterval, + int restartAttempts, + int materializationInterval, + int materializationMaxFailure) { + StreamExecutionEnvironment env = + getEnv(stateBackend, checkpointFile, checkpointInterval, restartAttempts); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(materializationInterval)) + .set( + StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, + materializationMaxFailure)); + return env; + } + + protected StreamExecutionEnvironment getEnv( + StateBackend stateBackend, + File checkpointFile, + long checkpointInterval, + int restartAttempts) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(checkpointInterval) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(checkpointFile.toURI()); + env.setStateBackend(stateBackend) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, 0)); + return env; + } + + protected StreamExecutionEnvironment getEnv(StateBackend stateBackend) throws IOException { + return getEnv(stateBackend, TEMPORARY_FOLDER.newFolder(), 10, 1); + } + + protected JobGraph buildJobGraph( + StreamExecutionEnvironment env, ControlledSource controlledSource, JobID jobId) { + env.addSource(controlledSource) + .keyBy(element -> element) + .map(new CountMapper()) + .addSink(new CollectionSink()) + .setParallelism(1); + return env.getStreamGraph().getJobGraph(jobId); + } + + protected void waitAndAssert(JobGraph jobGraph) throws Exception { + waitUntilJobFinished(jobGraph); + assertEquals(CollectionSink.getActualResult(), ControlledSource.getExpectedResult()); + } + + protected JobID generateJobID() { + byte[] randomBytes = new byte[AbstractID.SIZE]; + ThreadLocalRandom.current().nextBytes(randomBytes); + return JobID.fromByteArray(randomBytes); + } + + protected static List<Long> getAllMaterializationId(File checkpointFile, JobID jobID) + throws IOException { + List<Path> checkpointPaths = findAllSortedExternalizedCheckpoint(checkpointFile, jobID); + for (Path checkpointPath : checkpointPaths) { + CheckpointMetadata checkpointMetadata = + loadCheckpointMetadata(checkpointPath.toString()); + List<Long> materializationIds = + checkpointMetadata.getOperatorStates().stream() + .flatMap(operatorState -> operatorState.getStates().stream()) + .flatMap( + operatorSubtaskState -> + operatorSubtaskState.getManagedKeyedState().stream()) + .map(keyedStateHandle -> (ChangelogStateBackendHandle) keyedStateHandle) + .filter( + changelogStateBackendHandle -> + !changelogStateBackendHandle + .getMaterializedStateHandles() + .isEmpty()) + .map(ChangelogStateBackendHandle::getMaterializationID) + .collect(Collectors.toList()); + if (!materializationIds.isEmpty()) { + return materializationIds; + } + } + return Collections.emptyList(); + } + + private void waitUntilJobFinished(JobGraph jobGraph) throws Exception { + JobSubmissionResult jobSubmissionResult = + cluster.getMiniCluster().submitJob(jobGraph).get(); + JobResult jobResult = + cluster.getMiniCluster().requestJobResult(jobSubmissionResult.getJobID()).get(); + assertSame(ApplicationStatus.SUCCEEDED, jobResult.getApplicationStatus()); Review comment: Could you please throw `jobResult.getSerializedThrowable()` if status isn't `SUCCEEDED`? ########## File path: flink-tests/src/test/java/org/apache/flink/test/util/TestUtils.java ########## @@ -94,6 +102,59 @@ public static void waitUntilJobInitializationFinished( userCodeClassloader); } + public static CheckpointMetadata loadCheckpointMetadata(String savepointPath) + throws IOException { + CompletedCheckpointStorageLocation location = + AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(savepointPath); + + try (DataInputStream stream = + new DataInputStream(location.getMetadataHandle().openInputStream())) { + return Checkpoints.loadCheckpointMetadata( + stream, Thread.currentThread().getContextClassLoader(), savepointPath); + } + } + + public static List<Path> findAllSortedExternalizedCheckpoint(File checkpointDir, JobID jobId) + throws IOException { + return findAllExternalizedCheckpoint(checkpointDir, jobId).stream() + .sorted( + Comparator.comparingInt( + path -> + Integer.parseInt( + path.getFileName() + .toString() + .substring( + CHECKPOINT_DIR_PREFIX.length())))) + .collect(Collectors.toList()); + } + + public static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) + throws IOException { + return findAllExternalizedCheckpoint(checkpointDir, jobId).stream().findAny(); + } + + public static List<Path> findAllExternalizedCheckpoint(File checkpointDir, JobID jobId) + throws IOException { + try (Stream<Path> checkpoints = + Files.list(checkpointDir.toPath().resolve(jobId.toString()))) { + return checkpoints + .filter(path -> path.getFileName().toString().startsWith(CHECKPOINT_DIR_PREFIX)) + .filter( + path -> { + try (Stream<Path> checkpointFiles = Files.list(path)) { + return checkpointFiles.anyMatch( + child -> + child.getFileName() + .toString() + .equals(METADATA_FILE_NAME)); + } catch (IOException ignored) { + return false; + } Review comment: Could you explain why is the exception ignored here? With externalized checkpoints, the checkpoint shouldn't be removed, should it? ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationTestBase.java ########## @@ -0,0 +1,569 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateChangelogOptions; +import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue; +import org.apache.flink.runtime.state.Keyed; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.PriorityComparable; +import org.apache.flink.runtime.state.SavepointResources; +import org.apache.flink.runtime.state.SnapshotResult; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateSnapshotTransformer; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; +import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement; +import org.apache.flink.runtime.state.ttl.TtlTimeProvider; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.test.util.MiniClusterWithClientResource; +import org.apache.flink.testutils.junit.SharedObjects; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.FunctionWithException; + +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.Path; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.shaded.guava30.com.google.common.collect.Iterables.get; +import static org.apache.flink.test.util.TestUtils.findAllSortedExternalizedCheckpoint; +import static org.apache.flink.test.util.TestUtils.loadCheckpointMetadata; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** Base class for tests related to period materialization of ChangelogStateBackend. */ +@RunWith(Parameterized.class) +public abstract class ChangelogPeriodicMaterializationTestBase extends TestLogger { + + private static final int NUM_TASK_MANAGERS = 1; + private static final int NUM_TASK_SLOTS = 4; + protected static final int NUM_SLOTS = NUM_TASK_MANAGERS * NUM_TASK_SLOTS; + protected static final int TOTAL_ELEMENTS = 10_000; + + protected final AbstractStateBackend delegatedStateBackend; + + protected MiniClusterWithClientResource cluster; + + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Rule public final SharedObjects sharedObjects = SharedObjects.create(); + + @Parameterized.Parameters(name = "delegated state backend type ={0}") + public static Collection<AbstractStateBackend> parameter() { + return Arrays.asList( + new HashMapStateBackend(), + new EmbeddedRocksDBStateBackend(true), + new EmbeddedRocksDBStateBackend(false)); + } + + public ChangelogPeriodicMaterializationTestBase(AbstractStateBackend delegatedStateBackend) { + this.delegatedStateBackend = delegatedStateBackend; + } + + @Before + public void setup() throws Exception { + cluster = + new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configure()) + .setNumberTaskManagers(NUM_TASK_MANAGERS) + .setNumberSlotsPerTaskManager(NUM_TASK_SLOTS) + .build()); + cluster.before(); + cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend(); + } + + @After + public void tearDown() throws IOException { + cluster.after(); + // clear result in sink + CollectionSink.clearExpectedResult(); + } + + protected StreamExecutionEnvironment getEnv( + StateBackend stateBackend, + File checkpointFile, + long checkpointInterval, + int restartAttempts, + int materializationInterval, + int materializationMaxFailure) { + StreamExecutionEnvironment env = + getEnv(stateBackend, checkpointFile, checkpointInterval, restartAttempts); + env.configure( + new Configuration() + .set( + StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, + Duration.ofMillis(materializationInterval)) + .set( + StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, + materializationMaxFailure)); + return env; + } + + protected StreamExecutionEnvironment getEnv( + StateBackend stateBackend, + File checkpointFile, + long checkpointInterval, + int restartAttempts) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(checkpointInterval) + .enableChangelogStateBackend(true) + .getCheckpointConfig() + .setCheckpointStorage(checkpointFile.toURI()); + env.setStateBackend(stateBackend) + .setRestartStrategy(RestartStrategies.fixedDelayRestart(restartAttempts, 0)); + return env; + } + + protected StreamExecutionEnvironment getEnv(StateBackend stateBackend) throws IOException { + return getEnv(stateBackend, TEMPORARY_FOLDER.newFolder(), 10, 1); + } + + protected JobGraph buildJobGraph( + StreamExecutionEnvironment env, ControlledSource controlledSource, JobID jobId) { + env.addSource(controlledSource) + .keyBy(element -> element) + .map(new CountMapper()) + .addSink(new CollectionSink()) + .setParallelism(1); + return env.getStreamGraph().getJobGraph(jobId); + } + + protected void waitAndAssert(JobGraph jobGraph) throws Exception { + waitUntilJobFinished(jobGraph); + assertEquals(CollectionSink.getActualResult(), ControlledSource.getExpectedResult()); + } + + protected JobID generateJobID() { + byte[] randomBytes = new byte[AbstractID.SIZE]; + ThreadLocalRandom.current().nextBytes(randomBytes); + return JobID.fromByteArray(randomBytes); + } + + protected static List<Long> getAllMaterializationId(File checkpointFile, JobID jobID) + throws IOException { + List<Path> checkpointPaths = findAllSortedExternalizedCheckpoint(checkpointFile, jobID); + for (Path checkpointPath : checkpointPaths) { + CheckpointMetadata checkpointMetadata = + loadCheckpointMetadata(checkpointPath.toString()); + List<Long> materializationIds = + checkpointMetadata.getOperatorStates().stream() + .flatMap(operatorState -> operatorState.getStates().stream()) + .flatMap( + operatorSubtaskState -> + operatorSubtaskState.getManagedKeyedState().stream()) + .map(keyedStateHandle -> (ChangelogStateBackendHandle) keyedStateHandle) + .filter( + changelogStateBackendHandle -> + !changelogStateBackendHandle + .getMaterializedStateHandles() + .isEmpty()) + .map(ChangelogStateBackendHandle::getMaterializationID) + .collect(Collectors.toList()); + if (!materializationIds.isEmpty()) { + return materializationIds; + } Review comment: This means the 1st checkpoint with some materialized state will be returned, right? Is there any particular reason why not the last one? I think at least in case of failover, it could break the check that there were materialization after the recovery ########## File path: flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogPeriodicMaterializationITCase.java ########## @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.SharedReference; + +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This verifies that checkpointing works correctly for Changelog state backend with materialized + * state / non-materialized state. + */ +public class ChangelogPeriodicMaterializationITCase + extends ChangelogPeriodicMaterializationTestBase { + + public ChangelogPeriodicMaterializationITCase(AbstractStateBackend delegatedStateBackend) { + super(delegatedStateBackend); + } + + @Before + public void setup() throws Exception { + super.setup(); + } + + /** Recovery from checkpoint only containing non-materialized state. */ + @Test + public void testNonMaterialization() throws Exception { + StreamExecutionEnvironment env = + getEnv(new DelegatedStateBackendWrapper(delegatedStateBackend, t -> t)); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile(() -> completedCheckpointNum.get() <= 0); + throwArtificialFailure(); + } + } + }, + generateJobID())); + } + + /** Recovery from checkpoint containing non-materialized state and materialized state. */ + @Test + public void testMaterialization() throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + JobID jobID = generateJobID(); + SharedReference<AtomicInteger> currentCheckpointNum = + sharedObjects.add(new AtomicInteger()); + SharedReference<List<Long>> currentMaterializationId = sharedObjects.add(new ArrayList<>()); + StreamExecutionEnvironment env = + getEnv(delegatedStateBackend, checkpointFile, 100, 2, 10, 0); + waitAndAssert( + buildJobGraph( + env, + new ControlledSource() { + @Override + protected void beforeElement(SourceContext<Integer> ctx) + throws Exception { + if (getRuntimeContext().getAttemptNumber() == 0 + && currentIndex == TOTAL_ELEMENTS / 4) { + waitWhile( + () -> { + if (completedCheckpointNum.get() <= 0) { + return true; + } + List<Long> allMaterializationId = + getAllMaterializationId( + checkpointFile, jobID); + if (!allMaterializationId.isEmpty()) { + currentMaterializationId + .get() + .addAll(allMaterializationId); + Collections.sort( + currentMaterializationId.get()); + currentCheckpointNum + .get() + .compareAndSet( + 0, + completedCheckpointNum.get()); + return false; + } + return true; + }); + + throwArtificialFailure(); + } else if (getRuntimeContext().getAttemptNumber() == 1 + && currentIndex == TOTAL_ELEMENTS / 2) { + waitWhile( + () -> { + if (completedCheckpointNum.get() + <= currentCheckpointNum.get().get()) { + return true; + } + List<Long> allMaterializationId = + getAllMaterializationId( + checkpointFile, jobID); + Collections.sort(allMaterializationId); + return allMaterializationId.isEmpty() + || !currentMaterializationId + .get() + .equals(allMaterializationId); + }); + throwArtificialFailure(); + } + } + }, + jobID)); + } + + @Test + public void testFailedMaterialization() throws Exception { + File checkpointFile = TEMPORARY_FOLDER.newFolder(); + JobID jobID = generateJobID(); + SharedReference<AtomicBoolean> hasTriggerCheckpoint = + sharedObjects.add(new AtomicBoolean()); + SharedReference<List<Long>> currentMaterializationId = sharedObjects.add(new ArrayList<>()); + StreamExecutionEnvironment env = + getEnv( + new DelegatedStateBackendWrapper( + delegatedStateBackend, + snapshotResultFuture -> { + if (hasTriggerCheckpoint.get().compareAndSet(false, true)) { + throw new RuntimeException(); Review comment: Is it possible that failed is one subtask and materialized another? How about setting parallelism to 1 explicitly in `getEnv`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org