[
https://issues.apache.org/jira/browse/FLINK-7068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188033#comment-16188033
]
ASF GitHub Bot commented on FLINK-7068:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4358#discussion_r142137151
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
---
@@ -64,39 +65,60 @@ public FileSystemBlobStore(FileSystem fileSystem,
String storagePath) throws IOE
// - Put
------------------------------------------------------------------
@Override
- public void put(File localFile, JobID jobId, BlobKey blobKey) throws
IOException {
- put(localFile, BlobUtils.getStorageLocationPath(basePath,
jobId, blobKey));
+ public boolean put(File localFile, JobID jobId, BlobKey blobKey) throws
IOException {
+ return put(localFile,
BlobUtils.getStorageLocationPath(basePath, jobId, blobKey));
}
- private void put(File fromFile, String toBlobPath) throws IOException {
+ private boolean put(File fromFile, String toBlobPath) throws
IOException {
try (OutputStream os = fileSystem.create(new Path(toBlobPath),
FileSystem.WriteMode.OVERWRITE)) {
LOG.debug("Copying from {} to {}.", fromFile,
toBlobPath);
Files.copy(fromFile, os);
}
+ return true;
}
// - Get
------------------------------------------------------------------
@Override
- public void get(JobID jobId, BlobKey blobKey, File localFile) throws
IOException {
- get(BlobUtils.getStorageLocationPath(basePath, jobId, blobKey),
localFile);
+ public boolean get(JobID jobId, BlobKey blobKey, File localFile) throws
IOException {
+ return get(BlobUtils.getStorageLocationPath(basePath, jobId,
blobKey), localFile, blobKey);
}
- private void get(String fromBlobPath, File toFile) throws IOException {
+ private boolean get(String fromBlobPath, File toFile, BlobKey blobKey)
throws IOException {
checkNotNull(fromBlobPath, "Blob path");
checkNotNull(toFile, "File");
+ checkNotNull(blobKey, "Blob key");
if (!toFile.exists() && !toFile.createNewFile()) {
throw new IOException("Failed to create target file to
copy to");
}
final Path fromPath = new Path(fromBlobPath);
+ MessageDigest md = BlobUtils.createMessageDigest();
+
+ final int buffSize = 4096; // like IOUtils#BLOCKSIZE, for
chunked file copying
boolean success = false;
try (InputStream is = fileSystem.open(fromPath);
FileOutputStream fos = new FileOutputStream(toFile)) {
LOG.debug("Copying from {} to {}.", fromBlobPath,
toFile);
- IOUtils.copyBytes(is, fos); // closes the streams
+
+ // not using IOUtils.copyBytes(is, fos) here to be able
to create a hash on-the-fly
+ final byte[] buf = new byte[buffSize];
+ int bytesRead = is.read(buf);
+ while (bytesRead >= 0) {
+ fos.write(buf, 0, bytesRead);
+ md.update(buf, 0, bytesRead);
+
+ bytesRead = is.read(buf);
+ }
+
+ // verify that file contents are correct
+ final byte[] computedKey = md.digest();
+ if (!Arrays.equals(computedKey, blobKey.getHash())) {
+ throw new IOException("Detected data corruption
during transfer");
+ }
--- End diff --
Nice addition :-)
> change BlobService sub-classes for permanent and transient BLOBs
> ----------------------------------------------------------------
>
> Key: FLINK-7068
> URL: https://issues.apache.org/jira/browse/FLINK-7068
> Project: Flink
> Issue Type: Sub-task
> Components: Distributed Coordination, Network
> Affects Versions: 1.4.0
> Reporter: Nico Kruber
> Assignee: Nico Kruber
>
> A {{PermanentBlobStore}} should resemble use cases for BLOBs that are
> permanently stored for a job's life time (HA and non-HA).
> A {{TransientBlobStore}} should reflect BLOB offloading for logs, RPC, etc.
> which even does not have to be reflected by files.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)