This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 417431fd83e [Enhancement](hdfs-file-system) Change fs_handler ptr to 
shared_ptr and remove ref count operations. (#34049)
417431fd83e is described below

commit 417431fd83e6a927345a7384113981e36453a6f1
Author: Qi Chen <[email protected]>
AuthorDate: Sun Apr 28 19:45:30 2024 +0800

    [Enhancement](hdfs-file-system) Change fs_handler ptr to shared_ptr and 
remove ref count operations. (#34049)
    
    Backport #33959.
---
 be/src/io/fs/hdfs_file_system.cpp | 39 +++++++++++++--------------------------
 be/src/io/fs/hdfs_file_system.h   | 26 +++++++-------------------
 2 files changed, 20 insertions(+), 45 deletions(-)

diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index e41671e493b..1340a3078e2 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -71,11 +71,11 @@ public:
 
     // This function is thread-safe
     Status get_connection(const THdfsParams& hdfs_params, const std::string& 
fs_name,
-                          HdfsFileSystemHandle** fs_handle);
+                          std::shared_ptr<HdfsFileSystemHandle>* fs_handle);
 
 private:
     std::mutex _lock;
-    std::unordered_map<uint64, std::unique_ptr<HdfsFileSystemHandle>> _cache;
+    std::unordered_map<uint64, std::shared_ptr<HdfsFileSystemHandle>> _cache;
 
     HdfsFileSystemCache() = default;
 
@@ -148,15 +148,7 @@ HdfsFileSystem::HdfsFileSystem(const THdfsParams& 
hdfs_params, std::string id,
     }
 }
 
-HdfsFileSystem::~HdfsFileSystem() {
-    if (_fs_handle != nullptr) {
-        if (_fs_handle->from_cache) {
-            _fs_handle->dec_ref();
-        } else {
-            delete _fs_handle;
-        }
-    }
-}
+HdfsFileSystem::~HdfsFileSystem() = default;
 
 Status HdfsFileSystem::connect_impl() {
     RETURN_IF_ERROR(
@@ -384,10 +376,6 @@ Status HdfsFileSystem::download_impl(const Path& 
remote_file, const Path& local_
     return local_writer->close();
 }
 
-HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
-    return _fs_handle;
-}
-
 // ************* HdfsFileSystemCache ******************
 int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;
 
@@ -406,7 +394,7 @@ Status HdfsFileSystemCache::_create_fs(const THdfsParams& 
hdfs_params, const std
 void HdfsFileSystemCache::_clean_invalid() {
     std::vector<uint64> removed_handle;
     for (auto& item : _cache) {
-        if (item.second->invalid() && item.second->ref_cnt() == 0) {
+        if (item.second.use_count() == 1 && item.second->invalid()) {
             removed_handle.emplace_back(item.first);
         }
     }
@@ -419,7 +407,7 @@ void HdfsFileSystemCache::_clean_oldest() {
     uint64_t oldest_time = ULONG_MAX;
     uint64 oldest = 0;
     for (auto& item : _cache) {
-        if (item.second->ref_cnt() == 0 && item.second->last_access_time() < 
oldest_time) {
+        if (item.second.use_count() == 1 && item.second->last_access_time() < 
oldest_time) {
             oldest_time = item.second->last_access_time();
             oldest = item.first;
         }
@@ -429,16 +417,16 @@ void HdfsFileSystemCache::_clean_oldest() {
 
 Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params,
                                            const std::string& fs_name,
-                                           HdfsFileSystemHandle** fs_handle) {
+                                           
std::shared_ptr<HdfsFileSystemHandle>* fs_handle) {
     uint64 hash_code = _hdfs_hash_code(hdfs_params, fs_name);
     {
         std::lock_guard<std::mutex> l(_lock);
         auto it = _cache.find(hash_code);
         if (it != _cache.end()) {
-            HdfsFileSystemHandle* handle = it->second.get();
+            std::shared_ptr<HdfsFileSystemHandle> handle = it->second;
             if (!handle->invalid()) {
-                handle->inc_ref();
-                *fs_handle = handle;
+                handle->update_last_access_time();
+                *fs_handle = std::move(handle);
                 return Status::OK();
             }
             // fs handle is invalid, erase it.
@@ -455,13 +443,12 @@ Status HdfsFileSystemCache::get_connection(const 
THdfsParams& hdfs_params,
             _clean_oldest();
         }
         if (_cache.size() < MAX_CACHE_HANDLE) {
-            std::unique_ptr<HdfsFileSystemHandle> handle =
-                    std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true);
-            handle->inc_ref();
-            *fs_handle = handle.get();
+            auto handle = std::make_shared<HdfsFileSystemHandle>(hdfs_fs, 
true);
+            handle->update_last_access_time();
+            *fs_handle = handle;
             _cache[hash_code] = std::move(handle);
         } else {
-            *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
+            *fs_handle = std::make_shared<HdfsFileSystemHandle>(hdfs_fs, 
false);
         }
     }
     return Status::OK();
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index db854cafa9e..74d098004ab 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -45,13 +45,11 @@ public:
     HdfsFileSystemHandle(hdfsFS fs, bool cached)
             : hdfs_fs(fs),
               from_cache(cached),
-              _ref_cnt(0),
               _create_time(_now()),
               _last_access_time(0),
               _invalid(false) {}
 
     ~HdfsFileSystemHandle() {
-        DCHECK(_ref_cnt == 0) << _ref_cnt;
         if (hdfs_fs != nullptr) {
             // DO NOT call hdfsDisconnect(), or we will meet "Filesystem 
closed"
             // even if we create a new one
@@ -62,18 +60,14 @@ public:
 
     int64_t last_access_time() { return _last_access_time; }
 
-    void inc_ref() {
-        _ref_cnt++;
-        _last_access_time = _now();
-    }
-
-    void dec_ref() {
-        _ref_cnt--;
-        _last_access_time = _now();
+    void update_last_access_time() {
+        if (from_cache) {
+            _last_access_time = 
std::chrono::duration_cast<std::chrono::milliseconds>(
+                                        
std::chrono::system_clock::now().time_since_epoch())
+                                        .count();
+        }
     }
 
-    int ref_cnt() { return _ref_cnt; }
-
     bool invalid() { return _invalid; }
 
     void set_invalid() { _invalid = true; }
@@ -84,8 +78,6 @@ public:
     const bool from_cache;
 
 private:
-    // the number of referenced client
-    std::atomic<int> _ref_cnt;
     // For kerberos authentication, we need to save create time so that
     // we can know if the kerberos ticket is expired.
     std::atomic<uint64_t> _create_time;
@@ -109,8 +101,6 @@ public:
 
     ~HdfsFileSystem() override;
 
-    HdfsFileSystemHandle* get_handle();
-
     friend class HdfsFileHandleCache;
 
 protected:
@@ -143,9 +133,7 @@ private:
                    RuntimeProfile* profile);
     const THdfsParams& _hdfs_params;
     std::string _fs_name;
-    // do not use std::shared_ptr or std::unique_ptr
-    // _fs_handle is managed by HdfsFileSystemCache
-    HdfsFileSystemHandle* _fs_handle = nullptr;
+    std::shared_ptr<HdfsFileSystemHandle> _fs_handle = nullptr;
     RuntimeProfile* _profile = nullptr;
 };
 } // namespace io


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to