This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch upload
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit ea968bb28873b33cca7714dc6746cd2fbb880fca
Author: benjobs <[email protected]>
AuthorDate: Sun Aug 27 16:32:52 2023 +0800

    [Improve] FileUtils minor improvement
---
 .../apache/streampark/common/util/FileUtils.scala  | 138 ++++++++++++++++++---
 .../streampark/common/util/HadoopUtils.scala       |   5 +-
 .../org/apache/streampark/common/util/Logger.scala |   2 +-
 .../streampark/console/base/util/FileUtils.java    | 107 ----------------
 .../core/service/impl/AppBuildPipeServiceImpl.java |   2 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  14 +--
 .../core/service/impl/ProjectServiceImpl.java      |   2 +-
 .../console/base/util/FileUtilsTest.java           |   2 +
 .../flink/core/FlinkStreamingInitializer.scala     |   2 +-
 9 files changed, 134 insertions(+), 140 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 719715e22..40dd1d120 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -18,13 +18,17 @@ package org.apache.streampark.common.util
 
 import java.io._
 import java.net.URL
+import java.nio.ByteBuffer
+import java.nio.channels.Channels
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, Paths}
 import java.util
-import java.util.Scanner
+import java.util.stream.Collectors
 
 import scala.collection.convert.ImplicitConversions._
 import scala.collection.mutable
 
-object FileUtils {
+object FileUtils extends org.apache.commons.io.FileUtils {
 
   private[this] def bytesToHexString(src: Array[Byte]): String = {
     val stringBuilder = new mutable.StringBuilder
@@ -73,12 +77,6 @@ object FileUtils {
       s"[StreamPark] Failed to create directory within $TEMP_DIR_ATTEMPTS  
attempts (tried $baseName 0 to $baseName ${TEMP_DIR_ATTEMPTS - 1})")
   }
 
-  def exists(path: String): Unit = {
-    require(
-      path != null && path.nonEmpty && new File(path).exists(),
-      s"[StreamPark] FileUtils.exists: file $path is not exist!")
-  }
-
   def getPathFromEnv(env: String): String = {
     val path = Option(System.getenv(env)).getOrElse(System.getProperty(env))
     require(
@@ -115,6 +113,14 @@ object FileUtils {
     }
   }
 
+  def exists(file: Serializable): Boolean = {
+    file match {
+      case null => false
+      case f: java.io.File => f.exists()
+      case p => new java.io.File(p.toString).exists()
+    }
+  }
+
   def equals(file1: File, file2: File): Boolean = {
     (file1, file2) match {
       case (a, b) if a == null || b == null => false
@@ -143,20 +149,112 @@ object FileUtils {
   }
 
   @throws[IOException]
-  def readString(file: File): String = {
-    require(file != null && file.isFile)
-    val reader = new FileReader(file)
-    val scanner = new Scanner(reader)
-    val buffer = new mutable.StringBuilder()
-    if (scanner.hasNextLine) {
-      buffer.append(scanner.nextLine())
+  def readInputStream(in: InputStream, array: Array[Byte]): Unit = {
+    var toRead = array.length
+    var ret = 0
+    var off = 0
+    while (toRead > 0) {
+      ret = in.read(array, off, toRead)
+      if (ret < 0) throw new IOException("Bad inputStream, premature EOF")
+      toRead -= ret
+      off += ret
+    }
+    in.close()
+  }
+
+  @throws[IOException]
+  def readFile(file: File): String = {
+    if (file.length >= Int.MaxValue) {
+      throw new IOException("Too large file, unexpected!")
+    } else {
+      val len = file.length
+      val array = new Array[Byte](len.toInt)
+      val is = Files.newInputStream(file.toPath)
+      readInputStream(is, array)
+      is.close()
+      new String(array, StandardCharsets.UTF_8)
+    }
+  }
+
+  @throws[IOException]
+  def writeFile(content: String, file: File): Unit = {
+    val outputStream = Files.newOutputStream(file.toPath)
+    val channel = Channels.newChannel(outputStream)
+    val buffer = ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))
+    channel.write(buffer)
+    channel.close()
+    outputStream.flush()
+    outputStream.close()
+  }
+
+  @throws[IOException]
+  def readEndOfFile(file: File, maxSize: Long): Array[Byte] = {
+    var readSize = maxSize
+    var fileContent: Array[Byte] = null
+    try {
+      val raFile = new RandomAccessFile(file, "r")
+      try {
+        if (raFile.length > maxSize) raFile.seek(raFile.length - maxSize)
+        else if (raFile.length < maxSize) readSize = raFile.length.toInt
+        fileContent = new Array[Byte](readSize.toInt)
+        raFile.read(fileContent)
+      } finally if (raFile != null) raFile.close()
     }
-    while (scanner.hasNextLine) {
-      buffer.append("\r\n")
-      buffer.append(scanner.nextLine())
+    fileContent
+  }
+
+  /**
+   * Read the content of a file from a specified offset.
+   *
+   * @param file
+   *   The file to read from
+   * @param startOffset
+   *   The offset from where to start reading the file
+   * @param maxSize
+   *   The maximum size of the file to read
+   * @return
+   *   The content of the file as a byte array
+   * @throws IOException
+   *   if an I/O error occurs while reading the file
+   * @throws IllegalArgumentException
+   *   if the startOffset is greater than the file length
+   */
+  @throws[IOException]
+  def readFileFromOffset(file: File, startOffset: Long, maxSize: Long): 
Array[Byte] = {
+    if (file.length < startOffset) {
+      throw new IllegalArgumentException(
+        s"The startOffset $startOffset is great than the file length 
${file.length}")
+    }
+    Utils.using(new RandomAccessFile(file, "r")) {
+      raFile =>
+        val readSize = Math.min(maxSize, file.length - startOffset)
+        raFile.seek(startOffset)
+        val fileContent = new Array[Byte](readSize.toInt)
+        raFile.read(fileContent)
+        fileContent
+    }
+  }
+
+  /**
+   * Roll View Log.
+   *
+   * @param path
+   *   The file path.
+   * @param offset
+   *   The offset.
+   * @param limit
+   *   The limit.
+   * @return
+   *   The content of the file.
+   */
+  def tailOf(path: String, offset: Int, limit: Int): String = try {
+    val file = new File(path)
+    if (file.exists && file.isFile) {
+      Utils.using(Files.lines(Paths.get(path))) {
+        stream => 
stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"))
+      }
     }
-    Utils.close(scanner, reader)
-    buffer.toString()
+    null
   }
 
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index b9823d8f0..715afa00d 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -137,7 +137,10 @@ object HadoopUtils extends Logger {
 
   def getConfigurationFromHadoopConfDir(confDir: String = hadoopConfDir): 
Configuration = {
     if (!configurationCache.containsKey(confDir)) {
-      FileUtils.exists(confDir)
+      if (!FileUtils.exists(confDir)) {
+        throw new ExceptionInInitializerError(
+          s"[StreamPark] hadoop conf file " + confDir + " is not exist!")
+      }
       val hadoopConfDir = new File(confDir)
       val confName = List("hdfs-default.xml", "core-site.xml", 
"hdfs-site.xml", "yarn-site.xml")
       val files =
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
index 583fc91f2..0b97498b1 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Logger.scala
@@ -140,7 +140,7 @@ private[this] object LoggerFactory extends 
LoggerFactoryBinder {
         val configurator = new JoranConfigurator()
         configurator.setContext(loggerContext)
         val text = FileUtils
-          .readString(new File(path))
+          .readFile(new File(path))
           .replaceAll("org.slf4j", s"$shadedPackage.org.slf4j")
           .replaceAll("ch.qos.logback", s"$shadedPackage.ch.qos.logback")
           .replaceAll("org.apache.log4j", s"$shadedPackage.org.apache.log4j")
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
deleted file mode 100644
index 4aaa624f2..000000000
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.base.util;
-
-import org.apache.streampark.console.base.exception.ApiDetailException;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** The file utils. */
-public class FileUtils {
-
-  private FileUtils() {}
-
-  /**
-   * Reads the last portion of a file as a byte array.
-   *
-   * @param file the file to read
-   * @param maxSize the maximum number of bytes to read from the end of the 
file
-   * @return the byte array containing the content read from the file
-   * @throws IOException if an I/O error occurs
-   */
-  public static byte[] readEndOfFile(File file, long maxSize) throws 
IOException {
-    long readSize = maxSize;
-    byte[] fileContent;
-    try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) {
-      if (raFile.length() > maxSize) {
-        raFile.seek(raFile.length() - maxSize);
-      } else if (raFile.length() < maxSize) {
-        readSize = (int) raFile.length();
-      }
-      fileContent = new byte[(int) readSize];
-      raFile.read(fileContent);
-    }
-    return fileContent;
-  }
-
-  /**
-   * Read the content of a file from a specified offset.
-   *
-   * @param file The file to read from
-   * @param startOffset The offset from where to start reading the file
-   * @param maxSize The maximum size of the file to read
-   * @return The content of the file as a byte array
-   * @throws IOException if an I/O error occurs while reading the file
-   * @throws IllegalArgumentException if the startOffset is greater than the 
file length
-   */
-  public static byte[] readFileFromOffset(File file, long startOffset, long 
maxSize)
-      throws IOException {
-    if (file.length() < startOffset) {
-      throw new IllegalArgumentException(
-          String.format(
-              "The startOffset %s is great than the file length %s", 
startOffset, file.length()));
-    }
-    byte[] fileContent;
-    try (RandomAccessFile raFile = new RandomAccessFile(file, "r")) {
-      long readSize = Math.min(maxSize, file.length() - startOffset);
-      raFile.seek(startOffset);
-      fileContent = new byte[(int) readSize];
-      raFile.read(fileContent);
-    }
-    return fileContent;
-  }
-
-  /**
-   * Roll View Log.
-   *
-   * @param path The file path.
-   * @param offset The offset.
-   * @param limit The limit.
-   * @return The content of the file.
-   * @throws ApiDetailException if there's an error rolling the view log.
-   */
-  public static String rollViewLog(String path, int offset, int limit) {
-    try {
-      File file = new File(path);
-      if (file.exists() && file.isFile()) {
-        try (Stream<String> stream = Files.lines(Paths.get(path))) {
-          return 
stream.skip(offset).limit(limit).collect(Collectors.joining("\r\n"));
-        }
-      }
-      return null;
-    } catch (Exception e) {
-      throw new ApiDetailException("roll view log error: " + e);
-    }
-  }
-}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index f9017a724..a20f5a08c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -424,7 +424,7 @@ public class AppBuildPipeServiceImpl
     FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId());
     String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app);
 
-    if (!new File(flinkUserJar).exists()) {
+    if (!FileUtils.exists(flinkUserJar)) {
       Resource resource = resourceService.findByResourceName(app.getTeamId(), 
app.getJar());
       if (resource != null && StringUtils.isNotBlank(resource.getFilePath())) {
         flinkUserJar = resource.getFilePath();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index a94879640..6806825b5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.fs.LfsOperator;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
+import org.apache.streampark.common.util.FileUtils;
 import org.apache.streampark.common.util.HadoopUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.common.util.ThreadUtils;
@@ -97,7 +98,6 @@ import org.apache.streampark.flink.kubernetes.model.TrackId;
 import org.apache.streampark.flink.packer.pipeline.BuildResult;
 import org.apache.streampark.flink.packer.pipeline.ShadedBuildResponse;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.CoreOptions;
@@ -129,7 +129,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
-import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Base64;
 import java.util.Collection;
@@ -619,9 +618,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
                 if (!future.isDone()) {
                   future.cancel(true);
                 }
-                if (path != null) {
-                  return 
org.apache.streampark.console.base.util.FileUtils.rollViewLog(
-                      path, offset, limit);
+                if (FileUtils.exists(path)) {
+                  return FileUtils.tailOf(path, offset, limit);
                 }
                 return null;
               })
@@ -738,7 +736,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
           jarPath = resource.getFilePath();
         }
       }
-      appParam.setJarCheckSum(FileUtils.checksumCRC32(new File(jarPath)));
+      
appParam.setJarCheckSum(org.apache.commons.io.FileUtils.checksumCRC32(new 
File(jarPath)));
     }
 
     if (save(appParam)) {
@@ -866,7 +864,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
         File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
         if (jarFile.exists()) {
           try {
-            long checkSum = FileUtils.checksumCRC32(jarFile);
+            long checkSum = 
org.apache.commons.io.FileUtils.checksumCRC32(jarFile);
             if (!Objects.equals(checkSum, application.getJarCheckSum())) {
               application.setBuild(true);
             }
@@ -1144,7 +1142,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
   @Override
   public String readConf(Application appParam) throws IOException {
     File file = new File(appParam.getConfig());
-    String conf = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
+    String conf = FileUtils.readFile(file);
     return Base64.getEncoder().encodeToString(conf.getBytes());
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index e568d976b..d48228706 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.util.CompletableFutureUtils;
+import org.apache.streampark.common.util.FileUtils;
 import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.domain.ResponseCode;
@@ -28,7 +29,6 @@ import org.apache.streampark.console.base.domain.RestRequest;
 import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
-import org.apache.streampark.console.base.util.FileUtils;
 import org.apache.streampark.console.base.util.GZipUtils;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.Project;
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
index 57db8581c..d31083327 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.console.base.util;
 
+import org.apache.streampark.common.util.FileUtils;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index b3ee24c33..bf2e2c68b 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -130,7 +130,7 @@ private[flink] class FlinkStreamingInitializer(args: 
Array[String], apiType: Api
         require(
           configFile.exists(),
           s"[StreamPark] Usage: application config file: $configFile is not 
found!!!")
-        val text = FileUtils.readString(configFile)
+        val text = FileUtils.readFile(configFile)
         readConfig(text)
     }
     map.filter(_._2.nonEmpty)

Reply via email to