ZanderXu commented on code in PR #16927:
URL: https://github.com/apache/flink/pull/16927#discussion_r959323071
##########
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java:
##########
@@ -360,4 +391,37 @@ private static boolean waitUntilLeaseIsRevoked(final
FileSystem fs, final Path p
}
return isClosed;
}
+
+ /**
+ * If the last block of the file that the previous execution was writing
to is not in COMPLETE
+ * state, HDFS will perform block recovery which blocks truncate. Thus we
have to wait for block
+ * recovery to ensure the truncate is successful.
+ */
+ private static boolean waitForBlockRecovery(final FileSystem fs, final
Path path)
Review Comment:
I have one question, why not judge this by `isFileClosed(path)`, such as:
```
final Deadline deadline =
Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
boolean isClosed = dfs.isFileClosed(path);
while (!isClosed && deadline.hasTimeLeft()) {
try {
Thread.sleep(500L);
} catch (InterruptedException e1) {
LOG.warn("Interrupted when waiting for block recovery for file
{}.", path, e1);
break;
}
isClosed = dfs.isFileClosed(path);
}
return isClosed;
```
If the path is a very large file, `LocatedBlocks blocks =
dfs.getClient().getLocatedBlocks(absolutePath, 0, Long.MAX_VALUE);` will be
expensive.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]