zxealous commented on code in PR #11984:
URL: https://github.com/apache/doris/pull/11984#discussion_r952217473
##########
be/src/io/cache/whole_file_cache.cpp:
##########
@@ -60,55 +60,94 @@ Status WholeFileCache::_generate_cache_reader(size_t
offset, size_t req_size) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_done_file,
&done_file_exist),
"Check local cache done file exist failed.");
+
+ std::promise<Status> download_st;
+ std::future<Status> future = download_st.get_future();
if (!done_file_exist) {
- bool cache_dir_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(_cache_dir,
&cache_dir_exist),
- fmt::format("Check local cache dir exist failed. {}",
_cache_dir.native()));
- if (!cache_dir_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
-
io::global_local_filesystem()->create_directory(_cache_dir),
- fmt::format("Create local cache dir failed. {}",
_cache_dir.native()));
+ ThreadPoolToken* thread_token =
+
ExecEnv::GetInstance()->get_serial_download_cache_thread_token();
+ if (thread_token != nullptr) {
+ auto st = thread_token->submit_func([this, &download_st,
cache_done_file, cache_file] {
+ auto func = [this, cache_done_file, cache_file] {
+ bool done_file_exist = false;
+ bool cache_dir_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(_cache_dir,
&cache_dir_exist),
+ fmt::format("Check local cache dir exist failed.
{}",
+ _cache_dir.native()));
+ if (!cache_dir_exist) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+
io::global_local_filesystem()->create_directory(_cache_dir),
+ fmt::format("Create local cache dir failed.
{}",
+ _cache_dir.native()));
+ } else {
+ // Judge again whether cache_done_file exists, it is
possible that the cache
+ // is downloaded while waiting in the thread pool
+
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists(
+
cache_done_file, &done_file_exist),
+ "Check local cache done
file exist failed.");
+ }
+ bool cache_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
+ "Check local cache file exist failed.");
+ if (!done_file_exist && cache_file_exist) {
Review Comment:
done
##########
be/src/io/cache/whole_file_cache.cpp:
##########
@@ -60,55 +60,94 @@ Status WholeFileCache::_generate_cache_reader(size_t
offset, size_t req_size) {
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_done_file,
&done_file_exist),
"Check local cache done file exist failed.");
+
+ std::promise<Status> download_st;
+ std::future<Status> future = download_st.get_future();
if (!done_file_exist) {
- bool cache_dir_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(_cache_dir,
&cache_dir_exist),
- fmt::format("Check local cache dir exist failed. {}",
_cache_dir.native()));
- if (!cache_dir_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
-
io::global_local_filesystem()->create_directory(_cache_dir),
- fmt::format("Create local cache dir failed. {}",
_cache_dir.native()));
+ ThreadPoolToken* thread_token =
+
ExecEnv::GetInstance()->get_serial_download_cache_thread_token();
+ if (thread_token != nullptr) {
+ auto st = thread_token->submit_func([this, &download_st,
cache_done_file, cache_file] {
+ auto func = [this, cache_done_file, cache_file] {
+ bool done_file_exist = false;
+ bool cache_dir_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(_cache_dir,
&cache_dir_exist),
+ fmt::format("Check local cache dir exist failed.
{}",
+ _cache_dir.native()));
+ if (!cache_dir_exist) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+
io::global_local_filesystem()->create_directory(_cache_dir),
+ fmt::format("Create local cache dir failed.
{}",
+ _cache_dir.native()));
+ } else {
+ // Judge again whether cache_done_file exists, it is
possible that the cache
+ // is downloaded while waiting in the thread pool
+
RETURN_NOT_OK_STATUS_WITH_WARN(io::global_local_filesystem()->exists(
+
cache_done_file, &done_file_exist),
+ "Check local cache done
file exist failed.");
+ }
+ bool cache_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
+ "Check local cache file exist failed.");
+ if (!done_file_exist && cache_file_exist) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+
io::global_local_filesystem()->delete_file(cache_file),
+ "Check local cache file exist failed.");
+ } else if (done_file_exist && cache_file_exist) {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]