github-actions[bot] commented on code in PR #64224:
URL: https://github.com/apache/doris/pull/64224#discussion_r3377509191
##########
fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java:
##########
@@ -128,38 +130,20 @@ public synchronized void removeOutputStream(TBrokerFD fd)
{
BrokerOutputStream brokerOutputStream =
clientContext.outputStreams.remove(fd);
try {
if (brokerOutputStream != null) {
+ brokerOutputStream.brokerFileSystem.decrementActiveStreams();
brokerOutputStream.outputStream.close();
}
} catch (Exception e) {
logger.error("errors while close file data output stream", e);
}
}
- public synchronized void remoteExpireInputStreams() {
- int inputStreamExpireSeconds =
BrokerConfig.input_stream_expire_seconds;
Review Comment:
Removing the per-input-stream expiration path leaves recycled file systems
dependent only on whole-client expiration to decrement `activeStreamCount`. If
a client context remains active because it keeps pinging or uses other FDs, but
one reader FD is leaked or becomes idle and is never closed,
`activeStreamCount` stays positive forever and
`CheckBrokerFileSystemExpirationTask` will keep re-queueing the evicted FS
instead of closing it. That is exactly the leak/OOM class this PR is trying to
fix, and this also removes the existing operator knob
(`enable_input_stream_expire_check`/`input_stream_expire_seconds`) that could
clean up stale streams independently. Please keep an independent
stream-expiration mechanism, or otherwise prove and enforce that every leaked
stream causes the whole client context to expire.
##########
fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java:
##########
@@ -1385,55 +1370,113 @@ private static TBrokerFD parseUUIDToFD(UUID uuid) {
* 2. For other authentication modes, the lastAccessTime is used to
determine whether it has expired
*/
private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity
fileSystemIdentity, Map<String, String> properties) {
- BrokerFileSystem brokerFileSystem;
- if (cachedFileSystem.containsKey(fileSystemIdentity)) {
- brokerFileSystem = cachedFileSystem.get(fileSystemIdentity);
+ return cachedFileSystem.compute(fileSystemIdentity, (key,
brokerFileSystemInMap) -> {
+ if (brokerFileSystemInMap == null) {
+ BrokerFileSystem newBrokerFileSystem = new
BrokerFileSystem(fileSystemIdentity);
+ newBrokerFileSystem.incrementActiveOperations();
+ return newBrokerFileSystem;
+ }
if (UserGroupInformation.isSecurityEnabled()) {
try {
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
} catch (Exception e) {
logger.error("errors while refresh TGT: ", e);
}
- } else if (brokerFileSystem.isExpiredByLastAccessTime()) {
- brokerFileSystem.getLock().lock();
- BrokerFileSystem bfs =
cachedFileSystem.get(fileSystemIdentity);
- if (!bfs.isExpiredByLastAccessTime()) {
- return bfs;
- }
- try {
- logger.info("file system " + brokerFileSystem + " is
expired, update it.");
- brokerFileSystem.closeFileSystem();
- } catch (Throwable t) {
- logger.error("errors while close file system: ", t);
- } finally {
- brokerFileSystem.getLock().unlock();
- }
- brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);
- cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
+ } else if (brokerFileSystemInMap.isExpiredByLastAccessTime()) {
+ logger.info("file system " + brokerFileSystemInMap + " is
expired, move to recycle bin and update it.");
+ fileSystemRecycleBin.add(brokerFileSystemInMap);
+ BrokerFileSystem newBrokerFileSystem = new
BrokerFileSystem(fileSystemIdentity);
+ newBrokerFileSystem.incrementActiveOperations();
+ return newBrokerFileSystem;
}
- } else {
- brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);
- cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
- }
- return brokerFileSystem;
+ brokerFileSystemInMap.incrementActiveOperations();
+ brokerFileSystemInMap.updateLastUpdateAccessTime();
+ return brokerFileSystemInMap;
+ });
}
public long fileSize(String path, Map<String, String> properties) {
WildcardURI pathUri = new WildcardURI(path);
- BrokerFileSystem fileSystem = getFileSystem(path, properties);
+ BrokerFileSystem fileSystem = null;
Path filePath = new Path(pathUri.getPath());
try {
+ fileSystem = getFileSystem(path, properties);
FileStatus fileStatus =
fileSystem.getDFSFileSystem().getFileStatus(filePath);
if (fileStatus.isDirectory()) {
throw new
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"not a file: {}", path);
}
return fileStatus.getLen();
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("errors while getting file size: " + path);
Review Comment:
This broad `catch (Exception)` now catches `BrokerException`s thrown by this
method itself, for example the `INVALID_INPUT_FILE_PATH` raised just above when
`fileStatus.isDirectory()` is true. Before this PR only `IOException` was
caught, so that validation error propagated with its original status and did
not evict the cached filesystem. With this change it is converted to
`TARGET_STORAGE_SERVICE_ERROR` and the healthy filesystem is moved to the
recycle bin. The same pattern appears in `renamePath` for the explicit rename
failure and in other methods where `getFileSystem` validation exceptions are
now inside the broad catch. Please preserve existing `BrokerException`
semantics, e.g. catch and rethrow `BrokerException` before handling storage
`IOException`/unexpected errors, and avoid recycling the FS for local
validation failures.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]