This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 163dd030767f823a097168578bd2e05b7747c30d Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Thu Dec 17 13:06:01 2020 +0800 [FLINK-20636] Validate that unaligned checkpoints is not enabled This closes #187. --- .../flink/core/StatefulFunctionsConfigValidator.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java index b523e77..0defdf8 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java @@ -46,6 +46,7 @@ public final class StatefulFunctionsConfigValidator { validateParentFirstClassloaderPatterns(configuration); validateCustomPayloadSerializerClassName(configuration); validateNoHeapBackedTimers(configuration); + validateUnalignedCheckpointsDisabled(configuration); } private static void validateParentFirstClassloaderPatterns(Configuration configuration) { @@ -95,6 +96,9 @@ public final class StatefulFunctionsConfigValidator { .stringType() .defaultValue("rocksdb"); + private static final ConfigOption<Boolean> ENABLE_UNALIGNED_CHECKPOINTS = + ConfigOptions.key("execution.checkpointing.unaligned").booleanType().defaultValue(false); + private static void validateNoHeapBackedTimers(Configuration configuration) { final String timerFactory = configuration.getString(TIMER_SERVICE_FACTORY); if (!timerFactory.equalsIgnoreCase("rocksdb")) { @@ -103,4 +107,13 @@ public final class StatefulFunctionsConfigValidator { "StateFun only supports non-heap timers with a rocksdb state backend."); } } + + private static void validateUnalignedCheckpointsDisabled(Configuration configuration) { + final boolean unalignedCheckpoints = configuration.getBoolean(ENABLE_UNALIGNED_CHECKPOINTS); + if (unalignedCheckpoints) { + throw new StatefulFunctionsInvalidConfigException( + ENABLE_UNALIGNED_CHECKPOINTS, + "StateFun currently does not support unaligned checkpointing."); + } + } }
