This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new db3623a2 RATIS-1597. Compute MD5 during snapshot streaming (#661)
db3623a2 is described below

commit db3623a22e39a7957d9985fc9fe6784fe977f070
Author: William Song <[email protected]>
AuthorDate: Sat Jul 2 22:01:51 2022 +0800

    RATIS-1597. Compute MD5 during snapshot streaming (#661)
---
 .../apache/ratis/server/storage/FileChunkReader.java  | 19 ++++++++++++++++---
 .../apache/ratis/server/storage/SnapshotManager.java  | 11 +++++++++--
 .../apache/ratis/InstallSnapshotFromLeaderTests.java  | 14 ++++++--------
 3 files changed, 31 insertions(+), 13 deletions(-)

diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
index a9288efc..15ed9815 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.server.storage;
 
+import org.apache.ratis.io.MD5Hash;
 import org.apache.ratis.proto.RaftProtos.FileChunkProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.JavaUtils;
@@ -25,14 +26,18 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.nio.file.Path;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
 import java.util.Optional;
 
 /** Read {@link FileChunkProto}s from a file. */
 public class FileChunkReader implements Closeable {
   private final FileInfo info;
   private final Path relativePath;
-  private final FileInputStream in;
+  private final InputStream in;
+  private final MessageDigest digester;
   /** The offset position of the current chunk. */
   private long offset = 0;
   /** The index of the current chunk. */
@@ -52,7 +57,13 @@ public class FileChunkReader implements Closeable {
         .map(p -> directory.getRoot().toPath().relativize(p))
         .orElse(info.getPath());
     final File f = info.getPath().toFile();
-    this.in = new FileInputStream(f);
+    if (info.getFileDigest() == null) {
+      digester = MD5Hash.getDigester();
+      this.in = new DigestInputStream(new FileInputStream(f), digester);
+    } else {
+      digester = null;
+      this.in = new FileInputStream(f);
+    }
   }
 
   /**
@@ -66,6 +77,8 @@ public class FileChunkReader implements Closeable {
     final long remaining = info.getFileSize() - offset;
     final int chunkLength = remaining < chunkMaxSize ? (int) remaining : 
chunkMaxSize;
     final ByteString data = ByteString.readFrom(in, chunkLength);
+    final ByteString fileDigest = ByteString.copyFrom(
+            digester != null? digester.digest(): 
info.getFileDigest().getDigest());
 
     final FileChunkProto proto = FileChunkProto.newBuilder()
         .setFilename(relativePath.toString())
@@ -73,7 +86,7 @@ public class FileChunkReader implements Closeable {
         .setChunkIndex(chunkIndex)
         .setDone(offset + chunkLength == info.getFileSize())
         .setData(data)
-        .setFileDigest(ByteString.copyFrom(info.getFileDigest().getDigest()))
+        .setFileDigest(fileDigest)
         .build();
     chunkIndex++;
     offset += chunkLength;
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 8b5c6396..8748de5e 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -22,6 +22,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Path;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.function.Supplier;
 
 import org.apache.ratis.io.CorruptedFileException;
 import org.apache.ratis.io.MD5Hash;
@@ -32,6 +35,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
 import org.apache.ratis.statemachine.StateMachine;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MD5FileUtil;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.StringUtils;
@@ -51,6 +55,7 @@ public class SnapshotManager {
 
   private final RaftStorage storage;
   private final RaftPeerId selfId;
+  private final Supplier<MessageDigest> digester = 
JavaUtils.memoize(MD5Hash::getDigester);
 
   public SnapshotManager(RaftStorage storage, RaftPeerId selfId) {
     this.storage = storage;
@@ -106,7 +111,9 @@ public class SnapshotManager {
         }
 
         // write data to the file
-        out.write(chunk.getData().toByteArray());
+        try (DigestOutputStream digestOut = new DigestOutputStream(out, 
digester.get())) {
+          digestOut.write(chunk.getData().toByteArray());
+        }
       } finally {
         IOUtils.cleanup(null, out);
       }
@@ -118,7 +125,7 @@ public class SnapshotManager {
             new MD5Hash(chunk.getFileDigest().toByteArray());
         // calculate the checksum of the snapshot file and compare it with the
         // file digest in the request
-        MD5Hash digest = MD5FileUtil.computeMd5ForFile(tmpSnapshotFile);
+        final MD5Hash digest = new MD5Hash(digester.get().digest());
         if (!digest.equals(expectedDigest)) {
           LOG.warn("The snapshot md5 digest {} does not match expected {}",
               digest, expectedDigest);
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index b4cad6a4..98be1e64 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -153,14 +153,12 @@ public abstract class 
InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu
                 return null;
             }
             List<FileInfo> files = new ArrayList<>();
-            try {
-                files.add(new FileInfo(
-                        file1.toPath(),
-                        MD5FileUtil.computeMd5ForFile(file1)));
-                files.add(new FileInfo(
-                        file2.toPath(),
-                        MD5FileUtil.computeMd5ForFile(file2)));
-            } catch (IOException ignored) {}
+            files.add(new FileInfo(
+                    file1.toPath(),
+                    null));
+            files.add(new FileInfo(
+                    file2.toPath(),
+                    null));
             Assert.assertEquals(files.size(), 2);
 
             SnapshotInfo info = super.getLatestSnapshot();

Reply via email to