[FLINK-8801][yarn/s3] fix Utils#setupLocalResource() relying on consistent 
read-after-write

"Amazon S3 provides read-after-write consistency for PUTS of new objects in your
S3 bucket in all regions with one caveat. The caveat is that if you make a HEAD
or GET request to the key name (to find if the object exists) before creating
the object, Amazon S3 provides eventual consistency for read-after-write."
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel

Some S3 file system implementations may actually execute such a request for the
about-to-write object and thus the read-after-write is only eventually
consistent. org.apache.flink.yarn.Utils#setupLocalResource() currently relies on
a consistent read-after-write since it accesses the remote resource to get file
size and modification timestamp. Since there we have access to the local
resource, we can use this metadata directly instead and circumvent the problem.

This closes #5602.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c90a757b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c90a757b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c90a757b

Branch: refs/heads/master
Commit: c90a757b29f168144b1bae99df532911ae682e63
Parents: 6eb91a1
Author: Nico Kruber <[email protected]>
Authored: Tue Feb 27 17:23:20 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Mar 20 14:54:03 2018 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/yarn/Utils.java  | 34 ++++++++++++++++++--
 1 file changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c90a757b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index b9f7fac..b895784 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -141,7 +141,8 @@ public final class Utils {
                Path homedir,
                String relativeTargetPath) throws IOException {
 
-               if (new File(localSrcPath.toUri().getPath()).isDirectory()) {
+               File localFile = new File(localSrcPath.toUri().getPath());
+               if (localFile.isDirectory()) {
                        throw new IllegalArgumentException("File to copy must 
not be a directory: " +
                                localSrcPath);
                }
@@ -159,11 +160,40 @@ public final class Utils {
 
                fs.copyFromLocalFile(false, true, localSrcPath, dst);
 
+               // Note: If we used registerLocalResource(FileSystem, Path) 
here, we would access the remote
+               //       file once again which has problems with eventually 
consistent read-after-write file
+               //       systems. Instead, we decide to preserve the 
modification time at the remote
+               //       location because this and the size of the resource 
will be checked by YARN based on
+               //       the values we provide to #registerLocalResource() 
below.
+               fs.setTimes(dst, localFile.lastModified(), -1);
                // now create the resource instance
-               LocalResource resource = registerLocalResource(fs, dst);
+               LocalResource resource = registerLocalResource(dst, 
localFile.length(), localFile.lastModified());
+
                return Tuple2.of(dst, resource);
        }
 
+       /**
+        * Creates a YARN resource for the remote object at the given location.
+        *
+        * @param remoteRsrcPath        remote location of the resource
+        * @param resourceSize          size of the resource
+        * @param resourceModificationTime last modification time of the 
resource
+        *
+        * @return YARN resource
+        */
+       private static LocalResource registerLocalResource(
+                       Path remoteRsrcPath,
+                       long resourceSize,
+                       long resourceModificationTime) {
+               LocalResource localResource = 
Records.newRecord(LocalResource.class);
+               
localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
+               localResource.setSize(resourceSize);
+               localResource.setTimestamp(resourceModificationTime);
+               localResource.setType(LocalResourceType.FILE);
+               
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+               return localResource;
+       }
+
        private static LocalResource registerLocalResource(FileSystem fs, Path 
remoteRsrcPath) throws IOException {
                LocalResource localResource = 
Records.newRecord(LocalResource.class);
                FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);

Reply via email to