This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e71a0c414 [CH] Support use dynamic disk path #6232
e71a0c414 is described below
commit e71a0c414ecd2595b166a03ee381845f9977302c
Author: LiuNeng <[email protected]>
AuthorDate: Thu Jun 27 15:06:11 2024 +0800
[CH] Support use dynamic disk path #6232
What changes were proposed in this pull request?
Support use dynamic disk path
spark.gluten.sql.columnar.backend.ch.runtime_config.use_current_directory_as_tmp=true
disk.metadata_path and cache.path are automatically mapped to the current
directory
spark.gluten.sql.columnar.backend.ch.runtime_config.reuse_disk_cache=false
Add the current pid number to disk.metadata_path and cache.path
How was this patch tested?
unit tests
---
cpp-ch/local-engine/Common/CHUtil.cpp | 75 +++++++++++++++++++++++++++++++++++
cpp-ch/local-engine/Common/CHUtil.h | 4 ++
2 files changed, 79 insertions(+)
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 148e78bfb..76c71ce75 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -17,6 +17,7 @@
#include "CHUtil.h"
#include <filesystem>
+#include <format>
#include <memory>
#include <optional>
#include <unistd.h>
@@ -527,6 +528,50 @@ std::map<std::string, std::string>
BackendInitializerUtil::getBackendConfMap(con
return ch_backend_conf;
}
+std::vector<String> BackendInitializerUtil::wrapDiskPathConfig(
+ const String & path_prefix,
+ const String & path_suffix,
+ Poco::Util::AbstractConfiguration & config)
+{
+ std::vector<String> changed_paths;
+ if (path_prefix.empty() && path_suffix.empty())
+ return changed_paths;
+ Poco::Util::AbstractConfiguration::Keys disks;
+ std::unordered_set<String> disk_types = {"s3", "hdfs_gluten", "cache"};
+ config.keys("storage_configuration.disks", disks);
+
+ std::ranges::for_each(
+ disks,
+ [&](const auto & disk_name)
+ {
+ String disk_prefix = "storage_configuration.disks." + disk_name;
+ String disk_type = config.getString(disk_prefix + ".type", "");
+ if (!disk_types.contains(disk_type))
+ return;
+ if (disk_type == "cache")
+ {
+ String path = config.getString(disk_prefix + ".path", "");
+ if (!path.empty())
+ {
+ String final_path = path_prefix + path + path_suffix;
+ config.setString(disk_prefix + ".path", final_path);
+ changed_paths.emplace_back(final_path);
+ }
+ }
+ else if (disk_type == "s3" || disk_type == "hdfs_gluten")
+ {
+ String metadata_path = config.getString(disk_prefix +
".metadata_path", "");
+ if (!metadata_path.empty())
+ {
+ String final_path = path_prefix + metadata_path +
path_suffix;
+ config.setString(disk_prefix + ".metadata_path",
final_path);
+ changed_paths.emplace_back(final_path);
+ }
+ }
+ });
+ return changed_paths;
+}
+
DB::Context::ConfigurationPtr
BackendInitializerUtil::initConfig(std::map<std::string, std::string> &
backend_conf_map)
{
DB::Context::ConfigurationPtr config;
@@ -566,6 +611,25 @@ DB::Context::ConfigurationPtr
BackendInitializerUtil::initConfig(std::map<std::s
config->setString(CH_TASK_MEMORY,
backend_conf_map.at(GLUTEN_TASK_OFFHEAP));
}
+ const bool use_current_directory_as_tmp =
config->getBool("use_current_directory_as_tmp", false);
+ char buffer[PATH_MAX];
+ if (use_current_directory_as_tmp && getcwd(buffer, sizeof(buffer)) !=
nullptr)
+ {
+ wrapDiskPathConfig(String(buffer), "", *config);
+ }
+
+ const bool reuse_disk_cache = config->getBool("reuse_disk_cache", true);
+
+ if (!reuse_disk_cache)
+ {
+ String pid = std::to_string(static_cast<Int64>(getpid()));
+ auto path_need_clean = wrapDiskPathConfig("", "/" + pid, *config);
+ std::lock_guard lock(BackendFinalizerUtil::paths_mutex);
+ BackendFinalizerUtil::paths_need_to_clean.insert(
+ BackendFinalizerUtil::paths_need_to_clean.end(),
+ path_need_clean.begin(),
+ path_need_clean.end());
+ }
return config;
}
@@ -936,12 +1000,23 @@ void BackendFinalizerUtil::finalizeGlobally()
global_context.reset();
shared_context.reset();
}
+ std::lock_guard lock(paths_mutex);
+ std::ranges::for_each(paths_need_to_clean, [](const auto & path)
+ {
+ if (fs::exists(path))
+ fs::remove_all(path);
+ });
+ paths_need_to_clean.clear();
}
void BackendFinalizerUtil::finalizeSessionally()
{
}
+std::vector<String> BackendFinalizerUtil::paths_need_to_clean;
+
+std::mutex BackendFinalizerUtil::paths_mutex;
+
Int64 DateTimeUtil::currentTimeMillis()
{
return timeInMilliseconds(std::chrono::system_clock::now());
diff --git a/cpp-ch/local-engine/Common/CHUtil.h
b/cpp-ch/local-engine/Common/CHUtil.h
index 0321d410a..1198cfa21 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -196,6 +196,7 @@ private:
static void registerAllFactories();
static void applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr,
DB::Settings &);
static void updateNewSettings(const DB::ContextMutablePtr &, const
DB::Settings &);
+ static std::vector<String> wrapDiskPathConfig(const String & path_prefix,
const String & path_suffix, Poco::Util::AbstractConfiguration & config);
static std::map<std::string, std::string> getBackendConfMap(const
std::string & plan);
@@ -212,6 +213,9 @@ public:
/// Release session level resources like StorageJoinBuilder. Invoked every
time executor/driver shutdown.
static void finalizeSessionally();
+
+ static std::vector<String> paths_need_to_clean;
+ static std::mutex paths_mutex;
};
// Ignore memory track, memory should free before IgnoreMemoryTracker
deconstruction
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]