Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6281#discussion_r201374633
  
    --- Diff: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 ---
    @@ -253,4 +265,39 @@ public CommitRecoverable getRecoverable() {
                        return recoverable;
                }
        }
    +
    +   /**
    +    * Called when resuming execution after a failure and waits until the 
lease
    +    * of the file we are resuming is free.
    +    *
    +    * <p>The lease of the file we are resuming writing/committing to may 
still
    +    * belong to the process that failed previously and whose state we are
    +    * recovering.
    +    *
    +    * @param path The path to the file we want to resume writing to.
    +    */
    +   private boolean waitUntilLeaseIsRevoked(final Path path) throws 
IOException {
    +           Preconditions.checkState(fs instanceof DistributedFileSystem);
    +
    +           final DistributedFileSystem dfs = (DistributedFileSystem) fs;
    +           dfs.recoverLease(path);
    +           boolean isclosed = dfs.isFileClosed(path);
    +
    +           final StopWatch sw = new StopWatch();
    +           sw.start();
    +
    +           while (!isclosed) {
    +                   if (sw.getTime() > LEASE_TIMEOUT) {
    +                           break;
    +                   }
    +
    +                   try {
    --- End diff --
    
    This basically locks the thread in for up to LEASE_TIMEOUT time, making it 
not possible to cancel. I would either propagate the InterruptedException, or 
rethrow it as an IOException indicating that recovering the lease failed 
(because this is a single-purpose util function that works here).


---

Reply via email to