This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch flinkenv in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit d8f902fe74e2b3e924ddd7c5a27d452bd81e1735 Author: benjobs <[email protected]> AuthorDate: Fri Aug 25 17:14:40 2023 +0800 [Improve] Flink job env initializer improvement --- .../streampark/flink/core/EnhancerImplicit.scala | 28 ++- .../flink/core/FlinkStreamingInitializer.scala | 70 ++---- .../flink/core/FlinkTableInitializer.scala | 271 ++++++++------------- 3 files changed, 150 insertions(+), 219 deletions(-) diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala index e8b00e353..528e47907 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala @@ -21,6 +21,9 @@ import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, KEY_FLINK_AP import org.apache.streampark.common.util.DeflaterUtils import org.apache.flink.api.java.utils.ParameterTool +import org.apache.flink.configuration.PipelineOptions +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import scala.util.Try @@ -28,7 +31,7 @@ object EnhancerImplicit { implicit class EnhanceParameterTool(parameterTool: ParameterTool) { - def getAppName(name: String = null, required: Boolean = false): String = { + private[flink] def getAppName(name: String = null, required: Boolean = false): String = { val appName = name match { case null => Try(DeflaterUtils.unzipString(parameterTool.get(KEY_APP_NAME(), null))) @@ -43,4 +46,27 @@ object EnhancerImplicit { } + implicit class EnhanceTableEnvironment(env: TableEnvironment) { + + private[flink] def setAppName(implicit parameter: ParameterTool): TableEnvironment = { + val appName = parameter.getAppName() + if (appName != null) { + env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName) + } + env + } + + } + + implicit class EnhanceStreamExecutionEnvironment(env: StreamTableEnvironment) { + + private[flink] def setAppName(implicit parameter: ParameterTool): StreamTableEnvironment = { + val appName = parameter.getAppName() + if (appName != null) { + env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName) + } + env + } + } + } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index 7de68502f..b3ee24c33 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -34,33 +34,17 @@ import java.io.File private[flink] object FlinkStreamingInitializer { - private[this] var flinkInitializer: FlinkStreamingInitializer = _ - def initialize(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit) : (ParameterTool, StreamExecutionEnvironment) = { - if (flinkInitializer == null) { - this.synchronized { - if (flinkInitializer == null) { - flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala) - flinkInitializer.streamEnvConfFunc = config - flinkInitializer.initEnvironment() - } - } - } - (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment) + val flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala) + flinkInitializer.streamEnvConfFunc = config + (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv) } def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = { - if (flinkInitializer == null) { - this.synchronized { - if (flinkInitializer == null) { - flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java) - flinkInitializer.javaStreamEnvConfFunc = args.conf - flinkInitializer.initEnvironment() - } - } - } - (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment) + val flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java) + flinkInitializer.javaStreamEnvConfFunc = args.conf + (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv) } } @@ -75,7 +59,22 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api var javaTableEnvConfFunc: TableEnvConfigFunction = _ - private[this] var localStreamEnv: StreamExecutionEnvironment = _ + implicit private[flink] val parameter: ParameterTool = configuration.parameter + + lazy val streamEnv: StreamExecutionEnvironment = { + val env = new StreamExecutionEnvironment( + JavaStreamEnv.getExecutionEnvironment(configuration.envConfig)) + + apiType match { + case ApiType.java if javaStreamEnvConfFunc != null => + javaStreamEnvConfFunc.configuration(env.getJavaEnv, configuration.parameter) + case ApiType.scala if streamEnvConfFunc != null => + streamEnvConfFunc(env, configuration.parameter) + case _ => + } + env.getConfig.setGlobalJobParameters(configuration.parameter) + env + } lazy val configuration: FlinkConfiguration = initParameter() @@ -147,29 +146,4 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api map } - def streamEnvironment: StreamExecutionEnvironment = { - if (localStreamEnv == null) { - this.synchronized { - if (localStreamEnv == null) { - initEnvironment() - } - } - } - localStreamEnv - } - - def initEnvironment(): Unit = { - localStreamEnv = new StreamExecutionEnvironment( - JavaStreamEnv.getExecutionEnvironment(configuration.envConfig)) - - apiType match { - case ApiType.java if javaStreamEnvConfFunc != null => - javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, configuration.parameter) - case ApiType.scala if streamEnvConfFunc != null => - streamEnvConfFunc(localStreamEnv, configuration.parameter) - case _ => - } - localStreamEnv.getConfig.setGlobalJobParameters(configuration.parameter) - } - } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index 2a8cda83f..a401b437e 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -17,15 +17,14 @@ package org.apache.streampark.flink.core import org.apache.streampark.common.conf.ConfigConst._ -import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode} +import org.apache.streampark.common.enums.{ApiType, PlannerType} import org.apache.streampark.common.enums.ApiType.ApiType -import org.apache.streampark.common.enums.TableMode.TableMode import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils} import org.apache.streampark.flink.core.EnhancerImplicit._ import org.apache.streampark.flink.core.conf.FlinkConfiguration import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.configuration.{Configuration, PipelineOptions} +import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableEnvironment} import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment @@ -38,34 +37,18 @@ import scala.util.{Failure, Success, Try} private[flink] object FlinkTableInitializer { - private[this] var flinkInitializer: FlinkTableInitializer = _ - def initialize( args: Array[String], config: (TableConfig, ParameterTool) => Unit): (ParameterTool, TableEnvironment) = { - if (flinkInitializer == null) { - this.synchronized { - if (flinkInitializer == null) { - flinkInitializer = new FlinkTableInitializer(args, ApiType.scala) - flinkInitializer.tableConfFunc = config - flinkInitializer.initEnvironment(TableMode.batch) - } - } - } - (flinkInitializer.configuration.parameter, flinkInitializer.tableEnvironment) + val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala) + flinkInitializer.tableConfFunc = config + (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv) } def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = { - if (flinkInitializer == null) { - this.synchronized { - if (flinkInitializer == null) { - flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java) - flinkInitializer.javaTableEnvConfFunc = args.conf - flinkInitializer.initEnvironment(TableMode.batch) - } - } - } - (flinkInitializer.configuration.parameter, flinkInitializer.tableEnvironment) + val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java) + flinkInitializer.javaTableEnvConfFunc = args.conf + (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv) } def initialize( @@ -73,38 +56,25 @@ private[flink] object FlinkTableInitializer { configStream: (StreamExecutionEnvironment, ParameterTool) => Unit, configTable: (TableConfig, ParameterTool) => Unit) : (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = { - if (flinkInitializer == null) { - this.synchronized { - if (flinkInitializer == null) { - flinkInitializer = new FlinkTableInitializer(args, ApiType.scala) - flinkInitializer.streamEnvConfFunc = configStream - flinkInitializer.tableConfFunc = configTable - flinkInitializer.initEnvironment(TableMode.streaming) - } - } - } + + val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala) + flinkInitializer.streamEnvConfFunc = configStream + flinkInitializer.tableConfFunc = configTable ( flinkInitializer.configuration.parameter, - flinkInitializer.streamEnvironment, - flinkInitializer.streamTableEnvironment) + flinkInitializer.streamEnv, + flinkInitializer.streamTableEnv) } def initialize(args: StreamTableEnvConfig) : (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = { - if (flinkInitializer == null) { - this.synchronized { - if (flinkInitializer == null) { - flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java) - flinkInitializer.javaStreamEnvConfFunc = args.streamConfig - flinkInitializer.javaTableEnvConfFunc = args.tableConfig - flinkInitializer.initEnvironment(TableMode.streaming) - } - } - } + val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java) + flinkInitializer.javaStreamEnvConfFunc = args.streamConfig + flinkInitializer.javaTableEnvConfFunc = args.tableConfig ( flinkInitializer.configuration.parameter, - flinkInitializer.streamEnvironment, - flinkInitializer.streamTableEnvironment) + flinkInitializer.streamEnv, + flinkInitializer.streamTableEnv) } } @@ -112,30 +82,95 @@ private[flink] object FlinkTableInitializer { private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType) extends FlinkStreamingInitializer(args, apiType) { - private[this] var localStreamTableEnv: StreamTableEnvironment = _ + private[this] lazy val envSettings = { - private[this] var localTableEnv: TableEnvironment = _ + val builder = EnvironmentSettings.newInstance() - def streamTableEnvironment: StreamTableEnvironment = { - if (localStreamTableEnv == null) { - this.synchronized { - if (localStreamTableEnv == null) { - initEnvironment(TableMode.streaming) + Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))) + .getOrElse(PlannerType.blink) match { + case PlannerType.blink => + val useBlinkPlanner = + Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null) + if (useBlinkPlanner == null) { + logWarn("useBlinkPlanner deprecated") + } else { + useBlinkPlanner.setAccessible(true) + useBlinkPlanner.invoke(builder) + logInfo("blinkPlanner will be use.") } - } + case PlannerType.old => + val useOldPlanner = Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null) + if (useOldPlanner == null) { + logWarn("useOldPlanner deprecated") + } else { + useOldPlanner.setAccessible(true) + useOldPlanner.invoke(builder) + logInfo("useOldPlanner will be use.") + } + case PlannerType.any => + val useAnyPlanner = Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null) + if (useAnyPlanner == null) { + logWarn("useAnyPlanner deprecated") + } else { + logInfo("useAnyPlanner will be use.") + useAnyPlanner.setAccessible(true) + useAnyPlanner.invoke(builder) + } + } + + val buildWith = + (parameter.get(KEY_FLINK_TABLE_CATALOG), parameter.get(KEY_FLINK_TABLE_DATABASE)) + buildWith match { + case (x: String, y: String) if x != null && y != null => + logInfo(s"with built in catalog: $x") + logInfo(s"with built in database: $y") + builder.withBuiltInCatalogName(x) + builder.withBuiltInDatabaseName(y) + case (x: String, _) if x != null => + logInfo(s"with built in catalog: $x") + builder.withBuiltInCatalogName(x) + case (_, y: String) if y != null => + logInfo(s"with built in database: $y") + builder.withBuiltInDatabaseName(y) + case _ => } - localStreamTableEnv + builder } - def tableEnvironment: TableEnvironment = { - if (localTableEnv == null) { - this.synchronized { - if (localTableEnv == null) { - initEnvironment(TableMode.batch) - } - } + lazy val tableEnv: TableEnvironment = { + logInfo(s"job working in batch mode") + envSettings.inBatchMode() + val tableEnv = TableEnvironment.create(envSettings.build()).setAppName + apiType match { + case ApiType.java if javaTableEnvConfFunc != null => + javaTableEnvConfFunc.configuration(tableEnv.getConfig, parameter) + case ApiType.scala if tableConfFunc != null => + tableConfFunc(tableEnv.getConfig, parameter) + case _ => } - localTableEnv + tableEnv + } + + lazy val streamTableEnv: StreamTableEnvironment = { + logInfo(s"components should work in streaming mode") + envSettings.inStreamingMode() + val setting = envSettings.build() + + if (streamEnvConfFunc != null) { + streamEnvConfFunc(streamEnv, parameter) + } + if (javaStreamEnvConfFunc != null) { + javaStreamEnvConfFunc.configuration(streamEnv.getJavaEnv, parameter) + } + val streamTableEnv = StreamTableEnvironment.create(streamEnv, setting).setAppName + apiType match { + case ApiType.java if javaTableEnvConfFunc != null => + javaTableEnvConfFunc.configuration(streamTableEnv.getConfig, parameter) + case ApiType.scala if tableConfFunc != null => + tableConfFunc(streamTableEnv.getConfig, parameter) + case _ => + } + streamTableEnv } /** In case of table SQL, the parameter conf is not required, it depends on the developer. */ @@ -201,108 +236,4 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType } } - def initEnvironment(tableMode: TableMode): Unit = { - val builder = EnvironmentSettings.newInstance() - val parameter = configuration.parameter - Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))) - .getOrElse(PlannerType.blink) match { - case PlannerType.blink => - val useBlinkPlanner = - Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null) - if (useBlinkPlanner == null) { - logWarn("useBlinkPlanner deprecated") - } else { - useBlinkPlanner.setAccessible(true) - useBlinkPlanner.invoke(builder) - logInfo("blinkPlanner will be use.") - } - case PlannerType.old => - val useOldPlanner = Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null) - if (useOldPlanner == null) { - logWarn("useOldPlanner deprecated") - } else { - useOldPlanner.setAccessible(true) - useOldPlanner.invoke(builder) - logInfo("useOldPlanner will be use.") - } - case PlannerType.any => - val useAnyPlanner = Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null) - if (useAnyPlanner == null) { - logWarn("useAnyPlanner deprecated") - } else { - logInfo("useAnyPlanner will be use.") - useAnyPlanner.setAccessible(true) - useAnyPlanner.invoke(builder) - } - } - - val mode = Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode) - mode match { - case TableMode.batch => - logInfo(s"components should work in $tableMode mode") - builder.inBatchMode() - case TableMode.streaming => - logInfo(s"components should work in $tableMode mode") - builder.inStreamingMode() - } - - val buildWith = - (parameter.get(KEY_FLINK_TABLE_CATALOG), parameter.get(KEY_FLINK_TABLE_DATABASE)) - buildWith match { - case (x: String, y: String) if x != null && y != null => - logInfo(s"with built in catalog: $x") - logInfo(s"with built in database: $y") - builder.withBuiltInCatalogName(x) - builder.withBuiltInDatabaseName(y) - case (x: String, _) if x != null => - logInfo(s"with built in catalog: $x") - builder.withBuiltInCatalogName(x) - case (_, y: String) if y != null => - logInfo(s"with built in database: $y") - builder.withBuiltInDatabaseName(y) - case _ => - } - val setting = builder.build() - tableMode match { - case TableMode.batch => localTableEnv = TableEnvironment.create(setting) - case TableMode.streaming => - initEnvironment() - if (streamEnvConfFunc != null) { - streamEnvConfFunc(streamEnvironment, parameter) - } - if (javaStreamEnvConfFunc != null) { - javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, parameter) - } - localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, setting) - } - val appName = parameter.getAppName() - if (appName != null) { - tableMode match { - case TableMode.batch => - localTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName) - case TableMode.streaming => - localStreamTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName) - } - } - - apiType match { - case ApiType.java => - if (javaTableEnvConfFunc != null) { - tableMode match { - case TableMode.batch => - javaTableEnvConfFunc.configuration(localTableEnv.getConfig, parameter) - case TableMode.streaming => - javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter) - } - } - case ApiType.scala => - if (tableConfFunc != null) { - tableMode match { - case TableMode.batch => tableConfFunc(localTableEnv.getConfig, parameter) - case TableMode.streaming => tableConfFunc(localStreamTableEnv.getConfig, parameter) - } - } - } - } - }
