This is an automated email from the ASF dual-hosted git repository.
kriszu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new a077b91c7 [Improve] ingress improvement (#2702)
a077b91c7 is described below
commit a077b91c776dc83405125040246e44a0a591a58b
Author: benjobs <[email protected]>
AuthorDate: Tue Apr 25 23:22:03 2023 +0800
[Improve] ingress improvement (#2702)
---
.../apache/streampark/common/util/FileUtils.scala | 2 +-
.../org/apache/streampark/common/util/Utils.scala | 12 +---
.../flink/kubernetes/KubernetesRetriever.scala | 6 +-
.../helper/KubernetesDeploymentHelper.scala | 12 ++--
.../flink/kubernetes/ingress/IngressStrategy.scala | 30 +++++++++-
.../kubernetes/ingress/IngressStrategyV1.scala | 56 +++++++-----------
.../ingress/IngressStrategyV1beta1.scala | 57 ++++++++----------
.../flink/kubernetes/FlinkRestJsonTest.scala | 36 ------------
.../flink/kubernetes/IngressMetaTestUtil.scala | 68 ----------------------
.../impl/FlinkYarnApplicationBuildPipeline.scala | 2 +-
.../streampark/flink/proxy/FlinkShimsProxy.scala | 5 +-
11 files changed, 86 insertions(+), 200 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index e9d3c68bb..708943098 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -44,7 +44,7 @@ object FileUtils {
if (input == null) {
throw new RuntimeException("The inputStream can not be null")
}
- Utils.tryWithResource(input) {
+ Utils.using(input) {
in =>
val b = new Array[Byte](4)
in.read(b, 0, b.length)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 9d696e545..8b6ce53ab 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -110,7 +110,7 @@ object Utils {
/*
* Mimicking the try-with-resource syntax of Java-8+
*/
- def tryWithResource[R, T <: AutoCloseable](handle: T)(func: T => R)(implicit
+ def using[R, T <: AutoCloseable](handle: T)(func: T => R)(implicit
excFunc: Throwable => R = null): R = {
try {
func(handle)
@@ -136,16 +136,6 @@ object Utils {
})
}
- def using[T <: AutoCloseable, V](resource: T)(block: T => V): V = {
- try {
- block(resource)
- } finally {
- if (resource != null) {
- resource.close()
- }
- }
- }
-
/**
* calculate the percentage of num1 / num2, the result range from 0 to 100,
with one small digit
* reserve.
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index d4e6ec0ed..17a5df525 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.kubernetes
import org.apache.streampark.common.util.{Logger, Utils}
-import org.apache.streampark.common.util.Utils.tryWithResource
+import org.apache.streampark.common.util.Utils.using
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model.ClusterKey
@@ -107,7 +107,7 @@ object KubernetesRetriever extends Logger {
* deployment namespace
*/
def isDeploymentExists(name: String, namespace: String): Boolean = {
- tryWithResource(KubernetesRetriever.newK8sClient()) {
+ using(KubernetesRetriever.newK8sClient()) {
client =>
client
.apps()
@@ -123,7 +123,7 @@ object KubernetesRetriever extends Logger {
/** retrieve flink jobManager rest url */
def retrieveFlinkRestUrl(clusterKey: ClusterKey): Option[String] = {
- Utils.tryWithResource(
+ Utils.using(
KubernetesRetriever
.newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace,
clusterKey.executeMode)
.getOrElse(return None)) {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 82bbfe8c3..59c82f471 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.kubernetes.helper
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
-import org.apache.streampark.common.util.Utils.tryWithResource
+import org.apache.streampark.common.util.Utils.using
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import com.google.common.base.Charsets
@@ -34,7 +34,7 @@ import scala.util.{Success, Try}
object KubernetesDeploymentHelper extends Logger {
private[this] def getPods(nameSpace: String, deploymentName: String):
List[Pod] = {
- tryWithResource(KubernetesRetriever.newK8sClient()) {
+ using(KubernetesRetriever.newK8sClient()) {
client =>
Try {
client.pods
@@ -68,7 +68,7 @@ object KubernetesDeploymentHelper extends Logger {
}
def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean
= {
- tryWithResource(KubernetesRetriever.newK8sClient()) {
+ using(KubernetesRetriever.newK8sClient()) {
client =>
Try {
val r = client.apps.deployments
@@ -90,7 +90,7 @@ object KubernetesDeploymentHelper extends Logger {
}
def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String):
String = {
- tryWithResource(KubernetesRetriever.newK8sClient()) {
+ using(KubernetesRetriever.newK8sClient()) {
client =>
val path = KubernetesDeploymentHelper.getJobLog(jobId)
val file = new File(path)
@@ -101,7 +101,7 @@ object KubernetesDeploymentHelper extends Logger {
}
def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId:
String): String = {
- tryWithResource(KubernetesRetriever.newK8sClient()) {
+ using(KubernetesRetriever.newK8sClient()) {
client =>
Try {
val podName = getPods(nameSpace, jobName).head.getMetadata.getName
@@ -120,7 +120,7 @@ object KubernetesDeploymentHelper extends Logger {
}
def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean
= {
- tryWithResource(KubernetesRetriever.newK8sClient()) {
+ using(KubernetesRetriever.newK8sClient()) {
client =>
Try {
val r = client
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index 4df560b96..81466ed9f 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.kubernetes.ingress
+import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.commons.io.FileUtils
import org.apache.flink.client.program.ClusterClient
@@ -24,10 +25,12 @@ import org.apache.flink.client.program.ClusterClient
import java.io.File
trait IngressStrategy {
+
def ingressUrlAddress(
nameSpace: String,
clusterId: String,
clusterClient: ClusterClient[_]): String
+
def configureIngress(domainName: String, clusterId: String, nameSpace:
String): Unit
def prepareIngressTemplateFiles(buildWorkspace: String, ingressTemplates:
String): String = {
@@ -58,8 +61,31 @@ trait IngressStrategy {
)
}
- def getDeployment(nameSpace: String, clusterId: String, client:
DefaultKubernetesClient) = {
-
client.apps().deployments().inNamespace(nameSpace).withName(clusterId).get()
+ def getOwnerReference(
+ nameSpace: String,
+ clusterId: String,
+ client: DefaultKubernetesClient): OwnerReference = {
+
+ val deployment = client
+ .apps()
+ .deployments()
+ .inNamespace(nameSpace)
+ .withName(clusterId)
+ .get()
+
+ require(
+ deployment != null,
+ s"Deployment with name $clusterId not found in namespace $nameSpace")
+
+ val uid = deployment.getMetadata.getUid
+ new OwnerReferenceBuilder()
+ .withApiVersion("apps/v1")
+ .withKind("Deployment")
+ .withName(clusterId)
+ .withUid(uid)
+ .withController(true)
+ .withBlockOwnerDeletion(true)
+ .build()
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 9bd40c2d1..234eaee56 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -17,14 +17,15 @@
package org.apache.streampark.flink.kubernetes.ingress
-import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder
+import org.apache.streampark.common.util.Utils
+
import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.flink.client.program.ClusterClient
import scala.collection.JavaConverters._
import scala.language.postfixOps
-import scala.util.{Success, Try}
+import scala.util.Try
class IngressStrategyV1 extends IngressStrategy {
@@ -32,40 +33,26 @@ class IngressStrategyV1 extends IngressStrategy {
nameSpace: String,
clusterId: String,
clusterClient: ClusterClient[_]): String = {
- val client = new DefaultKubernetesClient
- val fromV1 =
-
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
- .map(ingress => ingress.getSpec.getRules.get(0))
- .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- Try(
- fromV1
- .map { case (host, path) => s"https://$host$path" }
- .getOrElse(clusterClient.getWebInterfaceURL)
- ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress
error."))
- }
- override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
- Try(new DefaultKubernetesClient) match {
- case Success(client) =>
- val deployment = getDeployment(nameSpace, clusterId, client)
-
- val deploymentUid = if (deployment != null) {
- deployment.getMetadata.getUid
- } else {
- throw new RuntimeException(
- s"Deployment with name $clusterId not found in namespace
$nameSpace")
- }
+ Utils.using(new DefaultKubernetesClient) {
+ client =>
+ val hosts =
+
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+ .map(ingress => ingress.getSpec.getRules.get(0))
+ .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
+ Try(
+ hosts
+ .map { case (host, path) => s"https://$host$path" }
+ .getOrElse(clusterClient.getWebInterfaceURL)
+ ).getOrElse(throw new RuntimeException("[StreamPark] get
ingressUrlAddress error."))
+ }
- // Create OwnerReference object
- val ownerReference = new OwnerReferenceBuilder()
- .withApiVersion("apps/v1")
- .withKind("Deployment")
- .withName(clusterId)
- .withUid(deploymentUid)
- .withController(true)
- .withBlockOwnerDeletion(true)
- .build()
+ }
+ override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
+ Utils.using(new DefaultKubernetesClient) {
+ client =>
+ val ownerReference = getOwnerReference(nameSpace, clusterId, client)
val ingress = new IngressBuilder()
.withNewMetadata()
.withName(clusterId)
@@ -104,9 +91,8 @@ class IngressStrategyV1 extends IngressStrategy {
.endHttp()
.endRule()
.endSpec()
- .build();
+ .build()
client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
- case _ =>
}
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index f3d5a9189..1d4891db8 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -17,14 +17,16 @@
package org.apache.streampark.flink.kubernetes.ingress
-import io.fabric8.kubernetes.api.model.{IntOrString, OwnerReferenceBuilder}
+import org.apache.streampark.common.util.Utils
+
+import io.fabric8.kubernetes.api.model.IntOrString
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.flink.client.program.ClusterClient
import scala.collection.JavaConverters._
import scala.language.postfixOps
-import scala.util.{Success, Try}
+import scala.util.Try
class IngressStrategyV1beta1 extends IngressStrategy {
@@ -32,41 +34,28 @@ class IngressStrategyV1beta1 extends IngressStrategy {
nameSpace: String,
clusterId: String,
clusterClient: ClusterClient[_]): String = {
- val client = new DefaultKubernetesClient
- // for kubernetes 1.19-
- val fromV1beta1 =
-
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
- .map(ingress => ingress.getSpec.getRules.get(0))
- .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- Try(
- fromV1beta1
- .map { case (host, path) => s"https://$host$path" }
- .getOrElse(clusterClient.getWebInterfaceURL)
- ).getOrElse(throw new RuntimeException("[StreamPark] get ingressUrlAddress
error."))
- }
- override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
- Try(new DefaultKubernetesClient) match {
- case Success(client) =>
- val deployment = getDeployment(nameSpace, clusterId, client)
+ Utils.using(new DefaultKubernetesClient) {
+ client =>
+ // for kubernetes 1.19-
+ val hosts =
+
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+ .map(ingress => ingress.getSpec.getRules.get(0))
+ .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- val deploymentUid = if (deployment != null) {
- deployment.getMetadata.getUid
- } else {
- throw new RuntimeException(
- s"Deployment with name $clusterId not found in namespace
$nameSpace")
- }
+ Try(
+ hosts
+ .map { case (host, path) => s"https://$host$path" }
+ .getOrElse(clusterClient.getWebInterfaceURL)
+ ).getOrElse(throw new RuntimeException("[StreamPark] get
ingressUrlAddress error."))
- // Create OwnerReference object
- val ownerReference = new OwnerReferenceBuilder()
- .withApiVersion("apps/v1")
- .withKind("Deployment")
- .withName(clusterId)
- .withUid(deploymentUid)
- .withController(true)
- .withBlockOwnerDeletion(true)
- .build()
+ }
+ }
+ override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
+ Utils.using(new DefaultKubernetesClient) {
+ client =>
+ val ownerReference = getOwnerReference(nameSpace, clusterId, client)
val ingress = new IngressBuilder()
.withNewMetadata()
.withName(clusterId)
@@ -96,8 +85,8 @@ class IngressStrategyV1beta1 extends IngressStrategy {
.endRule()
.endSpec()
.build()
+
client.network.ingress.inNamespace(nameSpace).create(ingress)
- case _ =>
}
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index 24abbcdfb..ce431e3a2 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -263,42 +263,6 @@ class FlinkRestJsonTest {
println(checkpoint)
}
- @Test def testIngress(): Unit = {
- val json =
- """
- |[
- | {
- | "addresses":[
- | "192.168.0.1",
- | "192.168.0.2",
- | "192.168.0.3"
- | ],
- | "port":80,
- | "protocol":"HTTP",
- | "serviceName":"native-flink:statebackend12788-rest",
- | "ingressName":"native-flink:statebackend12788",
- | "hostname":"streampark.com",
- | "path":"/native-flink/statebackend12788/",
- | "allNodes":false
- | },
- | {
- | "addresses":[
- | ],
- | "port":80,
- | "protocol":"HTTP",
- | "serviceName":"native-flink:statebackend12788-rest",
- | "ingressName":"native-flink:statebackend12788",
- | "hostname":"streampark.com",
- | "path":"/native-flink/statebackend12788(/|$)(.*)",
- | "allNodes":false
- | }
- |]
- |""".stripMargin
-
- val ingressMeta = new IngressMetaTestUtil().IngressMeta.as(json)
- println(ingressMeta.get)
- }
-
@Test def testHistoryArchives(): Unit = {
@transient
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
deleted file mode 100644
index 3bd8ff10b..000000000
---
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/IngressMetaTestUtil.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.flink.kubernetes
-
-import org.json4s.{DefaultFormats, JArray, JsonAST}
-import org.json4s.jackson.JsonMethods.parse
-
-import scala.util.{Failure, Success, Try}
-
-class IngressMetaTestUtil {
-
- case class IngressMeta(
- addresses: List[String],
- port: Integer,
- protocol: String,
- serviceName: String,
- ingressName: String,
- hostname: String,
- path: String,
- allNodes: Boolean)
-
- object IngressMeta {
-
- @transient implicit lazy val formats: DefaultFormats.type =
org.json4s.DefaultFormats
-
- def as(json: String): Option[List[IngressMeta]] = {
- Try(parse(json)) match {
- case Success(ok) =>
- ok match {
- case JArray(arr) =>
- val list = arr.map(
- x => {
- IngressMeta(
- addresses =
- (x \
"addresses").extractOpt[List[String]].getOrElse(List.empty[String]),
- port = (x \ "port").extractOpt[Integer].getOrElse(0),
- protocol = (x \ "protocol").extractOpt[String].orNull,
- serviceName = (x \
"serviceName").extractOpt[String].orNull,
- ingressName = (x \
"ingressName").extractOpt[String].orNull,
- hostname = (x \ "hostname").extractOpt[String].orNull,
- path = (x \ "path").extractOpt[String].orNull,
- allNodes = (x \
"allNodes").extractOpt[Boolean].getOrElse(false)
- )
- })
- Some(list)
- case _ => None
- }
- case Failure(_) => None
- }
- }
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index 2129b13f4..92c1d9199 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -88,7 +88,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
case FsOperator.hdfs =>
val uploadFile =
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
if (fsOperator.exists(uploadFile)) {
- Utils.tryWithResource(new FileInputStream(originFile))(
+ Utils.using(new FileInputStream(originFile))(
inputStream => {
if (DigestUtils.md5Hex(inputStream) !=
fsOperator.fileMd5(uploadFile)) {
fsOperator.upload(originFile.getAbsolutePath, uploadFile)
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 3641735f9..56dede3e6 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -196,12 +196,11 @@ object FlinkShimsProxy extends Logger {
@throws[Exception]
def getObject[T](loader: ClassLoader, obj: Object): T = {
val arrayOutputStream = new ByteArrayOutputStream
- val result = Utils.tryWithResource(new
ObjectOutputStream(arrayOutputStream))(
+ val result = Utils.using(new ObjectOutputStream(arrayOutputStream))(
objectOutputStream => {
objectOutputStream.writeObject(obj)
val byteArrayInputStream = new
ByteArrayInputStream(arrayOutputStream.toByteArray)
- Utils.tryWithResource(new ClassLoaderObjectInputStream(loader,
byteArrayInputStream))(
- _.readObject)
+ Utils.using(new ClassLoaderObjectInputStream(loader,
byteArrayInputStream))(_.readObject)
})
result.asInstanceOf[T]
}