This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 77c4237f8 [Improve] FileUtils minor improvement (#2981)
77c4237f8 is described below
commit 77c4237f86a26b59ccf107bea1da085e03b0d8db
Author: benjobs <[email protected]>
AuthorDate: Fri Sep 1 09:31:14 2023 -0500
[Improve] FileUtils minor improvement (#2981)
* [Improve] FileUtils minor improvement
* minor improvement
* minor improvement
* Implicits improvement
* check style improvement
* minor improvement
* minor bug fixed.
---
.../streampark/common/conf/ConfigOption.scala | 2 +-
.../common/conf/InternalConfigHolder.scala | 2 +-
.../apache/streampark/common/conf/Workspace.scala | 2 +-
.../apache/streampark/common/util/FileUtils.scala | 147 ++++++++++++++++++---
.../streampark/common/util/HadoopUtils.scala | 5 +-
.../streampark/common/util/ImplicitsUtils.scala | 62 +++++++++
.../org/apache/streampark/common/util/Logger.scala | 2 +-
.../org/apache/streampark/common/util/Utils.scala | 79 +++--------
.../streampark/console/base/util/FileUtils.java | 107 ---------------
.../core/service/impl/AppBuildPipeServiceImpl.java | 2 +-
.../core/service/impl/ApplicationServiceImpl.java | 14 +-
.../core/service/impl/ProjectServiceImpl.java | 2 +-
.../console/base/util/FileUtilsTest.java | 2 +
.../flink/client/trait/YarnClientTrait.scala | 37 +++---
.../flink/kubernetes/KubernetesRetriever.scala | 51 +++----
.../helper/KubernetesDeploymentHelper.scala | 136 ++++++++++---------
.../kubernetes/ingress/IngressController.scala | 8 +-
.../kubernetes/ingress/IngressStrategyV1.scala | 15 +--
.../ingress/IngressStrategyV1beta1.scala | 14 +-
.../impl/FlinkYarnApplicationBuildPipeline.scala | 4 +-
.../streampark/flink/proxy/FlinkShimsProxy.scala | 16 ++-
.../flink/core/FlinkStreamingInitializer.scala | 2 +-
22 files changed, 372 insertions(+), 339 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala
index 500fcf3f6..22ac7de64 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigOption.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.common.conf
-import org.apache.streampark.common.util.Utils.StringCasts
+import org.apache.streampark.common.util.ImplicitsUtils._
import java.util.Properties
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
index 73c23a4db..5f00c2088 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.common.conf
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
-import org.apache.streampark.common.util.Utils.StringCasts
+import org.apache.streampark.common.util.ImplicitsUtils._
import javax.annotation.{Nonnull, Nullable}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index d914780a1..8f3e7b10c 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -18,7 +18,7 @@ package org.apache.streampark.common.conf
import org.apache.streampark.common.enums.StorageType
import org.apache.streampark.common.util.{HdfsUtils, SystemPropertyUtils}
-import org.apache.streampark.common.util.Utils.StringCasts
+import org.apache.streampark.common.util.ImplicitsUtils._
import java.net.URI
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 719715e22..1522a4e8a 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -16,10 +16,16 @@
*/
package org.apache.streampark.common.util
+import org.apache.streampark.common.util.ImplicitsUtils._
+
import java.io._
import java.net.URL
+import java.nio.ByteBuffer
+import java.nio.channels.Channels
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
import java.util
-import java.util.Scanner
+import java.util.stream.Collectors
import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
@@ -44,12 +50,12 @@ object FileUtils {
if (input == null) {
throw new RuntimeException("The inputStream can not be null")
}
- Utils.using(input) {
- in =>
+ input.autoClose(
+ in => {
val b = new Array[Byte](4)
in.read(b, 0, b.length)
bytesToHexString(b)
- } == "504B0304"
+ }) == "504B0304"
}
def isJarFileType(file: File): Boolean = {
@@ -73,12 +79,6 @@ object FileUtils {
s"[StreamPark] Failed to create directory within $TEMP_DIR_ATTEMPTS
attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})")
}
- def exists(path: String): Unit = {
- require(
- path != null && path.nonEmpty && new File(path).exists(),
- s"[StreamPark] FileUtils.exists: file $path is not exist!")
- }
-
def getPathFromEnv(env: String): String = {
val path = Option(System.getenv(env)).getOrElse(System.getProperty(env))
require(
@@ -115,6 +115,14 @@ object FileUtils {
}
}
+ def exists(file: Serializable): Boolean = {
+ file match {
+ case null => false
+ case f: File => f.exists()
+ case p => new File(p.toString).exists()
+ }
+ }
+
def equals(file1: File, file2: File): Boolean = {
(file1, file2) match {
case (a, b) if a == null || b == null => false
@@ -143,20 +151,115 @@ object FileUtils {
}
@throws[IOException]
- def readString(file: File): String = {
- require(file != null && file.isFile)
- val reader = new FileReader(file)
- val scanner = new Scanner(reader)
- val buffer = new mutable.StringBuilder()
- if (scanner.hasNextLine) {
- buffer.append(scanner.nextLine())
+ def readInputStream(in: InputStream, array: Array[Byte]): Unit = {
+ in.autoClose(
+ is => {
+ var toRead = array.length
+ var ret = 0
+ var off = 0
+ while (toRead > 0) {
+ ret = is.read(array, off, toRead)
+ if (ret < 0) throw new IOException("Bad inputStream, premature EOF")
+ toRead -= ret
+ off += ret
+ }
+ })
+ }
+
+ @throws[IOException]
+ def readFile(file: File): String = {
+ if (file.length >= Int.MaxValue) {
+ throw new IOException("Too large file, unexpected!")
+ } else {
+ val len = file.length
+ val array = new Array[Byte](len.toInt)
+ Files
+ .newInputStream(file.toPath)
+ .autoClose(
+ is => {
+ readInputStream(is, array)
+ new String(array, StandardCharsets.UTF_8)
+ })
}
- while (scanner.hasNextLine) {
- buffer.append("\r\n")
- buffer.append(scanner.nextLine())
+ }
+
+ @throws[IOException]
+ def writeFile(content: String, file: File): Unit = {
+ val outputStream = Files.newOutputStream(file.toPath)
+ val channel = Channels.newChannel(outputStream)
+ val buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))
+ channel.write(buffer)
+ Utils.close(channel, outputStream)
+ }
+
+ @throws[IOException]
+ def readEndOfFile(file: File, maxSize: Long): Array[Byte] = {
+ var readSize = maxSize
+ new RandomAccessFile(file, "r").autoClose(
+ raFile => {
+ if (raFile.length > maxSize) {
+ raFile.seek(raFile.length - maxSize)
+ } else if (raFile.length < maxSize) {
+ readSize = raFile.length.toInt
+ }
+ val fileContent = new Array[Byte](readSize.toInt)
+ raFile.read(fileContent)
+ fileContent
+ })
+ }
+
+ /**
+ * Read the content of a file from a specified offset.
+ *
+ * @param file
+ * The file to read from
+ * @param startOffset
+ * The offset from where to start reading the file
+ * @param maxSize
+ * The maximum size of the file to read
+ * @return
+ * The content of the file as a byte array
+ * @throws IOException
+ * if an I/O error occurs while reading the file
+ * @throws IllegalArgumentException
+ * if the startOffset is greater than the file length
+ */
+ @throws[IOException]
+ def readFileFromOffset(file: File, startOffset: Long, maxSize: Long):
Array[Byte] = {
+ if (file.length < startOffset) {
+ throw new IllegalArgumentException(
+ s"The startOffset $startOffset is great than the file length
${file.length}")
+ }
+ new RandomAccessFile(file, "r").autoClose(
+ raFile => {
+ val readSize = Math.min(maxSize, file.length - startOffset)
+ raFile.seek(startOffset)
+ val fileContent = new Array[Byte](readSize.toInt)
+ raFile.read(fileContent)
+ fileContent
+ })
+ }
+
+ /**
+ * Roll View Log.
+ *
+ * @param path
+ * The file path.
+ * @param offset
+ * The offset.
+ * @param limit
+ * The limit.
+ * @return
+ * The content of the file.
+ */
+ def tailOf(path: String, offset: Int, limit: Int): String = try {
+ val file = new File(path)
+ if (file.exists && file.isFile) {
+ Files
+ .lines(Paths.get(path))
+ .autoClose(stream =>
stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n")))
}
- Utils.close(scanner, reader)
- buffer.toString()
+ null
}
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index b9823d8f0..715afa00d 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -137,7 +137,10 @@ object HadoopUtils extends Logger {
def getConfigurationFromHadoopConfDir(confDir: String = hadoopConfDir):
Configuration = {
if (!configurationCache.containsKey(confDir)) {
- FileUtils.exists(confDir)
+ if (!FileUtils.exists(confDir)) {
+ throw new ExceptionInInitializerError(
+ s"[StreamPark] hadoop conf file " + confDir + " is not exist!")
+ }
val hadoopConfDir = new File(confDir)
val confName = List("hdfs-default.xml", "core-site.xml",
"hdfs-site.xml", "yarn-site.xml")
val files =
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala
new file mode 100644
index 000000000..cfcb95107
--- /dev/null
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ImplicitsUtils.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.common.util
+
+import org.apache.streampark.common.util.Utils.close
+
+import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble,
Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}
+
+object ImplicitsUtils {
+
+ implicit class AutoCloseImplicits[T <: AutoCloseable](autoCloseable: T) {
+
+ implicit def autoClose[R](func: T => R)(implicit excFunc: Throwable => R =
null): R = {
+ try {
+ func(autoCloseable)
+ } catch {
+ case e: Throwable if excFunc != null => excFunc(e)
+ } finally {
+ close(autoCloseable)
+ }
+ }
+
+ }
+
+ implicit class StringImplicits(v: String) {
+ def cast[T](classType: Class[_]): T = {
+ classType match {
+ case c if c == classOf[String] => v.asInstanceOf[T]
+ case c if c == classOf[Byte] => v.toByte.asInstanceOf[T]
+ case c if c == classOf[Int] => v.toInt.asInstanceOf[T]
+ case c if c == classOf[Long] => v.toLong.asInstanceOf[T]
+ case c if c == classOf[Float] => v.toFloat.asInstanceOf[T]
+ case c if c == classOf[Double] => v.toDouble.asInstanceOf[T]
+ case c if c == classOf[Short] => v.toShort.asInstanceOf[T]
+ case c if c == classOf[Boolean] => v.toBoolean.asInstanceOf[T]
+ case c if c == classOf[JavaByte] => v.toByte.asInstanceOf[T]
+ case c if c == classOf[JavaInt] => JavaInt.valueOf(v).asInstanceOf[T]
+ case c if c == classOf[JavaLong] => JavaLong.valueOf(v).asInstanceOf[T]
+ case c if c == classOf[JavaFloat] =>
JavaFloat.valueOf(v).asInstanceOf[T]
+ case c if c == classOf[JavaDouble] =>
JavaDouble.valueOf(v).asInstanceOf[T]
+ case c if c == classOf[JavaShort] =>
JavaShort.valueOf(v).asInstanceOf[T]
+ case c if c == classOf[JavaBool] => JavaBool.valueOf(v).asInstanceOf[T]
+ case _ =>
+ throw new IllegalArgumentException(s"Unsupported type: $classType")
+ }
+ }
+ }
+}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index 583fc91f2..0b97498b1 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -140,7 +140,7 @@ private[this] object LoggerFactory extends
LoggerFactoryBinder {
val configurator = new JoranConfigurator()
configurator.setContext(loggerContext)
val text = FileUtils
- .readString(new File(path))
+ .readFile(new File(path))
.replaceAll("org.slf4j", s"$shadedPackage.org.slf4j")
.replaceAll("ch.qos.logback", s"$shadedPackage.ch.qos.logback")
.replaceAll("org.apache.log4j", s"$shadedPackage.org.apache.log4j")
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 98ec6ce6c..30534cf5d 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -16,10 +16,11 @@
*/
package org.apache.streampark.common.util
+import org.apache.streampark.common.util.ImplicitsUtils._
+
import org.apache.commons.lang3.StringUtils
-import java.io.{BufferedInputStream, File, FileInputStream, IOException,
PrintWriter, StringWriter}
-import java.lang.{Boolean => JavaBool, Byte => JavaByte, Double => JavaDouble,
Float => JavaFloat, Integer => JavaInt, Long => JavaLong, Short => JavaShort}
+import java.io._
import java.net.URL
import java.util.{jar, Collection => JavaCollection, Map => JavaMap,
Properties, UUID}
import java.util.jar.{JarFile, JarInputStream}
@@ -107,35 +108,6 @@ object Utils {
/** if any blank strings exist */
def isAnyBank(items: String*): Boolean = items == null ||
items.exists(StringUtils.isBlank)
- /*
- * Mimicking the try-with-resource syntax of Java-8+
- */
- def using[R, T <: AutoCloseable](handle: T)(func: T => R)(implicit
- excFunc: Throwable => R = null): R = {
- try {
- func(handle)
- } catch {
- case e: Throwable if excFunc != null => excFunc(e)
- } finally {
- if (handle != null) {
- handle.close()
- }
- }
- }
-
- def close(closeable: AutoCloseable*)(implicit func: Throwable => Unit =
null): Unit = {
- closeable.foreach(
- c => {
- try {
- if (c != null) {
- c.close()
- }
- } catch {
- case e: Throwable if func != null => func(e)
- }
- })
- }
-
/**
* calculate the percentage of num1 / num2, the result range from 0 to 100,
with one small digit
* reserve.
@@ -159,38 +131,31 @@ object Utils {
else {
try {
val stm = new StringWriter
- val wrt = new PrintWriter(stm)
- e.printStackTrace(wrt)
- wrt.close()
- stm.toString
+ new PrintWriter(stm).autoClose {
+ writer =>
+ e.printStackTrace(writer)
+ stm.toString
+ }
} catch {
case _: Throwable => e.getClass.getName + " (error while printing
stack trace)"
}
}
}
- implicit class StringCasts(v: String) {
- def cast[T](classType: Class[_]): T = {
- classType match {
- case c if c == classOf[String] => v.asInstanceOf[T]
- case c if c == classOf[Byte] => v.toByte.asInstanceOf[T]
- case c if c == classOf[Int] => v.toInt.asInstanceOf[T]
- case c if c == classOf[Long] => v.toLong.asInstanceOf[T]
- case c if c == classOf[Float] => v.toFloat.asInstanceOf[T]
- case c if c == classOf[Double] => v.toDouble.asInstanceOf[T]
- case c if c == classOf[Short] => v.toShort.asInstanceOf[T]
- case c if c == classOf[Boolean] => v.toBoolean.asInstanceOf[T]
- case c if c == classOf[JavaByte] => v.toByte.asInstanceOf[T]
- case c if c == classOf[JavaInt] => JavaInt.valueOf(v).asInstanceOf[T]
- case c if c == classOf[JavaLong] => JavaLong.valueOf(v).asInstanceOf[T]
- case c if c == classOf[JavaFloat] =>
JavaFloat.valueOf(v).asInstanceOf[T]
- case c if c == classOf[JavaDouble] =>
JavaDouble.valueOf(v).asInstanceOf[T]
- case c if c == classOf[JavaShort] =>
JavaShort.valueOf(v).asInstanceOf[T]
- case c if c == classOf[JavaBool] => JavaBool.valueOf(v).asInstanceOf[T]
- case _ =>
- throw new IllegalArgumentException(s"Unsupported type: $classType")
- }
- }
+ def close(closeable: AutoCloseable*)(implicit func: Throwable => Unit =
null): Unit = {
+ closeable.foreach(
+ c => {
+ try {
+ if (c != null) {
+ if (c.isInstanceOf[Flushable]) {
+ c.asInstanceOf[Flushable].flush()
+ }
+ c.close()
+ }
+ } catch {
+ case e: Throwable if func != null => func(e)
+ }
+ })
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
deleted file mode 100644
index 4aaa624f2..000000000
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.base.util;
-
-import org.apache.streampark.console.base.exception.ApiDetailException;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** The file utils. */
-public class FileUtils {
-
- private FileUtils() {}
-
- /**
- * Reads the last portion of a file as a byte array.
- *
- * @param file the file to read
- * @param maxSize the maximum number of bytes to read from the end of the
file
- * @return the byte array containing the content read from the file
- * @throws IOException if an I/O error occurs
- */
- public static byte[] readEndOfFile(File file, long maxSize) throws
IOException {
- long readSize = maxSize;
- byte[] fileContent;
- try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) {
- if (raFile.length() > maxSize) {
- raFile.seek(raFile.length() - maxSize);
- } else if (raFile.length() < maxSize) {
- readSize = (int) raFile.length();
- }
- fileContent = new byte[(int) readSize];
- raFile.read(fileContent);
- }
- return fileContent;
- }
-
- /**
- * Read the content of a file from a specified offset.
- *
- * @param file The file to read from
- * @param startOffset The offset from where to start reading the file
- * @param maxSize The maximum size of the file to read
- * @return The content of the file as a byte array
- * @throws IOException if an I/O error occurs while reading the file
- * @throws IllegalArgumentException if the startOffset is greater than the
file length
- */
- public static byte[] readFileFromOffset(File file, long startOffset, long
maxSize)
- throws IOException {
- if (file.length() < startOffset) {
- throw new IllegalArgumentException(
- String.format(
- "The startOffset %s is great than the file length %s",
startOffset, file.length()));
- }
- byte[] fileContent;
- try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) {
- long readSize = Math.min(maxSize, file.length() - startOffset);
- raFile.seek(startOffset);
- fileContent = new byte[(int) readSize];
- raFile.read(fileContent);
- }
- return fileContent;
- }
-
- /**
- * Roll View Log.
- *
- * @param path The file path.
- * @param offset The offset.
- * @param limit The limit.
- * @return The content of the file.
- * @throws ApiDetailException if there's an error rolling the view log.
- */
- public static String rollViewLog(String path, int offset, int limit) {
- try {
- File file = new File(path);
- if (file.exists() && file.isFile()) {
- try (Stream<String> stream = Files.lines(Paths.get(path))) {
- return
stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"));
- }
- }
- return null;
- } catch (Exception e) {
- throw new ApiDetailException("roll view log error: " + e);
- }
- }
-}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index f9017a724..a20f5a08c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -424,7 +424,7 @@ public class AppBuildPipeServiceImpl
FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
- if (!new File(flinkUserJar).exists()) {
+ if (!FileUtils.exists(flinkUserJar)) {
Resource resource = resourceService.findByResourceName(app.getTeamId(),
app.getJar());
if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
flinkUserJar = resource.getFilePath();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 9e970408f..c76540eb4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.fs.LfsOperator;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.HadoopUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.util.ThreadUtils;
@@ -97,7 +98,6 @@ import org.apache.streampark.flink.kubernetes.model.TrackId;
import org.apache.streampark.flink.packer.pipeline.BuildResult;
import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CoreOptions;
@@ -129,7 +129,6 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
@@ -619,9 +618,8 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (!future.isDone()) {
future.cancel(true);
}
- if (path != null) {
- return
org.apache.streampark.console.base.util.FileUtils.rollViewLog(
- path, offset, limit);
+ if (FileUtils.exists(path)) {
+ return FileUtils.tailOf(path, offset, limit);
}
return null;
})
@@ -738,7 +736,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
jarPath = resource.getFilePath();
}
}
- appParam.setJarCheckSum(FileUtils.checksumCRC32(new File(jarPath)));
+
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new
File(jarPath)));
}
if (save(appParam)) {
@@ -866,7 +864,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
if (jarFile.exists()) {
try {
- long checkSum = FileUtils.checksumCRC32(jarFile);
+ long checkSum =
org.apache.commons.io.FileUtils.checksumCRC32(jarFile);
if (!Objects.equals(checkSum, application.getJarCheckSum())) {
application.setBuild(true);
}
@@ -1144,7 +1142,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Override
public String readConf(Application appParam) throws IOException {
File file = new File(appParam.getConfig());
- String conf = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
+ String conf = FileUtils.readFile(file);
return Base64.getEncoder().encodeToString(conf.getBytes());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index e568d976b..d48228706 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.util.CompletableFutureUtils;
+import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.domain.ResponseCode;
@@ -28,7 +29,6 @@ import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
-import org.apache.streampark.console.base.util.FileUtils;
import org.apache.streampark.console.base.util.GZipUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.Project;
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
index 57db8581c..d31083327 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
@@ -17,6 +17,8 @@
package org.apache.streampark.console.base.util;
+import org.apache.streampark.common.util.FileUtils;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 0ce7cb9c7..6cc667c6e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.client.`trait`
+import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.bean._
@@ -43,25 +44,25 @@ trait YarnClientTrait extends FlinkClientTrait {
flinkConf: Configuration,
actionFunc: (JobID, ClusterClient[_]) => O): O = {
- Utils.using {
- flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
- val clusterClientFactory = new YarnClusterClientFactory
- val applicationId = clusterClientFactory.getClusterId(flinkConf)
- if (applicationId == null) {
- throw new FlinkException(
- "[StreamPark] getClusterClient error. No cluster id was specified.
Please specify a cluster to which you would like to connect.")
- }
- val clusterDescriptor =
clusterClientFactory.createClusterDescriptor(flinkConf)
- clusterDescriptor.retrieve(applicationId).getClusterClient
- } {
- client =>
- Try(actionFunc(getJobID(request.jobId), client)).recover {
- case e =>
- throw new FlinkException(
- s"[StreamPark] Do ${request.getClass.getSimpleName} for the job
${request.jobId} failed. " +
- s"detail: ${Utils.stringifyException(e)}");
- }.get
+ flinkConf.safeSet(YarnConfigOptions.APPLICATION_ID, request.clusterId)
+ val clusterClientFactory = new YarnClusterClientFactory
+ val applicationId = clusterClientFactory.getClusterId(flinkConf)
+ if (applicationId == null) {
+ throw new FlinkException(
+ "[StreamPark] getClusterClient error. No cluster id was specified.
Please specify a cluster to which you would like to connect.")
}
+ val clusterDescriptor =
clusterClientFactory.createClusterDescriptor(flinkConf)
+ clusterDescriptor
+ .retrieve(applicationId)
+ .getClusterClient
+ .autoClose(
+ client =>
+ Try(actionFunc(getJobID(request.jobId), client)).recover {
+ case e =>
+ throw new FlinkException(
+ s"[StreamPark] Do ${request.getClass.getSimpleName} for the
job ${request.jobId} failed. " +
+ s"detail: ${Utils.stringifyException(e)}");
+ }.get)
}
override def doTriggerSavepoint(
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 01b86f1ca..7418e8c50 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -18,8 +18,8 @@
package org.apache.streampark.flink.kubernetes
import org.apache.streampark.common.conf.ConfigConst
-import org.apache.streampark.common.util.{Logger, Utils}
-import org.apache.streampark.common.util.Utils.using
+import org.apache.streampark.common.util.ImplicitsUtils._
+import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import org.apache.streampark.flink.kubernetes.ingress.IngressController
import org.apache.streampark.flink.kubernetes.model.ClusterKey
@@ -108,32 +108,33 @@ object KubernetesRetriever extends Logger {
* deployment namespace
*/
def isDeploymentExists(name: String, namespace: String): Boolean = {
- using(KubernetesRetriever.newK8sClient()) {
- client =>
- client
- .apps()
- .deployments()
- .inNamespace(namespace)
- .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL)
- .list()
- .getItems
- .asScala
- .exists(e => e.getMetadata.getName == name)
- }(_ => false)
+ KubernetesRetriever
+ .newK8sClient()
+ .autoClose(
+ client =>
+ client
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL)
+ .list()
+ .getItems
+ .asScala
+ .exists(e => e.getMetadata.getName == name))(_ => false)
}
/** retrieve flink jobManager rest url */
def retrieveFlinkRestUrl(clusterKey: ClusterKey): Option[String] = {
- Utils.using(
- KubernetesRetriever
- .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace,
clusterKey.executeMode)
- .getOrElse(return None)) {
- client =>
- val url =
- IngressController.ingressUrlAddress(clusterKey.namespace,
clusterKey.clusterId, client)
- logger.info(s"retrieve flink jobManager rest url: $url")
- client.close()
- Some(url)
- }
+ KubernetesRetriever
+ .newFinkClusterClient(clusterKey.clusterId, clusterKey.namespace,
clusterKey.executeMode)
+ .getOrElse(return None)
+ .autoClose(
+ client => {
+ val url =
+ IngressController.ingressUrlAddress(clusterKey.namespace,
clusterKey.clusterId, client)
+ logger.info(s"retrieve flink jobManager rest url: $url")
+ Some(url)
+ })
}
+
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 8e622fe04..ff2dc7161 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.kubernetes.helper
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
-import org.apache.streampark.common.util.Utils.using
+import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import com.google.common.base.Charsets
@@ -34,25 +34,26 @@ import scala.util.{Success, Try}
object KubernetesDeploymentHelper extends Logger {
private[this] def getPods(nameSpace: String, deploymentName: String):
List[Pod] = {
- using(KubernetesRetriever.newK8sClient()) {
- client =>
- Try {
- client.pods
- .inNamespace(nameSpace)
- .withLabels {
- client.apps.deployments
- .inNamespace(nameSpace)
- .withName(deploymentName)
- .get
- .getSpec
- .getSelector
- .getMatchLabels
- }
- .list
- .getItems
- .toList
- }.getOrElse(List.empty[Pod])
- }
+ KubernetesRetriever
+ .newK8sClient()
+ .autoClose(
+ client =>
+ Try {
+ client.pods
+ .inNamespace(nameSpace)
+ .withLabels {
+ client.apps.deployments
+ .inNamespace(nameSpace)
+ .withName(deploymentName)
+ .get
+ .getSpec
+ .getSelector
+ .getMatchLabels
+ }
+ .list
+ .getItems
+ .toList
+ }.getOrElse(List.empty[Pod]))
}
def getDeploymentStatusChanges(nameSpace: String, deploymentName: String):
Boolean = {
@@ -74,16 +75,17 @@ object KubernetesDeploymentHelper extends Logger {
}
def deleteTaskDeployment(nameSpace: String, deploymentName: String): Boolean
= {
- using(KubernetesRetriever.newK8sClient()) {
- client =>
- Try {
- val r = client.apps.deployments
- .inNamespace(nameSpace)
- .withName(deploymentName)
- .delete
- Boolean.unbox(r)
- }.getOrElse(false)
- }
+ KubernetesRetriever
+ .newK8sClient()
+ .autoClose(
+ client =>
+ Try {
+ val r = client.apps.deployments
+ .inNamespace(nameSpace)
+ .withName(deploymentName)
+ .delete
+ Boolean.unbox(r)
+ }.getOrElse(false))
}
def isTheK8sConnectionNormal(): Boolean = {
@@ -96,47 +98,51 @@ object KubernetesDeploymentHelper extends Logger {
}
def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String):
String = {
- using(KubernetesRetriever.newK8sClient()) {
- client =>
- val path = KubernetesDeploymentHelper.getJobLog(jobId)
- val file = new File(path)
- val log =
client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
- Files.asCharSink(file, Charsets.UTF_8).write(log)
- path
- }
- }
-
- def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId:
String): String = {
- using(KubernetesRetriever.newK8sClient()) {
- client =>
- Try {
- val podName = getPods(nameSpace, jobName).head.getMetadata.getName
- val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+ KubernetesRetriever
+ .newK8sClient()
+ .autoClose(
+ client => {
+ val path = KubernetesDeploymentHelper.getJobLog(jobId)
val file = new File(path)
- val log = client.pods
- .inNamespace(nameSpace)
- .withName(podName)
- .terminated()
- .withPrettyOutput
- .getLog
+ val log =
client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
Files.asCharSink(file, Charsets.UTF_8).write(log)
path
- }.getOrElse(null)
- }(error => throw error)
+ })
+ }
+
+ def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId:
String): String = {
+ KubernetesRetriever
+ .newK8sClient()
+ .autoClose(
+ client =>
+ Try {
+ val podName = getPods(nameSpace, jobName).head.getMetadata.getName
+ val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
+ val file = new File(path)
+ val log = client.pods
+ .inNamespace(nameSpace)
+ .withName(podName)
+ .terminated()
+ .withPrettyOutput
+ .getLog
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ path
+ }.getOrElse(null))(error => throw error)
}
def deleteTaskConfigMap(nameSpace: String, deploymentName: String): Boolean
= {
- using(KubernetesRetriever.newK8sClient()) {
- client =>
- Try {
- val r = client
- .configMaps()
- .inNamespace(nameSpace)
- .withLabel("app", deploymentName)
- .delete
- Boolean.unbox(r)
- }.getOrElse(false)
- }
+ KubernetesRetriever
+ .newK8sClient()
+ .autoClose(
+ client =>
+ Try {
+ val r = client
+ .configMaps()
+ .inNamespace(nameSpace)
+ .withLabel("app", deploymentName)
+ .delete
+ Boolean.unbox(r)
+ }.getOrElse(false))
}
private[kubernetes] def getJobLog(jobId: String): String = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
index d443a22bd..b8fc0e817 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressController.scala
@@ -17,8 +17,8 @@
package org.apache.streampark.flink.kubernetes.ingress
+import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.common.util.Logger
-import org.apache.streampark.common.util.Utils.using
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.flink.client.program.ClusterClient
@@ -30,15 +30,15 @@ object IngressController extends Logger {
private[this] val VERSION_REGEXP = "(\\d+\\.\\d+)".r
private lazy val ingressStrategy: IngressStrategy = {
- using(new DefaultKubernetesClient()) {
- client =>
+ new DefaultKubernetesClient().autoClose(
+ client => {
val version =
VERSION_REGEXP.findFirstIn(client.getVersion.getGitVersion).get.toDouble
if (version >= 1.19) {
new IngressStrategyV1()
} else {
new IngressStrategyV1beta1()
}
- }
+ })
}
def configureIngress(domainName: String, clusterId: String, nameSpace:
String): Unit = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index c5c2fdd0b..b8346cc1d 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.ingress
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.ImplicitsUtils._
import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
import io.fabric8.kubernetes.client.DefaultKubernetesClient
@@ -34,7 +34,7 @@ class IngressStrategyV1 extends IngressStrategy {
clusterId: String,
clusterClient: ClusterClient[_]): String = {
- Utils.using(new DefaultKubernetesClient) {
+ new DefaultKubernetesClient().autoClose(
client =>
Try {
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
@@ -45,14 +45,12 @@ class IngressStrategyV1 extends IngressStrategy {
}.recover {
case e =>
throw new RuntimeException(s"[StreamPark] get ingressUrlAddress
error: $e")
- }.get
- }
-
+ }.get)
}
override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
- Utils.using(new DefaultKubernetesClient) {
- client =>
+ new DefaultKubernetesClient().autoClose(
+ client => {
val ownerReference = getOwnerReference(nameSpace, clusterId, client)
val ingress = new IngressBuilder()
.withNewMetadata()
@@ -94,6 +92,7 @@ class IngressStrategyV1 extends IngressStrategy {
.endSpec()
.build()
client.network.v1.ingresses().inNamespace(nameSpace).create(ingress)
- }
+ })
}
+
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 8cbc276e4..a5ca4258e 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.ingress
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.ImplicitsUtils._
import io.fabric8.kubernetes.api.model.IntOrString
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder
@@ -35,7 +35,7 @@ class IngressStrategyV1beta1 extends IngressStrategy {
clusterId: String,
clusterClient: ClusterClient[_]): String = {
- Utils.using(new DefaultKubernetesClient) {
+ new DefaultKubernetesClient().autoClose(
client =>
Try {
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
@@ -46,13 +46,12 @@ class IngressStrategyV1beta1 extends IngressStrategy {
}.recover {
case e =>
throw new RuntimeException(s"[StreamPark] get ingressUrlAddress
error: $e")
- }.get
- }
+ }.get)
}
override def configureIngress(domainName: String, clusterId: String,
nameSpace: String): Unit = {
- Utils.using(new DefaultKubernetesClient) {
- client =>
+ new DefaultKubernetesClient().autoClose(
+ client => {
val ownerReference = getOwnerReference(nameSpace, clusterId, client)
val ingress = new IngressBuilder()
.withNewMetadata()
@@ -83,8 +82,7 @@ class IngressStrategyV1beta1 extends IngressStrategy {
.endRule()
.endSpec()
.build()
-
client.network.ingress.inNamespace(nameSpace).create(ingress)
- }
+ })
}
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index b9167d437..6fbb0de50 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.packer.pipeline.impl
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.enums.DevelopmentMode
import org.apache.streampark.common.fs.{FsOperator, HdfsOperator, LfsOperator}
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.flink.packer.maven.MavenTool
import org.apache.streampark.flink.packer.pipeline._
@@ -90,7 +90,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
case FsOperator.hdfs =>
val uploadFile =
s"${Workspace.remote.APP_UPLOADS}/${originFile.getName}"
if (fsOperator.exists(uploadFile)) {
- Utils.using(new FileInputStream(originFile))(
+ new FileInputStream(originFile).autoClose(
inputStream => {
if (DigestUtils.md5Hex(inputStream) !=
fsOperator.fileMd5(uploadFile)) {
fsOperator.upload(originFile.getAbsolutePath, uploadFile)
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 56dede3e6..44294d756 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.proxy
import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion}
import org.apache.streampark.common.util.{ClassLoaderUtils, Logger, Utils}
+import org.apache.streampark.common.util.ImplicitsUtils._
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File,
ObjectOutputStream}
import java.net.URL
@@ -196,13 +197,14 @@ object FlinkShimsProxy extends Logger {
@throws[Exception]
def getObject[T](loader: ClassLoader, obj: Object): T = {
val arrayOutputStream = new ByteArrayOutputStream
- val result = Utils.using(new ObjectOutputStream(arrayOutputStream))(
- objectOutputStream => {
- objectOutputStream.writeObject(obj)
- val byteArrayInputStream = new
ByteArrayInputStream(arrayOutputStream.toByteArray)
- Utils.using(new ClassLoaderObjectInputStream(loader,
byteArrayInputStream))(_.readObject)
- })
- result.asInstanceOf[T]
+ new ObjectOutputStream(arrayOutputStream)
+ .autoClose(
+ objectOutputStream => {
+ objectOutputStream.writeObject(obj)
+ val byteArrayInputStream = new
ByteArrayInputStream(arrayOutputStream.toByteArray)
+ new ClassLoaderObjectInputStream(loader,
byteArrayInputStream).autoClose(_.readObject())
+ })
+ .asInstanceOf[T]
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index b3ee24c33..bf2e2c68b 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -130,7 +130,7 @@ private[flink] class FlinkStreamingInitializer(args:
Array[String], apiType: Api
require(
configFile.exists(),
s"[StreamPark] Usage: application config file: $configFile is not
found!!!")
- val text = FileUtils.readString(configFile)
+ val text = FileUtils.readFile(configFile)
readConfig(text)
}
map.filter(_._2.nonEmpty)