This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 41cdbc365 [ISSUE-2976][Bug] Flink job batchMode bug fixed (#2978)
41cdbc365 is described below

commit 41cdbc36597020b8546433e40957ac577775ec9e
Author: benjobs <[email protected]>
AuthorDate: Sat Aug 26 05:38:38 2023 -0500

    [ISSUE-2976][Bug] Flink job batchMode bug fixed (#2978)
    
    * [ISSUE-2976][Improve] Flink job env initializer improvement
    
    * [ISSUE-2976][Bug] Flink job batchMode bug fixed.
    
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../flink/client/trait/FlinkClientTrait.scala      |   9 +
 .../streampark/flink/core/EnhancerImplicit.scala   |  28 ++-
 .../flink/core/FlinkStreamingInitializer.scala     |  70 ++----
 .../flink/core/FlinkTableInitializer.scala         | 271 ++++++++-------------
 .../streampark/flink/core/TableContext.scala       |   2 +-
 .../streampark/flink/core/TableContext.scala       |   2 +-
 .../streampark/flink/core/TableContext.scala       |   2 +-
 .../streampark/flink/core/TableContext.scala       |   2 +-
 .../apache/streampark/flink/cli/SqlClient.scala    |  40 +--
 9 files changed, 189 insertions(+), 237 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index b75bc2b7a..6ff949bd2 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -471,6 +471,15 @@ trait FlinkClientTrait extends Logger {
       }
 
     }
+
+    // execution.runtime-mode
+    if (submitRequest.properties.nonEmpty) {
+      if 
(submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) {
+        programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
+        programArgs += 
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
+      }
+    }
+
     programArgs.toList.asJava
   }
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
index e8b00e353..528e47907 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -21,6 +21,9 @@ import 
org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, KEY_FLINK_AP
 import org.apache.streampark.common.util.DeflaterUtils
 
 import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.PipelineOptions
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
 
 import scala.util.Try
 
@@ -28,7 +31,7 @@ object EnhancerImplicit {
 
   implicit class EnhanceParameterTool(parameterTool: ParameterTool) {
 
-    def getAppName(name: String = null, required: Boolean = false): String = {
+    private[flink] def getAppName(name: String = null, required: Boolean = 
false): String = {
       val appName = name match {
         case null =>
           Try(DeflaterUtils.unzipString(parameterTool.get(KEY_APP_NAME(), 
null)))
@@ -43,4 +46,27 @@ object EnhancerImplicit {
 
   }
 
+  implicit class EnhanceTableEnvironment(env: TableEnvironment) {
+
+    private[flink] def setAppName(implicit parameter: ParameterTool): 
TableEnvironment = {
+      val appName = parameter.getAppName()
+      if (appName != null) {
+        env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
+      }
+      env
+    }
+
+  }
+
+  implicit class EnhanceStreamExecutionEnvironment(env: 
StreamTableEnvironment) {
+
+    private[flink] def setAppName(implicit parameter: ParameterTool): 
StreamTableEnvironment = {
+      val appName = parameter.getAppName()
+      if (appName != null) {
+        env.getConfig.getConfiguration.setString(PipelineOptions.NAME, appName)
+      }
+      env
+    }
+  }
+
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 7de68502f..b3ee24c33 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -34,33 +34,17 @@ import java.io.File
 
 private[flink] object FlinkStreamingInitializer {
 
-  private[this] var flinkInitializer: FlinkStreamingInitializer = _
-
   def initialize(args: Array[String], config: (StreamExecutionEnvironment, 
ParameterTool) => Unit)
       : (ParameterTool, StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
-          flinkInitializer.streamEnvConfFunc = config
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.configuration.parameter, 
flinkInitializer.streamEnvironment)
+    val flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
+    flinkInitializer.streamEnvConfFunc = config
+    (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv)
   }
 
   def initialize(args: StreamEnvConfig): (ParameterTool, 
StreamExecutionEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkStreamingInitializer(args.args, 
ApiType.java)
-          flinkInitializer.javaStreamEnvConfFunc = args.conf
-          flinkInitializer.initEnvironment()
-        }
-      }
-    }
-    (flinkInitializer.configuration.parameter, 
flinkInitializer.streamEnvironment)
+    val flinkInitializer = new FlinkStreamingInitializer(args.args, 
ApiType.java)
+    flinkInitializer.javaStreamEnvConfFunc = args.conf
+    (flinkInitializer.configuration.parameter, flinkInitializer.streamEnv)
   }
 }
 
@@ -75,7 +59,22 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
 
   var javaTableEnvConfFunc: TableEnvConfigFunction = _
 
-  private[this] var localStreamEnv: StreamExecutionEnvironment = _
+  implicit private[flink] val parameter: ParameterTool = 
configuration.parameter
+
+  lazy val streamEnv: StreamExecutionEnvironment = {
+    val env = new StreamExecutionEnvironment(
+      JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
+
+    apiType match {
+      case ApiType.java if javaStreamEnvConfFunc != null =>
+        javaStreamEnvConfFunc.configuration(env.getJavaEnv, 
configuration.parameter)
+      case ApiType.scala if streamEnvConfFunc != null =>
+        streamEnvConfFunc(env, configuration.parameter)
+      case _ =>
+    }
+    env.getConfig.setGlobalJobParameters(configuration.parameter)
+    env
+  }
 
   lazy val configuration: FlinkConfiguration = initParameter()
 
@@ -147,29 +146,4 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
     map
   }
 
-  def streamEnvironment: StreamExecutionEnvironment = {
-    if (localStreamEnv == null) {
-      this.synchronized {
-        if (localStreamEnv == null) {
-          initEnvironment()
-        }
-      }
-    }
-    localStreamEnv
-  }
-
-  def initEnvironment(): Unit = {
-    localStreamEnv = new StreamExecutionEnvironment(
-      JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
-
-    apiType match {
-      case ApiType.java if javaStreamEnvConfFunc != null =>
-        javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, 
configuration.parameter)
-      case ApiType.scala if streamEnvConfFunc != null =>
-        streamEnvConfFunc(localStreamEnv, configuration.parameter)
-      case _ =>
-    }
-    localStreamEnv.getConfig.setGlobalJobParameters(configuration.parameter)
-  }
-
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 2a8cda83f..a401b437e 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -17,15 +17,14 @@
 package org.apache.streampark.flink.core
 
 import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
+import org.apache.streampark.common.enums.{ApiType, PlannerType}
 import org.apache.streampark.common.enums.ApiType.ApiType
-import org.apache.streampark.common.enums.TableMode.TableMode
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.streampark.flink.core.EnhancerImplicit._
 import org.apache.streampark.flink.core.conf.FlinkConfiguration
 
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{Configuration, PipelineOptions}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableEnvironment}
 import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
@@ -38,34 +37,18 @@ import scala.util.{Failure, Success, Try}
 
 private[flink] object FlinkTableInitializer {
 
-  private[this] var flinkInitializer: FlinkTableInitializer = _
-
   def initialize(
       args: Array[String],
       config: (TableConfig, ParameterTool) => Unit): (ParameterTool, 
TableEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
-          flinkInitializer.tableConfFunc = config
-          flinkInitializer.initEnvironment(TableMode.batch)
-        }
-      }
-    }
-    (flinkInitializer.configuration.parameter, 
flinkInitializer.tableEnvironment)
+    val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
+    flinkInitializer.tableConfFunc = config
+    (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv)
   }
 
   def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
-          flinkInitializer.javaTableEnvConfFunc = args.conf
-          flinkInitializer.initEnvironment(TableMode.batch)
-        }
-      }
-    }
-    (flinkInitializer.configuration.parameter, 
flinkInitializer.tableEnvironment)
+    val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
+    flinkInitializer.javaTableEnvConfFunc = args.conf
+    (flinkInitializer.configuration.parameter, flinkInitializer.tableEnv)
   }
 
   def initialize(
@@ -73,38 +56,25 @@ private[flink] object FlinkTableInitializer {
       configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
       configTable: (TableConfig, ParameterTool) => Unit)
       : (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
-          flinkInitializer.streamEnvConfFunc = configStream
-          flinkInitializer.tableConfFunc = configTable
-          flinkInitializer.initEnvironment(TableMode.streaming)
-        }
-      }
-    }
+
+    val flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
+    flinkInitializer.streamEnvConfFunc = configStream
+    flinkInitializer.tableConfFunc = configTable
     (
       flinkInitializer.configuration.parameter,
-      flinkInitializer.streamEnvironment,
-      flinkInitializer.streamTableEnvironment)
+      flinkInitializer.streamEnv,
+      flinkInitializer.streamTableEnv)
   }
 
   def initialize(args: StreamTableEnvConfig)
       : (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
-    if (flinkInitializer == null) {
-      this.synchronized {
-        if (flinkInitializer == null) {
-          flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
-          flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
-          flinkInitializer.javaTableEnvConfFunc = args.tableConfig
-          flinkInitializer.initEnvironment(TableMode.streaming)
-        }
-      }
-    }
+    val flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
+    flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
+    flinkInitializer.javaTableEnvConfFunc = args.tableConfig
     (
       flinkInitializer.configuration.parameter,
-      flinkInitializer.streamEnvironment,
-      flinkInitializer.streamTableEnvironment)
+      flinkInitializer.streamEnv,
+      flinkInitializer.streamTableEnv)
   }
 
 }
@@ -112,30 +82,95 @@ private[flink] object FlinkTableInitializer {
 private[flink] class FlinkTableInitializer(args: Array[String], apiType: 
ApiType)
   extends FlinkStreamingInitializer(args, apiType) {
 
-  private[this] var localStreamTableEnv: StreamTableEnvironment = _
+  private[this] lazy val envSettings = {
 
-  private[this] var localTableEnv: TableEnvironment = _
+    val builder = EnvironmentSettings.newInstance()
 
-  def streamTableEnvironment: StreamTableEnvironment = {
-    if (localStreamTableEnv == null) {
-      this.synchronized {
-        if (localStreamTableEnv == null) {
-          initEnvironment(TableMode.streaming)
+    Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
+      .getOrElse(PlannerType.blink) match {
+      case PlannerType.blink =>
+        val useBlinkPlanner =
+          
Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
+        if (useBlinkPlanner == null) {
+          logWarn("useBlinkPlanner deprecated")
+        } else {
+          useBlinkPlanner.setAccessible(true)
+          useBlinkPlanner.invoke(builder)
+          logInfo("blinkPlanner will be use.")
         }
-      }
+      case PlannerType.old =>
+        val useOldPlanner = 
Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
+        if (useOldPlanner == null) {
+          logWarn("useOldPlanner deprecated")
+        } else {
+          useOldPlanner.setAccessible(true)
+          useOldPlanner.invoke(builder)
+          logInfo("useOldPlanner will be use.")
+        }
+      case PlannerType.any =>
+        val useAnyPlanner = 
Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
+        if (useAnyPlanner == null) {
+          logWarn("useAnyPlanner deprecated")
+        } else {
+          logInfo("useAnyPlanner will be use.")
+          useAnyPlanner.setAccessible(true)
+          useAnyPlanner.invoke(builder)
+        }
+    }
+
+    val buildWith =
+      (parameter.get(KEY_FLINK_TABLE_CATALOG), 
parameter.get(KEY_FLINK_TABLE_DATABASE))
+    buildWith match {
+      case (x: String, y: String) if x != null && y != null =>
+        logInfo(s"with built in catalog: $x")
+        logInfo(s"with built in database: $y")
+        builder.withBuiltInCatalogName(x)
+        builder.withBuiltInDatabaseName(y)
+      case (x: String, _) if x != null =>
+        logInfo(s"with built in catalog: $x")
+        builder.withBuiltInCatalogName(x)
+      case (_, y: String) if y != null =>
+        logInfo(s"with built in database: $y")
+        builder.withBuiltInDatabaseName(y)
+      case _ =>
     }
-    localStreamTableEnv
+    builder
   }
 
-  def tableEnvironment: TableEnvironment = {
-    if (localTableEnv == null) {
-      this.synchronized {
-        if (localTableEnv == null) {
-          initEnvironment(TableMode.batch)
-        }
-      }
+  lazy val tableEnv: TableEnvironment = {
+    logInfo(s"job working in batch mode")
+    envSettings.inBatchMode()
+    val tableEnv = TableEnvironment.create(envSettings.build()).setAppName
+    apiType match {
+      case ApiType.java if javaTableEnvConfFunc != null =>
+        javaTableEnvConfFunc.configuration(tableEnv.getConfig, parameter)
+      case ApiType.scala if tableConfFunc != null =>
+        tableConfFunc(tableEnv.getConfig, parameter)
+      case _ =>
     }
-    localTableEnv
+    tableEnv
+  }
+
+  lazy val streamTableEnv: StreamTableEnvironment = {
+    logInfo(s"components should work in streaming mode")
+    envSettings.inStreamingMode()
+    val setting = envSettings.build()
+
+    if (streamEnvConfFunc != null) {
+      streamEnvConfFunc(streamEnv, parameter)
+    }
+    if (javaStreamEnvConfFunc != null) {
+      javaStreamEnvConfFunc.configuration(streamEnv.getJavaEnv, parameter)
+    }
+    val streamTableEnv = StreamTableEnvironment.create(streamEnv, 
setting).setAppName
+    apiType match {
+      case ApiType.java if javaTableEnvConfFunc != null =>
+        javaTableEnvConfFunc.configuration(streamTableEnv.getConfig, parameter)
+      case ApiType.scala if tableConfFunc != null =>
+        tableConfFunc(streamTableEnv.getConfig, parameter)
+      case _ =>
+    }
+    streamTableEnv
   }
 
   /** In case of table SQL, the parameter conf is not required, it depends on 
the developer. */
@@ -201,108 +236,4 @@ private[flink] class FlinkTableInitializer(args: 
Array[String], apiType: ApiType
     }
   }
 
-  def initEnvironment(tableMode: TableMode): Unit = {
-    val builder = EnvironmentSettings.newInstance()
-    val parameter = configuration.parameter
-    Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER)))
-      .getOrElse(PlannerType.blink) match {
-      case PlannerType.blink =>
-        val useBlinkPlanner =
-          
Try(builder.getClass.getDeclaredMethod("useBlinkPlanner")).getOrElse(null)
-        if (useBlinkPlanner == null) {
-          logWarn("useBlinkPlanner deprecated")
-        } else {
-          useBlinkPlanner.setAccessible(true)
-          useBlinkPlanner.invoke(builder)
-          logInfo("blinkPlanner will be use.")
-        }
-      case PlannerType.old =>
-        val useOldPlanner = 
Try(builder.getClass.getDeclaredMethod("useOldPlanner")).getOrElse(null)
-        if (useOldPlanner == null) {
-          logWarn("useOldPlanner deprecated")
-        } else {
-          useOldPlanner.setAccessible(true)
-          useOldPlanner.invoke(builder)
-          logInfo("useOldPlanner will be use.")
-        }
-      case PlannerType.any =>
-        val useAnyPlanner = 
Try(builder.getClass.getDeclaredMethod("useAnyPlanner")).getOrElse(null)
-        if (useAnyPlanner == null) {
-          logWarn("useAnyPlanner deprecated")
-        } else {
-          logInfo("useAnyPlanner will be use.")
-          useAnyPlanner.setAccessible(true)
-          useAnyPlanner.invoke(builder)
-        }
-    }
-
-    val mode = 
Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
-    mode match {
-      case TableMode.batch =>
-        logInfo(s"components should work in $tableMode mode")
-        builder.inBatchMode()
-      case TableMode.streaming =>
-        logInfo(s"components should work in $tableMode mode")
-        builder.inStreamingMode()
-    }
-
-    val buildWith =
-      (parameter.get(KEY_FLINK_TABLE_CATALOG), 
parameter.get(KEY_FLINK_TABLE_DATABASE))
-    buildWith match {
-      case (x: String, y: String) if x != null && y != null =>
-        logInfo(s"with built in catalog: $x")
-        logInfo(s"with built in database: $y")
-        builder.withBuiltInCatalogName(x)
-        builder.withBuiltInDatabaseName(y)
-      case (x: String, _) if x != null =>
-        logInfo(s"with built in catalog: $x")
-        builder.withBuiltInCatalogName(x)
-      case (_, y: String) if y != null =>
-        logInfo(s"with built in database: $y")
-        builder.withBuiltInDatabaseName(y)
-      case _ =>
-    }
-    val setting = builder.build()
-    tableMode match {
-      case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
-      case TableMode.streaming =>
-        initEnvironment()
-        if (streamEnvConfFunc != null) {
-          streamEnvConfFunc(streamEnvironment, parameter)
-        }
-        if (javaStreamEnvConfFunc != null) {
-          javaStreamEnvConfFunc.configuration(streamEnvironment.getJavaEnv, 
parameter)
-        }
-        localStreamTableEnv = StreamTableEnvironment.create(streamEnvironment, 
setting)
-    }
-    val appName = parameter.getAppName()
-    if (appName != null) {
-      tableMode match {
-        case TableMode.batch =>
-          
localTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, 
appName)
-        case TableMode.streaming =>
-          
localStreamTableEnv.getConfig.getConfiguration.setString(PipelineOptions.NAME, 
appName)
-      }
-    }
-
-    apiType match {
-      case ApiType.java =>
-        if (javaTableEnvConfFunc != null) {
-          tableMode match {
-            case TableMode.batch =>
-              javaTableEnvConfFunc.configuration(localTableEnv.getConfig, 
parameter)
-            case TableMode.streaming =>
-              
javaTableEnvConfFunc.configuration(localStreamTableEnv.getConfig, parameter)
-          }
-        }
-      case ApiType.scala =>
-        if (tableConfFunc != null) {
-          tableMode match {
-            case TableMode.batch => tableConfFunc(localTableEnv.getConfig, 
parameter)
-            case TableMode.streaming => 
tableConfFunc(localStreamTableEnv.getConfig, parameter)
-          }
-        }
-    }
-  }
-
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index c512a24ea..ae4edadad 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -47,7 +47,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
 
   override def execute(jobName: String): JobExecutionResult = {
     printLogo(s"FlinkTable $jobName Starting...")
-    tableEnv.execute(jobName)
+    null
   }
 
   @deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index e9c950815..f23e4c9c5 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -52,7 +52,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
 
   override def execute(jobName: String): JobExecutionResult = {
     printLogo(s"FlinkTable $jobName Starting...")
-    tableEnv.execute(jobName)
+    null
   }
 
   @deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 7e4daf539..abbe13822 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -65,7 +65,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
 
   @deprecated override def execute(jobName: String): JobExecutionResult = {
     printLogo(s"FlinkTable $jobName Starting...")
-    tableEnv.execute(jobName)
+    null
   }
 
   @deprecated override def fromTableSource(source: TableSource[_]): Table =
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 471ad45aa..60fa14aaf 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.16/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.core
 
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.table.api.{CompiledPlan, PlanReference, Table, 
TableDescriptor, TableEnvironment, TableResult}
+import org.apache.flink.table.api.{CompiledPlan, PlanReference, Table, 
TableDescriptor, TableEnvironment}
 import org.apache.flink.table.module.ModuleEntry
 
 class TableContext(override val parameter: ParameterTool, private val 
tableEnv: TableEnvironment)
diff --git 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 2edf338c1..76239c6cb 100644
--- 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -26,30 +26,42 @@ import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.ExecutionOptions
 
 import scala.language.implicitConversions
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 object SqlClient extends App {
 
-  val parameterTool = ParameterTool.fromArgs(args)
+  private[this] val parameterTool = ParameterTool.fromArgs(args)
 
-  val flinkSql = {
+  private[this] val flinkSql = {
     val sql = parameterTool.get(KEY_FLINK_SQL())
     require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be 
null")
-    Try(DeflaterUtils.unzipString(sql)).getOrElse(
-      throw new IllegalArgumentException("Usage: flink sql is invalid or null, 
please check"))
+    Try(DeflaterUtils.unzipString(sql)) match {
+      case Success(value) => value
+      case Failure(_) =>
+        throw new IllegalArgumentException("Usage: flink sql is invalid or 
null, please check")
+    }
   }
 
-  val sets = SqlCommandParser.parseSQL(flinkSql).filter(_.command == 
SqlCommand.SET)
+  private[this] val sets = 
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
 
-  val mode = sets.find(_.operands.head == ExecutionOptions.RUNTIME_MODE.key()) 
match {
-    case Some(e) => e.operands(1)
+  private[this] val defaultMode = "streaming"
+
+  private[this] val mode = sets.find(_.operands.head == 
ExecutionOptions.RUNTIME_MODE.key()) match {
+    case Some(e) =>
+      // 1) flink sql execution.runtime-mode has highest priority
+      e.operands(1)
     case None =>
-      val appConf = parameterTool.get(KEY_APP_CONF(), null)
-      val defaultMode = "streaming"
-      if (appConf == null) defaultMode
-      else {
-        val parameter = 
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(appConf.drop(7)))
-        parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+      // 2) dynamic properties execution.runtime-mode
+      parameterTool.get(ExecutionOptions.RUNTIME_MODE.key(), null) match {
+        case null =>
+          parameterTool.get(KEY_APP_CONF(), null) match {
+            case null => defaultMode
+            case f =>
+              val parameter = 
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(f.drop(7)))
+              // 3) application conf execution.runtime-mode
+              parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+          }
+        case m => m
       }
   }
 

Reply via email to