This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 2e8b61874bdb2eff6dd92cef7e2d1445afbbf888
Author: Tzu-Li (Gordon) Tai <[email protected]>
AuthorDate: Fri May 15 14:24:23 2020 +0800

    [FLINK-16928] [core] Remove legacy scheduler config validation
---
 .../flink/core/StatefulFunctionsConfigValidator.java         | 10 ----------
 .../statefun/flink/core/StatefulFunctionsConfigTest.java     | 12 ------------
 2 files changed, 22 deletions(-)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index 88cc943..9a7b0d1 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -26,7 +26,6 @@ import java.util.Locale;
 import java.util.Set;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
 import 
org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 
@@ -43,7 +42,6 @@ public final class StatefulFunctionsConfigValidator {
   static void validate(Configuration configuration) {
     validateParentFirstClassloaderPatterns(configuration);
     validateMaxConcurrentCheckpoints(configuration);
-    validateLegacyScheduler(configuration);
   }
 
   private static void validateParentFirstClassloaderPatterns(Configuration 
configuration) {
@@ -66,14 +64,6 @@ public final class StatefulFunctionsConfigValidator {
     }
   }
 
-  private static void validateLegacyScheduler(Configuration configuration) {
-    String configuredScheduler = 
configuration.get(JobManagerOptions.SCHEDULER);
-    if (!"legacy".equalsIgnoreCase(configuredScheduler)) {
-      throw new StatefulFunctionsInvalidConfigException(
-          JobManagerOptions.SCHEDULER, "Currently the only supported scheduler 
is 'legacy'");
-    }
-  }
-
   private static Set<String> parentFirstClassloaderPatterns(Configuration 
configuration) {
     final String[] split =
         
configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";");
diff --git 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
index cc7f8f3..5a93dd7 100644
--- 
a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
+++ 
b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java
@@ -19,7 +19,6 @@ package org.apache.flink.statefun.flink.core;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import 
org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
 import org.apache.flink.statefun.flink.core.message.MessageFactoryType;
@@ -45,7 +44,6 @@ public class StatefulFunctionsConfigTest {
     configuration.set(
         CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
         "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
-    configuration.set(JobManagerOptions.SCHEDULER, "legacy");
     
configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
     configuration.setString("statefun.module.global-config.key1", "value1");
     configuration.setString("statefun.module.global-config.key2", "value2");
@@ -63,15 +61,6 @@ public class StatefulFunctionsConfigTest {
   }
 
   @Test(expected = StatefulFunctionsInvalidConfigException.class)
-  public void testMissingScheduler() {
-    Configuration configuration = validConfiguration();
-
-    configuration.removeConfig(JobManagerOptions.SCHEDULER);
-
-    new StatefulFunctionsConfig(configuration);
-  }
-
-  @Test(expected = StatefulFunctionsInvalidConfigException.class)
   public void invalidStrictFlinkConfigsThrows() {
     Configuration configuration = new Configuration();
     new StatefulFunctionsConfig(configuration);
@@ -90,7 +79,6 @@ public class StatefulFunctionsConfigTest {
         CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL,
         "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf");
     
configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1);
-    configuration.set(JobManagerOptions.SCHEDULER, "legacy");
     return configuration;
   }
 }

Reply via email to