This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new c7fa59bc5 [Improve] Flinkclient minor improvement
c7fa59bc5 is described below
commit c7fa59bc5a1150750657bff12dc21ba09796a786
Author: benjobs <[email protected]>
AuthorDate: Wed Dec 20 13:21:12 2023 +0800
[Improve] Flinkclient minor improvement
---
.../flink/client/trait/FlinkClientTrait.scala | 69 +++++++++++-----------
1 file changed, 35 insertions(+), 34 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 8e3ae8cba..4e3d5adcd 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -288,10 +288,7 @@ trait FlinkClientTrait extends Logger {
// 1. find the configuration directory
val configurationDirectory = s"$flinkHome/conf"
// 2. load the custom command lines
- val customCommandLines =
- loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
- new CliFrontend(flinkDefaultConfiguration, customCommandLines)
- customCommandLines
+ loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
}
private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
@@ -312,21 +309,25 @@ trait FlinkClientTrait extends Logger {
val cliArgs = {
val optionMap = new mutable.HashMap[String, Any]()
submitRequest.appOption
- .filter(
- x => {
- val verify = commandLineOptions.hasOption(x._1)
- if (!verify) logWarn(s"param:${x._1} is error,skip it.")
+ .filter {
+ opt =>
+ val verify = commandLineOptions.hasOption(opt._1)
+ if (!verify) {
+ logWarn(s"param:${opt._1} is error,skip it.")
+ }
verify
- })
- .foreach(
- x => {
- val opt = commandLineOptions.getOption(x._1.trim).getOpt
- Try(x._2.toBoolean).getOrElse(x._2) match {
+ }
+ .foreach {
+ opt =>
+ val option = commandLineOptions.getOption(opt._1.trim).getOpt
+ Try(opt._2.toBoolean).getOrElse(opt._2) match {
case b if b.isInstanceOf[Boolean] =>
- if (b.asInstanceOf[Boolean]) optionMap += s"-$opt" -> true
- case v => optionMap += s"-$opt" -> v
+ if (b.asInstanceOf[Boolean]) {
+ optionMap += s"-$option" -> true
+ }
+ case v => optionMap += s"-$option" -> v
}
- })
+ }
// fromSavePoint
if (submitRequest.savePoint != null) {
@@ -339,24 +340,24 @@ trait FlinkClientTrait extends Logger {
}
val array = new ArrayBuffer[String]()
- optionMap.foreach(
- x => {
- array += x._1
- x._2 match {
- case v: String => array += v
- case _ =>
+ optionMap.foreach {
+ opt =>
+ array += opt._1
+ if (opt._2.isInstanceOf[String]) {
+ array += opt._2.toString
}
- })
+ }
// app properties
if (MapUtils.isNotEmpty(submitRequest.properties)) {
- submitRequest.properties.foreach(
- x => {
- if (!x._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
- array += s"-D${x._1}=${x._2}"
+ submitRequest.properties.foreach {
+ key =>
+ if (!key._1.startsWith(CoreOptions.FLINK_JVM_OPTIONS.key())) {
+ array += s"-D${key._1}=${key._2}"
}
- })
+ }
}
+
array.toArray
}
@@ -438,10 +439,10 @@ trait FlinkClientTrait extends Logger {
val configuration = new Configuration()
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
flinkDefaultConfiguration.keySet.foreach(
- x => {
- flinkDefaultConfiguration.getString(x, null) match {
- case v if v != null => configuration.setString(x, v)
- case _ =>
+ key => {
+ val value = flinkDefaultConfiguration.getString(key, null)
+ if (value != null) {
+ configuration.setString(key, value)
}
})
configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
@@ -451,8 +452,8 @@ trait FlinkClientTrait extends Logger {
implicit private[client] class EnhanceFlinkConfiguration(flinkConfig:
Configuration) {
def safeSet[T](option: ConfigOption[T], value: T): Configuration = {
flinkConfig match {
- case x if value != null && value.toString.nonEmpty => x.set(option,
value)
- case x => x
+ case conf if value != null && value.toString.nonEmpty =>
conf.set(option, value)
+ case conf => conf
}
}
}