This is an automated email from the ASF dual-hosted git repository.

kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new a077b91c7 [Improve] ingress improvement (#2702)
a077b91c7 is described below

commit a077b91c776dc83405125040246e44a0a591a58b
Author: benjobs <[email protected]>
AuthorDate: Tue Apr 25 23:22:03 2023 +0800

    [Improve] ingress improvement (#2702)
---
 .../apache/streampark/common/util/FileUtils.scala  |  2 +-
 .../org/apache/streampark/common/util/Utils.scala  | 12 +---
 .../flink/kubernetes/KubernetesRetriever.scala     |  6 +-
 .../helper/KubernetesDeploymentHelper.scala        | 12 ++--
 .../flink/kubernetes/ingress/IngressStrategy.scala | 30 +++++++++-
 .../kubernetes/ingress/IngressStrategyV1.scala     | 56 +++++++-----------
 .../ingress/IngressStrategyV1beta1.scala           | 57 ++++++++----------
 .../flink/kubernetes/FlinkRestJsonTest.scala       | 36 ------------
 .../flink/kubernetes/IngressMetaTestUtil.scala     | 68 ----------------------
 .../impl/FlinkYarnApplicationBuildPipeline.scala   |  2 +-
 .../streampark/flink/proxy/FlinkShimsProxy.scala   |  5 +-
 11 files changed, 86 insertions(+), 200 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index e9d3c68bb..708943098 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -44,7 +44,7 @@ object FileUtils {
     if (input == null) {
       throw new RuntimeException("The inputStream can not be null")
     }
-    Utils.tryWithResource(input) {
+    Utils.using(input) {
       in =>
         val b = new Array[Byte](4)
         in.read(b, 0, b.length)
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 9d696e545..8b6ce53ab 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -110,7 +110,7 @@ object Utils {
   /*
    * Mimicking the try-with-resource syntax of Java-8+
    */
-  def tryWithResource[R, T <: AutoCloseable](handle: T)(func: T => R)(implicit
+  def using[R, T <: AutoCloseable](handle: T)(func: T => R)(implicit
       excFunc: Throwable => R = null): R = {
     try {
       func(handle)
@@ -136,16 +136,6 @@ object Utils {
       })
   }
 
-  def using[T <: AutoCloseable, V](resource: T)(block: T => V): V = {
-    try {
-      block(resource)
-    } finally {
-      if (resource != null) {
-        resource.close()
-      }
-    }
-  }
-
   /**
    * calculate the percentage of num1 / num2, the result range from 0 to 100, 
with one small digit
    * reserve.
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index d4e6ec0ed..17a5df525 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.kubernetes
 
 import org.apache.streampark.common.util.{Logger, Utils}
-import org.apache.streampark.common.util.Utils.tryWithResource
+import org.apache.streampark.common.util.Utils.using
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
@@ -107,7 +107,7 @@ object KubernetesRetriever extends Logger {
    *   deployment namespace
    */
   def isDeploymentExists(name: String, namespace: String): Boolean = {
-    tryWithResource(KubernetesRetriever.newK8sClient()) {
+    using(KubernetesRetriever.newK8sClient()) {
       client =>
         client
           .apps()
@@ -123,7 +123,7 @@ object KubernetesRetriever extends Logger {
 
   /** retrieve flink jobManager rest url */
   def retrieveFlinkRestUrl(clusterKey: ClusterKey): Option[String] = {
-    Utils.tryWithResource(
+    Utils.using(
       KubernetesRetriever
         .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace, 
clusterKey.executeMode)
         .getOrElse(return None)) {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 82bbfe8c3..59c82f471 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.kubernetes.helper
 
 import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
-import org.apache.streampark.common.util.Utils.tryWithResource
+import org.apache.streampark.common.util.Utils.using
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 
 import com.google.common.base.Charsets
@@ -34,7 +34,7 @@ import scala.util.{Success, Try}
 object KubernetesDeploymentHelper extends Logger {
 
   private[this] def getPods(nameSpace: String, deploymentName: String): 
List[Pod] = {
-    tryWithResource(KubernetesRetriever.newK8sClient()) {
+    using(KubernetesRetriever.newK8sClient()) {
       client =>
         Try {
           client.pods
@@ -68,7 +68,7 @@ object KubernetesDeploymentHelper extends Logger {
   }
 
   def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean 
= {
-    tryWithResource(KubernetesRetriever.newK8sClient()) {
+    using(KubernetesRetriever.newK8sClient()) {
       client =>
         Try {
           val r = client.apps.deployments
@@ -90,7 +90,7 @@ object KubernetesDeploymentHelper extends Logger {
   }
 
   def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): 
String = {
-    tryWithResource(KubernetesRetriever.newK8sClient()) {
+    using(KubernetesRetriever.newK8sClient()) {
       client =>
         val path = KubernetesDeploymentHelper.getJobLog(jobId)
         val file = new File(path)
@@ -101,7 +101,7 @@ object KubernetesDeploymentHelper extends Logger {
   }
 
   def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId: 
String): String = {
-    tryWithResource(KubernetesRetriever.newK8sClient()) {
+    using(KubernetesRetriever.newK8sClient()) {
       client =>
         Try {
           val podName = getPods(nameSpace, jobName).head.getMetadata.getName
@@ -120,7 +120,7 @@ object KubernetesDeploymentHelper extends Logger {
   }
 
   def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean 
= {
-    tryWithResource(KubernetesRetriever.newK8sClient()) {
+    using(KubernetesRetriever.newK8sClient()) {
       client =>
         Try {
           val r = client
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index 4df560b96..81466ed9f 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
+import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import org.apache.commons.io.FileUtils
 import org.apache.flink.client.program.ClusterClient
@@ -24,10 +25,12 @@ import org.apache.flink.client.program.ClusterClient
 import java.io.File
 
 trait IngressStrategy {
+
   def ingressUrlAddress(
       nameSpace: String,
       clusterId: String,
       clusterClient: ClusterClient[_]): String
+
   def configureIngress(domainName: String, clusterId: String, nameSpace: 
String): Unit
 
   def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates: 
String): String = {
@@ -58,8 +61,31 @@ trait IngressStrategy {
     )
   }
 
-  def getDeployment(nameSpace: String, clusterId: String, client: 
DefaultKubernetesClient) = {
-    
client.apps().deployments().inNamespace(nameSpace).withName(clusterId).get()
+  def getOwnerReference(
+      nameSpace: String,
+      clusterId: String,
+      client: DefaultKubernetesClient): OwnerReference = {
+
+    val deployment = client
+      .apps()
+      .deployments()
+      .inNamespace(nameSpace)
+      .withName(clusterId)
+      .get()
+
+    require(
+      deployment != null,
+      s"Deployment with name $clusterId not found in namespace $nameSpace")
+
+    val uid = deployment.getMetadata.getUid
+    new OwnerReferenceBuilder()
+      .withApiVersion("apps/v1")
+      .withKind("Deployment")
+      .withName(clusterId)
+      .withUid(uid)
+      .withController(true)
+      .withBlockOwnerDeletion(true)
+      .build()
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 9bd40c2d1..234eaee56 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -17,14 +17,15 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
-import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder
+import org.apache.streampark.common.util.Utils
+
 import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import org.apache.flink.client.program.ClusterClient
 
 import scala.collection.JavaConverters._
 import scala.language.postfixOps
-import scala.util.{Success, Try}
+import scala.util.Try
 
 class IngressStrategyV1 extends IngressStrategy {
 
@@ -32,40 +33,26 @@ class IngressStrategyV1 extends IngressStrategy {
       nameSpace: String,
       clusterId: String,
       clusterClient: ClusterClient[_]): String = {
-    val client = new DefaultKubernetesClient
-    val fromV1 =
-      
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
-        .map(ingress => ingress.getSpec.getRules.get(0))
-        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-    Try(
-      fromV1
-        .map { case (host, path) => s"https://$host$path"; }
-        .getOrElse(clusterClient.getWebInterfaceURL)
-    ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress 
error."))
-  }
 
-  override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
-    Try(new DefaultKubernetesClient) match {
-      case Success(client) =>
-        val deployment = getDeployment(nameSpace, clusterId, client)
-
-        val deploymentUid = if (deployment != null) {
-          deployment.getMetadata.getUid
-        } else {
-          throw new RuntimeException(
-            s"Deployment with name $clusterId not found in namespace 
$nameSpace")
-        }
+    Utils.using(new DefaultKubernetesClient) {
+      client =>
+        val hosts =
+          
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+            .map(ingress => ingress.getSpec.getRules.get(0))
+            .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+        Try(
+          hosts
+            .map { case (host, path) => s"https://$host$path"; }
+            .getOrElse(clusterClient.getWebInterfaceURL)
+        ).getOrElse(throw new RuntimeException("[StreamPark] get 
ingressUrlAddress error."))
+    }
 
-        // Create OwnerReference object
-        val ownerReference = new OwnerReferenceBuilder()
-          .withApiVersion("apps/v1")
-          .withKind("Deployment")
-          .withName(clusterId)
-          .withUid(deploymentUid)
-          .withController(true)
-          .withBlockOwnerDeletion(true)
-          .build()
+  }
 
+  override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
+    Utils.using(new DefaultKubernetesClient) {
+      client =>
+        val ownerReference = getOwnerReference(nameSpace, clusterId, client)
         val ingress = new IngressBuilder()
           .withNewMetadata()
           .withName(clusterId)
@@ -104,9 +91,8 @@ class IngressStrategyV1 extends IngressStrategy {
           .endHttp()
           .endRule()
           .endSpec()
-          .build();
+          .build()
         client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
-      case _ =>
     }
   }
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index f3d5a9189..1d4891db8 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -17,14 +17,16 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
-import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
+import org.apache.streampark.common.util.Utils
+
+import io.fabric8.kubernetes.api.model.IntOrString
 import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import org.apache.flink.client.program.ClusterClient
 
 import scala.collection.JavaConverters._
 import scala.language.postfixOps
-import scala.util.{Success, Try}
+import scala.util.Try
 
 class IngressStrategyV1beta1 extends IngressStrategy {
 
@@ -32,41 +34,28 @@ class IngressStrategyV1beta1 extends IngressStrategy {
       nameSpace: String,
       clusterId: String,
       clusterClient: ClusterClient[_]): String = {
-    val client = new DefaultKubernetesClient
-    // for kubernetes 1.19-
-    val fromV1beta1 =
-      
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
-        .map(ingress => ingress.getSpec.getRules.get(0))
-        .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-    Try(
-      fromV1beta1
-        .map { case (host, path) => s"https://$host$path"; }
-        .getOrElse(clusterClient.getWebInterfaceURL)
-    ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress 
error."))
-  }
 
-  override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
-    Try(new DefaultKubernetesClient) match {
-      case Success(client) =>
-        val deployment = getDeployment(nameSpace, clusterId, client)
+    Utils.using(new DefaultKubernetesClient) {
+      client =>
+        // for kubernetes 1.19-
+        val hosts =
+          
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+            .map(ingress => ingress.getSpec.getRules.get(0))
+            .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
 
-        val deploymentUid = if (deployment != null) {
-          deployment.getMetadata.getUid
-        } else {
-          throw new RuntimeException(
-            s"Deployment with name $clusterId not found in namespace 
$nameSpace")
-        }
+        Try(
+          hosts
+            .map { case (host, path) => s"https://$host$path"; }
+            .getOrElse(clusterClient.getWebInterfaceURL)
+        ).getOrElse(throw new RuntimeException("[StreamPark] get 
ingressUrlAddress error."))
 
-        // Create OwnerReference object
-        val ownerReference = new OwnerReferenceBuilder()
-          .withApiVersion("apps/v1")
-          .withKind("Deployment")
-          .withName(clusterId)
-          .withUid(deploymentUid)
-          .withController(true)
-          .withBlockOwnerDeletion(true)
-          .build()
+    }
+  }
 
+  override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
+    Utils.using(new DefaultKubernetesClient) {
+      client =>
+        val ownerReference = getOwnerReference(nameSpace, clusterId, client)
         val ingress = new IngressBuilder()
           .withNewMetadata()
           .withName(clusterId)
@@ -96,8 +85,8 @@ class IngressStrategyV1beta1 extends IngressStrategy {
           .endRule()
           .endSpec()
           .build()
+
         client.network.ingress.inNamespace(nameSpace).create(ingress)
-      case _ =>
     }
   }
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 24abbcdfb..ce431e3a2 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -263,42 +263,6 @@ class FlinkRestJsonTest {
     println(checkpoint)
   }
 
-  @Test def testIngress(): Unit = {
-    val json =
-      """
-        |[
-        |    {
-        |        "addresses":[
-        |            "192.168.0.1",
-        |            "192.168.0.2",
-        |            "192.168.0.3"
-        |        ],
-        |        "port":80,
-        |        "protocol":"HTTP",
-        |        "serviceName":"native-flink:statebackend12788-rest",
-        |        "ingressName":"native-flink:statebackend12788",
-        |        "hostname":"streampark.com",
-        |        "path":"/native-flink/statebackend12788/",
-        |        "allNodes":false
-        |    },
-        |    {
-        |        "addresses":[
-        |        ],
-        |        "port":80,
-        |        "protocol":"HTTP",
-        |        "serviceName":"native-flink:statebackend12788-rest",
-        |        "ingressName":"native-flink:statebackend12788",
-        |        "hostname":"streampark.com",
-        |        "path":"/native-flink/statebackend12788(/|$)(.*)",
-        |        "allNodes":false
-        |    }
-        |]
-        |""".stripMargin
-
-    val ingressMeta = new IngressMetaTestUtil().IngressMeta.as(json)
-    println(ingressMeta.get)
-  }
-
   @Test def testHistoryArchives(): Unit = {
 
     @transient
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
deleted file mode 100644
index 3bd8ff10b..000000000
--- 
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.kubernetes
-
-import org.json4s.{DefaultFormats, JArray, JsonAST}
-import org.json4s.jackson.JsonMethods.parse
-
-import scala.util.{Failure, Success, Try}
-
-class IngressMetaTestUtil {
-
-  case class IngressMeta(
-      addresses: List[String],
-      port: Integer,
-      protocol: String,
-      serviceName: String,
-      ingressName: String,
-      hostname: String,
-      path: String,
-      allNodes: Boolean)
-
-  object IngressMeta {
-
-    @transient implicit lazy val formats: DefaultFormats.type = 
org.json4s.DefaultFormats
-
-    def as(json: String): Option[List[IngressMeta]] = {
-      Try(parse(json)) match {
-        case Success(ok) =>
-          ok match {
-            case JArray(arr) =>
-              val list = arr.map(
-                x => {
-                  IngressMeta(
-                    addresses =
-                      (x \ 
"addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
-                    port = (x \ "port").extractOpt[Integer].getOrElse(0),
-                    protocol = (x \ "protocol").extractOpt[String].orNull,
-                    serviceName = (x \ 
"serviceName").extractOpt[String].orNull,
-                    ingressName = (x \ 
"ingressName").extractOpt[String].orNull,
-                    hostname = (x \ "hostname").extractOpt[String].orNull,
-                    path = (x \ "path").extractOpt[String].orNull,
-                    allNodes = (x \ 
"allNodes").extractOpt[Boolean].getOrElse(false)
-                  )
-                })
-              Some(list)
-            case _ => None
-          }
-        case Failure(_) => None
-      }
-    }
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 2129b13f4..92c1d9199 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -88,7 +88,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
         case FsOperator.hdfs =>
           val uploadFile = 
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
           if (fsOperator.exists(uploadFile)) {
-            Utils.tryWithResource(new FileInputStream(originFile))(
+            Utils.using(new FileInputStream(originFile))(
               inputStream => {
                 if (DigestUtils.md5Hex(inputStream) != 
fsOperator.fileMd5(uploadFile)) {
                   fsOperator.upload(originFile.getAbsolutePath, uploadFile)
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 3641735f9..56dede3e6 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -196,12 +196,11 @@ object FlinkShimsProxy extends Logger {
   @throws[Exception]
   def getObject[T](loader: ClassLoader, obj: Object): T = {
     val arrayOutputStream = new ByteArrayOutputStream
-    val result = Utils.tryWithResource(new 
ObjectOutputStream(arrayOutputStream))(
+    val result = Utils.using(new ObjectOutputStream(arrayOutputStream))(
       objectOutputStream => {
         objectOutputStream.writeObject(obj)
         val byteArrayInputStream = new 
ByteArrayInputStream(arrayOutputStream.toByteArray)
-        Utils.tryWithResource(new ClassLoaderObjectInputStream(loader, 
byteArrayInputStream))(
-          _.readObject)
+        Utils.using(new ClassLoaderObjectInputStream(loader, 
byteArrayInputStream))(_.readObject)
       })
     result.asInstanceOf[T]
   }

Reply via email to