This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.2
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit c317be06cfbe27455cec93a955d0b07cbc4e15ef
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Thu Dec 17 13:06:01 2020 +0800

    [FLINK-20636] Validate that unaligned checkpoints is not enabled
    
    This closes #187.
---
 .../flink/core/StatefulFunctionsConfigValidator.java        | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index 8b464d9..de3f2f5 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -43,6 +43,7 @@ public final class StatefulFunctionsConfigValidator {
   static void validate(Configuration configuration) {
     validateParentFirstClassloaderPatterns(configuration);
     validateNoHeapBackedTimers(configuration);
+    validateUnalignedCheckpointsDisabled(configuration);
   }
 
   private static void validateParentFirstClassloaderPatterns(Configuration 
configuration) {
@@ -70,6 +71,9 @@ public final class StatefulFunctionsConfigValidator {
           .stringType()
           .defaultValue("rocksdb");
 
+  private static final ConfigOption<Boolean> ENABLE_UNALIGNED_CHECKPOINTS =
+      
ConfigOptions.key("execution.checkpointing.unaligned").booleanType().defaultValue(false);
+
   private static void validateNoHeapBackedTimers(Configuration configuration) {
     final String timerFactory = configuration.getString(TIMER_SERVICE_FACTORY);
     if (!timerFactory.equalsIgnoreCase("rocksdb")) {
@@ -78,4 +82,13 @@ public final class StatefulFunctionsConfigValidator {
           "StateFun only supports non-heap timers with a rocksdb state 
backend.");
     }
   }
+
+  private static void validateUnalignedCheckpointsDisabled(Configuration 
configuration) {
+    final boolean unalignedCheckpoints = 
configuration.getBoolean(ENABLE_UNALIGNED_CHECKPOINTS);
+    if (unalignedCheckpoints) {
+      throw new StatefulFunctionsInvalidConfigException(
+          ENABLE_UNALIGNED_CHECKPOINTS,
+          "StateFun currently does not support unaligned checkpointing.");
+    }
+  }
 }

Reply via email to