This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 41cdbc365 [ISSUE-2976][Bug] Flink job batchMode bug fixed (#2978)
41cdbc365 is described below
commit 41cdbc36597020b8546433e40957ac577775ec9e
Author: benjobs <[email protected]>
AuthorDate: Sat Aug 26 05:38:38 2023 -0500
[ISSUE-2976][Bug] Flink job batchMode bug fixed (#2978)
* [ISSUE-2976][Improve] Flink job env initializer improvement
* [ISSUE-2976][Bug] Flink job batchMode bug fixed.
---------
Co-authored-by: benjobs <[email protected]>
---
.../flink/client/trait/FlinkClientTrait.scala | 9 +
.../streampark/flink/core/EnhancerImplicit.scala | 28 ++-
.../flink/core/FlinkStreamingInitializer.scala | 70 ++----
.../flink/core/FlinkTableInitializer.scala | 271 ++++++++-------------
.../streampark/flink/core/TableContext.scala | 2 +-
.../streampark/flink/core/TableContext.scala | 2 +-
.../streampark/flink/core/TableContext.scala | 2 +-
.../streampark/flink/core/TableContext.scala | 2 +-
.../apache/streampark/flink/cli/SqlClient.scala | 40 +--
9 files changed, 189 insertions(+), 237 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index b75bc2b7a..6ff949bd2 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -471,6 +471,15 @@ trait FlinkClientTrait extends Logger {
}
}
+
+ // execution.runtime-mode
+ if (submitRequest.properties.nonEmpty) {
+ if
(submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) {
+ programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
+ programArgs +=
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
+ }
+ }
+
programArgs.toList.asJava
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
index e8b00e353..528e47907 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -21,6 +21,9 @@ import
org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, KEY_FLINK_AP
import org.apache.streampark.common.util.DeflaterUtils
import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import scala.util.Try
@@ -28,7 +31,7 @@ object EnhancerImplicit {
implicit class EnhanceParameterTool(parameterTool: ParameterTool) {
- def getAppName(name: String = null, required: Boolean = false): String = {
+ private[flink] def getAppName(name: String = null, required: Boolean =
false): String = {
val appName = name match {
case null =>
Try(DeflaterUtils.unzipString(parameterTool.get(KEY_APP_NAME(),
null)))
@@ -43,4 +46,27 @@ object EnhancerImplicit {
}
+ implicit class EnhanceTableEnvironment(env: TableEnvironment) {
+
+ private[flink] def setAppName(implicit parameter: ParameterTool):
TableEnvironment = {
+ val appName = parameter.getAppName()
+ if (appName != null) {
+ env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
+ }
+ env
+ }
+
+ }
+
+ implicit class EnhanceStreamExecutionEnvironment(env:
StreamTableEnvironment) {
+
+ private[flink] def setAppName(implicit parameter: ParameterTool):
StreamTableEnvironment = {
+ val appName = parameter.getAppName()
+ if (appName != null) {
+ env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
+ }
+ env
+ }
+ }
+
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 7de68502f..b3ee24c33 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -34,33 +34,17 @@ import java.io.File
private[flink] object FlinkStreamingInitializer {
- private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
def initialize(args: Array[String], config: (StreamExecutionEnvironment,
ParameterTool) => Unit)
: (ParameterTool, StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
- flinkInitializer.streamEnvConfFunc = config
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.configuration.parameter,
flinkInitializer.streamEnvironment)
+ val flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
+ flinkInitializer.streamEnvConfFunc = config
+ (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv)
}
def initialize(args: StreamEnvConfig): (ParameterTool,
StreamExecutionEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkStreamingInitializer(args.args,
ApiType.java)
- flinkInitializer.javaStreamEnvConfFunc = args.conf
- flinkInitializer.initEnvironment()
- }
- }
- }
- (flinkInitializer.configuration.parameter,
flinkInitializer.streamEnvironment)
+ val flinkInitializer = new FlinkStreamingInitializer(args.args,
ApiType.java)
+ flinkInitializer.javaStreamEnvConfFunc = args.conf
+ (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv)
}
}
@@ -75,7 +59,22 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
var javaTableEnvConfFunc: TableEnvConfigFunction = _
- private[this] var localStreamEnv: StreamExecutionEnvironment = _
+ implicit private[flink] val parameter: ParameterTool =
configuration.parameter
+
+ lazy val streamEnv: StreamExecutionEnvironment = {
+ val env = new StreamExecutionEnvironment(
+ JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
+
+ apiType match {
+ case ApiType.java if javaStreamEnvConfFunc != null =>
+ javaStreamEnvConfFunc.configuration(env.getJavaEnv,
configuration.parameter)
+ case ApiType.scala if streamEnvConfFunc != null =>
+ streamEnvConfFunc(env, configuration.parameter)
+ case _ =>
+ }
+ env.getConfig.setGlobalJobParameters(configuration.parameter)
+ env
+ }
lazy val configuration: FlinkConfiguration = initParameter()
@@ -147,29 +146,4 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
map
}
- def streamEnvironment: StreamExecutionEnvironment = {
- if (localStreamEnv == null) {
- this.synchronized {
- if (localStreamEnv == null) {
- initEnvironment()
- }
- }
- }
- localStreamEnv
- }
-
- def initEnvironment(): Unit = {
- localStreamEnv = new StreamExecutionEnvironment(
- JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
-
- apiType match {
- case ApiType.java if javaStreamEnvConfFunc != null =>
- javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv,
configuration.parameter)
- case ApiType.scala if streamEnvConfFunc != null =>
- streamEnvConfFunc(localStreamEnv, configuration.parameter)
- case _ =>
- }
- localStreamEnv.getConfig.setGlobalJobParameters(configuration.parameter)
- }
-
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 2a8cda83f..a401b437e 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,15 +17,14 @@
package org.apache.streampark.flink.core
import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
+import org.apache.streampark.common.enums.{ApiType, PlannerType}
import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.TableMode.TableMode
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.streampark.flink.core.EnhancerImplicit._
import org.apache.streampark.flink.core.conf.FlinkConfiguration
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, PipelineOptions}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, TableConfig,
TableEnvironment}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
@@ -38,34 +37,18 @@ import scala.util.{Failure, Success, Try}
private[flink] object FlinkTableInitializer {
- private[this] var flinkInitializer: FlinkTableInitializer = _
-
def initialize(
args: Array[String],
config: (TableConfig, ParameterTool) => Unit): (ParameterTool,
TableEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
- flinkInitializer.tableConfFunc = config
- flinkInitializer.initEnvironment(TableMode.batch)
- }
- }
- }
- (flinkInitializer.configuration.parameter,
flinkInitializer.tableEnvironment)
+ val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
+ flinkInitializer.tableConfFunc = config
+ (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv)
}
def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
- flinkInitializer.javaTableEnvConfFunc = args.conf
- flinkInitializer.initEnvironment(TableMode.batch)
- }
- }
- }
- (flinkInitializer.configuration.parameter,
flinkInitializer.tableEnvironment)
+ val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
+ flinkInitializer.javaTableEnvConfFunc = args.conf
+ (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv)
}
def initialize(
@@ -73,38 +56,25 @@ private[flink] object FlinkTableInitializer {
configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
configTable: (TableConfig, ParameterTool) => Unit)
: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
- flinkInitializer.streamEnvConfFunc = configStream
- flinkInitializer.tableConfFunc = configTable
- flinkInitializer.initEnvironment(TableMode.streaming)
- }
- }
- }
+
+ val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
+ flinkInitializer.streamEnvConfFunc = configStream
+ flinkInitializer.tableConfFunc = configTable
(
flinkInitializer.configuration.parameter,
- flinkInitializer.streamEnvironment,
- flinkInitializer.streamTableEnvironment)
+ flinkInitializer.streamEnv,
+ flinkInitializer.streamTableEnv)
}
def initialize(args: StreamTableEnvConfig)
: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
- if (flinkInitializer == null) {
- this.synchronized {
- if (flinkInitializer == null) {
- flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
- flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
- flinkInitializer.javaTableEnvConfFunc = args.tableConfig
- flinkInitializer.initEnvironment(TableMode.streaming)
- }
- }
- }
+ val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
+ flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
+ flinkInitializer.javaTableEnvConfFunc = args.tableConfig
(
flinkInitializer.configuration.parameter,
- flinkInitializer.streamEnvironment,
- flinkInitializer.streamTableEnvironment)
+ flinkInitializer.streamEnv,
+ flinkInitializer.streamTableEnv)
}
}
@@ -112,30 +82,95 @@ private[flink] object FlinkTableInitializer {
private[flink] class FlinkTableInitializer(args: Array[String], apiType:
ApiType)
extends FlinkStreamingInitializer(args, apiType) {
- private[this] var localStreamTableEnv: StreamTableEnvironment = _
+ private[this] lazy val envSettings = {
- private[this] var localTableEnv: TableEnvironment = _
+ val builder = EnvironmentSettings.newInstance()
- def streamTableEnvironment: StreamTableEnvironment = {
- if (localStreamTableEnv == null) {
- this.synchronized {
- if (localStreamTableEnv == null) {
- initEnvironment(TableMode.streaming)
+ Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
+ .getOrElse(PlannerType.blink) match {
+ case PlannerType.blink =>
+ val useBlinkPlanner =
+
Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
+ if (useBlinkPlanner == null) {
+ logWarn("useBlinkPlanner deprecated")
+ } else {
+ useBlinkPlanner.setAccessible(true)
+ useBlinkPlanner.invoke(builder)
+ logInfo("blinkPlanner will be use.")
}
- }
+ case PlannerType.old =>
+ val useOldPlanner =
Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
+ if (useOldPlanner == null) {
+ logWarn("useOldPlanner deprecated")
+ } else {
+ useOldPlanner.setAccessible(true)
+ useOldPlanner.invoke(builder)
+ logInfo("useOldPlanner will be use.")
+ }
+ case PlannerType.any =>
+ val useAnyPlanner =
Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
+ if (useAnyPlanner == null) {
+ logWarn("useAnyPlanner deprecated")
+ } else {
+ logInfo("useAnyPlanner will be use.")
+ useAnyPlanner.setAccessible(true)
+ useAnyPlanner.invoke(builder)
+ }
+ }
+
+ val buildWith =
+ (parameter.get(KEY_FLINK_TABLE_CATALOG),
parameter.get(KEY_FLINK_TABLE_DATABASE))
+ buildWith match {
+ case (x: String, y: String) if x != null && y != null =>
+ logInfo(s"with built in catalog: $x")
+ logInfo(s"with built in database: $y")
+ builder.withBuiltInCatalogName(x)
+ builder.withBuiltInDatabaseName(y)
+ case (x: String, _) if x != null =>
+ logInfo(s"with built in catalog: $x")
+ builder.withBuiltInCatalogName(x)
+ case (_, y: String) if y != null =>
+ logInfo(s"with built in database: $y")
+ builder.withBuiltInDatabaseName(y)
+ case _ =>
}
- localStreamTableEnv
+ builder
}
- def tableEnvironment: TableEnvironment = {
- if (localTableEnv == null) {
- this.synchronized {
- if (localTableEnv == null) {
- initEnvironment(TableMode.batch)
- }
- }
+ lazy val tableEnv: TableEnvironment = {
+ logInfo(s"job working in batch mode")
+ envSettings.inBatchMode()
+ val tableEnv = TableEnvironment.create(envSettings.build()).setAppName
+ apiType match {
+ case ApiType.java if javaTableEnvConfFunc != null =>
+ javaTableEnvConfFunc.configuration(tableEnv.getConfig, parameter)
+ case ApiType.scala if tableConfFunc != null =>
+ tableConfFunc(tableEnv.getConfig, parameter)
+ case _ =>
}
- localTableEnv
+ tableEnv
+ }
+
+ lazy val streamTableEnv: StreamTableEnvironment = {
+ logInfo(s"components should work in streaming mode")
+ envSettings.inStreamingMode()
+ val setting = envSettings.build()
+
+ if (streamEnvConfFunc != null) {
+ streamEnvConfFunc(streamEnv, parameter)
+ }
+ if (javaStreamEnvConfFunc != null) {
+ javaStreamEnvConfFunc.configuration(streamEnv.getJavaEnv, parameter)
+ }
+ val streamTableEnv = StreamTableEnvironment.create(streamEnv,
setting).setAppName
+ apiType match {
+ case ApiType.java if javaTableEnvConfFunc != null =>
+ javaTableEnvConfFunc.configuration(streamTableEnv.getConfig, parameter)
+ case ApiType.scala if tableConfFunc != null =>
+ tableConfFunc(streamTableEnv.getConfig, parameter)
+ case _ =>
+ }
+ streamTableEnv
}
/** In case of table SQL, the parameter conf is not required, it depends on
the developer. */
@@ -201,108 +236,4 @@ private[flink] class FlinkTableInitializer(args:
Array[String], apiType: ApiType
}
}
- def initEnvironment(tableMode: TableMode): Unit = {
- val builder = EnvironmentSettings.newInstance()
- val parameter = configuration.parameter
- Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
- .getOrElse(PlannerType.blink) match {
- case PlannerType.blink =>
- val useBlinkPlanner =
-
Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
- if (useBlinkPlanner == null) {
- logWarn("useBlinkPlanner deprecated")
- } else {
- useBlinkPlanner.setAccessible(true)
- useBlinkPlanner.invoke(builder)
- logInfo("blinkPlanner will be use.")
- }
- case PlannerType.old =>
- val useOldPlanner =
Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
- if (useOldPlanner == null) {
- logWarn("useOldPlanner deprecated")
- } else {
- useOldPlanner.setAccessible(true)
- useOldPlanner.invoke(builder)
- logInfo("useOldPlanner will be use.")
- }
- case PlannerType.any =>
- val useAnyPlanner =
Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
- if (useAnyPlanner == null) {
- logWarn("useAnyPlanner deprecated")
- } else {
- logInfo("useAnyPlanner will be use.")
- useAnyPlanner.setAccessible(true)
- useAnyPlanner.invoke(builder)
- }
- }
-
- val mode =
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
- mode match {
- case TableMode.batch =>
- logInfo(s"components should work in $tableMode mode")
- builder.inBatchMode()
- case TableMode.streaming =>
- logInfo(s"components should work in $tableMode mode")
- builder.inStreamingMode()
- }
-
- val buildWith =
- (parameter.get(KEY_FLINK_TABLE_CATALOG),
parameter.get(KEY_FLINK_TABLE_DATABASE))
- buildWith match {
- case (x: String, y: String) if x != null && y != null =>
- logInfo(s"with built in catalog: $x")
- logInfo(s"with built in database: $y")
- builder.withBuiltInCatalogName(x)
- builder.withBuiltInDatabaseName(y)
- case (x: String, _) if x != null =>
- logInfo(s"with built in catalog: $x")
- builder.withBuiltInCatalogName(x)
- case (_, y: String) if y != null =>
- logInfo(s"with built in database: $y")
- builder.withBuiltInDatabaseName(y)
- case _ =>
- }
- val setting = builder.build()
- tableMode match {
- case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
- case TableMode.streaming =>
- initEnvironment()
- if (streamEnvConfFunc != null) {
- streamEnvConfFunc(streamEnvironment, parameter)
- }
- if (javaStreamEnvConfFunc != null) {
- javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv,
parameter)
- }
- localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment,
setting)
- }
- val appName = parameter.getAppName()
- if (appName != null) {
- tableMode match {
- case TableMode.batch =>
-
localTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME,
appName)
- case TableMode.streaming =>
-
localStreamTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME,
appName)
- }
- }
-
- apiType match {
- case ApiType.java =>
- if (javaTableEnvConfFunc != null) {
- tableMode match {
- case TableMode.batch =>
- javaTableEnvConfFunc.configuration(localTableEnv.getConfig,
parameter)
- case TableMode.streaming =>
-
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
- }
- }
- case ApiType.scala =>
- if (tableConfFunc != null) {
- tableMode match {
- case TableMode.batch => tableConfFunc(localTableEnv.getConfig,
parameter)
- case TableMode.streaming =>
tableConfFunc(localStreamTableEnv.getConfig, parameter)
- }
- }
- }
- }
-
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index c512a24ea..ae4edadad 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -47,7 +47,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
override def execute(jobName: String): JobExecutionResult = {
printLogo(s"FlinkTable $jobName Starting...")
- tableEnv.execute(jobName)
+ null
}
@deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index e9c950815..f23e4c9c5 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -52,7 +52,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
override def execute(jobName: String): JobExecutionResult = {
printLogo(s"FlinkTable $jobName Starting...")
- tableEnv.execute(jobName)
+ null
}
@deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 7e4daf539..abbe13822 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -65,7 +65,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
@deprecated override def execute(jobName: String): JobExecutionResult = {
printLogo(s"FlinkTable $jobName Starting...")
- tableEnv.execute(jobName)
+ null
}
@deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 471ad45aa..60fa14aaf 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.core
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.table.api.{CompiledPlan, PlanReference, Table,
TableDescriptor, TableEnvironment, TableResult}
+import org.apache.flink.table.api.{CompiledPlan, PlanReference, Table,
TableDescriptor, TableEnvironment}
import org.apache.flink.table.module.ModuleEntry
class TableContext(override val parameter: ParameterTool, private val
tableEnv: TableEnvironment)
diff --git
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 2edf338c1..76239c6cb 100644
---
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -26,30 +26,42 @@ import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.configuration.ExecutionOptions
import scala.language.implicitConversions
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
object SqlClient extends App {
- val parameterTool = ParameterTool.fromArgs(args)
+ private[this] val parameterTool = ParameterTool.fromArgs(args)
- val flinkSql = {
+ private[this] val flinkSql = {
val sql = parameterTool.get(KEY_FLINK_SQL())
require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be
null")
- Try(DeflaterUtils.unzipString(sql)).getOrElse(
- throw new IllegalArgumentException("Usage: flink sql is invalid or null,
please check"))
+ Try(DeflaterUtils.unzipString(sql)) match {
+ case Success(value) => value
+ case Failure(_) =>
+ throw new IllegalArgumentException("Usage: flink sql is invalid or
null, please check")
+ }
}
- val sets = SqlCommandParser.parseSQL(flinkSql).filter(_.command ==
SqlCommand.SET)
+ private[this] val sets =
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
- val mode = sets.find(_.operands.head == ExecutionOptions.RUNTIME_MODE.key())
match {
- case Some(e) => e.operands(1)
+ private[this] val defaultMode = "streaming"
+
+ private[this] val mode = sets.find(_.operands.head ==
ExecutionOptions.RUNTIME_MODE.key()) match {
+ case Some(e) =>
+ // 1) flink sql execution.runtime-mode has highest priority
+ e.operands(1)
case None =>
- val appConf = parameterTool.get(KEY_APP_CONF(), null)
- val defaultMode = "streaming"
- if (appConf == null) defaultMode
- else {
- val parameter =
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(appConf.drop(7)))
- parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+ // 2) dynamic properties execution.runtime-mode
+ parameterTool.get(ExecutionOptions.RUNTIME_MODE.key(), null) match {
+ case null =>
+ parameterTool.get(KEY_APP_CONF(), null) match {
+ case null => defaultMode
+ case f =>
+ val parameter =
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(f.drop(7)))
+ // 3) application conf execution.runtime-mode
+ parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+ }
+ case m => m
}
}