yanyan300300 commented on a change in pull request #8215: [FLINK-8801][yarn/s3] 
Fix jars downloading issues due to inconsistent timestamp in S3 Filesystem
URL: https://github.com/apache/flink/pull/8215#discussion_r300175437
 
 

 ##########
 File path: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
 ##########
 @@ -161,15 +169,37 @@ public static void setupYarnClassPath(Configuration 
conf, Map<String, String> ap
 
                fs.copyFromLocalFile(false, true, localSrcPath, dst);
 
-               // Note: If we used registerLocalResource(FileSystem, Path) 
here, we would access the remote
+               // Note: If we directly used registerLocalResource(FileSystem, 
Path) here, we would access the remote
                //       file once again which has problems with eventually 
consistent read-after-write file
-               //       systems. Instead, we decide to preserve the 
modification time at the remote
-               //       location because this and the size of the resource 
will be checked by YARN based on
-               //       the values we provide to #registerLocalResource() 
below.
-               fs.setTimes(dst, localFile.lastModified(), -1);
-               // now create the resource instance
-               LocalResource resource = registerLocalResource(dst, 
localFile.length(), localFile.lastModified());
+               //       systems. Instead, we decide to wait until the remote 
file be available.
 
+               FileStatus[] fss = null;
+               int iter = 1;
+               while (iter <= REMOTE_RESOURCES_FETCH_NUM_RETRY + 1) {
+                       try {
+                               fss = fs.listStatus(dst);
+                               break;
+                       } catch (FileNotFoundException e) {
+                               LOG.debug("Got FileNotFoundException while 
fetching uploaded remote resources at retry num {}", iter);
+                               try {
+                                       LOG.debug("Sleeping for {}ms", 
REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI);
+                                       
TimeUnit.MILLISECONDS.sleep(REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI);
+                               } catch (InterruptedException ie) {
+                                       LOG.warn("Failed to sleep for {}ms at 
retry num {} while fetching uploaded remote resources",
+                                               
REMOTE_RESOURCES_FETCH_WAIT_IN_MILLI, iter, ie);
+                               }
+                               iter++;
+                       }
+               }
+               long dstModificationTime = -1;
+               if (fss != null && fss.length >  0) {
+                       dstModificationTime = fss[0].getModificationTime();
+               }
+               LOG.debug("Got modification time {} from remote path {}", 
dstModificationTime, dst);
+
+               // now create the resource instance
+               LocalResource resource = registerLocalResource(dst, 
localFile.length(), dstModificationTime > 0 ? dstModificationTime
+                       : localFile.lastModified());
 
 Review comment:
   Sorry I should have made it clearer. For 1) I meant it requires one less 
level of indentation with `else`. Personally I felt it is easier to read. For 
2) Usually to check how many conditions are tested, branch coverage is looked 
at along with line coverage.
   
   For this case, sure I can update according to @NicoK 's suggestion. Could 
you confirm if that is also what you are suggesting?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to