This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit d146a90499f47c8410e4bac1e81a4f63644c7e57 Author: Tzu-Li (Gordon) Tai <[email protected]> AuthorDate: Fri May 15 14:24:23 2020 +0800 [FLINK-16928] [core] Remove legacy scheduler config validation --- .../flink/core/StatefulFunctionsConfigValidator.java | 10 ---------- .../statefun/flink/core/StatefulFunctionsConfigTest.java | 12 ------------ 2 files changed, 22 deletions(-) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java index 88cc943..9a7b0d1 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java @@ -26,7 +26,6 @@ import java.util.Locale; import java.util.Set; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; @@ -43,7 +42,6 @@ public final class StatefulFunctionsConfigValidator { static void validate(Configuration configuration) { validateParentFirstClassloaderPatterns(configuration); validateMaxConcurrentCheckpoints(configuration); - validateLegacyScheduler(configuration); } private static void validateParentFirstClassloaderPatterns(Configuration configuration) { @@ -66,14 +64,6 @@ public final class StatefulFunctionsConfigValidator { } } - private static void validateLegacyScheduler(Configuration configuration) { - String configuredScheduler = configuration.get(JobManagerOptions.SCHEDULER); - if (!"legacy".equalsIgnoreCase(configuredScheduler)) { - throw new StatefulFunctionsInvalidConfigException( - JobManagerOptions.SCHEDULER, "Currently the only supported scheduler is 'legacy'"); - } - } - private static Set<String> parentFirstClassloaderPatterns(Configuration configuration) { final String[] split = configuration.get(CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL).split(";"); diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java index cc7f8f3..5a93dd7 100644 --- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java +++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigTest.java @@ -19,7 +19,6 @@ package org.apache.flink.statefun.flink.core; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; -import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException; import org.apache.flink.statefun.flink.core.message.MessageFactoryType; @@ -45,7 +44,6 @@ public class StatefulFunctionsConfigTest { configuration.set( CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"); - configuration.set(JobManagerOptions.SCHEDULER, "legacy"); configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1); configuration.setString("statefun.module.global-config.key1", "value1"); configuration.setString("statefun.module.global-config.key2", "value2"); @@ -63,15 +61,6 @@ public class StatefulFunctionsConfigTest { } @Test(expected = StatefulFunctionsInvalidConfigException.class) - public void testMissingScheduler() { - Configuration configuration = validConfiguration(); - - configuration.removeConfig(JobManagerOptions.SCHEDULER); - - new StatefulFunctionsConfig(configuration); - } - - @Test(expected = StatefulFunctionsInvalidConfigException.class) public void invalidStrictFlinkConfigsThrows() { Configuration configuration = new Configuration(); new StatefulFunctionsConfig(configuration); @@ -90,7 +79,6 @@ public class StatefulFunctionsConfigTest { CoreOptions.ALWAYS_PARENT_FIRST_LOADER_PATTERNS_ADDITIONAL, "org.apache.flink.statefun;org.apache.kafka;com.google.protobuf"); configuration.set(ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, 1); - configuration.set(JobManagerOptions.SCHEDULER, "legacy"); return configuration; } }
