This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit e3ee2b6c0798478545a8c87c3b918ff59a56799d Author: Igal Shilman <[email protected]> AuthorDate: Wed Apr 1 18:06:30 2020 +0200 [FLINK-16926][core] Configure Execution Env in StatefulFunctionsClusterEntryPoint This commit adds a workaround for FLINK-16560. When submitting the Job via StatefulFunctionsClusterEntryPoint (an adopted version of a JobClusterEntryPoint) The resulting StreamExecutionEnvironment is started with an empty configuration object, and might miss important config options set at flink-conf.yaml. This closes #90 --- .../statefun/flink/core/StatefulFunctionsConfig.java | 11 +++++++++++ .../flink/statefun/flink/core/StatefulFunctionsJob.java | 16 ++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java index 293f917..3487d35 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java @@ -120,6 +120,8 @@ public class StatefulFunctionsConfig implements Serializable { } } + private final Configuration flinkConfiguration; + private MessageFactoryType factoryType; private String flinkJobName; @@ -139,6 +141,7 @@ public class StatefulFunctionsConfig implements Serializable { */ public StatefulFunctionsConfig(Configuration configuration) { StatefulFunctionsConfigValidator.validate(configuration); + this.flinkConfiguration = configuration; this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER); this.flinkJobName = configuration.get(FLINK_JOB_NAME); @@ -240,4 +243,12 @@ public class StatefulFunctionsConfig implements Serializable { public void setGlobalConfiguration(String key, String value) { this.globalConfigurations.put(key, value); } + + /** + * Returns the underlying Flink configuration that used to initialize this {@link + * StatefulFunctionsConfig}. + */ + public Configuration getFlinkConfiguration() { + return flinkConfiguration; + } } diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java index 57ddfc6..25ed940 100644 --- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java +++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java @@ -25,6 +25,7 @@ import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders; import org.apache.flink.statefun.flink.core.translation.FlinkUniverse; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamPlanEnvironment; public class StatefulFunctionsJob { @@ -52,6 +53,7 @@ public class StatefulFunctionsJob { Objects.requireNonNull(stateFunConfig); setDefaultContextClassLoaderIfAbsent(); + configureExecutionEnvironment(env, stateFunConfig); env.getConfig().enableObjectReuse(); @@ -69,6 +71,20 @@ public class StatefulFunctionsJob { env.execute(stateFunConfig.getFlinkJobName()); } + private static void configureExecutionEnvironment( + StreamExecutionEnvironment env, StatefulFunctionsConfig stateFunConfig) { + if (!(env instanceof StreamPlanEnvironment)) { + return; + } + // This is a workaround until FLINK-16560 would be resolved. + // When submitting the Job via StatefulFunctionsClusterEntryPoint (an adopted version of a + // JobClusterEntryPoint) The resulting StreamExecutionEnvironment is started with an empty + // configuration object, hence might miss important config options set at flink-conf.yaml. + ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); + Objects.requireNonNull(contextClassLoader); + env.configure(stateFunConfig.getFlinkConfiguration(), contextClassLoader); + } + private static void setDefaultContextClassLoaderIfAbsent() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (classLoader == null) {
