This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new 18915cc35 Flink job batchMode bug fixed.
18915cc35 is described below
commit 18915cc3594a9c208fec8e73358a43989709891a
Author: benjobs <[email protected]>
AuthorDate: Fri Aug 25 21:30:20 2023 +0800
Flink job batchMode bug fixed.
---
.../streampark/flink/core/EnhancerImplicit.scala | 28 ++-
.../flink/core/FlinkStreamingInitializer.scala | 70 ++----
.../flink/core/FlinkTableInitializer.scala | 270 ++++++++-------------
3 files changed, 150 insertions(+), 218 deletions(-)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
index e8b00e353..528e47907 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -21,6 +21,9 @@ import
org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, KEY_FLINK_AP
import org.apache.streampark.common.util.DeflaterUtils
import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import scala.util.Try
@@ -28,7 +31,7 @@ object EnhancerImplicit {
implicit class EnhanceParameterTool(parameterTool: ParameterTool) {
- def getAppName(name: String = null, required: Boolean = false): String = {
+ private[flink] def getAppName(name: String = null, required: Boolean =
false): String = {
val appName = name match {
case null =>
Try(DeflaterUtils.unzipString(parameterTool.get(KEY_APP_NAME(),
null)))
@@ -43,4 +46,27 @@ object EnhancerImplicit {
}
+ implicit class EnhanceTableEnvironment(env: TableEnvironment) {
+
+ private[flink] def setAppName(implicit parameter: ParameterTool):
TableEnvironment = {
+ val appName = parameter.getAppName()
+ if (appName != null) {
+ env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
+ }
+ env
+ }
+
+ }
+
+ implicit class EnhanceStreamExecutionEnvironment(env:
StreamTableEnvironment) {
+
+ private[flink] def setAppName(implicit parameter: ParameterTool):
StreamTableEnvironment = {
+ val appName = parameter.getAppName()
+ if (appName != null) {
+ env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
+ }
+ env
+ }
+ }
+
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 7de68502f..b3ee24c33 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -34,33 +34,17 @@ import java.io.File
private[flink] object FlinkStreamingInitializer {
- private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
def initialize(args: Array[String], config: (StreamExecutionEnvironment,
ParameterTool) => Unit)
: (ParameterTool, StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
- flinkInitializer.streamEnvConfFunc = config
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.configuration.parameter,
flinkInitializer.streamEnvironment)
+ val flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
+ flinkInitializer.streamEnvConfFunc = config
+ (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv)
}
def initialize(args: StreamEnvConfig): (ParameterTool,
StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args.args,
ApiType.java)
- flinkInitializer.javaStreamEnvConfFunc = args.conf
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.configuration.parameter,
flinkInitializer.streamEnvironment)
+ val flinkInitializer = new FlinkStreamingInitializer(args.args,
ApiType.java)
+ flinkInitializer.javaStreamEnvConfFunc = args.conf
+ (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv)
}
}
@@ -75,7 +59,22 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
var javaTableEnvConfFunc: TableEnvConfigFunction = _
- private[this] var localStreamEnv: StreamExecutionEnvironment = _
+ implicit private[flink] val parameter: ParameterTool =
configuration.parameter
+
+ lazy val streamEnv: StreamExecutionEnvironment = {
+ val env = new StreamExecutionEnvironment(
+ JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
+
+ apiType match {
+ case ApiType.java if javaStreamEnvConfFunc != null =>
+ javaStreamEnvConfFunc.configuration(env.getJavaEnv,
configuration.parameter)
+ case ApiType.scala if streamEnvConfFunc != null =>
+ streamEnvConfFunc(env, configuration.parameter)
+ case _ =>
+ }
+ env.getConfig.setGlobalJobParameters(configuration.parameter)
+ env
+ }
lazy val configuration: FlinkConfiguration = initParameter()
@@ -147,29 +146,4 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
map
}
- def streamEnvironment: StreamExecutionEnvironment = {
- if (localStreamEnv == null) {
- this.synchronized {
- if (localStreamEnv == null) {
- initEnvironment()
- }
- }
- }
- localStreamEnv
- }
-
- def initEnvironment(): Unit = {
- localStreamEnv = new StreamExecutionEnvironment(
- JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
-
- apiType match {
- case ApiType.java if javaStreamEnvConfFunc != null =>
- javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv,
configuration.parameter)
- case ApiType.scala if streamEnvConfFunc != null =>
- streamEnvConfFunc(localStreamEnv, configuration.parameter)
- case _ =>
- }
- localStreamEnv.getConfig.setGlobalJobParameters(configuration.parameter)
- }
-
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 31f0e4233..a7d52749d 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,15 +17,14 @@
package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
+import org.apache.streampark.common.enums.{ApiType, PlannerType}
import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.TableMode.TableMode
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.streampark.flink.core.EnhancerImplicit._
import org.apache.streampark.flink.core.conf.FlinkConfiguration
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, PipelineOptions}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig,
TableEnvironment}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
@@ -38,34 +37,18 @@ import scala.util.{Failure, Success, Try}
private[flink] object FlinkTableInitializer {
- private[this] var flinkInitializer: FlinkTableInitializer = _
-
def initialize(
args: Array[String],
config: (TableConfig, ParameterTool) => Unit): (ParameterTool,
TableEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
- flinkInitializer.tableConfFunc = config
- flinkInitializer.initEnvironment(TableMode.batch)
- }
- }
- }
- (flinkInitializer.configuration.parameter,
flinkInitializer.tableEnvironment)
+ val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
+ flinkInitializer.tableConfFunc = config
+ (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv)
}
def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
- flinkInitializer.javaTableEnvConfFunc = args.conf
- flinkInitializer.initEnvironment(TableMode.batch)
- }
- }
- }
- (flinkInitializer.configuration.parameter,
flinkInitializer.tableEnvironment)
+ val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
+ flinkInitializer.javaTableEnvConfFunc = args.conf
+ (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv)
}
def initialize(
@@ -73,38 +56,25 @@ private[flink] object FlinkTableInitializer {
configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
configTable: (TableConfig, ParameterTool) => Unit)
: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
- flinkInitializer.streamEnvConfFunc = configStream
- flinkInitializer.tableConfFunc = configTable
- flinkInitializer.initEnvironment(TableMode.streaming)
- }
- }
- }
+
+ val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
+ flinkInitializer.streamEnvConfFunc = configStream
+ flinkInitializer.tableConfFunc = configTable
(
flinkInitializer.configuration.parameter,
- flinkInitializer.streamEnvironment,
- flinkInitializer.streamTableEnvironment)
+ flinkInitializer.streamEnv,
+ flinkInitializer.streamTableEnv)
}
def initialize(args: StreamTableEnvConfig)
: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
- flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
- flinkInitializer.javaTableEnvConfFunc = args.tableConfig
- flinkInitializer.initEnvironment(TableMode.streaming)
- }
- }
- }
+ val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
+ flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
+ flinkInitializer.javaTableEnvConfFunc = args.tableConfig
(
flinkInitializer.configuration.parameter,
- flinkInitializer.streamEnvironment,
- flinkInitializer.streamTableEnvironment)
+ flinkInitializer.streamEnv,
+ flinkInitializer.streamTableEnv)
}
}
@@ -112,30 +82,95 @@ private[flink] object FlinkTableInitializer {
private[flink] class FlinkTableInitializer(args: Array[String], apiType:
ApiType)
extends FlinkStreamingInitializer(args, apiType) {
- private[this] var localStreamTableEnv: StreamTableEnvironment = _
+ private[this] lazy val envSettings = {
- private[this] var localTableEnv: TableEnvironment = _
+ val builder = EnvironmentSettings.newInstance()
- def streamTableEnvironment: StreamTableEnvironment = {
- if (localStreamTableEnv == null) {
- this.synchronized {
- if (localStreamTableEnv == null) {
- initEnvironment(TableMode.streaming)
+ Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
+ .getOrElse(PlannerType.blink) match {
+ case PlannerType.blink =>
+ val useBlinkPlanner =
+
Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
+ if (useBlinkPlanner == null) {
+ logWarn("useBlinkPlanner deprecated")
+ } else {
+ useBlinkPlanner.setAccessible(true)
+ useBlinkPlanner.invoke(builder)
+ logInfo("blinkPlanner will be use.")
}
- }
+ case PlannerType.old =>
+ val useOldPlanner =
Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
+ if (useOldPlanner == null) {
+ logWarn("useOldPlanner deprecated")
+ } else {
+ useOldPlanner.setAccessible(true)
+ useOldPlanner.invoke(builder)
+ logInfo("useOldPlanner will be use.")
+ }
+ case PlannerType.any =>
+ val useAnyPlanner =
Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
+ if (useAnyPlanner == null) {
+ logWarn("useAnyPlanner deprecated")
+ } else {
+ logInfo("useAnyPlanner will be use.")
+ useAnyPlanner.setAccessible(true)
+ useAnyPlanner.invoke(builder)
+ }
+ }
+
+ val buildWith =
+ (parameter.get(KEY_FLINK_TABLE_CATALOG),
parameter.get(KEY_FLINK_TABLE_DATABASE))
+ buildWith match {
+ case (x: String, y: String) if x != null && y != null =>
+ logInfo(s"with built in catalog: $x")
+ logInfo(s"with built in database: $y")
+ builder.withBuiltInCatalogName(x)
+ builder.withBuiltInDatabaseName(y)
+ case (x: String, _) if x != null =>
+ logInfo(s"with built in catalog: $x")
+ builder.withBuiltInCatalogName(x)
+ case (_, y: String) if y != null =>
+ logInfo(s"with built in database: $y")
+ builder.withBuiltInDatabaseName(y)
+ case _ =>
}
- localStreamTableEnv
+ builder
}
- def tableEnvironment: TableEnvironment = {
- if (localTableEnv == null) {
- this.synchronized {
- if (localTableEnv == null) {
- initEnvironment(TableMode.batch)
- }
- }
+ lazy val tableEnv: TableEnvironment = {
+ logInfo(s"job working in batch mode")
+ envSettings.inBatchMode()
+ val tableEnv = TableEnvironment.create(envSettings.build()).setAppName
+ apiType match {
+ case ApiType.java if javaTableEnvConfFunc != null =>
+ javaTableEnvConfFunc.configuration(tableEnv.getConfig, parameter)
+ case ApiType.scala if tableConfFunc != null =>
+ tableConfFunc(tableEnv.getConfig, parameter)
+ case _ =>
}
- localTableEnv
+ tableEnv
+ }
+
+ lazy val streamTableEnv: StreamTableEnvironment = {
+ logInfo(s"components should work in streaming mode")
+ envSettings.inStreamingMode()
+ val setting = envSettings.build()
+
+ if (streamEnvConfFunc != null) {
+ streamEnvConfFunc(streamEnv, parameter)
+ }
+ if (javaStreamEnvConfFunc != null) {
+ javaStreamEnvConfFunc.configuration(streamEnv.getJavaEnv, parameter)
+ }
+ val streamTableEnv = StreamTableEnvironment.create(streamEnv,
setting).setAppName
+ apiType match {
+ case ApiType.java if javaTableEnvConfFunc != null =>
+ javaTableEnvConfFunc.configuration(streamTableEnv.getConfig, parameter)
+ case ApiType.scala if tableConfFunc != null =>
+ tableConfFunc(streamTableEnv.getConfig, parameter)
+ case _ =>
+ }
+ streamTableEnv
}
/** In case of table SQL, the parameter conf is not required, it depends on
the developer. */
@@ -201,107 +236,4 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
}
}
- def initEnvironment(tableMode: TableMode): Unit = {
- val builder = EnvironmentSettings.newInstance()
- val parameter = configuration.parameter
- Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
- .getOrElse(PlannerType.blink) match {
- case PlannerType.blink =>
- val useBlinkPlanner =
-
Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
- if (useBlinkPlanner == null) {
- logWarn("useBlinkPlanner deprecated")
- } else {
- useBlinkPlanner.setAccessible(true)
- useBlinkPlanner.invoke(builder)
- logInfo("blinkPlanner will be use.")
- }
- case PlannerType.old =>
- val useOldPlanner =
Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
- if (useOldPlanner == null) {
- logWarn("useOldPlanner deprecated")
- } else {
- useOldPlanner.setAccessible(true)
- useOldPlanner.invoke(builder)
- logInfo("useOldPlanner will be use.")
- }
- case PlannerType.any =>
- val useAnyPlanner =
Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
- if (useAnyPlanner == null) {
- logWarn("useAnyPlanner deprecated")
- } else {
- logInfo("useAnyPlanner will be use.")
- useAnyPlanner.setAccessible(true)
- useAnyPlanner.invoke(builder)
- }
- }
-
- tableMode match {
- case TableMode.batch =>
- logInfo(s"components should work in $tableMode mode")
- builder.inBatchMode()
- case TableMode.streaming =>
- logInfo(s"components should work in $tableMode mode")
- builder.inStreamingMode()
- }
-
- val buildWith =
- (parameter.get(KEY_FLINK_TABLE_CATALOG),
parameter.get(KEY_FLINK_TABLE_DATABASE))
- buildWith match {
- case (x: String, y: String) if x != null && y != null =>
- logInfo(s"with built in catalog: $x")
- logInfo(s"with built in database: $y")
- builder.withBuiltInCatalogName(x)
- builder.withBuiltInDatabaseName(y)
- case (x: String, _) if x != null =>
- logInfo(s"with built in catalog: $x")
- builder.withBuiltInCatalogName(x)
- case (_, y: String) if y != null =>
- logInfo(s"with built in database: $y")
- builder.withBuiltInDatabaseName(y)
- case _ =>
- }
- val setting = builder.build()
- tableMode match {
- case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
- case TableMode.streaming =>
- initEnvironment()
- if (streamEnvConfFunc != null) {
- streamEnvConfFunc(streamEnvironment, parameter)
- }
- if (javaStreamEnvConfFunc != null) {
- javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
parameter)
- }
- localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment,
setting)
- }
- val appName = parameter.getAppName()
- if (appName != null) {
- tableMode match {
- case TableMode.batch =>
-
localTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME,
appName)
- case TableMode.streaming =>
-
localStreamTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME,
appName)
- }
- }
-
- apiType match {
- case ApiType.java =>
- if (javaTableEnvConfFunc != null) {
- tableMode match {
- case TableMode.batch =>
- javaTableEnvConfFunc.configuration(localTableEnv.getConfig,
parameter)
- case TableMode.streaming =>
-
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
- }
- }
- case ApiType.scala =>
- if (tableConfFunc != null) {
- tableMode match {
- case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
parameter)
- case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, parameter)
- }
- }
- }
- }
-
}