This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2b177ea1f [Improve] Packer module code optimization (#3010)
2b177ea1f is described below

commit 2b177ea1f1ab930514725b50811e670e7ce079b7
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Sep 1 23:30:50 2023 +0800

    [Improve] Packer module code optimization (#3010)
    
    * DockerResolveProgress Code optimization
    
    * uploadToHdfs rename
    
    * Modified K8sPodTemplates
    
    * Modified PodTemplateTool
    
    * Modified FlinkK8sApplicationBuildPipeline
---
 .../scala/org/apache/streampark/common/util/Utils.scala |  2 +-
 .../streampark/flink/kubernetes/PodTemplateTool.scala   |  3 ++-
 .../flink/kubernetes/model/K8sPodTemplates.scala        |  8 +++++---
 .../flink/packer/pipeline/DockerResolveProgress.scala   | 17 ++++++++++++-----
 .../impl/FlinkK8sApplicationBuildPipeline.scala         |  4 +---
 .../impl/FlinkYarnApplicationBuildPipeline.scala        |  9 ++++++---
 6 files changed, 27 insertions(+), 16 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 30534cf5d..cd6981717 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -114,7 +114,7 @@ object Utils {
    */
   def calPercent(num1: Long, num2: Long): Double =
     if (num1 == 0 || num2 == 0) 0.0
-    else (num1.toDouble / num2.toDouble * 100).formatted("%.1f").toDouble
+    else "%.1f".format(num1.toDouble / num2.toDouble * 100).toDouble
 
   def hashCode(elements: Any*): Int = {
     if (elements == null) return 0
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
index 1461e4ab8..31c743212 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
@@ -20,6 +20,7 @@ package org.apache.streampark.flink.kubernetes
 import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates
 
 import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.configuration.Configuration
 
 import java.io.File
@@ -57,7 +58,7 @@ object PodTemplateTool {
 
     val podTempleMap = mutable.Map[String, String]()
     val outputTmplContent = (tmplContent: String, podTmpl: PodTemplateType) => 
{
-      if (tmplContent.nonEmpty) {
+      if (StringUtils.isNotBlank(tmplContent)) {
         val outputPath = s"$buildWorkspace/${podTmpl.fileName}"
         val outputFile = new File(outputPath)
         FileUtils.write(outputFile, tmplContent, "UTF-8")
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sPodTemplates.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sPodTemplates.scala
index bbe2349c7..a65d7f0aa 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sPodTemplates.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sPodTemplates.scala
@@ -19,6 +19,8 @@ package org.apache.streampark.flink.kubernetes.model
 
 import org.apache.streampark.common.util.Utils
 
+import org.apache.commons.lang3.StringUtils
+
 import scala.util.Try
 
 /** Pod template for flink k8s cluster */
@@ -27,9 +29,9 @@ case class K8sPodTemplates(
     jmPodTemplate: String = "",
     tmPodTemplate: String = "") {
 
-  def nonEmpty: Boolean = Option(podTemplate).exists(_.trim.nonEmpty) ||
-    Option(jmPodTemplate).exists(_.trim.nonEmpty) ||
-    Option(tmPodTemplate).exists(_.trim.nonEmpty)
+  def nonEmpty: Boolean = StringUtils.isNotBlank(podTemplate) ||
+    StringUtils.isNotBlank(jmPodTemplate) ||
+    StringUtils.isNotBlank(tmPodTemplate)
 
   def isEmpty: Boolean = !nonEmpty
 
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
index 12306293d..eb9b402a7 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
@@ -20,6 +20,7 @@ package org.apache.streampark.flink.packer.pipeline
 import org.apache.streampark.common.util.Utils
 
 import com.github.dockerjava.api.model.{PullResponseItem, PushResponseItem}
+import org.apache.commons.lang3.StringUtils
 
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
@@ -36,7 +37,10 @@ class DockerPullProgress(
     var lastTime: Long) {
   // noinspection DuplicatedCode
   def update(pullRsp: PullResponseItem): Unit = {
-    if (pullRsp == null || pullRsp.getId == null || pullRsp.getStatus == null) 
{
+    if (
+      pullRsp == null || StringUtils.isBlank(pullRsp.getId) || 
StringUtils.isBlank(
+        pullRsp.getStatus)
+    ) {
       return
     }
     if (pullRsp.getStatus.contains("complete")) {
@@ -60,7 +64,7 @@ class DockerPullProgress(
 
 class DockerBuildProgress(val steps: ArrayBuffer[String], var lastTime: Long) {
   def update(buildStep: String): Unit = {
-    if (buildStep != null && buildStep.nonEmpty) {
+    if (StringUtils.isNotBlank(buildStep)) {
       steps += buildStep
       lastTime = System.currentTimeMillis
     }
@@ -75,7 +79,10 @@ class DockerPushProgress(
     var lastTime: Long) {
   // noinspection DuplicatedCode
   def update(pushRsp: PushResponseItem): Unit = {
-    if (pushRsp == null || pushRsp.getId == null || pushRsp.getStatus == null) 
{
+    if (
+      pushRsp == null || StringUtils.isBlank(pushRsp.getId) || 
StringUtils.isBlank(
+        pushRsp.getStatus)
+    ) {
       return
     }
     if (pushRsp.getStatus.contains("complete")) {
@@ -125,8 +132,8 @@ case class DockerLayerProgress(layerId: String, status: 
String, current: Long, t
   def percent: Double = Utils.calPercent(current, total)
 
   def currentMb: Double =
-    if (current == 0) 0 else (current.toDouble / (1024 * 
1024)).formatted("%.2f").toDouble
+    if (current == 0) 0 else "%.2f".format(current.toDouble / (1024 * 
1024)).toDouble
 
   def totalMb: Double =
-    if (total == 0) 0 else (total.toDouble / (1024 * 
1024)).formatted("%.2f").toDouble
+    if (total == 0) 0 else "%.2f".format(total.toDouble / (1024 * 
1024)).toDouble
 }
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 72d47991e..5e0449aee 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -229,9 +229,7 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
       registerAddress: String,
       imageNamespace: String): String = {
     var tagName = if (tag.contains("/")) tag else s"$imageNamespace/$tag"
-    if (
-      registerAddress != null && registerAddress.nonEmpty && 
!tagName.startsWith(registerAddress)
-    ) {
+    if (StringUtils.isNotBlank(registerAddress) && 
!tagName.startsWith(registerAddress)) {
       tagName = s"$registerAddress/$tagName"
     }
     tagName.toLowerCase
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 6fbb0de50..3606107c9 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -68,8 +68,8 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
     execStep(3) {
       mavenJars.foreach(
         jar => {
-          uploadToHdfs(FsOperator.lfs, jar, request.localWorkspace)
-          uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
+          uploadJarToHdfsOrLfs(FsOperator.lfs, jar, request.localWorkspace)
+          uploadJarToHdfsOrLfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
         })
     }.getOrElse(throw getError.exception)
 
@@ -77,7 +77,10 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
   }
 
   @throws[IOException]
-  private[this] def uploadToHdfs(fsOperator: FsOperator, origin: String, 
target: String): Unit = {
+  private[this] def uploadJarToHdfsOrLfs(
+      fsOperator: FsOperator,
+      origin: String,
+      target: String): Unit = {
     val originFile = new File(origin)
     if (!fsOperator.exists(target)) {
       fsOperator.mkdirs(target)

Reply via email to