This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new c7fa59bc5 [Improve] Flinkclient minor improvement
c7fa59bc5 is described below

commit c7fa59bc5a1150750657bff12dc21ba09796a786
Author: benjobs <[email protected]>
AuthorDate: Wed Dec 20 13:21:12 2023 +0800

    [Improve] Flinkclient minor improvement
---
 .../flink/client/trait/FlinkClientTrait.scala      | 69 +++++++++++-----------
 1 file changed, 35 insertions(+), 34 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 8e3ae8cba..4e3d5adcd 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -288,10 +288,7 @@ trait FlinkClientTrait extends Logger {
     // 1. find the configuration directory
     val configurationDirectory = s"$flinkHome/conf"
     // 2. load the custom command lines
-    val customCommandLines =
-      loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
-    new CliFrontend(flinkDefaultConfiguration, customCommandLines)
-    customCommandLines
+    loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
   }
 
   private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
@@ -312,21 +309,25 @@ trait FlinkClientTrait extends Logger {
     val cliArgs = {
       val optionMap = new mutable.HashMap[String, Any]()
       submitRequest.appOption
-        .filter(
-          x => {
-            val verify = commandLineOptions.hasOption(x._1)
-            if (!verify) logWarn(s"param:${x._1} is error,skip it.")
+        .filter {
+          opt =>
+            val verify = commandLineOptions.hasOption(opt._1)
+            if (!verify) {
+              logWarn(s"param:${opt._1} is error,skip it.")
+            }
             verify
-          })
-        .foreach(
-          x => {
-            val opt = commandLineOptions.getOption(x._1.trim).getOpt
-            Try(x._2.toBoolean).getOrElse(x._2) match {
+        }
+        .foreach {
+          opt =>
+            val option = commandLineOptions.getOption(opt._1.trim).getOpt
+            Try(opt._2.toBoolean).getOrElse(opt._2) match {
               case b if b.isInstanceOf[Boolean] =>
-                if (b.asInstanceOf[Boolean]) optionMap += s"-$opt" -> true
-              case v => optionMap += s"-$opt" -> v
+                if (b.asInstanceOf[Boolean]) {
+                  optionMap += s"-$option" -> true
+                }
+              case v => optionMap += s"-$option" -> v
             }
-          })
+        }
 
       // fromSavePoint
       if (submitRequest.savePoint != null) {
@@ -339,24 +340,24 @@ trait FlinkClientTrait extends Logger {
       }
 
       val array = new ArrayBuffer[String]()
-      optionMap.foreach(
-        x => {
-          array += x._1
-          x._2 match {
-            case v: String => array += v
-            case _ =>
+      optionMap.foreach {
+        opt =>
+          array += opt._1
+          if (opt._2.isInstanceOf[String]) {
+            array += opt._2.toString
           }
-        })
+      }
 
       // app properties
       if (MapUtils.isNotEmpty(submitRequest.properties)) {
-        submitRequest.properties.foreach(
-          x => {
-            if (!x._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
-              array += s"-D${x._1}=${x._2}"
+        submitRequest.properties.foreach {
+          key =>
+            if (!key._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
+              array += s"-D${key._1}=${key._2}"
             }
-          })
+        }
       }
+
       array.toArray
     }
 
@@ -438,10 +439,10 @@ trait FlinkClientTrait extends Logger {
     val configuration = new Configuration()
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
     flinkDefaultConfiguration.keySet.foreach(
-      x => {
-        flinkDefaultConfiguration.getString(x, null) match {
-          case v if v != null => configuration.setString(x, v)
-          case _ =>
+      key => {
+        val value = flinkDefaultConfiguration.getString(key, null)
+        if (value != null) {
+          configuration.setString(key, value)
         }
       })
     configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
@@ -451,8 +452,8 @@ trait FlinkClientTrait extends Logger {
   implicit private[client] class EnhanceFlinkConfiguration(flinkConfig: 
Configuration) {
     def safeSet[T](option: ConfigOption[T], value: T): Configuration = {
       flinkConfig match {
-        case x if value != null && value.toString.nonEmpty => x.set(option, 
value)
-        case x => x
+        case conf if value != null && value.toString.nonEmpty => 
conf.set(option, value)
+        case conf => conf
       }
     }
   }

Reply via email to