This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e71a0c414 [CH] Support use dynamic disk path #6232
e71a0c414 is described below

commit e71a0c414ecd2595b166a03ee381845f9977302c
Author: LiuNeng <[email protected]>
AuthorDate: Thu Jun 27 15:06:11 2024 +0800

    [CH] Support use dynamic disk path #6232
    
    What changes were proposed in this pull request?
    Support use dynamic disk path
    
spark.gluten.sql.columnar.backend.ch.runtime_config.use_current_directory_as_tmp=true
    disk.metadata_path and cache.path are automatically mapped to the current 
directory
    spark.gluten.sql.columnar.backend.ch.runtime_config.reuse_disk_cache=false
    Add the current pid number to disk.metadata_path and cache.path
    
    How was this patch tested?
    unit tests
---
 cpp-ch/local-engine/Common/CHUtil.cpp | 75 +++++++++++++++++++++++++++++++++++
 cpp-ch/local-engine/Common/CHUtil.h   |  4 ++
 2 files changed, 79 insertions(+)

diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index 148e78bfb..76c71ce75 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -17,6 +17,7 @@
 
 #include "CHUtil.h"
 #include <filesystem>
+#include <format>
 #include <memory>
 #include <optional>
 #include <unistd.h>
@@ -527,6 +528,50 @@ std::map<std::string, std::string> 
BackendInitializerUtil::getBackendConfMap(con
     return ch_backend_conf;
 }
 
+std::vector<String> BackendInitializerUtil::wrapDiskPathConfig(
+    const String & path_prefix,
+    const String & path_suffix,
+    Poco::Util::AbstractConfiguration & config)
+{
+    std::vector<String> changed_paths;
+    if (path_prefix.empty() && path_suffix.empty())
+        return changed_paths;
+    Poco::Util::AbstractConfiguration::Keys disks;
+    std::unordered_set<String> disk_types = {"s3", "hdfs_gluten", "cache"};
+    config.keys("storage_configuration.disks", disks);
+
+    std::ranges::for_each(
+        disks,
+        [&](const auto & disk_name)
+        {
+            String disk_prefix = "storage_configuration.disks." + disk_name;
+            String disk_type = config.getString(disk_prefix + ".type", "");
+            if (!disk_types.contains(disk_type))
+                return;
+            if (disk_type == "cache")
+            {
+                String path = config.getString(disk_prefix + ".path", "");
+                if (!path.empty())
+                {
+                    String final_path = path_prefix + path + path_suffix;
+                    config.setString(disk_prefix + ".path", final_path);
+                    changed_paths.emplace_back(final_path);
+                }
+            }
+            else if (disk_type == "s3" || disk_type == "hdfs_gluten")
+            {
+                String metadata_path = config.getString(disk_prefix + 
".metadata_path", "");
+                if (!metadata_path.empty())
+                {
+                    String final_path = path_prefix + metadata_path + 
path_suffix;
+                    config.setString(disk_prefix + ".metadata_path", 
final_path);
+                    changed_paths.emplace_back(final_path);
+                }
+            }
+        });
+    return changed_paths;
+}
+
 DB::Context::ConfigurationPtr 
BackendInitializerUtil::initConfig(std::map<std::string, std::string> & 
backend_conf_map)
 {
     DB::Context::ConfigurationPtr config;
@@ -566,6 +611,25 @@ DB::Context::ConfigurationPtr 
BackendInitializerUtil::initConfig(std::map<std::s
         config->setString(CH_TASK_MEMORY, 
backend_conf_map.at(GLUTEN_TASK_OFFHEAP));
     }
 
+    const bool use_current_directory_as_tmp = 
config->getBool("use_current_directory_as_tmp", false);
+    char buffer[PATH_MAX];
+    if (use_current_directory_as_tmp && getcwd(buffer, sizeof(buffer)) != 
nullptr)
+    {
+        wrapDiskPathConfig(String(buffer), "", *config);
+    }
+
+    const bool reuse_disk_cache = config->getBool("reuse_disk_cache", true);
+
+    if (!reuse_disk_cache)
+    {
+        String pid = std::to_string(static_cast<Int64>(getpid()));
+        auto path_need_clean = wrapDiskPathConfig("", "/" + pid, *config);
+        std::lock_guard lock(BackendFinalizerUtil::paths_mutex);
+        BackendFinalizerUtil::paths_need_to_clean.insert(
+            BackendFinalizerUtil::paths_need_to_clean.end(),
+            path_need_clean.begin(),
+            path_need_clean.end());
+    }
     return config;
 }
 
@@ -936,12 +1000,23 @@ void BackendFinalizerUtil::finalizeGlobally()
         global_context.reset();
         shared_context.reset();
     }
+    std::lock_guard lock(paths_mutex);
+    std::ranges::for_each(paths_need_to_clean, [](const auto & path)
+    {
+        if (fs::exists(path))
+            fs::remove_all(path);
+    });
+    paths_need_to_clean.clear();
 }
 
 void BackendFinalizerUtil::finalizeSessionally()
 {
 }
 
+std::vector<String> BackendFinalizerUtil::paths_need_to_clean;
+
+std::mutex BackendFinalizerUtil::paths_mutex;
+
 Int64 DateTimeUtil::currentTimeMillis()
 {
     return timeInMilliseconds(std::chrono::system_clock::now());
diff --git a/cpp-ch/local-engine/Common/CHUtil.h 
b/cpp-ch/local-engine/Common/CHUtil.h
index 0321d410a..1198cfa21 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -196,6 +196,7 @@ private:
     static void registerAllFactories();
     static void applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr, 
DB::Settings &);
     static void updateNewSettings(const DB::ContextMutablePtr &, const 
DB::Settings &);
+    static std::vector<String> wrapDiskPathConfig(const String & path_prefix, 
const String & path_suffix, Poco::Util::AbstractConfiguration & config);
 
 
     static std::map<std::string, std::string> getBackendConfMap(const 
std::string & plan);
@@ -212,6 +213,9 @@ public:
 
     /// Release session level resources like StorageJoinBuilder. Invoked every 
time executor/driver shutdown.
     static void finalizeSessionally();
+
+    static std::vector<String> paths_need_to_clean;
+    static std::mutex paths_mutex;
 };
 
 // Ignore memory track, memory should free before IgnoreMemoryTracker 
deconstruction


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to