This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 17080e73cd [CH] Rename Mergetree part file name to avoid duplicated 
file name (#7769)
17080e73cd is described below

commit 17080e73cd15d6bc5889e3837ac2c5a28627edac
Author: LiuNeng <[email protected]>
AuthorDate: Wed Nov 6 13:59:34 2024 +0800

    [CH] Rename Mergetree part file name to avoid duplicated file name (#7769)
    
    What changes were proposed in this pull request?
    Rename Mergetree part file name to avoid duplicated file name and support 
prefetch mergetree data file
    
    How was this patch tested?
    unit tests
    
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
---
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  |  4 ++--
 cpp-ch/local-engine/Common/GlutenConfig.cpp        |  6 +++++
 cpp-ch/local-engine/Common/GlutenConfig.h          |  9 ++++++++
 .../CompactObjectStorageDiskTransaction.cpp        |  4 ++--
 .../CompactObjectStorageDiskTransaction.h          |  3 +++
 .../local-engine/Storages/Cache/CacheManager.cpp   |  7 +++++-
 cpp-ch/local-engine/Storages/Cache/CacheManager.h  |  2 +-
 .../Storages/MergeTree/SparkStorageMergeTree.cpp   | 26 +++++++++++++++-------
 .../Storages/MergeTree/SparkStorageMergeTree.h     |  4 +++-
 9 files changed, 50 insertions(+), 15 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index af67b01f49..331009d12c 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -196,9 +196,9 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
           val objectName = obj.get().objectName()
           if (objectName.contains("metadata.gluten")) {
             metadataGlutenExist = true
-          } else if (objectName.contains("meta.bin")) {
+          } else if (objectName.contains("part_meta.gluten")) {
             metadataBinExist = true
-          } else if (objectName.contains("data.bin")) {
+          } else if (objectName.contains("part_data.gluten")) {
             dataBinExist = true
           } else if (objectName.contains("_commits")) {
             // Spark 35 has _commits directory
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.cpp 
b/cpp-ch/local-engine/Common/GlutenConfig.cpp
index 93d074ecc2..eb6c2dcab6 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.cpp
+++ b/cpp-ch/local-engine/Common/GlutenConfig.cpp
@@ -137,4 +137,10 @@ GlutenJobSchedulerConfig 
GlutenJobSchedulerConfig::loadFromContext(const DB::Con
     config.job_scheduler_max_threads = 
context->getConfigRef().getUInt64(JOB_SCHEDULER_MAX_THREADS, 10);
     return config;
 }
+MergeTreeCacheConfig MergeTreeCacheConfig::loadFromContext(const 
DB::ContextPtr & context)
+{
+    MergeTreeCacheConfig config;
+    config.enable_data_prefetch = 
context->getConfigRef().getBool(ENABLE_DATA_PREFETCH, 
config.enable_data_prefetch);
+    return config;
+}
 }
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h 
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 82402eaafa..11220afb48 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -142,4 +142,13 @@ struct GlutenJobSchedulerConfig
 
     static GlutenJobSchedulerConfig loadFromContext(const DB::ContextPtr & 
context);
 };
+
+struct MergeTreeCacheConfig
+{
+    inline static const String ENABLE_DATA_PREFETCH = "enable_data_prefetch";
+
+    bool enable_data_prefetch = true;
+
+    static MergeTreeCacheConfig loadFromContext(const DB::ContextPtr & 
context);
+};
 }
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
index 82afeb85e2..5b1fe63a09 100644
--- 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
@@ -31,8 +31,8 @@ bool isMetaDataFile(const std::string & path)
 void CompactObjectStorageDiskTransaction::commit()
 {
     auto metadata_tx = disk.getMetadataStorage()->createTransaction();
-    std::filesystem::path data_path = std::filesystem::path(prefix_path) / 
"data.bin";
-    std::filesystem::path meta_path = std::filesystem::path(prefix_path) / 
"meta.bin";
+    std::filesystem::path data_path = std::filesystem::path(prefix_path) / 
PART_DATA_FILE_NAME;
+    std::filesystem::path meta_path = std::filesystem::path(prefix_path) / 
PART_META_FILE_NAME;
 
     auto object_storage = disk.getObjectStorage();
     auto data_key = object_storage->generateObjectKeyForPath(data_path, 
std::nullopt);
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
index e15c362f30..becb5371aa 100644
--- 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
@@ -34,6 +34,9 @@ namespace local_engine
 
 class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {
     public:
+    static inline const String PART_DATA_FILE_NAME = "part_data.gluten";
+    static inline const String PART_META_FILE_NAME = "part_meta.gluten";
+
     explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const 
DB::DiskPtr tmp_)
         : disk(disk_), tmp_data(tmp_)
     {
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp 
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index e2ba48e9d2..3218db2741 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -88,7 +88,9 @@ Task CacheManager::cachePart(
     job_context.table.parts.clear();
     job_context.table.parts.push_back(part);
     job_context.table.snapshot_id = "";
-    Task task = [job_detail = job_context, context = this->context, 
read_columns = columns, only_meta_cache]()
+    MergeTreeCacheConfig config = 
MergeTreeCacheConfig::loadFromContext(context);
+    Task task = [job_detail = job_context, context = this->context, 
read_columns = columns, only_meta_cache,
+        prefetch_data = config.enable_data_prefetch]()
     {
         try
         {
@@ -106,6 +108,9 @@ Task CacheManager::cachePart(
                     job_detail.table.parts.front().name);
                 return;
             }
+            // prefetch part data
+            if (prefetch_data)
+                
storage->prefetchPartDataFile({job_detail.table.parts.front().name});
 
             auto storage_snapshot = 
std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
             NamesAndTypesList names_and_types_list;
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h 
b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
index 8fd26d249a..c44026ce0b 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -29,7 +29,7 @@ struct MergeTreePart;
 struct MergeTreeTableInstance;
 
 /***
- * Manage the cache of the MergeTree, mainly including meta.bin, data.bin, 
metadata.gluten
+ * Manage the cache of the MergeTree, mainly including part_data.gluten, 
part_meta.gluten, metadata.gluten
  */
 class CacheManager
 {
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
index 45be9dcf74..15da00fbee 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
@@ -16,6 +16,7 @@
  */
 #include "SparkStorageMergeTree.h"
 
+#include <Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h>
 #include <Interpreters/MergeTreeTransaction.h>
 #include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
 #include <Storages/MergeTree/MergeTreeSettings.h>
@@ -159,27 +160,36 @@ SparkStorageMergeTree::SparkStorageMergeTree(
 
 std::atomic<int> SparkStorageMergeTree::part_num;
 
-void 
SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set<std::string> 
parts) const
+void SparkStorageMergeTree::prefetchPartDataFile(const 
std::unordered_set<std::string>& parts) const
+{
+    prefetchPartFiles(parts, 
CompactObjectStorageDiskTransaction::PART_DATA_FILE_NAME);
+}
+
+void SparkStorageMergeTree::prefetchPartFiles(const 
std::unordered_set<std::string>& parts, String file_name) const
 {
     auto disk = getDisks().front();
     if (!disk->isRemote())
         return;
-    std::vector<String> meta_paths;
-    std::ranges::for_each(parts, [&](const String & name) { 
meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); });
+    std::vector<String> data_paths;
+    std::ranges::for_each(parts, [&](const String & name) { 
data_paths.emplace_back(fs::path(relative_data_path) / name / file_name); });
     auto read_settings = ReadSettings{};
-    // read_settings.enable_filesystem_cache = false;
     read_settings.remote_fs_method = RemoteFSReadMethod::read;
-    for (const auto & meta_path : meta_paths)
+    for (const auto & data_path : data_paths)
     {
-        if (!disk->existsDirectory(meta_path))
+        if (!disk->existsFile(data_path))
             continue;
-
-        auto in = disk->readFile(meta_path, read_settings);
+        LOG_DEBUG(log, "Prefetching part file {}", data_path);
+        auto in = disk->readFile(data_path, read_settings);
         String ignore_data;
         readStringUntilEOF(ignore_data, *in);
     }
 }
 
+void SparkStorageMergeTree::prefetchMetaDataFile(const 
std::unordered_set<std::string>& parts) const
+{
+    prefetchPartFiles(parts, 
CompactObjectStorageDiskTransaction::PART_META_FILE_NAME);
+}
+
 std::vector<MergeTreeDataPartPtr> 
SparkStorageMergeTree::loadDataPartsWithNames(const 
std::unordered_set<std::string> & parts)
 {
     Stopwatch watch;
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h 
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
index cec1597eab..237cf69192 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
@@ -71,6 +71,7 @@ public:
     std::map<std::string, MutationCommands> getUnfinishedMutationCommands() 
const override;
     std::vector<MergeTreeDataPartPtr> loadDataPartsWithNames(const 
std::unordered_set<std::string> & parts);
     void removePartFromMemory(const MergeTreeData::DataPart & part_to_detach);
+    void prefetchPartDataFile(const std::unordered_set<std::string>& parts) 
const;
 
     MergeTreeDataSelectExecutor reader;
     MergeTreeDataMergerMutator merger_mutator;
@@ -91,7 +92,8 @@ private:
     static std::atomic<int> part_num;
     SimpleIncrement increment;
 
-    void prefetchMetaDataFile(std::unordered_set<std::string> parts) const;
+    void prefetchPartFiles(const std::unordered_set<std::string>& parts, 
String file_name) const;
+    void prefetchMetaDataFile(const std::unordered_set<std::string>& parts) 
const;
     void startBackgroundMovesIfNeeded() override;
     std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
     LoadPartResult loadDataPart(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to