This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3e7f9e5ac RATIS-1915. Do not use FileInputStream/FileOutputStream in
ratis-common. (#947)
3e7f9e5ac is described below
commit 3e7f9e5ac8894db7ad5734d380a7f91d65364ff0
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 25 13:29:12 2023 -0700
RATIS-1915. Do not use FileInputStream/FileOutputStream in ratis-common.
(#947)
---
.../apache/ratis/util/AtomicFileOutputStream.java | 49 +++-----
.../main/java/org/apache/ratis/util/FileUtils.java | 135 ++++++++++++++++++---
.../main/java/org/apache/ratis/util/LogUtils.java | 3 +-
.../java/org/apache/ratis/util/MD5FileUtil.java | 95 ++++++---------
.../ratis/InstallSnapshotFromLeaderTests.java | 10 +-
5 files changed, 178 insertions(+), 114 deletions(-)
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
index b2381ba83..530eb383c 100644
---
a/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
+++
b/ratis-common/src/main/java/org/apache/ratis/util/AtomicFileOutputStream.java
@@ -20,11 +20,11 @@ package org.apache.ratis.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -32,13 +32,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
* The output file will not show up until it has been entirely written and
sync'ed to disk.
* It uses a temporary file when it is being written.
* The default temporary file has a .tmp suffix.
- *
+ * <p>
* When the output stream is closed, it is
* (1) flushed
* (2) sync'ed, and
* (3) renamed/moved from the temporary file to the output file.
* If the output file already exists, it will be overwritten.
- *
+ * <p>
* NOTE that on Windows platforms, the output file, if it exists, is deleted
* before the temporary file is moved.
*/
@@ -55,12 +55,12 @@ public class AtomicFileOutputStream extends
FilterOutputStream {
private final File tmpFile;
private final AtomicBoolean isClosed = new AtomicBoolean();
- public AtomicFileOutputStream(File outFile) throws FileNotFoundException {
+ public AtomicFileOutputStream(File outFile) throws IOException {
this(outFile, getTemporaryFile(outFile));
}
- public AtomicFileOutputStream(File outFile, File tmpFile) throws
FileNotFoundException {
- super(new FileOutputStream(tmpFile));
+ public AtomicFileOutputStream(File outFile, File tmpFile) throws IOException
{
+ super(FileUtils.newOutputStreamForceAtClose(tmpFile,
StandardOpenOption.CREATE, StandardOpenOption.WRITE));
this.outFile = outFile.getAbsoluteFile();
this.tmpFile = tmpFile.getAbsoluteFile();
}
@@ -74,35 +74,16 @@ public class AtomicFileOutputStream extends
FilterOutputStream {
if (!isClosed.compareAndSet(false, true)) {
return;
}
- boolean forced = false;
- boolean closed = false;
try {
- flush();
- ((FileOutputStream)out).getChannel().force(true);
- forced = true;
-
super.close();
- closed = true;
-
- final boolean renamed = tmpFile.renameTo(outFile);
- if (!renamed) {
- // On windows, renameTo does not replace.
- if (outFile.exists() && !outFile.delete()) {
- throw new IOException("Could not delete original file " + outFile);
- }
- FileUtils.move(tmpFile, outFile);
- }
- } finally {
- if (!closed) {
- if (!forced) {
- // If we failed when flushing, try to close it to not leak an FD
- IOUtils.cleanup(LOG, out);
- }
- // close wasn't successful, try to delete the tmp file
- if (!tmpFile.delete()) {
- LOG.warn("Unable to delete tmp file " + tmpFile);
- }
+ FileUtils.move(tmpFile, outFile, StandardCopyOption.REPLACE_EXISTING);
+ } catch (Exception e) {
+ try {
+ FileUtils.deleteIfExists(tmpFile);
+ } catch (IOException ioe) {
+ e.addSuppressed(ioe);
}
+ throw e;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
index be736b09f..d5141e917 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/FileUtils.java
@@ -22,11 +22,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
import java.util.function.Supplier;
public interface FileUtils {
@@ -48,17 +53,82 @@ public interface FileUtils {
final long original = f.length();
LogUtils.runAndLog(LOG,
() -> {
- try (FileOutputStream out = new FileOutputStream(f, true)) {
- out.getChannel().truncate(target);
+ try (FileChannel channel = FileChannel.open(f.toPath(),
StandardOpenOption.WRITE)) {
+ channel.truncate(target);
}
},
- () -> "FileOutputStream.getChannel().truncate " + f + " length: " +
original + " -> " + target);
+ () -> "FileChannel.truncate " + f + " length: " + original + " -> " +
target);
}
- static OutputStream createNewFile(Path p) throws IOException {
+ static InputStream newInputStream(String s, OpenOption... options) throws
IOException {
+ return newInputStream(Paths.get(s), options);
+ }
+
+ static InputStream newInputStream(File f, OpenOption... options) throws
IOException {
+ return newInputStream(f.toPath(), options);
+ }
+
+ static InputStream newInputStream(Path p, OpenOption... options) throws
IOException {
+ return LogUtils.supplyAndLog(LOG,
+ () -> Files.newInputStream(p, options),
+ () -> "Files.newInputStream " + p + " with options " +
Arrays.asList(options));
+ }
+
+ static OutputStream newOutputStream(File f, OpenOption... options) throws
IOException {
+ return newOutputStream(f.toPath(), options);
+ }
+
+ static OutputStream newOutputStream(Path p, OpenOption... options) throws
IOException {
+ return LogUtils.supplyAndLog(LOG,
+ () -> Files.newOutputStream(p, options),
+ () -> "Files.newOutputStream " + p + " with options " +
Arrays.asList(options));
+ }
+
+ static OutputStream newOutputStream(FileChannel channel, boolean
forceAtClose) {
+ final byte[] single = {0};
+ return new OutputStream() {
+ @Override
+ public void write(int b) throws IOException {
+ single[0] = (byte) b;
+ write(single);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ for(; len > 0; ) {
+ final int written = channel.write(ByteBuffer.wrap(b, off, len));
+ off += written;
+ len -= written;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try (FileChannel c = channel) {
+ if (forceAtClose) {
+ c.force(true);
+ }
+ }
+ }
+ };
+ }
+
+ static OutputStream newOutputStreamForceAtClose(Path p, OpenOption...
options) throws IOException {
+ return newOutputStream(newFileChannel(p, options), true);
+ }
+
+ static OutputStream newOutputStreamForceAtClose(File f, OpenOption...
options) throws IOException {
+ return newOutputStreamForceAtClose(f.toPath(), options);
+ }
+
+ static FileChannel newFileChannel(File f, OpenOption... options) throws
IOException {
+ return newFileChannel(f.toPath(), options);
+ }
+
+ static FileChannel newFileChannel(Path p, OpenOption... options) throws
IOException {
return LogUtils.supplyAndLog(LOG,
- () -> Files.newOutputStream(p, StandardOpenOption.CREATE_NEW),
- () -> "Files.newOutputStream " + StandardOpenOption.CREATE_NEW + " " +
p);
+ () -> FileChannel.open(p, options),
+ () -> "FileChannel.open " + p + " with options " +
Arrays.asList(options));
}
static void createDirectories(File dir) throws IOException {
@@ -86,19 +156,35 @@ public interface FileUtils {
}
}
- static void move(File src, File dst) throws IOException {
- move(src.toPath(), dst.toPath());
+ static void move(File src, File dst, CopyOption... options) throws
IOException {
+ move(src.toPath(), dst.toPath(), options);
}
- static void move(Path src, Path dst) throws IOException {
+ static void move(Path src, Path dst, CopyOption... options) throws
IOException {
+ Objects.requireNonNull(options, "options == null");
+ final List<CopyOption> original = Arrays.asList(options);
+ final boolean atomicMove =
original.contains(StandardCopyOption.ATOMIC_MOVE);
+ if (atomicMove) {
+ LogUtils.runAndLog(LOG,
+ () -> Files.move(src, dst, options),
+ () -> "Files.move " + src + " to " + dst + " with options " +
original);
+ return;
+ }
+
+ final CopyOption[] optionsWithAtomicMove = new CopyOption[options.length +
1];
+ optionsWithAtomicMove[0] = StandardCopyOption.ATOMIC_MOVE;
+ System.arraycopy(options, 0, optionsWithAtomicMove, 1, options.length);
+
+ final Supplier<String> suffix = () -> original.isEmpty() ? "" : " with
options " + original;
try {
LogUtils.runAndLog(LOG,
- () -> Files.move(src, dst, StandardCopyOption.ATOMIC_MOVE),
- () -> "Atomic Files.move " + src + " to " + dst);
+ () -> Files.move(src, dst, optionsWithAtomicMove),
+ () -> "Atomic Files.move " + src + " to " + dst + suffix.get());
} catch (AtomicMoveNotSupportedException e) {
+ // Fallback to non-atomic move.
LogUtils.runAndLog(LOG,
- () -> Files.move(src, dst),
- () -> "Atomic move not supported. Fallback to Files.move " + src + "
to " + dst);
+ () -> Files.move(src, dst, options),
+ () -> "Atomic move not supported. Fallback to Files.move " + src + "
to " + dst + suffix.get());
}
}
@@ -196,6 +282,24 @@ public interface FileUtils {
() -> "Files.delete " + p);
}
+ /**
+ * Use {@link Files#deleteIfExists(Path)} to delete the given path.
+ * This method may print log messages using {@link #LOG}.
+ */
+ static void deleteIfExists(Path p) throws IOException {
+ LogUtils.runAndLog(LOG,
+ () -> Files.deleteIfExists(p),
+ () -> "Files.deleteIfExists " + p);
+ }
+
+ /**
+ * Use {@link Files#deleteIfExists(Path)} to delete the given path.
+ * This method may print log messages using {@link #LOG}.
+ */
+ static void deleteIfExists(File f) throws IOException {
+ deleteIfExists(f.toPath());
+ }
+
/** The same as passing f.toPath() to {@link #deleteFully(Path)}. */
static void deleteFully(File f) throws IOException {
LOG.trace("deleteFully {}", f);
@@ -204,12 +308,9 @@ public interface FileUtils {
/**
* Delete fully the given path.
- *
* (1) If it is a file, the file will be deleted.
- *
* (2) If it is a directory, the directory and all its contents will be
recursively deleted.
* If an exception is thrown, the directory may possibly be partially
deleted.*
- *
* (3) If it is a symlink, the symlink will be deleted but the symlink
target will not be deleted.
*/
static void deleteFully(Path p) throws IOException {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
index 9cb6e74a5..d29f1e56e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java
@@ -62,8 +62,7 @@ public interface LogUtils {
} else if (log.isWarnEnabled()){
log.warn("Failed to " + name.get() + ": " + e);
}
- final THROWABLE throwable = JavaUtils.cast(e);
- throw throwable;
+ throw e;
}
if (log.isTraceEnabled()) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
b/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
index 2e5eb2984..8a38f45e6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/MD5FileUtil.java
@@ -23,14 +23,13 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.security.DigestInputStream;
+import java.nio.file.StandardOpenOption;
import java.security.MessageDigest;
+import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -42,8 +41,19 @@ public abstract class MD5FileUtil {
// Keep the checksum and data in the same block format instead of individual
files.
public static final String MD5_SUFFIX = ".md5";
- private static final Pattern LINE_REGEX =
- Pattern.compile("([0-9a-f]{32}) [ *](.+)");
+ private static final String LINE_REGEX = "([0-9a-f]{32}) [ *](.+)";
+ private static final Pattern LINE_PATTERN = Pattern.compile(LINE_REGEX);
+
+ static Matcher getMatcher(String md5) {
+ return Optional.ofNullable(md5)
+ .map(LINE_PATTERN::matcher)
+ .filter(Matcher::matches)
+ .orElse(null);
+ }
+
+ static String getDoesNotMatchString(String line) {
+ return "\"" + line + "\" does not match the pattern " + LINE_REGEX;
+ }
/**
* Verify that the previously saved md5 for the given file matches
@@ -60,36 +70,14 @@ public abstract class MD5FileUtil {
}
}
- /**
- * Read the md5 file stored alongside the given data file and match the md5
- * file content.
- * @param md5File the file containing data
- * @return a matcher with two matched groups where group(1) is the md5 string
- * and group(2) is the data file path.
- */
- private static Matcher readStoredMd5(File md5File) throws IOException {
- BufferedReader reader =
- new BufferedReader(new InputStreamReader(new FileInputStream(
- md5File), StandardCharsets.UTF_8));
- String md5Line;
- try {
- md5Line = reader.readLine();
- if (md5Line == null) {
- md5Line = "";
- }
- md5Line = md5Line.trim();
+ /** Read the first line of the given file. */
+ private static String readFirstLine(File f) throws IOException {
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(
+ FileUtils.newInputStream(f), StandardCharsets.UTF_8))) {
+ return
Optional.ofNullable(reader.readLine()).map(String::trim).orElse(null);
} catch (IOException ioe) {
- throw new IOException("Error reading md5 file at " + md5File, ioe);
- } finally {
- IOUtils.cleanup(LOG, reader);
+ throw new IOException("Failed to read file: " + f, ioe);
}
-
- Matcher matcher = LINE_REGEX.matcher(md5Line);
- if (!matcher.matches()) {
- throw new IOException("Invalid MD5 file " + md5File + ": the content \""
- + md5Line + "\" does not match the expected pattern.");
- }
- return matcher;
}
/**
@@ -103,7 +91,9 @@ public abstract class MD5FileUtil {
return null;
}
- final Matcher matcher = readStoredMd5(md5File);
+ final String md5 = readFirstLine(md5File);
+ final Matcher matcher =
Optional.ofNullable(getMatcher(md5)).orElseThrow(() -> new IOException(
+ "Invalid MD5 file " + md5File + ": the content " +
getDoesNotMatchString(md5)));
String storedHash = matcher.group(1);
File referencedFile = new File(matcher.group(2));
@@ -122,9 +112,15 @@ public abstract class MD5FileUtil {
* Read dataFile and compute its MD5 checksum.
*/
public static MD5Hash computeMd5ForFile(File dataFile) throws IOException {
+ final int bufferSize = SizeInBytes.ONE_MB.getSizeInt();
final MessageDigest digester = MD5Hash.getDigester();
- try (DigestInputStream dis = new
DigestInputStream(Files.newInputStream(dataFile.toPath()), digester)) {
- IOUtils.readFully(dis, 128*1024);
+ try (FileChannel in = FileUtils.newFileChannel(dataFile,
StandardOpenOption.READ)) {
+ final long fileSize = in.size();
+ for (int offset = 0; offset < fileSize; ) {
+ final int readSize = Math.toIntExact(Math.min(fileSize - offset,
bufferSize));
+ digester.update(in.map(FileChannel.MapMode.READ_ONLY, offset,
readSize));
+ offset += readSize;
+ }
}
return new MD5Hash(digester.digest());
}
@@ -157,11 +153,13 @@ public abstract class MD5FileUtil {
private static void saveMD5File(File dataFile, String digestString)
throws IOException {
- File md5File = getDigestFileForFile(dataFile);
- String md5Line = digestString + " *" + dataFile.getName() + "\n";
+ final String md5Line = digestString + " *" + dataFile.getName() + "\n";
+ if (getMatcher(md5Line.trim()) == null) {
+ throw new IllegalArgumentException("Invalid md5 string: " +
getDoesNotMatchString(digestString));
+ }
- try (AtomicFileOutputStream afos
- = new AtomicFileOutputStream(md5File)) {
+ final File md5File = getDigestFileForFile(dataFile);
+ try (AtomicFileOutputStream afos = new AtomicFileOutputStream(md5File)) {
afos.write(md5Line.getBytes(StandardCharsets.UTF_8));
}
@@ -170,21 +168,6 @@ public abstract class MD5FileUtil {
}
}
- public static void renameMD5File(File oldDataFile, File newDataFile)
- throws IOException {
- final File fromFile = getDigestFileForFile(oldDataFile);
- if (!fromFile.exists()) {
- throw new FileNotFoundException(fromFile + " does not exist.");
- }
-
- final String digestString = readStoredMd5(fromFile).group(1);
- saveMD5File(newDataFile, digestString);
-
- if (!fromFile.delete()) {
- LOG.warn("deleting " + fromFile.getAbsolutePath() + " FAILED");
- }
- }
-
/**
* @return a reference to the file with .md5 suffix that will
* contain the md5 checksum for the given data file.
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index 2b8819911..b7eb75204 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -45,11 +45,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -168,12 +169,11 @@ public abstract class
InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu
FileUtils.createDirectories(snapshotRoot);
FileUtils.createDirectories(file1.getParentFile());
FileUtils.createDirectories(file2.getParentFile());
- FileUtils.createNewFile(file1.toPath());
- FileUtils.createNewFile(file2.toPath());
- // write 4MB data to simulate multiple chunk scene
+ FileUtils.newOutputStream(file1,
StandardOpenOption.CREATE_NEW).close();
+ // write 4KB data to simulate multiple chunk scene
final byte[] data = new byte[4096];
Arrays.fill(data, (byte)0x01);
- try (FileOutputStream fout = new FileOutputStream(file2)) {
+ try (OutputStream fout = FileUtils.newOutputStream(file2,
StandardOpenOption.CREATE_NEW)) {
fout.write(data);
}
}