This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2529d272d29db12fe9835e3c8260a661ae011e28
Author: Igal Shilman <[email protected]>
AuthorDate: Thu Apr 2 00:01:51 2020 +0200

    [FLINK-16927][core] Add legacy scheduler property validation
    
    This commit requires that the value of `jobmanager.scheduler` configuration
    parameter is set to legacy.
    
    This closes #91
---
 .../core/StatefulFunctionsConfigValidator.java     | 10 ++++++++
 .../flink/core/StatefulFunctionsConfigTest.java    | 28 ++++++++++++++++++++++
 .../flink/statefun/flink/harness/Harness.java      |  3 +++
 3 files changed, 41 insertions(+)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index 9a7b0d1..88cc943 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -26,6 +26,7 @@ import java.util.Locale;
 import java.util.Set;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import 
org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 
@@ -42,6 +43,7 @@ public final class StatefulFunctionsConfigValidator {
   static void validate(Configuration configuration) {
     validateParentFirstClassloaderPatterns(configuration);
     validateMaxConcurrentCheckpoints(configuration);
+    validateLegacyScheduler(configuration);
   }
 
   private static void validateParentFirstClassloaderPatterns(Configuration 
configuration) {
@@ -64,6 +66,14 @@ public final class StatefulFunctionsConfigValidator {
     }
   }
 
+  private static void validateLegacyScheduler(Configuration configuration) {
+    String configuredScheduler = 
configuration.get(JobManagerOptions.SCHEDULER);
+    if (!"legacy".equalsIgnoreCase(configuredScheduler)) {
+      throw new StatefulFunctionsInvalidConfigException(
+          JobManagerOptions.SCHEDULER, "Currently the only supported scheduler 
is 'legacy'");
+    }
+  }
+
   private static Set<String> parentFirstClassloaderPatterns(Configuration 
configuration) {
     final String[] split =
         
configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";");
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index 90dad85..cc7f8f3 100644
--- 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.statefun.flink.core;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import 
org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
@@ -44,6 +45,7 @@ public class StatefulFunctionsConfigTest {
     configuration.set(
         CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
         "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
+    configuration.set(JobManagerOptions.SCHEDULER, "legacy");
     
configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
     configuration.setString("statefun.module.global-config.key1", "value1");
     configuration.setString("statefun.module.global-config.key2", "value2");
@@ -61,8 +63,34 @@ public class StatefulFunctionsConfigTest {
   }
 
   @Test(expected = StatefulFunctionsInvalidConfigException.class)
+  public void testMissingScheduler() {
+    Configuration configuration = validConfiguration();
+
+    configuration.removeConfig(JobManagerOptions.SCHEDULER);
+
+    new StatefulFunctionsConfig(configuration);
+  }
+
+  @Test(expected = StatefulFunctionsInvalidConfigException.class)
   public void invalidStrictFlinkConfigsThrows() {
     Configuration configuration = new Configuration();
     new StatefulFunctionsConfig(configuration);
   }
+
+  private static Configuration validConfiguration() {
+    Configuration configuration = new Configuration();
+    configuration.set(StatefulFunctionsConfig.FLINK_JOB_NAME, "name");
+    configuration.set(
+        StatefulFunctionsConfig.USER_MESSAGE_SERIALIZER, 
MessageFactoryType.WITH_KRYO_PAYLOADS);
+    configuration.set(
+        StatefulFunctionsConfig.TOTAL_MEMORY_USED_FOR_FEEDBACK_CHECKPOINTING,
+        MemorySize.ofMebiBytes(100));
+    configuration.set(StatefulFunctionsConfig.ASYNC_MAX_OPERATIONS_PER_TASK, 
100);
+    configuration.set(
+        CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
+        "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
+    
configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
+    configuration.set(JobManagerOptions.SCHEDULER, "legacy");
+    return configuration;
+  }
 }
diff --git 
a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
 
b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
index 6fe3856..bcab218 100644
--- 
a/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
+++ 
b/statefun-flink/statefun-flink-harness/src/main/java/org/apache/flink/statefun/flink/harness/Harness.java
@@ -22,6 +22,8 @@ import java.util.Map;
 import java.util.Objects;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.runtime.dispatcher.SchedulerNGFactoryFactory;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsConfigValidator;
 import org.apache.flink.statefun.flink.core.StatefulFunctionsJob;
@@ -167,5 +169,6 @@ public class Harness {
     flinkConfig.set(
         ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS,
         StatefulFunctionsConfigValidator.MAX_CONCURRENT_CHECKPOINTS);
+    flinkConfig.set(JobManagerOptions.SCHEDULER, 
SchedulerNGFactoryFactory.SCHEDULER_TYPE_LEGACY);
   }
 }

Reply via email to