This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 746acb487be3e4fb8f94505aa9b79cb60b4ce851 Author: Roman Khachatryan <[email protected]> AuthorDate: Fri Feb 11 14:22:51 2022 +0100 [FLINK-26093][tests] Adjust SavepointFormatITCase for ChangelogStateBackend --- .../test/checkpointing/SavepointFormatITCase.java | 276 +++++++++++---------- 1 file changed, 148 insertions(+), 128 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java index 7757354..580ff38 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointFormatITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.configuration.StateChangelogOptions; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.OperatorState; @@ -41,6 +42,7 @@ import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SavepointKeyedStateHandle; +import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -59,11 +61,13 @@ import org.slf4j.event.Level; import java.io.DataInputStream; import java.io.IOException; import java.nio.file.Path; +import java.util.LinkedList; +import java.util.List; import java.util.Optional; -import java.util.function.Consumer; +import java.util.function.BiFunction; import java.util.function.Predicate; -import java.util.stream.Stream; +import static java.util.Arrays.asList; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -78,138 +82,151 @@ public class SavepointFormatITCase { LoggerAuditingExtension loggerAuditingExtension = new LoggerAuditingExtension(SavepointFormatITCase.class, Level.INFO); - private static Stream<Arguments> parameters() { - return Stream.of( - Arguments.of( - SavepointFormatType.CANONICAL, - HEAP, - (Consumer<KeyedStateHandle>) - keyedState -> - assertThat( - keyedState, - instanceOf(SavepointKeyedStateHandle.class))), - Arguments.of( - SavepointFormatType.NATIVE, - HEAP, - (Consumer<KeyedStateHandle>) - keyedState -> - assertThat( - keyedState, - instanceOf(KeyGroupsStateHandle.class))), - Arguments.of( - SavepointFormatType.CANONICAL, - ROCKSDB_FULL_SNAPSHOTS, - (Consumer<KeyedStateHandle>) - keyedState -> - assertThat( - keyedState, - instanceOf(SavepointKeyedStateHandle.class))), - Arguments.of( - SavepointFormatType.NATIVE, - ROCKSDB_FULL_SNAPSHOTS, - (Consumer<KeyedStateHandle>) - keyedState -> - assertThat( - keyedState, - instanceOf(KeyGroupsStateHandle.class))), - Arguments.of( - SavepointFormatType.CANONICAL, - ROCKSDB_INCREMENTAL_SNAPSHOTS, - (Consumer<KeyedStateHandle>) - keyedState -> - assertThat( - keyedState, - instanceOf(SavepointKeyedStateHandle.class))), - Arguments.of( - SavepointFormatType.NATIVE, - ROCKSDB_INCREMENTAL_SNAPSHOTS, - (Consumer<KeyedStateHandle>) - keyedState -> - assertThat( - keyedState, - instanceOf( - IncrementalRemoteKeyedStateHandle.class)))); + private static List<Arguments> parameters() { + // iterate through all combinations of backends, isIncremental, isChangelogEnabled + List<Arguments> result = new LinkedList<>(); + for (BiFunction<Boolean, Boolean, StateBackendConfig> builder : + StateBackendConfig.builders) { + for (boolean incremental : new boolean[] {true, false}) { + for (boolean changelog : new boolean[] {true, false}) { + for (SavepointFormatType formatType : SavepointFormatType.values()) { + result.add(Arguments.of(formatType, builder.apply(incremental, changelog))); + } + } + } + } + return result; + } + + private void validateState( + KeyedStateHandle state, + SavepointFormatType formatType, + StateBackendConfig backendConfig) { + if (formatType == SavepointFormatType.CANONICAL) { + assertThat(state, instanceOf(SavepointKeyedStateHandle.class)); + } else if (backendConfig.isChangelogEnabled()) { + assertThat(state, instanceOf(ChangelogStateBackendHandle.class)); + for (KeyedStateHandle nestedState : + ((ChangelogStateBackendHandle) state).getMaterializedStateHandles()) { + validateNativeNonChangelogState(nestedState, backendConfig); + } + } else { + validateNativeNonChangelogState(state, backendConfig); + } + } + + private void validateNativeNonChangelogState( + KeyedStateHandle state, StateBackendConfig backendConfig) { + if (backendConfig.isIncremental()) { + assertThat(state, instanceOf(IncrementalRemoteKeyedStateHandle.class)); + } else { + assertThat(state, instanceOf(KeyGroupsStateHandle.class)); + } } private abstract static class StateBackendConfig { + protected final boolean changelogEnabled; + protected final boolean incremental; + + protected StateBackendConfig(boolean changelogEnabled, boolean incremental) { + this.changelogEnabled = changelogEnabled; + this.incremental = incremental; + } + public abstract String getName(); - public abstract Configuration getConfiguration(); + public Configuration getConfiguration() { + Configuration stateBackendConfig = new Configuration(); + stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, getConfigName()); + stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental); + stateBackendConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, changelogEnabled); + return stateBackendConfig; + } public int getCheckpointsBeforeSavepoint() { return 0; } + protected abstract String getConfigName(); + @Override public final String toString() { - return getName(); + return String.format( + "%s, incremental: %b, changelog: %b", getName(), incremental, changelogEnabled); } - } - private static final StateBackendConfig HEAP = - new StateBackendConfig() { - @Override - public String getName() { - return "HEAP"; - } - - @Override - public Configuration getConfiguration() { - Configuration stateBackendConfig = new Configuration(); - stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "filesystem"); - stateBackendConfig.set( - CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO); - return stateBackendConfig; - } - }; - - private static final StateBackendConfig ROCKSDB_FULL_SNAPSHOTS = - new StateBackendConfig() { - @Override - public String getName() { - return "ROCKSDB_FULL_SNAPSHOTS"; - } + private static final List<BiFunction<Boolean, Boolean, StateBackendConfig>> builders = + asList(SavepointFormatITCase::getRocksdb, SavepointFormatITCase::heap); - @Override - public Configuration getConfiguration() { - Configuration stateBackendConfig = new Configuration(); - stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "rocksdb"); - stateBackendConfig.set( - CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO); - stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, false); - return stateBackendConfig; - } - }; + public abstract boolean isIncremental(); - private static final StateBackendConfig ROCKSDB_INCREMENTAL_SNAPSHOTS = - new StateBackendConfig() { - @Override - public String getName() { - return "ROCKSDB_INCREMENTAL_SNAPSHOTS"; - } + private boolean isChangelogEnabled() { + return changelogEnabled; + } + } - @Override - public int getCheckpointsBeforeSavepoint() { - return 1; - } + private static StateBackendConfig heap(boolean incremental, boolean changelogEnabled) { + return new StateBackendConfig(changelogEnabled, incremental /* ignored for now */) { + @Override + public String getName() { + return "HEAP"; + } + + @Override + public Configuration getConfiguration() { + Configuration stateBackendConfig = super.getConfiguration(); + stateBackendConfig.set( + CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO); + return stateBackendConfig; + } + + @Override + protected String getConfigName() { + return "filesystem"; + } + + @Override + public boolean isIncremental() { + return false; + } + }; + } - @Override - public Configuration getConfiguration() { - Configuration stateBackendConfig = new Configuration(); - stateBackendConfig.setString(StateBackendOptions.STATE_BACKEND, "rocksdb"); - stateBackendConfig.set( - CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO); - stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); - return stateBackendConfig; - } - }; + private static StateBackendConfig getRocksdb(boolean incremental, boolean changelogEnabled) { + return new StateBackendConfig(changelogEnabled, incremental) { + @Override + public String getName() { + return "ROCKSDB"; + } + + @Override + public int getCheckpointsBeforeSavepoint() { + return 1; + } + + @Override + public boolean isIncremental() { + return this.incremental; + } + + @Override + public Configuration getConfiguration() { + Configuration stateBackendConfig = super.getConfiguration(); + stateBackendConfig.set( + CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO); + return stateBackendConfig; + } + + protected String getConfigName() { + return "rocksdb"; + } + }; + } @ParameterizedTest(name = "[{index}] {0}, {1}") @MethodSource("parameters") public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath( - SavepointFormatType formatType, - StateBackendConfig stateBackendConfig, - Consumer<KeyedStateHandle> stateHandleVerification) + SavepointFormatType formatType, StateBackendConfig stateBackendConfig) throws Exception { final int numTaskManagers = 2; final int numSlotsPerTaskManager = 2; @@ -231,20 +248,16 @@ public class SavepointFormatITCase { submitJobAndTakeSavepoint( miniClusterResource, formatType, - stateBackendConfig.getCheckpointsBeforeSavepoint()); + stateBackendConfig.getCheckpointsBeforeSavepoint(), + config); final CheckpointMetadata metadata = loadCheckpointMetadata(savepointPath); final OperatorState operatorState = metadata.getOperatorStates().stream().filter(hasKeyedState()).findFirst().get(); - operatorState - .getStates() - .forEach( - subtaskState -> { - subtaskState - .getManagedKeyedState() - .forEach(stateHandleVerification); - }); - relocateAndVerify(miniClusterResource, savepointPath, renamedSavepointDir); + operatorState.getStates().stream() + .flatMap(subtaskState -> subtaskState.getManagedKeyedState().stream()) + .forEach(handle -> validateState(handle, formatType, stateBackendConfig)); + relocateAndVerify(miniClusterResource, savepointPath, renamedSavepointDir, config); } finally { miniClusterResource.after(); } @@ -272,14 +285,17 @@ public class SavepointFormatITCase { } private void relocateAndVerify( - MiniClusterWithClientResource cluster, String savepointPath, Path renamedSavepointDir) + MiniClusterWithClientResource cluster, + String savepointPath, + Path renamedSavepointDir, + Configuration config) throws Exception { final org.apache.flink.core.fs.Path oldPath = new org.apache.flink.core.fs.Path(savepointPath); final org.apache.flink.core.fs.Path newPath = new org.apache.flink.core.fs.Path(renamedSavepointDir.toUri().toString()); (new org.apache.flink.core.fs.Path(savepointPath).getFileSystem()).rename(oldPath, newPath); - final JobGraph jobGraph = createJobGraph(); + final JobGraph jobGraph = createJobGraph(config); jobGraph.setSavepointRestoreSettings( SavepointRestoreSettings.forPath( renamedSavepointDir.toUri().toString(), false, RestoreMode.CLAIM)); @@ -293,9 +309,10 @@ public class SavepointFormatITCase { private String submitJobAndTakeSavepoint( MiniClusterWithClientResource cluster, SavepointFormatType formatType, - int checkpointBeforeSavepoint) + int checkpointBeforeSavepoint, + Configuration config) throws Exception { - final JobGraph jobGraph = createJobGraph(); + final JobGraph jobGraph = createJobGraph(config); final JobID jobId = jobGraph.getJobID(); ClusterClient<?> client = cluster.getClusterClient(); @@ -311,8 +328,11 @@ public class SavepointFormatITCase { .get(); } - private static JobGraph createJobGraph() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + private static JobGraph createJobGraph(Configuration config) { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment( + /* pass configuration to prevent any conflicting randomization*/ + config); env.setParallelism(4); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); env.disableOperatorChaining();
