This is an automated email from the ASF dual-hosted git repository.

linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 77c4237f8 [Improve] FileUtils minor improvement (#2981)
77c4237f8 is described below

commit 77c4237f86a26b59ccf107bea1da085e03b0d8db
Author: benjobs <[email protected]>
AuthorDate: Fri Sep 1 09:31:14 2023 -0500

    [Improve] FileUtils minor improvement (#2981)
    
    * [Improve] FileUtils minor improvement
    
    * minor improvement
    
    * minor improvement
    
    * Implicits improvement
    
    * check style improvement
    
    * minor improvement
    
    * minor bug fixed.
---
 .../streampark/common/conf/ConfigOption.scala      |   2 +-
 .../common/conf/InternalConfigHolder.scala         |   2 +-
 .../apache/streampark/common/conf/Workspace.scala  |   2 +-
 .../apache/streampark/common/util/FileUtils.scala  | 147 ++++++++++++++++++---
 .../streampark/common/util/HadoopUtils.scala       |   5 +-
 .../streampark/common/util/ImplicitsUtils.scala    |  62 +++++++++
 .../org/apache/streampark/common/util/Logger.scala |   2 +-
 .../org/apache/streampark/common/util/Utils.scala  |  79 +++--------
 .../streampark/console/base/util/FileUtils.java    | 107 ---------------
 .../core/service/impl/AppBuildPipeServiceImpl.java |   2 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  14 +-
 .../core/service/impl/ProjectServiceImpl.java      |   2 +-
 .../console/base/util/FileUtilsTest.java           |   2 +
 .../flink/client/trait/YarnClientTrait.scala       |  37 +++---
 .../flink/kubernetes/KubernetesRetriever.scala     |  51 +++----
 .../helper/KubernetesDeploymentHelper.scala        | 136 ++++++++++---------
 .../kubernetes/ingress/IngressController.scala     |   8 +-
 .../kubernetes/ingress/IngressStrategyV1.scala     |  15 +--
 .../ingress/IngressStrategyV1beta1.scala           |  14 +-
 .../impl/FlinkYarnApplicationBuildPipeline.scala   |   4 +-
 .../streampark/flink/proxy/FlinkShimsProxy.scala   |  16 ++-
 .../flink/core/FlinkStreamingInitializer.scala     |   2 +-
 22 files changed, 372 insertions(+), 339 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala
index 500fcf3f6..22ac7de64 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.common.conf
 
-import org.apache.streampark.common.util.Utils.StringCasts
+import org.apache.streampark.common.util.ImplicitsUtils._
 
 import java.util.Properties
 
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
index 73c23a4db..5f00c2088 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.common.conf
 
 import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
-import org.apache.streampark.common.util.Utils.StringCasts
+import org.apache.streampark.common.util.ImplicitsUtils._
 
 import javax.annotation.{Nonnull, Nullable}
 
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index d914780a1..8f3e7b10c 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -18,7 +18,7 @@ package org.apache.streampark.common.conf
 
 import org.apache.streampark.common.enums.StorageType
 import org.apache.streampark.common.util.{HdfsUtils, SystemPropertyUtils}
-import org.apache.streampark.common.util.Utils.StringCasts
+import org.apache.streampark.common.util.ImplicitsUtils._
 
 import java.net.URI
 
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 719715e22..1522a4e8a 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -16,10 +16,16 @@
  */
 package org.apache.streampark.common.util
 
+import org.apache.streampark.common.util.ImplicitsUtils._
+
 import java.io._
 import java.net.URL
+import java.nio.ByteBuffer
+import java.nio.channels.Channels
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
 import java.util
-import java.util.Scanner
+import java.util.stream.Collectors
 
 import scala.collection.convert.ImplicitConversions._
 import scala.collection.mutable
@@ -44,12 +50,12 @@ object FileUtils {
     if (input == null) {
       throw new RuntimeException("The inputStream can not be null")
     }
-    Utils.using(input) {
-      in =>
+    input.autoClose(
+      in => {
         val b = new Array[Byte](4)
         in.read(b, 0, b.length)
         bytesToHexString(b)
-    } == "504B0304"
+      }) == "504B0304"
   }
 
   def isJarFileType(file: File): Boolean = {
@@ -73,12 +79,6 @@ object FileUtils {
       s"[StreamPark] Failed to create directory within $TEMP_DIR_ATTEMPTS  
attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})")
   }
 
-  def exists(path: String): Unit = {
-    require(
-      path != null && path.nonEmpty && new File(path).exists(),
-      s"[StreamPark] FileUtils.exists: file $path is not exist!")
-  }
-
   def getPathFromEnv(env: String): String = {
     val path = Option(System.getenv(env)).getOrElse(System.getProperty(env))
     require(
@@ -115,6 +115,14 @@ object FileUtils {
     }
   }
 
+  def exists(file: Serializable): Boolean = {
+    file match {
+      case null => false
+      case f: File => f.exists()
+      case p => new File(p.toString).exists()
+    }
+  }
+
   def equals(file1: File, file2: File): Boolean = {
     (file1, file2) match {
       case (a, b) if a == null || b == null => false
@@ -143,20 +151,115 @@ object FileUtils {
   }
 
   @throws[IOException]
-  def readString(file: File): String = {
-    require(file != null && file.isFile)
-    val reader = new FileReader(file)
-    val scanner = new Scanner(reader)
-    val buffer = new mutable.StringBuilder()
-    if (scanner.hasNextLine) {
-      buffer.append(scanner.nextLine())
+  def readInputStream(in: InputStream, array: Array[Byte]): Unit = {
+    in.autoClose(
+      is => {
+        var toRead = array.length
+        var ret = 0
+        var off = 0
+        while (toRead > 0) {
+          ret = is.read(array, off, toRead)
+          if (ret < 0) throw new IOException("Bad inputStream, premature EOF")
+          toRead -= ret
+          off += ret
+        }
+      })
+  }
+
+  @throws[IOException]
+  def readFile(file: File): String = {
+    if (file.length >= Int.MaxValue) {
+      throw new IOException("Too large file, unexpected!")
+    } else {
+      val len = file.length
+      val array = new Array[Byte](len.toInt)
+      Files
+        .newInputStream(file.toPath)
+        .autoClose(
+          is => {
+            readInputStream(is, array)
+            new String(array, StandardCharsets.UTF_8)
+          })
     }
-    while (scanner.hasNextLine) {
-      buffer.append("\r\n")
-      buffer.append(scanner.nextLine())
+  }
+
+  @throws[IOException]
+  def writeFile(content: String, file: File): Unit = {
+    val outputStream = Files.newOutputStream(file.toPath)
+    val channel = Channels.newChannel(outputStream)
+    val buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))
+    channel.write(buffer)
+    Utils.close(channel, outputStream)
+  }
+
+  @throws[IOException]
+  def readEndOfFile(file: File, maxSize: Long): Array[Byte] = {
+    var readSize = maxSize
+    new RandomAccessFile(file, "r").autoClose(
+      raFile => {
+        if (raFile.length > maxSize) {
+          raFile.seek(raFile.length - maxSize)
+        } else if (raFile.length < maxSize) {
+          readSize = raFile.length.toInt
+        }
+        val fileContent = new Array[Byte](readSize.toInt)
+        raFile.read(fileContent)
+        fileContent
+      })
+  }
+
+  /**
+   * Read the content of a file from a specified offset.
+   *
+   * @param file
+   *   The file to read from
+   * @param startOffset
+   *   The offset from where to start reading the file
+   * @param maxSize
+   *   The maximum size of the file to read
+   * @return
+   *   The content of the file as a byte array
+   * @throws IOException
+   *   if an I/O error occurs while reading the file
+   * @throws IllegalArgumentException
+   *   if the startOffset is greater than the file length
+   */
+  @throws[IOException]
+  def readFileFromOffset(file: File, startOffset: Long, maxSize: Long): 
Array[Byte] = {
+    if (file.length < startOffset) {
+      throw new IllegalArgumentException(
+        s"The startOffset $startOffset is great than the file length 
${file.length}")
+    }
+    new RandomAccessFile(file, "r").autoClose(
+      raFile => {
+        val readSize = Math.min(maxSize, file.length - startOffset)
+        raFile.seek(startOffset)
+        val fileContent = new Array[Byte](readSize.toInt)
+        raFile.read(fileContent)
+        fileContent
+      })
+  }
+
+  /**
+   * Roll View Log.
+   *
+   * @param path
+   *   The file path.
+   * @param offset
+   *   The offset.
+   * @param limit
+   *   The limit.
+   * @return
+   *   The content of the file.
+   */
+  def tailOf(path: String, offset: Int, limit: Int): String = try {
+    val file = new File(path)
+    if (file.exists && file.isFile) {
+      Files
+        .lines(Paths.get(path))
+        .autoClose(stream => 
stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n")))
     }
-    Utils.close(scanner, reader)
-    buffer.toString()
+    null
   }
 
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index b9823d8f0..715afa00d 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -137,7 +137,10 @@ object HadoopUtils extends Logger {
 
   def getConfigurationFromHadoopConfDir(confDir: String = hadoopConfDir): 
Configuration = {
     if (!configurationCache.containsKey(confDir)) {
-      FileUtils.exists(confDir)
+      if (!FileUtils.exists(confDir)) {
+        throw new ExceptionInInitializerError(
+          s"[StreamPark] hadoop conf file " + confDir + " is not exist!")
+      }
       val hadoopConfDir = new File(confDir)
       val confName = List("hdfs-default.xml", "core-site.xml", 
"hdfs-site.xml", "yarn-site.xml")
       val files =
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala
new file mode 100644
index 000000000..cfcb95107
--- /dev/null
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.common.util
+
+import org.apache.streampark.common.util.Utils.close
+
+import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble, 
Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}
+
+object ImplicitsUtils {
+
+  implicit class AutoCloseImplicits[T <: AutoCloseable](autoCloseable: T) {
+
+    implicit def autoClose[R](func: T => R)(implicit excFunc: Throwable => R = 
null): R = {
+      try {
+        func(autoCloseable)
+      } catch {
+        case e: Throwable if excFunc != null => excFunc(e)
+      } finally {
+        close(autoCloseable)
+      }
+    }
+
+  }
+
+  implicit class StringImplicits(v: String) {
+    def cast[T](classType: Class[_]): T = {
+      classType match {
+        case c if c == classOf[String] => v.asInstanceOf[T]
+        case c if c == classOf[Byte] => v.toByte.asInstanceOf[T]
+        case c if c == classOf[Int] => v.toInt.asInstanceOf[T]
+        case c if c == classOf[Long] => v.toLong.asInstanceOf[T]
+        case c if c == classOf[Float] => v.toFloat.asInstanceOf[T]
+        case c if c == classOf[Double] => v.toDouble.asInstanceOf[T]
+        case c if c == classOf[Short] => v.toShort.asInstanceOf[T]
+        case c if c == classOf[Boolean] => v.toBoolean.asInstanceOf[T]
+        case c if c == classOf[JavaByte] => v.toByte.asInstanceOf[T]
+        case c if c == classOf[JavaInt] => JavaInt.valueOf(v).asInstanceOf[T]
+        case c if c == classOf[JavaLong] => JavaLong.valueOf(v).asInstanceOf[T]
+        case c if c == classOf[JavaFloat] => 
JavaFloat.valueOf(v).asInstanceOf[T]
+        case c if c == classOf[JavaDouble] => 
JavaDouble.valueOf(v).asInstanceOf[T]
+        case c if c == classOf[JavaShort] => 
JavaShort.valueOf(v).asInstanceOf[T]
+        case c if c == classOf[JavaBool] => JavaBool.valueOf(v).asInstanceOf[T]
+        case _ =>
+          throw new IllegalArgumentException(s"Unsupported type: $classType")
+      }
+    }
+  }
+}
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index 583fc91f2..0b97498b1 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -140,7 +140,7 @@ private[this] object LoggerFactory extends 
LoggerFactoryBinder {
         val configurator = new JoranConfigurator()
         configurator.setContext(loggerContext)
         val text = FileUtils
-          .readString(new File(path))
+          .readFile(new File(path))
           .replaceAll("org.slf4j", s"$shadedPackage.org.slf4j")
           .replaceAll("ch.qos.logback", s"$shadedPackage.ch.qos.logback")
           .replaceAll("org.apache.log4j", s"$shadedPackage.org.apache.log4j")
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 98ec6ce6c..30534cf5d 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -16,10 +16,11 @@
  */
 package org.apache.streampark.common.util
 
+import org.apache.streampark.common.util.ImplicitsUtils._
+
 import org.apache.commons.lang3.StringUtils
 
-import java.io.{BufferedInputStream, File, FileInputStream, IOException, 
PrintWriter, StringWriter}
-import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble, 
Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}
+import java.io._
 import java.net.URL
 import java.util.{jar, Collection => JavaCollection, Map => JavaMap, 
Properties, UUID}
 import java.util.jar.{JarFile, JarInputStream}
@@ -107,35 +108,6 @@ object Utils {
   /** if any blank strings exist */
   def isAnyBank(items: String*): Boolean = items == null || 
items.exists(StringUtils.isBlank)
 
-  /*
-   * Mimicking the try-with-resource syntax of Java-8+
-   */
-  def using[R, T <: AutoCloseable](handle: T)(func: T => R)(implicit
-      excFunc: Throwable => R = null): R = {
-    try {
-      func(handle)
-    } catch {
-      case e: Throwable if excFunc != null => excFunc(e)
-    } finally {
-      if (handle != null) {
-        handle.close()
-      }
-    }
-  }
-
-  def close(closeable: AutoCloseable*)(implicit func: Throwable => Unit = 
null): Unit = {
-    closeable.foreach(
-      c => {
-        try {
-          if (c != null) {
-            c.close()
-          }
-        } catch {
-          case e: Throwable if func != null => func(e)
-        }
-      })
-  }
-
   /**
    * calculate the percentage of num1 / num2, the result range from 0 to 100, 
with one small digit
    * reserve.
@@ -159,38 +131,31 @@ object Utils {
     else {
       try {
         val stm = new StringWriter
-        val wrt = new PrintWriter(stm)
-        e.printStackTrace(wrt)
-        wrt.close()
-        stm.toString
+        new PrintWriter(stm).autoClose {
+          writer =>
+            e.printStackTrace(writer)
+            stm.toString
+        }
       } catch {
         case _: Throwable => e.getClass.getName + " (error while printing 
stack trace)"
       }
     }
   }
 
-  implicit class StringCasts(v: String) {
-    def cast[T](classType: Class[_]): T = {
-      classType match {
-        case c if c == classOf[String] => v.asInstanceOf[T]
-        case c if c == classOf[Byte] => v.toByte.asInstanceOf[T]
-        case c if c == classOf[Int] => v.toInt.asInstanceOf[T]
-        case c if c == classOf[Long] => v.toLong.asInstanceOf[T]
-        case c if c == classOf[Float] => v.toFloat.asInstanceOf[T]
-        case c if c == classOf[Double] => v.toDouble.asInstanceOf[T]
-        case c if c == classOf[Short] => v.toShort.asInstanceOf[T]
-        case c if c == classOf[Boolean] => v.toBoolean.asInstanceOf[T]
-        case c if c == classOf[JavaByte] => v.toByte.asInstanceOf[T]
-        case c if c == classOf[JavaInt] => JavaInt.valueOf(v).asInstanceOf[T]
-        case c if c == classOf[JavaLong] => JavaLong.valueOf(v).asInstanceOf[T]
-        case c if c == classOf[JavaFloat] => 
JavaFloat.valueOf(v).asInstanceOf[T]
-        case c if c == classOf[JavaDouble] => 
JavaDouble.valueOf(v).asInstanceOf[T]
-        case c if c == classOf[JavaShort] => 
JavaShort.valueOf(v).asInstanceOf[T]
-        case c if c == classOf[JavaBool] => JavaBool.valueOf(v).asInstanceOf[T]
-        case _ =>
-          throw new IllegalArgumentException(s"Unsupported type: $classType")
-      }
-    }
+  def close(closeable: AutoCloseable*)(implicit func: Throwable => Unit = 
null): Unit = {
+    closeable.foreach(
+      c => {
+        try {
+          if (c != null) {
+            if (c.isInstanceOf[Flushable]) {
+              c.asInstanceOf[Flushable].flush()
+            }
+            c.close()
+          }
+        } catch {
+          case e: Throwable if func != null => func(e)
+        }
+      })
   }
 
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
deleted file mode 100644
index 4aaa624f2..000000000
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.base.util;
-
-import org.apache.streampark.console.base.exception.ApiDetailException;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** The file utils. */
-public class FileUtils {
-
-  private FileUtils() {}
-
-  /**
-   * Reads the last portion of a file as a byte array.
-   *
-   * @param file the file to read
-   * @param maxSize the maximum number of bytes to read from the end of the 
file
-   * @return the byte array containing the content read from the file
-   * @throws IOException if an I/O error occurs
-   */
-  public static byte[] readEndOfFile(File file, long maxSize) throws 
IOException {
-    long readSize = maxSize;
-    byte[] fileContent;
-    try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) {
-      if (raFile.length() > maxSize) {
-        raFile.seek(raFile.length() - maxSize);
-      } else if (raFile.length() < maxSize) {
-        readSize = (int) raFile.length();
-      }
-      fileContent = new byte[(int) readSize];
-      raFile.read(fileContent);
-    }
-    return fileContent;
-  }
-
-  /**
-   * Read the content of a file from a specified offset.
-   *
-   * @param file The file to read from
-   * @param startOffset The offset from where to start reading the file
-   * @param maxSize The maximum size of the file to read
-   * @return The content of the file as a byte array
-   * @throws IOException if an I/O error occurs while reading the file
-   * @throws IllegalArgumentException if the startOffset is greater than the 
file length
-   */
-  public static byte[] readFileFromOffset(File file, long startOffset, long 
maxSize)
-      throws IOException {
-    if (file.length() < startOffset) {
-      throw new IllegalArgumentException(
-          String.format(
-              "The startOffset %s is great than the file length %s", 
startOffset, file.length()));
-    }
-    byte[] fileContent;
-    try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) {
-      long readSize = Math.min(maxSize, file.length() - startOffset);
-      raFile.seek(startOffset);
-      fileContent = new byte[(int) readSize];
-      raFile.read(fileContent);
-    }
-    return fileContent;
-  }
-
-  /**
-   * Roll View Log.
-   *
-   * @param path The file path.
-   * @param offset The offset.
-   * @param limit The limit.
-   * @return The content of the file.
-   * @throws ApiDetailException if there's an error rolling the view log.
-   */
-  public static String rollViewLog(String path, int offset, int limit) {
-    try {
-      File file = new File(path);
-      if (file.exists() && file.isFile()) {
-        try (Stream<String> stream = Files.lines(Paths.get(path))) {
-          return 
stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"));
-        }
-      }
-      return null;
-    } catch (Exception e) {
-      throw new ApiDetailException("roll view log error: " + e);
-    }
-  }
-}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index f9017a724..a20f5a08c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -424,7 +424,7 @@ public class AppBuildPipeServiceImpl
     FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
     String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
 
-    if (!new File(flinkUserJar).exists()) {
+    if (!FileUtils.exists(flinkUserJar)) {
       Resource resource = resourceService.findByResourceName(app.getTeamId(), 
app.getJar());
       if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
         flinkUserJar = resource.getFilePath();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9e970408f..c76540eb4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.fs.LfsOperator;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.FileUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.common.util.ThreadUtils;
@@ -97,7 +98,6 @@ import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CoreOptions;
@@ -129,7 +129,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collection;
@@ -619,9 +618,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 if (!future.isDone()) {
                   future.cancel(true);
                 }
-                if (path != null) {
-                  return 
org.apache.streampark.console.base.util.FileUtils.rollViewLog(
-                      path, offset, limit);
+                if (FileUtils.exists(path)) {
+                  return FileUtils.tailOf(path, offset, limit);
                 }
                 return null;
               })
@@ -738,7 +736,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           jarPath = resource.getFilePath();
         }
       }
-      appParam.setJarCheckSum(FileUtils.checksumCRC32(new File(jarPath)));
+      
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new 
File(jarPath)));
     }
 
     if (save(appParam)) {
@@ -866,7 +864,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
         if (jarFile.exists()) {
           try {
-            long checkSum = FileUtils.checksumCRC32(jarFile);
+            long checkSum = 
org.apache.commons.io.FileUtils.checksumCRC32(jarFile);
             if (!Objects.equals(checkSum, application.getJarCheckSum())) {
               application.setBuild(true);
             }
@@ -1144,7 +1142,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   @Override
   public String readConf(Application appParam) throws IOException {
     File file = new File(appParam.getConfig());
-    String conf = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
+    String conf = FileUtils.readFile(file);
     return Base64.getEncoder().encodeToString(conf.getBytes());
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index e568d976b..d48228706 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.util.CompletableFutureUtils;
+import org.apache.streampark.common.util.FileUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.ResponseCode;
@@ -28,7 +29,6 @@ import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
-import org.apache.streampark.console.base.util.FileUtils;
 import org.apache.streampark.console.base.util.GZipUtils;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.Project;
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
index 57db8581c..d31083327 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.console.base.util;
 
+import org.apache.streampark.common.util.FileUtils;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 0ce7cb9c7..6cc667c6e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.client.`trait`
 
+import org.apache.streampark.common.util.ImplicitsUtils._
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.bean._
 
@@ -43,25 +44,25 @@ trait YarnClientTrait extends FlinkClientTrait {
       flinkConf: Configuration,
       actionFunc: (JobID, ClusterClient[_]) => O): O = {
 
-    Utils.using {
-      flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
-      val clusterClientFactory = new YarnClusterClientFactory
-      val applicationId = clusterClientFactory.getClusterId(flinkConf)
-      if (applicationId == null) {
-        throw new FlinkException(
-          "[StreamPark] getClusterClient error. No cluster id was specified. 
Please specify a cluster to which you would like to connect.")
-      }
-      val clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(flinkConf)
-      clusterDescriptor.retrieve(applicationId).getClusterClient
-    } {
-      client =>
-        Try(actionFunc(getJobID(request.jobId), client)).recover {
-          case e =>
-            throw new FlinkException(
-              s"[StreamPark] Do ${request.getClass.getSimpleName} for the job 
${request.jobId} failed. " +
-                s"detail: ${Utils.stringifyException(e)}");
-        }.get
+    flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+    val clusterClientFactory = new YarnClusterClientFactory
+    val applicationId = clusterClientFactory.getClusterId(flinkConf)
+    if (applicationId == null) {
+      throw new FlinkException(
+        "[StreamPark] getClusterClient error. No cluster id was specified. 
Please specify a cluster to which you would like to connect.")
     }
+    val clusterDescriptor = 
clusterClientFactory.createClusterDescriptor(flinkConf)
+    clusterDescriptor
+      .retrieve(applicationId)
+      .getClusterClient
+      .autoClose(
+        client =>
+          Try(actionFunc(getJobID(request.jobId), client)).recover {
+            case e =>
+              throw new FlinkException(
+                s"[StreamPark] Do ${request.getClass.getSimpleName} for the 
job ${request.jobId} failed. " +
+                  s"detail: ${Utils.stringifyException(e)}");
+          }.get)
   }
 
   override def doTriggerSavepoint(
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 01b86f1ca..7418e8c50 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -18,8 +18,8 @@
 package org.apache.streampark.flink.kubernetes
 
 import org.apache.streampark.common.conf.ConfigConst
-import org.apache.streampark.common.util.{Logger, Utils}
-import org.apache.streampark.common.util.Utils.using
+import org.apache.streampark.common.util.ImplicitsUtils._
+import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
 import org.apache.streampark.flink.kubernetes.ingress.IngressController
 import org.apache.streampark.flink.kubernetes.model.ClusterKey
@@ -108,32 +108,33 @@ object KubernetesRetriever extends Logger {
    *   deployment namespace
    */
   def isDeploymentExists(name: String, namespace: String): Boolean = {
-    using(KubernetesRetriever.newK8sClient()) {
-      client =>
-        client
-          .apps()
-          .deployments()
-          .inNamespace(namespace)
-          .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL)
-          .list()
-          .getItems
-          .asScala
-          .exists(e => e.getMetadata.getName == name)
-    }(_ => false)
+    KubernetesRetriever
+      .newK8sClient()
+      .autoClose(
+        client =>
+          client
+            .apps()
+            .deployments()
+            .inNamespace(namespace)
+            .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL)
+            .list()
+            .getItems
+            .asScala
+            .exists(e => e.getMetadata.getName == name))(_ => false)
   }
 
   /** retrieve flink jobManager rest url */
   def retrieveFlinkRestUrl(clusterKey: ClusterKey): Option[String] = {
-    Utils.using(
-      KubernetesRetriever
-        .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace, 
clusterKey.executeMode)
-        .getOrElse(return None)) {
-      client =>
-        val url =
-          IngressController.ingressUrlAddress(clusterKey.namespace, 
clusterKey.clusterId, client)
-        logger.info(s"retrieve flink jobManager rest url: $url")
-        client.close()
-        Some(url)
-    }
+    KubernetesRetriever
+      .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace, 
clusterKey.executeMode)
+      .getOrElse(return None)
+      .autoClose(
+        client => {
+          val url =
+            IngressController.ingressUrlAddress(clusterKey.namespace, 
clusterKey.clusterId, client)
+          logger.info(s"retrieve flink jobManager rest url: $url")
+          Some(url)
+        })
   }
+
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 8e622fe04..ff2dc7161 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.kubernetes.helper
 
 import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
-import org.apache.streampark.common.util.Utils.using
+import org.apache.streampark.common.util.ImplicitsUtils._
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 
 import com.google.common.base.Charsets
@@ -34,25 +34,26 @@ import scala.util.{Success, Try}
 object KubernetesDeploymentHelper extends Logger {
 
   private[this] def getPods(nameSpace: String, deploymentName: String): 
List[Pod] = {
-    using(KubernetesRetriever.newK8sClient()) {
-      client =>
-        Try {
-          client.pods
-            .inNamespace(nameSpace)
-            .withLabels {
-              client.apps.deployments
-                .inNamespace(nameSpace)
-                .withName(deploymentName)
-                .get
-                .getSpec
-                .getSelector
-                .getMatchLabels
-            }
-            .list
-            .getItems
-            .toList
-        }.getOrElse(List.empty[Pod])
-    }
+    KubernetesRetriever
+      .newK8sClient()
+      .autoClose(
+        client =>
+          Try {
+            client.pods
+              .inNamespace(nameSpace)
+              .withLabels {
+                client.apps.deployments
+                  .inNamespace(nameSpace)
+                  .withName(deploymentName)
+                  .get
+                  .getSpec
+                  .getSelector
+                  .getMatchLabels
+              }
+              .list
+              .getItems
+              .toList
+          }.getOrElse(List.empty[Pod]))
   }
 
   def getDeploymentStatusChanges(nameSpace: String, deploymentName: String): 
Boolean = {
@@ -74,16 +75,17 @@ object KubernetesDeploymentHelper extends Logger {
   }
 
   def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean 
= {
-    using(KubernetesRetriever.newK8sClient()) {
-      client =>
-        Try {
-          val r = client.apps.deployments
-            .inNamespace(nameSpace)
-            .withName(deploymentName)
-            .delete
-          Boolean.unbox(r)
-        }.getOrElse(false)
-    }
+    KubernetesRetriever
+      .newK8sClient()
+      .autoClose(
+        client =>
+          Try {
+            val r = client.apps.deployments
+              .inNamespace(nameSpace)
+              .withName(deploymentName)
+              .delete
+            Boolean.unbox(r)
+          }.getOrElse(false))
   }
 
   def isTheK8sConnectionNormal(): Boolean = {
@@ -96,47 +98,51 @@ object KubernetesDeploymentHelper extends Logger {
   }
 
   def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): 
String = {
-    using(KubernetesRetriever.newK8sClient()) {
-      client =>
-        val path = KubernetesDeploymentHelper.getJobLog(jobId)
-        val file = new File(path)
-        val log = 
client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
-        Files.asCharSink(file, Charsets.UTF_8).write(log)
-        path
-    }
-  }
-
-  def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId: 
String): String = {
-    using(KubernetesRetriever.newK8sClient()) {
-      client =>
-        Try {
-          val podName = getPods(nameSpace, jobName).head.getMetadata.getName
-          val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+    KubernetesRetriever
+      .newK8sClient()
+      .autoClose(
+        client => {
+          val path = KubernetesDeploymentHelper.getJobLog(jobId)
           val file = new File(path)
-          val log = client.pods
-            .inNamespace(nameSpace)
-            .withName(podName)
-            .terminated()
-            .withPrettyOutput
-            .getLog
+          val log = 
client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
           Files.asCharSink(file, Charsets.UTF_8).write(log)
           path
-        }.getOrElse(null)
-    }(error => throw error)
+        })
+  }
+
+  def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId: 
String): String = {
+    KubernetesRetriever
+      .newK8sClient()
+      .autoClose(
+        client =>
+          Try {
+            val podName = getPods(nameSpace, jobName).head.getMetadata.getName
+            val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+            val file = new File(path)
+            val log = client.pods
+              .inNamespace(nameSpace)
+              .withName(podName)
+              .terminated()
+              .withPrettyOutput
+              .getLog
+            Files.asCharSink(file, Charsets.UTF_8).write(log)
+            path
+          }.getOrElse(null))(error => throw error)
   }
 
   def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean 
= {
-    using(KubernetesRetriever.newK8sClient()) {
-      client =>
-        Try {
-          val r = client
-            .configMaps()
-            .inNamespace(nameSpace)
-            .withLabel("app", deploymentName)
-            .delete
-          Boolean.unbox(r)
-        }.getOrElse(false)
-    }
+    KubernetesRetriever
+      .newK8sClient()
+      .autoClose(
+        client =>
+          Try {
+            val r = client
+              .configMaps()
+              .inNamespace(nameSpace)
+              .withLabel("app", deploymentName)
+              .delete
+            Boolean.unbox(r)
+          }.getOrElse(false))
   }
 
   private[kubernetes] def getJobLog(jobId: String): String = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index d443a22bd..b8fc0e817 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
+import org.apache.streampark.common.util.ImplicitsUtils._
 import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.util.Utils.using
 
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import org.apache.flink.client.program.ClusterClient
@@ -30,15 +30,15 @@ object IngressController extends Logger {
   private[this] val VERSION_REGEXP = "(\\d+\\.\\d+)".r
 
   private lazy val ingressStrategy: IngressStrategy = {
-    using(new DefaultKubernetesClient()) {
-      client =>
+    new DefaultKubernetesClient().autoClose(
+      client => {
         val version = 
VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble
         if (version >= 1.19) {
           new IngressStrategyV1()
         } else {
           new IngressStrategyV1beta1()
         }
-    }
+      })
   }
 
   def configureIngress(domainName: String, clusterId: String, nameSpace: 
String): Unit = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index c5c2fdd0b..b8346cc1d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.ImplicitsUtils._
 
 import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
@@ -34,7 +34,7 @@ class IngressStrategyV1 extends IngressStrategy {
       clusterId: String,
       clusterClient: ClusterClient[_]): String = {
 
-    Utils.using(new DefaultKubernetesClient) {
+    new DefaultKubernetesClient().autoClose(
       client =>
         Try {
           
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
@@ -45,14 +45,12 @@ class IngressStrategyV1 extends IngressStrategy {
         }.recover {
           case e =>
             throw new RuntimeException(s"[StreamPark] get ingressUrlAddress 
error: $e")
-        }.get
-    }
-
+        }.get)
   }
 
   override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
-    Utils.using(new DefaultKubernetesClient) {
-      client =>
+    new DefaultKubernetesClient().autoClose(
+      client => {
         val ownerReference = getOwnerReference(nameSpace, clusterId, client)
         val ingress = new IngressBuilder()
           .withNewMetadata()
@@ -94,6 +92,7 @@ class IngressStrategyV1 extends IngressStrategy {
           .endSpec()
           .build()
         client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
-    }
+      })
   }
+
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 8cbc276e4..a5ca4258e 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.ImplicitsUtils._
 
 import io.fabric8.kubernetes.api.model.IntOrString
 import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
@@ -35,7 +35,7 @@ class IngressStrategyV1beta1 extends IngressStrategy {
       clusterId: String,
       clusterClient: ClusterClient[_]): String = {
 
-    Utils.using(new DefaultKubernetesClient) {
+    new DefaultKubernetesClient().autoClose(
       client =>
         Try {
           
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
@@ -46,13 +46,12 @@ class IngressStrategyV1beta1 extends IngressStrategy {
         }.recover {
           case e =>
             throw new RuntimeException(s"[StreamPark] get ingressUrlAddress 
error: $e")
-        }.get
-    }
+        }.get)
   }
 
   override def configureIngress(domainName: String, clusterId: String, 
nameSpace: String): Unit = {
-    Utils.using(new DefaultKubernetesClient) {
-      client =>
+    new DefaultKubernetesClient().autoClose(
+      client => {
         val ownerReference = getOwnerReference(nameSpace, clusterId, client)
         val ingress = new IngressBuilder()
           .withNewMetadata()
@@ -83,8 +82,7 @@ class IngressStrategyV1beta1 extends IngressStrategy {
           .endRule()
           .endSpec()
           .build()
-
         client.network.ingress.inNamespace(nameSpace).create(ingress)
-    }
+      })
   }
 }
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index b9167d437..6fbb0de50 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.packer.pipeline.impl
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.enums.DevelopmentMode
 import org.apache.streampark.common.fs.{FsOperator, HdfsOperator, LfsOperator}
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.ImplicitsUtils._
 import org.apache.streampark.flink.packer.maven.MavenTool
 import org.apache.streampark.flink.packer.pipeline._
 
@@ -90,7 +90,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
         case FsOperator.hdfs =>
           val uploadFile = 
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
           if (fsOperator.exists(uploadFile)) {
-            Utils.using(new FileInputStream(originFile))(
+            new FileInputStream(originFile).autoClose(
               inputStream => {
                 if (DigestUtils.md5Hex(inputStream) != 
fsOperator.fileMd5(uploadFile)) {
                   fsOperator.upload(originFile.getAbsolutePath, uploadFile)
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 56dede3e6..44294d756 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.proxy
 
 import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion}
 import org.apache.streampark.common.util.{ClassLoaderUtils, Logger, Utils}
+import org.apache.streampark.common.util.ImplicitsUtils._
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, 
ObjectOutputStream}
 import java.net.URL
@@ -196,13 +197,14 @@ object FlinkShimsProxy extends Logger {
   @throws[Exception]
   def getObject[T](loader: ClassLoader, obj: Object): T = {
     val arrayOutputStream = new ByteArrayOutputStream
-    val result = Utils.using(new ObjectOutputStream(arrayOutputStream))(
-      objectOutputStream => {
-        objectOutputStream.writeObject(obj)
-        val byteArrayInputStream = new 
ByteArrayInputStream(arrayOutputStream.toByteArray)
-        Utils.using(new ClassLoaderObjectInputStream(loader, 
byteArrayInputStream))(_.readObject)
-      })
-    result.asInstanceOf[T]
+    new ObjectOutputStream(arrayOutputStream)
+      .autoClose(
+        objectOutputStream => {
+          objectOutputStream.writeObject(obj)
+          val byteArrayInputStream = new 
ByteArrayInputStream(arrayOutputStream.toByteArray)
+          new ClassLoaderObjectInputStream(loader, 
byteArrayInputStream).autoClose(_.readObject())
+        })
+      .asInstanceOf[T]
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index b3ee24c33..bf2e2c68b 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -130,7 +130,7 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
         require(
           configFile.exists(),
           s"[StreamPark] Usage: application config file: $configFile is not 
found!!!")
-        val text = FileUtils.readString(configFile)
+        val text = FileUtils.readFile(configFile)
         readConfig(text)
     }
     map.filter(_._2.nonEmpty)

Reply via email to