This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-2.2 in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit c317be06cfbe27455cec93a955d0b07cbc4e15ef Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Thu Dec 17 13:06:01 2020 +0800 [FLINK-20636] Validate that unaligned checkpoints is not enabled This closes #187. --- .../flink/core/StatefulFunctionsConfigValidator.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java index 8b464d9..de3f2f5 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java @@ -43,6 +43,7 @@ public final class StatefulFunctionsConfigValidator { static void validate(Configuration configuration) { validateParentFirstClassloaderPatterns(configuration); validateNoHeapBackedTimers(configuration); + validateUnalignedCheckpointsDisabled(configuration); } private static void validateParentFirstClassloaderPatterns(Configuration configuration) { @@ -70,6 +71,9 @@ public final class StatefulFunctionsConfigValidator { .stringType() .defaultValue("rocksdb"); + private static final ConfigOption<Boolean> ENABLE_UNALIGNED_CHECKPOINTS = + ConfigOptions.key("execution.checkpointing.unaligned").booleanType().defaultValue(false); + private static void validateNoHeapBackedTimers(Configuration configuration) { final String timerFactory = configuration.getString(TIMER_SERVICE_FACTORY); if (!timerFactory.equalsIgnoreCase("rocksdb")) { @@ -78,4 +82,13 @@ public final class StatefulFunctionsConfigValidator { "StateFun only supports non-heap timers with a rocksdb state backend."); } } + + private static void validateUnalignedCheckpointsDisabled(Configuration configuration) { + final boolean unalignedCheckpoints = configuration.getBoolean(ENABLE_UNALIGNED_CHECKPOINTS); + if (unalignedCheckpoints) { + throw new StatefulFunctionsInvalidConfigException( + ENABLE_UNALIGNED_CHECKPOINTS, + "StateFun currently does not support unaligned checkpointing."); + } + } }
