[FLINK-8801][yarn/s3] fix Utils#setupLocalResource() relying on consistent read-after-write
"Amazon S3 provides read-after-write consistency for PUTS of new objects in your S3 bucket in all regions with one caveat. The caveat is that if you make a HEAD or GET request to the key name (to find if the object exists) before creating the object, Amazon S3 provides eventual consistency for read-after-write." https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel Some S3 file system implementations may actually execute such a request for the about-to-write object and thus the read-after-write is only eventually consistent. org.apache.flink.yarn.Utils#setupLocalResource() currently relies on a consistent read-after-write since it accesses the remote resource to get file size and modification timestamp. Since there we have access to the local resource, we can use this metadata directly instead and circumvent the problem. This closes #5602. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c90a757b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c90a757b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c90a757b Branch: refs/heads/master Commit: c90a757b29f168144b1bae99df532911ae682e63 Parents: 6eb91a1 Author: Nico Kruber <[email protected]> Authored: Tue Feb 27 17:23:20 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Mar 20 14:54:03 2018 +0100 ---------------------------------------------------------------------- .../main/java/org/apache/flink/yarn/Utils.java | 34 ++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c90a757b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index b9f7fac..b895784 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -141,7 +141,8 @@ public final class Utils { Path homedir, String relativeTargetPath) throws IOException { - if (new File(localSrcPath.toUri().getPath()).isDirectory()) { + File localFile = new File(localSrcPath.toUri().getPath()); + if (localFile.isDirectory()) { throw new IllegalArgumentException("File to copy must not be a directory: " + localSrcPath); } @@ -159,11 +160,40 @@ public final class Utils { fs.copyFromLocalFile(false, true, localSrcPath, dst); + // Note: If we used registerLocalResource(FileSystem, Path) here, we would access the remote + // file once again which has problems with eventually consistent read-after-write file + // systems. Instead, we decide to preserve the modification time at the remote + // location because this and the size of the resource will be checked by YARN based on + // the values we provide to #registerLocalResource() below. + fs.setTimes(dst, localFile.lastModified(), -1); // now create the resource instance - LocalResource resource = registerLocalResource(fs, dst); + LocalResource resource = registerLocalResource(dst, localFile.length(), localFile.lastModified()); + return Tuple2.of(dst, resource); } + /** + * Creates a YARN resource for the remote object at the given location. + * + * @param remoteRsrcPath remote location of the resource + * @param resourceSize size of the resource + * @param resourceModificationTime last modification time of the resource + * + * @return YARN resource + */ + private static LocalResource registerLocalResource( + Path remoteRsrcPath, + long resourceSize, + long resourceModificationTime) { + LocalResource localResource = Records.newRecord(LocalResource.class); + localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri())); + localResource.setSize(resourceSize); + localResource.setTimestamp(resourceModificationTime); + localResource.setType(LocalResourceType.FILE); + localResource.setVisibility(LocalResourceVisibility.APPLICATION); + return localResource; + } + private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsrcPath) throws IOException { LocalResource localResource = Records.newRecord(LocalResource.class); FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
