This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c80dc91a78 [bugfix](memleak) UserFunctionCache may have memory leak
during close (#18913)
c80dc91a78 is described below
commit c80dc91a7820eac5724086e926acfadd107109a5
Author: yiguolei <[email protected]>
AuthorDate: Sat Apr 22 10:15:51 2023 +0800
[bugfix](memleak) UserFunctionCache may have memory leak during close
(#18913)
* [bugfix](memleak) UserFunctionCache may have memory leak during close
* [bugfix](memleak) UserFunctionCache may have memory leak during close
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/runtime/user_function_cache.cpp | 124 ++++++---------------------------
be/src/runtime/user_function_cache.h | 25 ++-----
2 files changed, 28 insertions(+), 121 deletions(-)
diff --git a/be/src/runtime/user_function_cache.cpp
b/be/src/runtime/user_function_cache.cpp
index ac87920cd8..25e7405a0f 100644
--- a/be/src/runtime/user_function_cache.cpp
+++ b/be/src/runtime/user_function_cache.cpp
@@ -32,6 +32,7 @@
#include <vector>
#include "common/config.h"
+#include "common/factory_creator.h"
#include "common/status.h"
#include "gutil/strings/split.h"
#include "http/http_client.h"
@@ -48,16 +49,12 @@ static const int kLibShardNum = 128;
// function cache entry, store information for
struct UserFunctionCacheEntry {
+ ENABLE_FACTORY_CREATOR(UserFunctionCacheEntry);
UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_, const
std::string& lib_file_,
LibType type)
: function_id(fid_), checksum(checksum_), lib_file(lib_file_),
type(type) {}
~UserFunctionCacheEntry();
- void ref() { _refs.fetch_add(1); }
-
- // If unref() returns true, this object should be delete
- bool unref() { return _refs.fetch_sub(1) == 1; }
-
int64_t function_id = 0;
// used to check if this library is valid.
std::string checksum;
@@ -88,9 +85,6 @@ struct UserFunctionCacheEntry {
std::unordered_map<std::string, void*> fptr_map;
LibType type;
-
-private:
- std::atomic<int> _refs {0};
};
UserFunctionCacheEntry::~UserFunctionCacheEntry() {
@@ -114,9 +108,6 @@ UserFunctionCache::~UserFunctionCache() {
while (it != _entry_map.end()) {
auto entry = it->second;
it = _entry_map.erase(it);
- if (entry->unref()) {
- delete entry;
- }
}
}
@@ -162,11 +153,9 @@ Status UserFunctionCache::_load_entry_from_lib(const
std::string& dir, const std
return Status::InternalError("duplicate function id");
}
// create a cache entry and put it into entry map
- UserFunctionCacheEntry* entry =
- new UserFunctionCacheEntry(function_id, checksum, dir + "/" +
file, lib_type);
+ std::shared_ptr<UserFunctionCacheEntry> entry =
UserFunctionCacheEntry::create_shared(
+ function_id, checksum, dir + "/" + file, lib_type);
entry->is_downloaded = true;
-
- entry->ref();
_entry_map[function_id] = entry;
return Status::OK();
@@ -204,64 +193,11 @@ std::string get_real_symbol(const std::string& symbol) {
return str2;
}
-Status UserFunctionCache::get_function_ptr(int64_t fid, const std::string&
orig_symbol,
- const std::string& url, const
std::string& checksum,
- void** fn_ptr,
UserFunctionCacheEntry** output_entry) {
- auto symbol = get_real_symbol(orig_symbol);
- if (fid == 0) {
- // Just loading a function ptr in the current process. No need to take
any locks.
- RETURN_IF_ERROR(dynamic_lookup(_current_process_handle,
symbol.c_str(), fn_ptr));
- return Status::OK();
- }
-
- // if we need to unref entry
- bool need_unref_entry = false;
- UserFunctionCacheEntry* entry = nullptr;
- // find the library entry for this function. If *output_entry is not null
- // find symbol in it without to get other entry
- if (output_entry != nullptr && *output_entry != nullptr) {
- entry = *output_entry;
- } else {
- RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry,
LibType::SO));
- need_unref_entry = true;
- }
-
- Status status;
- {
- std::lock_guard<SpinLock> l(entry->map_lock);
- // now, we have the library entry, we need to lock it to find symbol
- auto it = entry->fptr_map.find(symbol);
- if (it != entry->fptr_map.end()) {
- *fn_ptr = it->second;
- } else {
- status = dynamic_lookup(entry->lib_handle, symbol.c_str(), fn_ptr);
- if (status.ok()) {
- entry->fptr_map.emplace(symbol, *fn_ptr);
- } else {
- LOG(WARNING) << "fail to lookup symbol in library, symbol=" <<
symbol
- << ", file=" << entry->lib_file;
- }
- }
- }
-
- if (status.ok() && output_entry != nullptr && *output_entry == nullptr) {
- *output_entry = entry;
- need_unref_entry = false;
- }
-
- if (need_unref_entry) {
- if (entry->unref()) {
- delete entry;
- }
- }
-
- return status;
-}
-
Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
const std::string& checksum,
- UserFunctionCacheEntry**
output_entry, LibType type) {
- UserFunctionCacheEntry* entry = nullptr;
+
std::shared_ptr<UserFunctionCacheEntry>& output_entry,
+ LibType type) {
+ std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
std::string file_name = _get_file_name_from_url(url);
{
std::lock_guard<std::mutex> l(_cache_lock);
@@ -269,12 +205,10 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid,
const std::string& url,
if (it != _entry_map.end()) {
entry = it->second;
} else {
- entry = new UserFunctionCacheEntry(
+ entry = UserFunctionCacheEntry::create_shared(
fid, checksum, _make_lib_file(fid, checksum, type,
file_name), type);
- entry->ref();
_entry_map.emplace(fid, entry);
}
- entry->ref();
}
auto st = _load_cache_entry(url, entry);
if (!st.ok()) {
@@ -285,28 +219,21 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid,
const std::string& url,
return st;
}
- *output_entry = entry;
+ output_entry = entry;
return Status::OK();
}
-void UserFunctionCache::_destroy_cache_entry(UserFunctionCacheEntry* entry) {
+void
UserFunctionCache::_destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry>
entry) {
// 1. we remove cache entry from entry map
- size_t num_removed = 0;
- {
- std::lock_guard<std::mutex> l(_cache_lock);
- num_removed = _entry_map.erase(entry->function_id);
- }
- if (num_removed > 0) {
- entry->unref();
- }
+ std::lock_guard<std::mutex> l(_cache_lock);
+ // set should delete flag to true, so that the jar file will be removed
when
+ // the entry is removed from map, and deconstruct method is called.
entry->should_delete_library.store(true);
- // now we need to drop
- if (entry->unref()) {
- delete entry;
- }
+ _entry_map.erase(entry->function_id);
}
-Status UserFunctionCache::_load_cache_entry(const std::string& url,
UserFunctionCacheEntry* entry) {
+Status UserFunctionCache::_load_cache_entry(const std::string& url,
+
std::shared_ptr<UserFunctionCacheEntry> entry) {
if (entry->is_loaded.load()) {
return Status::OK();
}
@@ -326,7 +253,8 @@ Status UserFunctionCache::_load_cache_entry(const
std::string& url, UserFunction
}
// entry's lock must be held
-Status UserFunctionCache::_download_lib(const std::string& url,
UserFunctionCacheEntry* entry) {
+Status UserFunctionCache::_download_lib(const std::string& url,
+
std::shared_ptr<UserFunctionCacheEntry> entry) {
DCHECK(!entry->is_downloaded);
// get local path to save library
@@ -399,7 +327,8 @@ std::string
UserFunctionCache::_get_file_name_from_url(const std::string& url) c
}
// entry's lock must be held
-Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry*
entry) {
+Status UserFunctionCache::_load_cache_entry_internal(
+ std::shared_ptr<UserFunctionCacheEntry> entry) {
RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
entry->is_loaded.store(true);
return Status::OK();
@@ -418,19 +347,10 @@ std::string UserFunctionCache::_make_lib_file(int64_t
function_id, const std::st
return ss.str();
}
-void UserFunctionCache::release_entry(UserFunctionCacheEntry* entry) {
- if (entry == nullptr) {
- return;
- }
- if (entry->unref()) {
- delete entry;
- }
-}
-
Status UserFunctionCache::get_jarpath(int64_t fid, const std::string& url,
const std::string& checksum,
std::string* libpath) {
- UserFunctionCacheEntry* entry = nullptr;
- RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry,
LibType::JAR));
+ std::shared_ptr<UserFunctionCacheEntry> entry = nullptr;
+ RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, entry, LibType::JAR));
*libpath = entry->lib_file;
return Status::OK();
}
diff --git a/be/src/runtime/user_function_cache.h
b/be/src/runtime/user_function_cache.h
index 1507642167..e58f02294b 100644
--- a/be/src/runtime/user_function_cache.h
+++ b/be/src/runtime/user_function_cache.h
@@ -56,19 +56,6 @@ public:
static UserFunctionCache* instance();
- // Return function pointer for given fid and symbol.
- // If fid is 0, lookup symbol from this doris-be process.
- // Otherwise find symbol in UserFunction's library.
- // Found function pointer is returned in fn_ptr, and cache entry
- // is returned by entry. Client must call release_entry to release
- // cache entry if didn't need it.
- // If *entry is not true means that we should find symbol in this
- // entry.
- Status get_function_ptr(int64_t fid, const std::string& symbol, const
std::string& url,
- const std::string& checksum, void** fn_ptr,
- UserFunctionCacheEntry** entry);
- void release_entry(UserFunctionCacheEntry* entry);
-
Status get_jarpath(int64_t fid, const std::string& url, const std::string&
checksum,
std::string* libpath);
@@ -76,14 +63,14 @@ private:
Status _load_cached_lib();
Status _load_entry_from_lib(const std::string& dir, const std::string&
file);
Status _get_cache_entry(int64_t fid, const std::string& url, const
std::string& checksum,
- UserFunctionCacheEntry** output_entry, LibType
type);
- Status _load_cache_entry(const std::string& url, UserFunctionCacheEntry*
entry);
- Status _download_lib(const std::string& url, UserFunctionCacheEntry*
entry);
- Status _load_cache_entry_internal(UserFunctionCacheEntry* entry);
+ std::shared_ptr<UserFunctionCacheEntry>&
output_entry, LibType type);
+ Status _load_cache_entry(const std::string& url,
std::shared_ptr<UserFunctionCacheEntry> entry);
+ Status _download_lib(const std::string& url,
std::shared_ptr<UserFunctionCacheEntry> entry);
+ Status _load_cache_entry_internal(std::shared_ptr<UserFunctionCacheEntry>
entry);
std::string _make_lib_file(int64_t function_id, const std::string&
checksum, LibType type,
const std::string& file_name);
- void _destroy_cache_entry(UserFunctionCacheEntry* entry);
+ void _destroy_cache_entry(std::shared_ptr<UserFunctionCacheEntry> entry);
std::string _get_real_url(const std::string& url);
std::string _get_file_name_from_url(const std::string& url) const;
@@ -93,7 +80,7 @@ private:
void* _current_process_handle = nullptr;
std::mutex _cache_lock;
- std::unordered_map<int64_t, UserFunctionCacheEntry*> _entry_map;
+ std::unordered_map<int64_t, std::shared_ptr<UserFunctionCacheEntry>>
_entry_map;
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]