This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new e03ae9226 flink job batchMode improvement
e03ae9226 is described below

commit e03ae92265c21a673b0a92a082143a8c6a3a5f16
Author: benjobs <[email protected]>
AuthorDate: Sat Aug 26 16:21:04 2023 +0800

    flink job batchMode improvement
---
 .../flink/client/trait/FlinkClientTrait.scala      |  8 +++++
 .../apache/streampark/flink/cli/SqlClient.scala    | 40 ++++++++++++++--------
 2 files changed, 34 insertions(+), 14 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 325bc9ff3..074d9fad5 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -474,6 +474,14 @@ trait FlinkClientTrait extends Logger {
           programArgs += submitRequest.appConf
       }
     }
+
+    // execution.runtime-mode
+    if (submitRequest.properties.nonEmpty) {
+      if 
(submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) {
+        programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
+        programArgs += 
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key())
+      }
+    }
     programArgs.toList.asJava
   }
 
diff --git 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 2edf338c1..76239c6cb 100644
--- 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -26,30 +26,42 @@ import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.ExecutionOptions
 
 import scala.language.implicitConversions
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 object SqlClient extends App {
 
-  val parameterTool = ParameterTool.fromArgs(args)
+  private[this] val parameterTool = ParameterTool.fromArgs(args)
 
-  val flinkSql = {
+  private[this] val flinkSql = {
     val sql = parameterTool.get(KEY_FLINK_SQL())
     require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be 
null")
-    Try(DeflaterUtils.unzipString(sql)).getOrElse(
-      throw new IllegalArgumentException("Usage: flink sql is invalid or null, 
please check"))
+    Try(DeflaterUtils.unzipString(sql)) match {
+      case Success(value) => value
+      case Failure(_) =>
+        throw new IllegalArgumentException("Usage: flink sql is invalid or 
null, please check")
+    }
   }
 
-  val sets = SqlCommandParser.parseSQL(flinkSql).filter(_.command == 
SqlCommand.SET)
+  private[this] val sets = 
SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
 
-  val mode = sets.find(_.operands.head == ExecutionOptions.RUNTIME_MODE.key()) 
match {
-    case Some(e) => e.operands(1)
+  private[this] val defaultMode = "streaming"
+
+  private[this] val mode = sets.find(_.operands.head == 
ExecutionOptions.RUNTIME_MODE.key()) match {
+    case Some(e) =>
+      // 1) flink sql execution.runtime-mode has highest priority
+      e.operands(1)
     case None =>
-      val appConf = parameterTool.get(KEY_APP_CONF(), null)
-      val defaultMode = "streaming"
-      if (appConf == null) defaultMode
-      else {
-        val parameter = 
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(appConf.drop(7)))
-        parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+      // 2) dynamic properties execution.runtime-mode
+      parameterTool.get(ExecutionOptions.RUNTIME_MODE.key(), null) match {
+        case null =>
+          parameterTool.get(KEY_APP_CONF(), null) match {
+            case null => defaultMode
+            case f =>
+              val parameter = 
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(f.drop(7)))
+              // 3) application conf execution.runtime-mode
+              parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+          }
+        case m => m
       }
   }
 

Reply via email to