Repository: flink
Updated Branches:
  refs/heads/release-1.4 cb4e088e6 -> 8b1376b18


[FLINK-8801][yarn/s3] fix Utils#setupLocalResource() relying on consistent 
read-after-write

"Amazon S3 provides read-after-write consistency for PUTS of new objects in your
S3 bucket in all regions with one caveat. The caveat is that if you make a HEAD
or GET request to the key name (to find if the object exists) before creating
the object, Amazon S3 provides eventual consistency for read-after-write."
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel

Some S3 file system implementations may actually execute such a request for the
about-to-write object and thus the read-after-write is only eventually
consistent. org.apache.flink.yarn.Utils#setupLocalResource() currently relies on
a consistent read-after-write since it accesses the remote resource to get file
size and modification timestamp. Since there we have access to the local
resource, we can use this metadata directly instead and circumvent the problem.

This closes #5602.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b1376b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b1376b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b1376b1

Branch: refs/heads/release-1.4
Commit: 8b1376b1808d302574073ce180dc561244adc6f5
Parents: cb4e088
Author: Nico Kruber <[email protected]>
Authored: Tue Feb 27 17:23:20 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Mar 20 17:40:37 2018 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/yarn/Utils.java  | 34 ++++++++++++++++++--
 1 file changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b1376b1/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 652afec..cbf7808 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -140,7 +140,8 @@ public final class Utils {
                Path homedir,
                String relativeTargetPath) throws IOException {
 
-               if (new File(localSrcPath.toUri().getPath()).isDirectory()) {
+               File localFile = new File(localSrcPath.toUri().getPath());
+               if (localFile.isDirectory()) {
                        throw new IllegalArgumentException("File to copy must 
not be a directory: " +
                                localSrcPath);
                }
@@ -158,11 +159,40 @@ public final class Utils {
 
                fs.copyFromLocalFile(false, true, localSrcPath, dst);
 
+               // Note: If we used registerLocalResource(FileSystem, Path) 
here, we would access the remote
+               //       file once again which has problems with eventually 
consistent read-after-write file
+               //       systems. Instead, we decide to preserve the 
modification time at the remote
+               //       location because this and the size of the resource 
will be checked by YARN based on
+               //       the values we provide to #registerLocalResource() 
below.
+               fs.setTimes(dst, localFile.lastModified(), -1);
                // now create the resource instance
-               LocalResource resource = registerLocalResource(fs, dst);
+               LocalResource resource = registerLocalResource(dst, 
localFile.length(), localFile.lastModified());
+
                return Tuple2.of(dst, resource);
        }
 
+       /**
+        * Creates a YARN resource for the remote object at the given location.
+        *
+        * @param remoteRsrcPath        remote location of the resource
+        * @param resourceSize          size of the resource
+        * @param resourceModificationTime last modification time of the 
resource
+        *
+        * @return YARN resource
+        */
+       private static LocalResource registerLocalResource(
+                       Path remoteRsrcPath,
+                       long resourceSize,
+                       long resourceModificationTime) {
+               LocalResource localResource = 
Records.newRecord(LocalResource.class);
+               
localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
+               localResource.setSize(resourceSize);
+               localResource.setTimestamp(resourceModificationTime);
+               localResource.setType(LocalResourceType.FILE);
+               
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+               return localResource;
+       }
+
        private static LocalResource registerLocalResource(FileSystem fs, Path 
remoteRsrcPath) throws IOException {
                LocalResource localResource = 
Records.newRecord(LocalResource.class);
                FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);

Reply via email to