This is an automated email from the ASF dual-hosted git repository. szetszwo pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/ratis.git
commit a219ccc1979269c16e5badf992618594c8cfaa36 Author: William Song <[email protected]> AuthorDate: Sat Jul 2 22:01:51 2022 +0800 RATIS-1597. Compute MD5 during snapshot streaming (#661) --- .../apache/ratis/server/storage/FileChunkReader.java | 19 ++++++++++++++++--- .../apache/ratis/server/storage/SnapshotManager.java | 11 +++++++++-- .../apache/ratis/InstallSnapshotFromLeaderTests.java | 14 ++++++-------- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java index a9288efc..15ed9815 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.server.storage; +import org.apache.ratis.io.MD5Hash; import org.apache.ratis.proto.RaftProtos.FileChunkProto; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.JavaUtils; @@ -25,14 +26,18 @@ import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Path; +import java.security.DigestInputStream; +import java.security.MessageDigest; import java.util.Optional; /** Read {@link FileChunkProto}s from a file. */ public class FileChunkReader implements Closeable { private final FileInfo info; private final Path relativePath; - private final FileInputStream in; + private final InputStream in; + private final MessageDigest digester; /** The offset position of the current chunk. */ private long offset = 0; /** The index of the current chunk. */ @@ -52,7 +57,13 @@ public class FileChunkReader implements Closeable { .map(p -> directory.getRoot().toPath().relativize(p)) .orElse(info.getPath()); final File f = info.getPath().toFile(); - this.in = new FileInputStream(f); + if (info.getFileDigest() == null) { + digester = MD5Hash.getDigester(); + this.in = new DigestInputStream(new FileInputStream(f), digester); + } else { + digester = null; + this.in = new FileInputStream(f); + } } /** @@ -66,6 +77,8 @@ public class FileChunkReader implements Closeable { final long remaining = info.getFileSize() - offset; final int chunkLength = remaining < chunkMaxSize ? (int) remaining : chunkMaxSize; final ByteString data = ByteString.readFrom(in, chunkLength); + final ByteString fileDigest = ByteString.copyFrom( + digester != null? digester.digest(): info.getFileDigest().getDigest()); final FileChunkProto proto = FileChunkProto.newBuilder() .setFilename(relativePath.toString()) @@ -73,7 +86,7 @@ public class FileChunkReader implements Closeable { .setChunkIndex(chunkIndex) .setDone(offset + chunkLength == info.getFileSize()) .setData(data) - .setFileDigest(ByteString.copyFrom(info.getFileDigest().getDigest())) + .setFileDigest(fileDigest) .build(); chunkIndex++; offset += chunkLength; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java index 8b5c6396..8748de5e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java @@ -22,6 +22,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.file.Path; +import java.security.DigestOutputStream; +import java.security.MessageDigest; +import java.util.function.Supplier; import org.apache.ratis.io.CorruptedFileException; import org.apache.ratis.io.MD5Hash; @@ -32,6 +35,7 @@ import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.util.FileUtils; import org.apache.ratis.util.IOUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MD5FileUtil; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.StringUtils; @@ -51,6 +55,7 @@ public class SnapshotManager { private final RaftStorage storage; private final RaftPeerId selfId; + private final Supplier<MessageDigest> digester = JavaUtils.memoize(MD5Hash::getDigester); public SnapshotManager(RaftStorage storage, RaftPeerId selfId) { this.storage = storage; @@ -106,7 +111,9 @@ public class SnapshotManager { } // write data to the file - out.write(chunk.getData().toByteArray()); + try (DigestOutputStream digestOut = new DigestOutputStream(out, digester.get())) { + digestOut.write(chunk.getData().toByteArray()); + } } finally { IOUtils.cleanup(null, out); } @@ -118,7 +125,7 @@ public class SnapshotManager { new MD5Hash(chunk.getFileDigest().toByteArray()); // calculate the checksum of the snapshot file and compare it with the // file digest in the request - MD5Hash digest = MD5FileUtil.computeMd5ForFile(tmpSnapshotFile); + final MD5Hash digest = new MD5Hash(digester.get().digest()); if (!digest.equals(expectedDigest)) { LOG.warn("The snapshot md5 digest {} does not match expected {}", digest, expectedDigest); diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java index b4cad6a4..98be1e64 100644 --- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java @@ -153,14 +153,12 @@ public abstract class InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu return null; } List<FileInfo> files = new ArrayList<>(); - try { - files.add(new FileInfo( - file1.toPath(), - MD5FileUtil.computeMd5ForFile(file1))); - files.add(new FileInfo( - file2.toPath(), - MD5FileUtil.computeMd5ForFile(file2))); - } catch (IOException ignored) {} + files.add(new FileInfo( + file1.toPath(), + null)); + files.add(new FileInfo( + file2.toPath(), + null)); Assert.assertEquals(files.size(), 2); SnapshotInfo info = super.getLatestSnapshot();
