This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch flinkenv
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit d8f902fe74e2b3e924ddd7c5a27d452bd81e1735
Author: benjobs <[email protected]>
AuthorDate: Fri Aug 25 17:14:40 2023 +0800

    [Improve] Flink job env initializer improvement
---
 .../streampark/flink/core/EnhancerImplicit.scala   |  28 ++-
 .../flink/core/FlinkStreamingInitializer.scala     |  70 ++----
 .../flink/core/FlinkTableInitializer.scala         | 271 ++++++++-------------
 3 files changed, 150 insertions(+), 219 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
index e8b00e353..528e47907 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -21,6 +21,9 @@ import 
org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, KEY_FLINK_AP
 import org.apache.streampark.common.util.DeflaterUtils
 
 import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 
 import scala.util.Try
 
@@ -28,7 +31,7 @@ object EnhancerImplicit {
 
   implicit class EnhanceParameterTool(parameterTool: ParameterTool) {
 
-    def getAppName(name: String = null, required: Boolean = false): String = {
+    private[flink] def getAppName(name: String = null, required: Boolean = 
false): String = {
       val appName = name match {
         case null =>
           Try(DeflaterUtils.unzipString(parameterTool.get(KEY_APP_NAME(), 
null)))
@@ -43,4 +46,27 @@ object EnhancerImplicit {
 
   }
 
+  implicit class EnhanceTableEnvironment(env: TableEnvironment) {
+
+    private[flink] def setAppName(implicit parameter: ParameterTool): 
TableEnvironment = {
+      val appName = parameter.getAppName()
+      if (appName != null) {
+        env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
+      }
+      env
+    }
+
+  }
+
+  implicit class EnhanceStreamExecutionEnvironment(env: 
StreamTableEnvironment) {
+
+    private[flink] def setAppName(implicit parameter: ParameterTool): 
StreamTableEnvironment = {
+      val appName = parameter.getAppName()
+      if (appName != null) {
+        env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
+      }
+      env
+    }
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 7de68502f..b3ee24c33 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -34,33 +34,17 @@ import java.io.File
 
 private[flink] object FlinkStreamingInitializer {
 
-  private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
   def initialize(args: Array[String], config: (StreamExecutionEnvironment, 
ParameterTool) => Unit)
       : (ParameterTool, StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
-          flinkInitializer.streamEnvConfFunc = config
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.configuration.parameter, 
flinkInitializer.streamEnvironment)
+    val flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
+    flinkInitializer.streamEnvConfFunc = config
+    (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv)
   }
 
   def initialize(args: StreamEnvConfig): (ParameterTool, 
StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args.args, 
ApiType.java)
-          flinkInitializer.javaStreamEnvConfFunc = args.conf
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.configuration.parameter, 
flinkInitializer.streamEnvironment)
+    val flinkInitializer = new FlinkStreamingInitializer(args.args, 
ApiType.java)
+    flinkInitializer.javaStreamEnvConfFunc = args.conf
+    (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv)
   }
 }
 
@@ -75,7 +59,22 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
 
   var javaTableEnvConfFunc: TableEnvConfigFunction = _
 
-  private[this] var localStreamEnv: StreamExecutionEnvironment = _
+  implicit private[flink] val parameter: ParameterTool = 
configuration.parameter
+
+  lazy val streamEnv: StreamExecutionEnvironment = {
+    val env = new StreamExecutionEnvironment(
+      JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
+
+    apiType match {
+      case ApiType.java if javaStreamEnvConfFunc != null =>
+        javaStreamEnvConfFunc.configuration(env.getJavaEnv, 
configuration.parameter)
+      case ApiType.scala if streamEnvConfFunc != null =>
+        streamEnvConfFunc(env, configuration.parameter)
+      case _ =>
+    }
+    env.getConfig.setGlobalJobParameters(configuration.parameter)
+    env
+  }
 
   lazy val configuration: FlinkConfiguration = initParameter()
 
@@ -147,29 +146,4 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     map
   }
 
-  def streamEnvironment: StreamExecutionEnvironment = {
-    if (localStreamEnv == null) {
-      this.synchronized {
-        if (localStreamEnv == null) {
-          initEnvironment()
-        }
-      }
-    }
-    localStreamEnv
-  }
-
-  def initEnvironment(): Unit = {
-    localStreamEnv = new StreamExecutionEnvironment(
-      JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
-
-    apiType match {
-      case ApiType.java if javaStreamEnvConfFunc != null =>
-        javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, 
configuration.parameter)
-      case ApiType.scala if streamEnvConfFunc != null =>
-        streamEnvConfFunc(localStreamEnv, configuration.parameter)
-      case _ =>
-    }
-    localStreamEnv.getConfig.setGlobalJobParameters(configuration.parameter)
-  }
-
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 2a8cda83f..a401b437e 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,15 +17,14 @@
 package org.apache.streampark.flink.core
 
 import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
+import org.apache.streampark.common.enums.{ApiType, PlannerType}
 import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.TableMode.TableMode
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.streampark.flink.core.EnhancerImplicit._
 import org.apache.streampark.flink.core.conf.FlinkConfiguration
 
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, PipelineOptions}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableEnvironment}
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
@@ -38,34 +37,18 @@ import scala.util.{Failure, Success, Try}
 
 private[flink] object FlinkTableInitializer {
 
-  private[this] var flinkInitializer: FlinkTableInitializer = _
-
   def initialize(
       args: Array[String],
       config: (TableConfig, ParameterTool) => Unit): (ParameterTool, 
TableEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
-          flinkInitializer.tableConfFunc = config
-          flinkInitializer.initEnvironment(TableMode.batch)
-        }
-      }
-    }
-    (flinkInitializer.configuration.parameter, 
flinkInitializer.tableEnvironment)
+    val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
+    flinkInitializer.tableConfFunc = config
+    (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv)
   }
 
   def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
-          flinkInitializer.javaTableEnvConfFunc = args.conf
-          flinkInitializer.initEnvironment(TableMode.batch)
-        }
-      }
-    }
-    (flinkInitializer.configuration.parameter, 
flinkInitializer.tableEnvironment)
+    val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
+    flinkInitializer.javaTableEnvConfFunc = args.conf
+    (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv)
   }
 
   def initialize(
@@ -73,38 +56,25 @@ private[flink] object FlinkTableInitializer {
       configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
       configTable: (TableConfig, ParameterTool) => Unit)
       : (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
-          flinkInitializer.streamEnvConfFunc = configStream
-          flinkInitializer.tableConfFunc = configTable
-          flinkInitializer.initEnvironment(TableMode.streaming)
-        }
-      }
-    }
+
+    val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
+    flinkInitializer.streamEnvConfFunc = configStream
+    flinkInitializer.tableConfFunc = configTable
     (
       flinkInitializer.configuration.parameter,
-      flinkInitializer.streamEnvironment,
-      flinkInitializer.streamTableEnvironment)
+      flinkInitializer.streamEnv,
+      flinkInitializer.streamTableEnv)
   }
 
   def initialize(args: StreamTableEnvConfig)
       : (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
-          flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
-          flinkInitializer.javaTableEnvConfFunc = args.tableConfig
-          flinkInitializer.initEnvironment(TableMode.streaming)
-        }
-      }
-    }
+    val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
+    flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
+    flinkInitializer.javaTableEnvConfFunc = args.tableConfig
     (
       flinkInitializer.configuration.parameter,
-      flinkInitializer.streamEnvironment,
-      flinkInitializer.streamTableEnvironment)
+      flinkInitializer.streamEnv,
+      flinkInitializer.streamTableEnv)
   }
 
 }
@@ -112,30 +82,95 @@ private[flink] object FlinkTableInitializer {
 private[flink] class FlinkTableInitializer(args: Array[String], apiType: 
ApiType)
   extends FlinkStreamingInitializer(args, apiType) {
 
-  private[this] var localStreamTableEnv: StreamTableEnvironment = _
+  private[this] lazy val envSettings = {
 
-  private[this] var localTableEnv: TableEnvironment = _
+    val builder = EnvironmentSettings.newInstance()
 
-  def streamTableEnvironment: StreamTableEnvironment = {
-    if (localStreamTableEnv == null) {
-      this.synchronized {
-        if (localStreamTableEnv == null) {
-          initEnvironment(TableMode.streaming)
+    Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
+      .getOrElse(PlannerType.blink) match {
+      case PlannerType.blink =>
+        val useBlinkPlanner =
+          
Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
+        if (useBlinkPlanner == null) {
+          logWarn("useBlinkPlanner deprecated")
+        } else {
+          useBlinkPlanner.setAccessible(true)
+          useBlinkPlanner.invoke(builder)
+          logInfo("blinkPlanner will be use.")
         }
-      }
+      case PlannerType.old =>
+        val useOldPlanner = 
Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
+        if (useOldPlanner == null) {
+          logWarn("useOldPlanner deprecated")
+        } else {
+          useOldPlanner.setAccessible(true)
+          useOldPlanner.invoke(builder)
+          logInfo("useOldPlanner will be use.")
+        }
+      case PlannerType.any =>
+        val useAnyPlanner = 
Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
+        if (useAnyPlanner == null) {
+          logWarn("useAnyPlanner deprecated")
+        } else {
+          logInfo("useAnyPlanner will be use.")
+          useAnyPlanner.setAccessible(true)
+          useAnyPlanner.invoke(builder)
+        }
+    }
+
+    val buildWith =
+      (parameter.get(KEY_FLINK_TABLE_CATALOG), 
parameter.get(KEY_FLINK_TABLE_DATABASE))
+    buildWith match {
+      case (x: String, y: String) if x != null && y != null =>
+        logInfo(s"with built in catalog: $x")
+        logInfo(s"with built in database: $y")
+        builder.withBuiltInCatalogName(x)
+        builder.withBuiltInDatabaseName(y)
+      case (x: String, _) if x != null =>
+        logInfo(s"with built in catalog: $x")
+        builder.withBuiltInCatalogName(x)
+      case (_, y: String) if y != null =>
+        logInfo(s"with built in database: $y")
+        builder.withBuiltInDatabaseName(y)
+      case _ =>
     }
-    localStreamTableEnv
+    builder
   }
 
-  def tableEnvironment: TableEnvironment = {
-    if (localTableEnv == null) {
-      this.synchronized {
-        if (localTableEnv == null) {
-          initEnvironment(TableMode.batch)
-        }
-      }
+  lazy val tableEnv: TableEnvironment = {
+    logInfo(s"job working in batch mode")
+    envSettings.inBatchMode()
+    val tableEnv = TableEnvironment.create(envSettings.build()).setAppName
+    apiType match {
+      case ApiType.java if javaTableEnvConfFunc != null =>
+        javaTableEnvConfFunc.configuration(tableEnv.getConfig, parameter)
+      case ApiType.scala if tableConfFunc != null =>
+        tableConfFunc(tableEnv.getConfig, parameter)
+      case _ =>
     }
-    localTableEnv
+    tableEnv
+  }
+
+  lazy val streamTableEnv: StreamTableEnvironment = {
+    logInfo(s"components should work in streaming mode")
+    envSettings.inStreamingMode()
+    val setting = envSettings.build()
+
+    if (streamEnvConfFunc != null) {
+      streamEnvConfFunc(streamEnv, parameter)
+    }
+    if (javaStreamEnvConfFunc != null) {
+      javaStreamEnvConfFunc.configuration(streamEnv.getJavaEnv, parameter)
+    }
+    val streamTableEnv = StreamTableEnvironment.create(streamEnv, 
setting).setAppName
+    apiType match {
+      case ApiType.java if javaTableEnvConfFunc != null =>
+        javaTableEnvConfFunc.configuration(streamTableEnv.getConfig, parameter)
+      case ApiType.scala if tableConfFunc != null =>
+        tableConfFunc(streamTableEnv.getConfig, parameter)
+      case _ =>
+    }
+    streamTableEnv
   }
 
   /** In case of table SQL, the parameter conf is not required, it depends on 
the developer. */
@@ -201,108 +236,4 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
     }
   }
 
-  def initEnvironment(tableMode: TableMode): Unit = {
-    val builder = EnvironmentSettings.newInstance()
-    val parameter = configuration.parameter
-    Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
-      .getOrElse(PlannerType.blink) match {
-      case PlannerType.blink =>
-        val useBlinkPlanner =
-          
Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
-        if (useBlinkPlanner == null) {
-          logWarn("useBlinkPlanner deprecated")
-        } else {
-          useBlinkPlanner.setAccessible(true)
-          useBlinkPlanner.invoke(builder)
-          logInfo("blinkPlanner will be use.")
-        }
-      case PlannerType.old =>
-        val useOldPlanner = 
Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
-        if (useOldPlanner == null) {
-          logWarn("useOldPlanner deprecated")
-        } else {
-          useOldPlanner.setAccessible(true)
-          useOldPlanner.invoke(builder)
-          logInfo("useOldPlanner will be use.")
-        }
-      case PlannerType.any =>
-        val useAnyPlanner = 
Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
-        if (useAnyPlanner == null) {
-          logWarn("useAnyPlanner deprecated")
-        } else {
-          logInfo("useAnyPlanner will be use.")
-          useAnyPlanner.setAccessible(true)
-          useAnyPlanner.invoke(builder)
-        }
-    }
-
-    val mode = 
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
-    mode match {
-      case TableMode.batch =>
-        logInfo(s"components should work in $tableMode mode")
-        builder.inBatchMode()
-      case TableMode.streaming =>
-        logInfo(s"components should work in $tableMode mode")
-        builder.inStreamingMode()
-    }
-
-    val buildWith =
-      (parameter.get(KEY_FLINK_TABLE_CATALOG), 
parameter.get(KEY_FLINK_TABLE_DATABASE))
-    buildWith match {
-      case (x: String, y: String) if x != null && y != null =>
-        logInfo(s"with built in catalog: $x")
-        logInfo(s"with built in database: $y")
-        builder.withBuiltInCatalogName(x)
-        builder.withBuiltInDatabaseName(y)
-      case (x: String, _) if x != null =>
-        logInfo(s"with built in catalog: $x")
-        builder.withBuiltInCatalogName(x)
-      case (_, y: String) if y != null =>
-        logInfo(s"with built in database: $y")
-        builder.withBuiltInDatabaseName(y)
-      case _ =>
-    }
-    val setting = builder.build()
-    tableMode match {
-      case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
-      case TableMode.streaming =>
-        initEnvironment()
-        if (streamEnvConfFunc != null) {
-          streamEnvConfFunc(streamEnvironment, parameter)
-        }
-        if (javaStreamEnvConfFunc != null) {
-          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
parameter)
-        }
-        localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, 
setting)
-    }
-    val appName = parameter.getAppName()
-    if (appName != null) {
-      tableMode match {
-        case TableMode.batch =>
-          
localTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, 
appName)
-        case TableMode.streaming =>
-          
localStreamTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, 
appName)
-      }
-    }
-
-    apiType match {
-      case ApiType.java =>
-        if (javaTableEnvConfFunc != null) {
-          tableMode match {
-            case TableMode.batch =>
-              javaTableEnvConfFunc.configuration(localTableEnv.getConfig, 
parameter)
-            case TableMode.streaming =>
-              
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
-          }
-        }
-      case ApiType.scala =>
-        if (tableConfFunc != null) {
-          tableMode match {
-            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
parameter)
-            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, parameter)
-          }
-        }
-    }
-  }
-
 }

Reply via email to