This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit e3ee2b6c0798478545a8c87c3b918ff59a56799d
Author: Igal Shilman <[email protected]>
AuthorDate: Wed Apr 1 18:06:30 2020 +0200

    [FLINK-16926][core] Configure Execution Env in 
StatefulFunctionsClusterEntryPoint
    
    This commit adds a workaround for FLINK-16560.
    When submitting the Job via StatefulFunctionsClusterEntryPoint (an adopted 
version of a
    JobClusterEntryPoint) The resulting StreamExecutionEnvironment is started 
with an empty
    configuration object, and might miss important config options set at 
flink-conf.yaml.
    
    This closes #90
---
 .../statefun/flink/core/StatefulFunctionsConfig.java     | 11 +++++++++++
 .../flink/statefun/flink/core/StatefulFunctionsJob.java  | 16 ++++++++++++++++
 2 files changed, 27 insertions(+)

diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
index 293f917..3487d35 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfig.java
@@ -120,6 +120,8 @@ public class StatefulFunctionsConfig implements 
Serializable {
     }
   }
 
+  private final Configuration flinkConfiguration;
+
   private MessageFactoryType factoryType;
 
   private String flinkJobName;
@@ -139,6 +141,7 @@ public class StatefulFunctionsConfig implements 
Serializable {
    */
   public StatefulFunctionsConfig(Configuration configuration) {
     StatefulFunctionsConfigValidator.validate(configuration);
+    this.flinkConfiguration = configuration;
 
     this.factoryType = configuration.get(USER_MESSAGE_SERIALIZER);
     this.flinkJobName = configuration.get(FLINK_JOB_NAME);
@@ -240,4 +243,12 @@ public class StatefulFunctionsConfig implements 
Serializable {
   public void setGlobalConfiguration(String key, String value) {
     this.globalConfigurations.put(key, value);
   }
+
+  /**
+   * Returns the underlying Flink configuration that used to initialize this 
{@link
+   * StatefulFunctionsConfig}.
+   */
+  public Configuration getFlinkConfiguration() {
+    return flinkConfiguration;
+  }
 }
diff --git 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
index 57ddfc6..25ed940 100644
--- 
a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
+++ 
b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsJob.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.utils.ParameterTool;
 import 
org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.statefun.flink.core.translation.FlinkUniverse;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamPlanEnvironment;
 
 public class StatefulFunctionsJob {
 
@@ -52,6 +53,7 @@ public class StatefulFunctionsJob {
     Objects.requireNonNull(stateFunConfig);
 
     setDefaultContextClassLoaderIfAbsent();
+    configureExecutionEnvironment(env, stateFunConfig);
 
     env.getConfig().enableObjectReuse();
 
@@ -69,6 +71,20 @@ public class StatefulFunctionsJob {
     env.execute(stateFunConfig.getFlinkJobName());
   }
 
+  private static void configureExecutionEnvironment(
+      StreamExecutionEnvironment env, StatefulFunctionsConfig stateFunConfig) {
+    if (!(env instanceof StreamPlanEnvironment)) {
+      return;
+    }
+    // This is a workaround until FLINK-16560 would be resolved.
+    // When submitting the Job via StatefulFunctionsClusterEntryPoint (an 
adopted version of a
+    // JobClusterEntryPoint) The resulting StreamExecutionEnvironment is 
started with an empty
+    // configuration object, hence might miss important config options set at 
flink-conf.yaml.
+    ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+    Objects.requireNonNull(contextClassLoader);
+    env.configure(stateFunConfig.getFlinkConfiguration(), contextClassLoader);
+  }
+
   private static void setDefaultContextClassLoaderIfAbsent() {
     ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
     if (classLoader == null) {

Reply via email to