github-actions[bot] commented on code in PR #64224:
URL: https://github.com/apache/doris/pull/64224#discussion_r3377509191


##########
fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/ClientContextManager.java:
##########
@@ -128,38 +130,20 @@ public synchronized void removeOutputStream(TBrokerFD fd) 
{
         BrokerOutputStream brokerOutputStream = 
clientContext.outputStreams.remove(fd);
         try {
             if (brokerOutputStream != null) {
+                brokerOutputStream.brokerFileSystem.decrementActiveStreams();
                 brokerOutputStream.outputStream.close();
             }
         } catch (Exception e) {
             logger.error("errors while close file data output stream", e);
         }
     }
 
-    public synchronized void remoteExpireInputStreams() {
-        int inputStreamExpireSeconds = 
BrokerConfig.input_stream_expire_seconds;

Review Comment:
   Removing the per-input-stream expiration path leaves recycled file systems 
dependent only on whole-client expiration to decrement `activeStreamCount`. If 
a client context remains active because it keeps pinging or uses other FDs, but 
one reader FD is leaked or becomes idle and is never closed, 
`activeStreamCount` stays positive forever and 
`CheckBrokerFileSystemExpirationTask` will keep re-queueing the evicted FS 
instead of closing it. That is exactly the leak/OOM class this PR is trying to 
fix, and this also removes the existing operator knob 
(`enable_input_stream_expire_check`/`input_stream_expire_seconds`) that could 
clean up stale streams independently. Please keep an independent 
stream-expiration mechanism, or otherwise prove and enforce that every leaked 
stream causes the whole client context to expire.



##########
fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java:
##########
@@ -1385,55 +1370,113 @@ private static TBrokerFD parseUUIDToFD(UUID uuid) {
      *   2. For other authentication modes, the lastAccessTime is used to 
determine whether it has expired
      */
     private BrokerFileSystem updateCachedFileSystem(FileSystemIdentity 
fileSystemIdentity, Map<String, String> properties) {
-        BrokerFileSystem brokerFileSystem;
-        if (cachedFileSystem.containsKey(fileSystemIdentity)) {
-            brokerFileSystem = cachedFileSystem.get(fileSystemIdentity);
+        return cachedFileSystem.compute(fileSystemIdentity, (key, 
brokerFileSystemInMap) -> {
+            if (brokerFileSystemInMap == null) {
+                BrokerFileSystem newBrokerFileSystem = new 
BrokerFileSystem(fileSystemIdentity);
+                newBrokerFileSystem.incrementActiveOperations();
+                return newBrokerFileSystem;
+            }
             if (UserGroupInformation.isSecurityEnabled()) {
                 try {
                     
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
                 } catch (Exception e) {
                     logger.error("errors while refresh TGT: ", e);
                 }
-            } else if (brokerFileSystem.isExpiredByLastAccessTime()) {
-                brokerFileSystem.getLock().lock();
-                BrokerFileSystem bfs = 
cachedFileSystem.get(fileSystemIdentity);
-                if (!bfs.isExpiredByLastAccessTime()) {
-                  return bfs;
-                }
-                try {
-                    logger.info("file system " + brokerFileSystem + " is 
expired, update it.");
-                    brokerFileSystem.closeFileSystem();
-                } catch (Throwable t) {
-                    logger.error("errors while close file system: ", t);
-                } finally {
-                    brokerFileSystem.getLock().unlock();
-                }
-                brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);
-                cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
+            } else if (brokerFileSystemInMap.isExpiredByLastAccessTime()) {
+                logger.info("file system " + brokerFileSystemInMap + " is 
expired, move to recycle bin and update it.");
+                fileSystemRecycleBin.add(brokerFileSystemInMap);
+                BrokerFileSystem newBrokerFileSystem = new 
BrokerFileSystem(fileSystemIdentity);
+                newBrokerFileSystem.incrementActiveOperations();
+                return newBrokerFileSystem;
             }
-        } else {
-            brokerFileSystem = new BrokerFileSystem(fileSystemIdentity);
-            cachedFileSystem.put(fileSystemIdentity, brokerFileSystem);
-        }
-        return brokerFileSystem;
+            brokerFileSystemInMap.incrementActiveOperations();
+            brokerFileSystemInMap.updateLastUpdateAccessTime();
+            return brokerFileSystemInMap;
+        });
     }
 
     public long fileSize(String path, Map<String, String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
-        BrokerFileSystem fileSystem = getFileSystem(path, properties);
+        BrokerFileSystem fileSystem = null;
         Path filePath = new Path(pathUri.getPath());
         try {
+            fileSystem = getFileSystem(path, properties);
             FileStatus fileStatus = 
fileSystem.getDFSFileSystem().getFileStatus(filePath);
             if (fileStatus.isDirectory()) {
                 throw new 
BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
                     "not a file: {}", path);
             }
             return fileStatus.getLen();
-        } catch (IOException e) {
+        } catch (Exception e) {
             logger.error("errors while getting file size: " + path);

Review Comment:
   This broad `catch (Exception)` now catches `BrokerException`s thrown by this 
method itself, for example the `INVALID_INPUT_FILE_PATH` raised just above when 
`fileStatus.isDirectory()` is true. Before this PR only `IOException` was 
caught, so that validation error propagated with its original status and did 
not evict the cached filesystem. With this change it is converted to 
`TARGET_STORAGE_SERVICE_ERROR` and the healthy filesystem is moved to the 
recycle bin. The same pattern appears in `renamePath` for the explicit rename 
failure and in other methods where `getFileSystem` validation exceptions are 
now inside the broad catch. Please preserve existing `BrokerException` 
semantics, e.g. catch and rethrow `BrokerException` before handling storage 
`IOException`/unexpected errors, and avoid recycling the FS for local 
validation failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to