gaoyunhaii commented on a change in pull request #13377:
URL: https://github.com/apache/flink/pull/13377#discussion_r492593704
##########
File path:
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
##########
@@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final
FileSystem fs, final Path p
}
isClosed = dfs.isFileClosed(path);
}
+ // [FLINK-18592] recover lease after the lease timeout passed
but file was still not closed
+ if(!isClosed && !deadline.hasTimeLeft()){
+ recoverLease(path, dfs);
Review comment:
I think we might merge this process with the original lease recovering
logic.
##########
File path:
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
##########
@@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final
FileSystem fs, final Path p
}
isClosed = dfs.isFileClosed(path);
}
+ // [FLINK-18592] recover lease after the lease timeout passed
but file was still not closed
+ if(!isClosed && !deadline.hasTimeLeft()){
+ recoverLease(path, dfs);
+ }
return isClosed;
}
+
+
+ /*
+ * Run the dfs recover lease. recoverLease is asynchronous. It returns:
-false when it starts the lease recovery (i.e. lease recovery not *yet* done) -
true when the lease recovery has
+ * succeeded or the file is closed.
+ *
+ * But, we have to be careful. Each time we call recoverLease, it
starts the recover lease process over from the beginning. We could put
ourselves in a situation
+ * where we are doing nothing but starting a recovery, interrupting it
to start again, and so on.
+ *
+ * The namenode will try to recover the lease on the file's primary
node. If all is well, it should return near immediately.
+ * But, as is common, it is the very primary node that has crashed and
so the namenode will be stuck waiting on a socket timeout before it will ask
another datanode to start the recovery.
+ * It does not help if we call recoverLease in the meantime and in
particular, subsequent to the socket timeout, a recoverLease invocation will
cause us to start over from square one
+ * (possibly waiting on socket timeout against primary node).
+ * So, in the below, we do the following:
+ * 1. Call recoverLease.
+ * 2. If it returns true, break.
+ * 3. If it returns false, wait a few seconds and then call it again.
+ * 4. If it returns true, break.
+ * 5. If it returns false, wait for what we think the datanode socket
timeout is (configurable) and then try again.
+ * 6. If it returns true, break.
+ * 7. If it returns false, repeat starting at step 5. above. If
HDFS-4525 is available, call it every second and we might be able to exit early.
+ */
+ private static boolean recoverLease(Path path, DistributedFileSystem
dfs) throws IOException {
+ LOG.info("Recover lease on dfs file " + path);
+ long startWaiting = System.currentTimeMillis();
+ // Default is 15 minutes. It's huge, but the idea is that if we
have a major issue, HDFS
+ // usually needs 10 minutes before marking the nodes as dead.
So we're putting ourselves
+ // beyond that limit 'to be safe'.
+ //Configuration conf = dfs.getConf();
+ long recoveryTimeout = HdfsConstants.LEASE_HARDLIMIT_PERIOD / 4;
+ long recoveryTargetTimeout = recoveryTimeout + startWaiting;
+ // This setting should be a little bit above what the cluster
dfs heartbeat is set to.
+ long firstPause = 4000L;
+ long pause = 1000L;
+ // This should be set to how long it'll take for us to timeout
against primary datanode if it
+ // is dead. We set it to 64 seconds, 4 second than the default
READ_TIMEOUT in HDFS, the
+ // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery
is still failing after this
+ // timeout, then further recovery will take liner backoff with
this base, to avoid endless
+ // preemptions when this value is not properly configured.
+ long subsequentPauseBase = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
+
+ Method isFileClosedMeth = null;
+ // whether we need to look for isFileClosed method
+ boolean findIsFileClosedMeth = true;
+ boolean recovered = false;
+ // We break the loop if we succeed the lease recovery, timeout,
or we throw an exception.
+ for (int nbAttempt = 0; !recovered; nbAttempt++) {
+ recovered = recoverLease(dfs, nbAttempt, path,
startWaiting);
+ if (recovered) {
+ break;
+ }
+ if (recoveryTargetTimeout < System.currentTimeMillis())
{
+ LOG.warn("Cannot recoverLease after trying for
" +
+ recoveryTimeout +
+ "ms (hbase.lease.recovery.timeout);
continuing, but may be DATALOSS!!!; " +
+ String.format("attempt=%d, on file=%s,
after %d ms", nbAttempt, path.toString(), System.currentTimeMillis() -
startWaiting));
+ break;
+ }
+ try {
+ // On the first time through wait the short
'firstPause'.
+ if (nbAttempt == 0) {
+ Thread.sleep(firstPause);
+ } else {
+ // Cycle here until (subsequentPause *
nbAttempt) elapses. While spinning, check
+ // isFileClosed if available (should be
in hadoop 2.0.5... not in hadoop 1 though.
+ long localStartWaiting =
System.currentTimeMillis();
+ while ((System.currentTimeMillis() -
localStartWaiting) < subsequentPauseBase *
+ nbAttempt) {
+ Thread.sleep(pause);
+ if (findIsFileClosedMeth) {
Review comment:
I think Flink has always assumes `close` method is always available.
This is due to Flink should limited the minimum version to support. Thus we
could remove the logic to detect the close method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]