This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 163dd030767f823a097168578bd2e05b7747c30d
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Thu Dec 17 13:06:01 2020 +0800

    [FLINK-20636] Validate that unaligned checkpoints is not enabled
    
    This closes #187.
---
 .../flink/core/StatefulFunctionsConfigValidator.java        | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index b523e77..0defdf8 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -46,6 +46,7 @@ public final class StatefulFunctionsConfigValidator {
     validateParentFirstClassloaderPatterns(configuration);
     validateCustomPayloadSerializerClassName(configuration);
     validateNoHeapBackedTimers(configuration);
+    validateUnalignedCheckpointsDisabled(configuration);
   }
 
   private static void validateParentFirstClassloaderPatterns(Configuration 
configuration) {
@@ -95,6 +96,9 @@ public final class StatefulFunctionsConfigValidator {
           .stringType()
           .defaultValue("rocksdb");
 
+  private static final ConfigOption<Boolean> ENABLE_UNALIGNED_CHECKPOINTS =
+      
ConfigOptions.key("execution.checkpointing.unaligned").booleanType().defaultValue(false);
+
   private static void validateNoHeapBackedTimers(Configuration configuration) {
     final String timerFactory = configuration.getString(TIMER_SERVICE_FACTORY);
     if (!timerFactory.equalsIgnoreCase("rocksdb")) {
@@ -103,4 +107,13 @@ public final class StatefulFunctionsConfigValidator {
           "StateFun only supports non-heap timers with a rocksdb state 
backend.");
     }
   }
+
+  private static void validateUnalignedCheckpointsDisabled(Configuration 
configuration) {
+    final boolean unalignedCheckpoints = 
configuration.getBoolean(ENABLE_UNALIGNED_CHECKPOINTS);
+    if (unalignedCheckpoints) {
+      throw new StatefulFunctionsInvalidConfigException(
+          ENABLE_UNALIGNED_CHECKPOINTS,
+          "StateFun currently does not support unaligned checkpointing.");
+    }
+  }
 }

Reply via email to