Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6281#discussion_r201369555
  
    --- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 ---
    @@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() {
                        return recoverable;
                }
        }
    +
    +   /**
    +    * Called when resuming execution after a failure and waits until the 
lease
    +    * of the file we are resuming is free.
    +    *
    +    * <p>The lease of the file we are resuming writing/committing to may 
still
    +    * belong to the process that failed previously and whose state we are
    +    * recovering.
    +    *
    +    * @param path The path to the file we want to resume writing to.
    +    */
    +   private boolean waitUntilLeaseIsRevoked(final Path path) throws 
IOException {
    +           Preconditions.checkState(fs instanceof DistributedFileSystem);
    +
    +           final DistributedFileSystem dfs = (DistributedFileSystem) fs;
    +           dfs.recoverLease(path);
    +           boolean isclosed = dfs.isFileClosed(path);
    +
    +           final StopWatch sw = new StopWatch();
    --- End diff --
    
    Let's use `Deadline` from the Flink utils instead to reduce external 
dependencies.


---

Reply via email to