This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 2b177ea1f [Improve] Packer module code optimization (#3010)
2b177ea1f is described below
commit 2b177ea1f1ab930514725b50811e670e7ce079b7
Author: ChengJie1053 <[email protected]>
AuthorDate: Fri Sep 1 23:30:50 2023 +0800
[Improve] Packer module code optimization (#3010)
* DockerResolveProgress Code optimization
* uploadToHdfs rename
* Modified K8sPodTemplates
* Modified PodTemplateTool
* Modified FlinkK8sApplicationBuildPipeline
---
.../scala/org/apache/streampark/common/util/Utils.scala | 2 +-
.../streampark/flink/kubernetes/PodTemplateTool.scala | 3 ++-
.../flink/kubernetes/model/K8sPodTemplates.scala | 8 +++++---
.../flink/packer/pipeline/DockerResolveProgress.scala | 17 ++++++++++++-----
.../impl/FlinkK8sApplicationBuildPipeline.scala | 4 +---
.../impl/FlinkYarnApplicationBuildPipeline.scala | 9 ++++++---
6 files changed, 27 insertions(+), 16 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 30534cf5d..cd6981717 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -114,7 +114,7 @@ object Utils {
*/
def calPercent(num1: Long, num2: Long): Double =
if (num1 == 0 || num2 == 0) 0.0
- else (num1.toDouble / num2.toDouble * 100).formatted("%.1f").toDouble
+ else "%.1f".format(num1.toDouble / num2.toDouble * 100).toDouble
def hashCode(elements: Any*): Int = {
if (elements == null) return 0
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
index 1461e4ab8..31c743212 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/PodTemplateTool.scala
@@ -20,6 +20,7 @@ package org.apache.streampark.flink.kubernetes
import org.apache.streampark.flink.kubernetes.model.K8sPodTemplates
import org.apache.commons.io.FileUtils
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.configuration.Configuration
import java.io.File
@@ -57,7 +58,7 @@ object PodTemplateTool {
val podTempleMap = mutable.Map[String, String]()
val outputTmplContent = (tmplContent: String, podTmpl: PodTemplateType) =>
{
- if (tmplContent.nonEmpty) {
+ if (StringUtils.isNotBlank(tmplContent)) {
val outputPath = s"$buildWorkspace/${podTmpl.fileName}"
val outputFile = new File(outputPath)
FileUtils.write(outputFile, tmplContent, "UTF-8")
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sPodTemplates.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sPodTemplates.scala
index bbe2349c7..a65d7f0aa 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sPodTemplates.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/K8sPodTemplates.scala
@@ -19,6 +19,8 @@ package org.apache.streampark.flink.kubernetes.model
import org.apache.streampark.common.util.Utils
+import org.apache.commons.lang3.StringUtils
+
import scala.util.Try
/** Pod template for flink k8s cluster */
@@ -27,9 +29,9 @@ case class K8sPodTemplates(
jmPodTemplate: String = "",
tmPodTemplate: String = "") {
- def nonEmpty: Boolean = Option(podTemplate).exists(_.trim.nonEmpty) ||
- Option(jmPodTemplate).exists(_.trim.nonEmpty) ||
- Option(tmPodTemplate).exists(_.trim.nonEmpty)
+ def nonEmpty: Boolean = StringUtils.isNotBlank(podTemplate) ||
+ StringUtils.isNotBlank(jmPodTemplate) ||
+ StringUtils.isNotBlank(tmPodTemplate)
def isEmpty: Boolean = !nonEmpty
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
index 12306293d..eb9b402a7 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/DockerResolveProgress.scala
@@ -20,6 +20,7 @@ package org.apache.streampark.flink.packer.pipeline
import org.apache.streampark.common.util.Utils
import com.github.dockerjava.api.model.{PullResponseItem, PushResponseItem}
+import org.apache.commons.lang3.StringUtils
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
@@ -36,7 +37,10 @@ class DockerPullProgress(
var lastTime: Long) {
// noinspection DuplicatedCode
def update(pullRsp: PullResponseItem): Unit = {
- if (pullRsp == null || pullRsp.getId == null || pullRsp.getStatus == null)
{
+ if (
+ pullRsp == null || StringUtils.isBlank(pullRsp.getId) ||
StringUtils.isBlank(
+ pullRsp.getStatus)
+ ) {
return
}
if (pullRsp.getStatus.contains("complete")) {
@@ -60,7 +64,7 @@ class DockerPullProgress(
class DockerBuildProgress(val steps: ArrayBuffer[String], var lastTime: Long) {
def update(buildStep: String): Unit = {
- if (buildStep != null && buildStep.nonEmpty) {
+ if (StringUtils.isNotBlank(buildStep)) {
steps += buildStep
lastTime = System.currentTimeMillis
}
@@ -75,7 +79,10 @@ class DockerPushProgress(
var lastTime: Long) {
// noinspection DuplicatedCode
def update(pushRsp: PushResponseItem): Unit = {
- if (pushRsp == null || pushRsp.getId == null || pushRsp.getStatus == null)
{
+ if (
+ pushRsp == null || StringUtils.isBlank(pushRsp.getId) ||
StringUtils.isBlank(
+ pushRsp.getStatus)
+ ) {
return
}
if (pushRsp.getStatus.contains("complete")) {
@@ -125,8 +132,8 @@ case class DockerLayerProgress(layerId: String, status:
String, current: Long, t
def percent: Double = Utils.calPercent(current, total)
def currentMb: Double =
- if (current == 0) 0 else (current.toDouble / (1024 *
1024)).formatted("%.2f").toDouble
+ if (current == 0) 0 else "%.2f".format(current.toDouble / (1024 *
1024)).toDouble
def totalMb: Double =
- if (total == 0) 0 else (total.toDouble / (1024 *
1024)).formatted("%.2f").toDouble
+ if (total == 0) 0 else "%.2f".format(total.toDouble / (1024 *
1024)).toDouble
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 72d47991e..5e0449aee 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -229,9 +229,7 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
registerAddress: String,
imageNamespace: String): String = {
var tagName = if (tag.contains("/")) tag else s"$imageNamespace/$tag"
- if (
- registerAddress != null && registerAddress.nonEmpty &&
!tagName.startsWith(registerAddress)
- ) {
+ if (StringUtils.isNotBlank(registerAddress) &&
!tagName.startsWith(registerAddress)) {
tagName = s"$registerAddress/$tagName"
}
tagName.toLowerCase
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 6fbb0de50..3606107c9 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -68,8 +68,8 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
execStep(3) {
mavenJars.foreach(
jar => {
- uploadToHdfs(FsOperator.lfs, jar, request.localWorkspace)
- uploadToHdfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
+ uploadJarToHdfsOrLfs(FsOperator.lfs, jar, request.localWorkspace)
+ uploadJarToHdfsOrLfs(FsOperator.hdfs, jar, request.yarnProvidedPath)
})
}.getOrElse(throw getError.exception)
@@ -77,7 +77,10 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
}
@throws[IOException]
- private[this] def uploadToHdfs(fsOperator: FsOperator, origin: String,
target: String): Unit = {
+ private[this] def uploadJarToHdfsOrLfs(
+ fsOperator: FsOperator,
+ origin: String,
+ target: String): Unit = {
val originFile = new File(origin)
if (!fsOperator.exists(target)) {
fsOperator.mkdirs(target)