This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new db3623a2 RATIS-1597. Compute MD5 during snapshot streaming (#661)
db3623a2 is described below
commit db3623a22e39a7957d9985fc9fe6784fe977f070
Author: William Song <[email protected]>
AuthorDate: Sat Jul 2 22:01:51 2022 +0800
RATIS-1597. Compute MD5 during snapshot streaming (#661)
---
.../apache/ratis/server/storage/FileChunkReader.java | 19 ++++++++++++++++---
.../apache/ratis/server/storage/SnapshotManager.java | 11 +++++++++--
.../apache/ratis/InstallSnapshotFromLeaderTests.java | 14 ++++++--------
3 files changed, 31 insertions(+), 13 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
index a9288efc..15ed9815 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/FileChunkReader.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.storage;
+import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.proto.RaftProtos.FileChunkProto;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
@@ -25,14 +26,18 @@ import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.file.Path;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
import java.util.Optional;
/** Read {@link FileChunkProto}s from a file. */
public class FileChunkReader implements Closeable {
private final FileInfo info;
private final Path relativePath;
- private final FileInputStream in;
+ private final InputStream in;
+ private final MessageDigest digester;
/** The offset position of the current chunk. */
private long offset = 0;
/** The index of the current chunk. */
@@ -52,7 +57,13 @@ public class FileChunkReader implements Closeable {
.map(p -> directory.getRoot().toPath().relativize(p))
.orElse(info.getPath());
final File f = info.getPath().toFile();
- this.in = new FileInputStream(f);
+ if (info.getFileDigest() == null) {
+ digester = MD5Hash.getDigester();
+ this.in = new DigestInputStream(new FileInputStream(f), digester);
+ } else {
+ digester = null;
+ this.in = new FileInputStream(f);
+ }
}
/**
@@ -66,6 +77,8 @@ public class FileChunkReader implements Closeable {
final long remaining = info.getFileSize() - offset;
final int chunkLength = remaining < chunkMaxSize ? (int) remaining :
chunkMaxSize;
final ByteString data = ByteString.readFrom(in, chunkLength);
+ final ByteString fileDigest = ByteString.copyFrom(
+ digester != null? digester.digest():
info.getFileDigest().getDigest());
final FileChunkProto proto = FileChunkProto.newBuilder()
.setFilename(relativePath.toString())
@@ -73,7 +86,7 @@ public class FileChunkReader implements Closeable {
.setChunkIndex(chunkIndex)
.setDone(offset + chunkLength == info.getFileSize())
.setData(data)
- .setFileDigest(ByteString.copyFrom(info.getFileDigest().getDigest()))
+ .setFileDigest(fileDigest)
.build();
chunkIndex++;
offset += chunkLength;
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
index 8b5c6396..8748de5e 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/storage/SnapshotManager.java
@@ -22,6 +22,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
+import java.security.DigestOutputStream;
+import java.security.MessageDigest;
+import java.util.function.Supplier;
import org.apache.ratis.io.CorruptedFileException;
import org.apache.ratis.io.MD5Hash;
@@ -32,6 +35,7 @@ import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.IOUtils;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MD5FileUtil;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
@@ -51,6 +55,7 @@ public class SnapshotManager {
private final RaftStorage storage;
private final RaftPeerId selfId;
+ private final Supplier<MessageDigest> digester =
JavaUtils.memoize(MD5Hash::getDigester);
public SnapshotManager(RaftStorage storage, RaftPeerId selfId) {
this.storage = storage;
@@ -106,7 +111,9 @@ public class SnapshotManager {
}
// write data to the file
- out.write(chunk.getData().toByteArray());
+ try (DigestOutputStream digestOut = new DigestOutputStream(out,
digester.get())) {
+ digestOut.write(chunk.getData().toByteArray());
+ }
} finally {
IOUtils.cleanup(null, out);
}
@@ -118,7 +125,7 @@ public class SnapshotManager {
new MD5Hash(chunk.getFileDigest().toByteArray());
// calculate the checksum of the snapshot file and compare it with the
// file digest in the request
- MD5Hash digest = MD5FileUtil.computeMd5ForFile(tmpSnapshotFile);
+ final MD5Hash digest = new MD5Hash(digester.get().digest());
if (!digest.equals(expectedDigest)) {
LOG.warn("The snapshot md5 digest {} does not match expected {}",
digest, expectedDigest);
diff --git
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
index b4cad6a4..98be1e64 100644
---
a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
+++
b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotFromLeaderTests.java
@@ -153,14 +153,12 @@ public abstract class
InstallSnapshotFromLeaderTests<CLUSTER extends MiniRaftClu
return null;
}
List<FileInfo> files = new ArrayList<>();
- try {
- files.add(new FileInfo(
- file1.toPath(),
- MD5FileUtil.computeMd5ForFile(file1)));
- files.add(new FileInfo(
- file2.toPath(),
- MD5FileUtil.computeMd5ForFile(file2)));
- } catch (IOException ignored) {}
+ files.add(new FileInfo(
+ file1.toPath(),
+ null));
+ files.add(new FileInfo(
+ file2.toPath(),
+ null));
Assert.assertEquals(files.size(), 2);
SnapshotInfo info = super.getLatestSnapshot();