This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e7f9e5ac RATIS-1915. Do not use FileInputStream/FileOutputStream in 
ratis-common. (#947)
3e7f9e5ac is described below

commit 3e7f9e5ac8894db7ad5734d380a7f91d65364ff0
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 25 13:29:12 2023 -0700

    RATIS-1915. Do not use FileInputStream/FileOutputStream in ratis-common. 
(#947)
---
 .../apache/ratis/util/AtomicFileOutputStream.java  |  49 +++-----
 .../main/java/org/apache/ratis/util/FileUtils.java | 135 ++++++++++++++++++---
 .../main/java/org/apache/ratis/util/LogUtils.java  |   3 +-
 .../java/org/apache/ratis/util/MD5FileUtil.java    |  95 ++++++---------
 .../ratis/InstallSnapshotFromLeaderTests.java      |  10 +-
 5 files changed, 178 insertions(+), 114 deletions(-)

diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java 
b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
index b2381ba83..530eb383c 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
@@ -20,11 +20,11 @@ package org.apache.ratis.util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
 import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -32,13 +32,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * The output file will not show up until it has been entirely written and 
sync'ed to disk.
  * It uses a temporary file when it is being written.
  * The default temporary file has a .tmp suffix.
- *
+ * <p>
  * When the output stream is closed, it is
  * (1) flushed
  * (2) sync'ed, and
  * (3) renamed/moved from the temporary file to the output file.
  * If the output file already exists, it will be overwritten.
- *
+ * <p>
  * NOTE that on Windows platforms, the output file, if it exists, is deleted
  * before the temporary file is moved.
  */
@@ -55,12 +55,12 @@ public class AtomicFileOutputStream extends 
FilterOutputStream {
   private final File tmpFile;
   private final AtomicBoolean isClosed = new AtomicBoolean();
 
-  public AtomicFileOutputStream(File outFile) throws FileNotFoundException {
+  public AtomicFileOutputStream(File outFile) throws IOException {
     this(outFile, getTemporaryFile(outFile));
   }
 
-  public AtomicFileOutputStream(File outFile, File tmpFile) throws 
FileNotFoundException {
-    super(new FileOutputStream(tmpFile));
+  public AtomicFileOutputStream(File outFile, File tmpFile) throws IOException 
{
+    super(FileUtils.newOutputStreamForceAtClose(tmpFile, 
StandardOpenOption.CREATE, StandardOpenOption.WRITE));
     this.outFile = outFile.getAbsoluteFile();
     this.tmpFile = tmpFile.getAbsoluteFile();
   }
@@ -74,35 +74,16 @@ public class AtomicFileOutputStream extends 
FilterOutputStream {
     if (!isClosed.compareAndSet(false, true)) {
       return;
     }
-    boolean forced = false;
-    boolean closed = false;
     try {
-      flush();
-      ((FileOutputStream)out).getChannel().force(true);
-      forced = true;
-
       super.close();
-      closed = true;
-
-      final boolean renamed = tmpFile.renameTo(outFile);
-      if (!renamed) {
-        // On windows, renameTo does not replace.
-        if (outFile.exists() && !outFile.delete()) {
-          throw new IOException("Could not delete original file " + outFile);
-        }
-        FileUtils.move(tmpFile, outFile);
-      }
-    } finally {
-      if (!closed) {
-        if (!forced) {
-          // If we failed when flushing, try to close it to not leak an FD
-          IOUtils.cleanup(LOG, out);
-        }
-        // close wasn't successful, try to delete the tmp file
-        if (!tmpFile.delete()) {
-          LOG.warn("Unable to delete tmp file " + tmpFile);
-        }
+      FileUtils.move(tmpFile, outFile, StandardCopyOption.REPLACE_EXISTING);
+    } catch (Exception e) {
+      try {
+        FileUtils.deleteIfExists(tmpFile);
+      } catch (IOException ioe) {
+        e.addSuppressed(ioe);
       }
+      throw e;
     }
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
index be736b09f..d5141e917 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -22,11 +22,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.*;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
 import java.util.function.Supplier;
 
 public interface FileUtils {
@@ -48,17 +53,82 @@ public interface FileUtils {
     final long original = f.length();
     LogUtils.runAndLog(LOG,
         () -> {
-          try (FileOutputStream out = new FileOutputStream(f, true)) {
-            out.getChannel().truncate(target);
+          try (FileChannel channel = FileChannel.open(f.toPath(), 
StandardOpenOption.WRITE)) {
+            channel.truncate(target);
           }
         },
-        () -> "FileOutputStream.getChannel().truncate " + f + " length: " + 
original + " -> " + target);
+        () -> "FileChannel.truncate " + f + " length: " + original + " -> " + 
target);
   }
 
-  static OutputStream createNewFile(Path p) throws IOException {
+  static InputStream newInputStream(String s, OpenOption... options) throws 
IOException {
+    return newInputStream(Paths.get(s), options);
+  }
+
+  static InputStream newInputStream(File f, OpenOption... options) throws 
IOException {
+    return newInputStream(f.toPath(), options);
+  }
+
+  static InputStream newInputStream(Path p, OpenOption... options) throws 
IOException {
+    return LogUtils.supplyAndLog(LOG,
+        () -> Files.newInputStream(p, options),
+        () -> "Files.newInputStream " + p + " with options " + 
Arrays.asList(options));
+  }
+
+  static OutputStream newOutputStream(File f, OpenOption... options) throws 
IOException {
+    return newOutputStream(f.toPath(), options);
+  }
+
+  static OutputStream newOutputStream(Path p, OpenOption... options) throws 
IOException {
+    return LogUtils.supplyAndLog(LOG,
+        () -> Files.newOutputStream(p, options),
+        () -> "Files.newOutputStream " + p + " with options " + 
Arrays.asList(options));
+  }
+
+  static OutputStream newOutputStream(FileChannel channel, boolean 
forceAtClose) {
+    final byte[] single = {0};
+    return new OutputStream() {
+      @Override
+      public void write(int b) throws IOException {
+        single[0] = (byte) b;
+        write(single);
+      }
+
+      @Override
+      public void write(byte[] b, int off, int len) throws IOException {
+        for(; len > 0; ) {
+          final int written = channel.write(ByteBuffer.wrap(b, off, len));
+          off += written;
+          len -= written;
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+        try (FileChannel c = channel) {
+          if (forceAtClose) {
+            c.force(true);
+          }
+        }
+      }
+    };
+  }
+
+  static OutputStream newOutputStreamForceAtClose(Path p, OpenOption... 
options) throws IOException {
+    return newOutputStream(newFileChannel(p, options), true);
+  }
+
+  static OutputStream newOutputStreamForceAtClose(File f, OpenOption... 
options) throws IOException {
+    return newOutputStreamForceAtClose(f.toPath(), options);
+  }
+
+  static FileChannel newFileChannel(File f, OpenOption... options) throws 
IOException {
+    return newFileChannel(f.toPath(), options);
+  }
+
+  static FileChannel newFileChannel(Path p, OpenOption... options) throws 
IOException {
     return LogUtils.supplyAndLog(LOG,
-        () -> Files.newOutputStream(p, StandardOpenOption.CREATE_NEW),
-        () -> "Files.newOutputStream " + StandardOpenOption.CREATE_NEW + " " + 
p);
+        () -> FileChannel.open(p, options),
+        () -> "FileChannel.open " + p + " with options " + 
Arrays.asList(options));
   }
 
   static void createDirectories(File dir) throws IOException {
@@ -86,19 +156,35 @@ public interface FileUtils {
     }
   }
 
-  static void move(File src, File dst) throws IOException {
-    move(src.toPath(), dst.toPath());
+  static void move(File src, File dst, CopyOption... options) throws 
IOException {
+    move(src.toPath(), dst.toPath(), options);
   }
 
-  static void move(Path src, Path dst) throws IOException {
+  static void move(Path src, Path dst, CopyOption... options) throws 
IOException {
+    Objects.requireNonNull(options, "options == null");
+    final List<CopyOption> original = Arrays.asList(options);
+    final boolean atomicMove = 
original.contains(StandardCopyOption.ATOMIC_MOVE);
+    if (atomicMove) {
+      LogUtils.runAndLog(LOG,
+          () -> Files.move(src, dst, options),
+          () -> "Files.move " + src + " to " + dst + " with options " + 
original);
+      return;
+    }
+
+    final CopyOption[] optionsWithAtomicMove = new CopyOption[options.length + 
1];
+    optionsWithAtomicMove[0] = StandardCopyOption.ATOMIC_MOVE;
+    System.arraycopy(options, 0, optionsWithAtomicMove, 1, options.length);
+
+    final Supplier<String> suffix = () -> original.isEmpty() ? "" : " with 
options " + original;
     try {
       LogUtils.runAndLog(LOG,
-        () -> Files.move(src, dst, StandardCopyOption.ATOMIC_MOVE),
-        () -> "Atomic Files.move " + src + " to " + dst);
+          () -> Files.move(src, dst, optionsWithAtomicMove),
+          () -> "Atomic Files.move " + src + " to " + dst + suffix.get());
     } catch (AtomicMoveNotSupportedException e) {
+      // Fallback to non-atomic move.
       LogUtils.runAndLog(LOG,
-        () -> Files.move(src, dst),
-        () -> "Atomic move not supported. Fallback to Files.move " + src + " 
to " + dst);
+          () -> Files.move(src, dst, options),
+          () -> "Atomic move not supported. Fallback to Files.move " + src + " 
to " + dst + suffix.get());
     }
   }
 
@@ -196,6 +282,24 @@ public interface FileUtils {
         () -> "Files.delete " + p);
   }
 
+  /**
+   * Use {@link Files#deleteIfExists(Path)} to delete the given path.
+   * This method may print log messages using {@link #LOG}.
+   */
+  static void deleteIfExists(Path p) throws IOException {
+    LogUtils.runAndLog(LOG,
+        () -> Files.deleteIfExists(p),
+        () -> "Files.deleteIfExists " + p);
+  }
+
+  /**
+   * Use {@link Files#deleteIfExists(Path)} to delete the given path.
+   * This method may print log messages using {@link #LOG}.
+   */
+  static void deleteIfExists(File f) throws IOException {
+    deleteIfExists(f.toPath());
+  }
+
   /** The same as passing f.toPath() to {@link #deleteFully(Path)}. */
   static void deleteFully(File f) throws IOException {
     LOG.trace("deleteFully {}", f);
@@ -204,12 +308,9 @@ public interface FileUtils {
 
   /**
    * Delete fully the given path.
-   *
    * (1) If it is a file, the file will be deleted.
-   *
    * (2) If it is a directory, the directory and all its contents will be 
recursively deleted.
    *     If an exception is thrown, the directory may possibly be partially 
deleted.*
-   *
    * (3) If it is a symlink, the symlink will be deleted but the symlink 
target will not be deleted.
    */
   static void deleteFully(Path p) throws IOException {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
index 9cb6e74a5..d29f1e56e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
@@ -62,8 +62,7 @@ public interface LogUtils {
       } else if (log.isWarnEnabled()){
         log.warn("Failed to " + name.get() + ": " + e);
       }
-      final THROWABLE throwable = JavaUtils.cast(e);
-      throw throwable;
+      throw e;
     }
 
     if (log.isTraceEnabled()) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java 
b/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
index 2e5eb2984..8a38f45e6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
@@ -23,14 +23,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.channels.FileChannel;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.security.DigestInputStream;
+import java.nio.file.StandardOpenOption;
 import java.security.MessageDigest;
+import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -42,8 +41,19 @@ public abstract class MD5FileUtil {
   // Keep the checksum and data in the same block format instead of individual 
files.
 
   public static final String MD5_SUFFIX = ".md5";
-  private static final Pattern LINE_REGEX =
-      Pattern.compile("([0-9a-f]{32}) [ *](.+)");
+  private static final String LINE_REGEX = "([0-9a-f]{32}) [ *](.+)";
+  private static final Pattern LINE_PATTERN = Pattern.compile(LINE_REGEX);
+
+  static Matcher getMatcher(String md5) {
+    return Optional.ofNullable(md5)
+        .map(LINE_PATTERN::matcher)
+        .filter(Matcher::matches)
+        .orElse(null);
+  }
+
+  static String getDoesNotMatchString(String line) {
+    return "\"" + line + "\" does not match the pattern " + LINE_REGEX;
+  }
 
   /**
    * Verify that the previously saved md5 for the given file matches
@@ -60,36 +70,14 @@ public abstract class MD5FileUtil {
     }
   }
 
-  /**
-   * Read the md5 file stored alongside the given data file and match the md5
-   * file content.
-   * @param md5File the file containing data
-   * @return a matcher with two matched groups where group(1) is the md5 string
-   *         and group(2) is the data file path.
-   */
-  private static Matcher readStoredMd5(File md5File) throws IOException {
-    BufferedReader reader =
-          new BufferedReader(new InputStreamReader(new FileInputStream(
-            md5File), StandardCharsets.UTF_8));
-    String md5Line;
-    try {
-      md5Line = reader.readLine();
-      if (md5Line == null) {
-        md5Line = "";
-      }
-      md5Line = md5Line.trim();
+  /** Read the first line of the given file. */
+  private static String readFirstLine(File f) throws IOException {
+    try (BufferedReader reader = new BufferedReader(new InputStreamReader(
+        FileUtils.newInputStream(f), StandardCharsets.UTF_8))) {
+      return 
Optional.ofNullable(reader.readLine()).map(String::trim).orElse(null);
     } catch (IOException ioe) {
-      throw new IOException("Error reading md5 file at " + md5File, ioe);
-    } finally {
-      IOUtils.cleanup(LOG, reader);
+      throw new IOException("Failed to read file: " + f, ioe);
     }
-
-    Matcher matcher = LINE_REGEX.matcher(md5Line);
-    if (!matcher.matches()) {
-      throw new IOException("Invalid MD5 file " + md5File + ": the content \""
-          + md5Line + "\" does not match the expected pattern.");
-    }
-    return matcher;
   }
 
   /**
@@ -103,7 +91,9 @@ public abstract class MD5FileUtil {
       return null;
     }
 
-    final Matcher matcher = readStoredMd5(md5File);
+    final String md5 = readFirstLine(md5File);
+    final Matcher matcher = 
Optional.ofNullable(getMatcher(md5)).orElseThrow(() -> new IOException(
+        "Invalid MD5 file " + md5File + ": the content " + 
getDoesNotMatchString(md5)));
     String storedHash = matcher.group(1);
     File referencedFile = new File(matcher.group(2));
 
@@ -122,9 +112,15 @@ public abstract class MD5FileUtil {
    * Read dataFile and compute its MD5 checksum.
    */
   public static MD5Hash computeMd5ForFile(File dataFile) throws IOException {
+    final int bufferSize = SizeInBytes.ONE_MB.getSizeInt();
     final MessageDigest digester = MD5Hash.getDigester();
-    try (DigestInputStream dis = new 
DigestInputStream(Files.newInputStream(dataFile.toPath()), digester)) {
-      IOUtils.readFully(dis, 128*1024);
+    try (FileChannel in = FileUtils.newFileChannel(dataFile, 
StandardOpenOption.READ)) {
+      final long fileSize = in.size();
+      for (int offset = 0; offset < fileSize; ) {
+        final int readSize = Math.toIntExact(Math.min(fileSize - offset, 
bufferSize));
+        digester.update(in.map(FileChannel.MapMode.READ_ONLY, offset, 
readSize));
+        offset += readSize;
+      }
     }
     return new MD5Hash(digester.digest());
   }
@@ -157,11 +153,13 @@ public abstract class MD5FileUtil {
 
   private static void saveMD5File(File dataFile, String digestString)
       throws IOException {
-    File md5File = getDigestFileForFile(dataFile);
-    String md5Line = digestString + " *" + dataFile.getName() + "\n";
+    final String md5Line = digestString + " *" + dataFile.getName() + "\n";
+    if (getMatcher(md5Line.trim()) == null) {
+      throw new IllegalArgumentException("Invalid md5 string: " + 
getDoesNotMatchString(digestString));
+    }
 
-    try (AtomicFileOutputStream afos
-         = new AtomicFileOutputStream(md5File)) {
+    final File md5File = getDigestFileForFile(dataFile);
+    try (AtomicFileOutputStream afos = new AtomicFileOutputStream(md5File)) {
       afos.write(md5Line.getBytes(StandardCharsets.UTF_8));
     }
 
@@ -170,21 +168,6 @@ public abstract class MD5FileUtil {
     }
   }
 
-  public static void renameMD5File(File oldDataFile, File newDataFile)
-      throws IOException {
-    final File fromFile = getDigestFileForFile(oldDataFile);
-    if (!fromFile.exists()) {
-      throw new FileNotFoundException(fromFile + " does not exist.");
-    }
-
-    final String digestString = readStoredMd5(fromFile).group(1);
-    saveMD5File(newDataFile, digestString);
-
-    if (!fromFile.delete()) {
-      LOG.warn("deleting  " + fromFile.getAbsolutePath() + " FAILED");
-    }
-  }
-
   /**
    * @return a reference to the file with .md5 suffix that will
    * contain the md5 checksum for the given data file.
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index 2b8819911..b7eb75204 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -45,11 +45,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -168,12 +169,11 @@ public abstract class 
InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu
           FileUtils.createDirectories(snapshotRoot);
           FileUtils.createDirectories(file1.getParentFile());
           FileUtils.createDirectories(file2.getParentFile());
-          FileUtils.createNewFile(file1.toPath());
-          FileUtils.createNewFile(file2.toPath());
-          // write 4MB data to simulate multiple chunk scene
+          FileUtils.newOutputStream(file1, 
StandardOpenOption.CREATE_NEW).close();
+          // write 4KB data to simulate multiple chunk scene
           final byte[] data = new byte[4096];
           Arrays.fill(data, (byte)0x01);
-          try (FileOutputStream fout = new FileOutputStream(file2)) {
+          try (OutputStream fout = FileUtils.newOutputStream(file2, 
StandardOpenOption.CREATE_NEW)) {
               fout.write(data);
           }
         }

Reply via email to