This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 17080e73cd [CH] Rename Mergetree part file name to avoid duplicated
file name (#7769)
17080e73cd is described below
commit 17080e73cd15d6bc5889e3837ac2c5a28627edac
Author: LiuNeng <[email protected]>
AuthorDate: Wed Nov 6 13:59:34 2024 +0800
[CH] Rename Mergetree part file name to avoid duplicated file name (#7769)
What changes were proposed in this pull request?
Rename Mergetree part file name to avoid duplicated file name and support
prefetch mergetree data file
How was this patch tested?
unit tests
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
---
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 4 ++--
cpp-ch/local-engine/Common/GlutenConfig.cpp | 6 +++++
cpp-ch/local-engine/Common/GlutenConfig.h | 9 ++++++++
.../CompactObjectStorageDiskTransaction.cpp | 4 ++--
.../CompactObjectStorageDiskTransaction.h | 3 +++
.../local-engine/Storages/Cache/CacheManager.cpp | 7 +++++-
cpp-ch/local-engine/Storages/Cache/CacheManager.h | 2 +-
.../Storages/MergeTree/SparkStorageMergeTree.cpp | 26 +++++++++++++++-------
.../Storages/MergeTree/SparkStorageMergeTree.h | 4 +++-
9 files changed, 50 insertions(+), 15 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index af67b01f49..331009d12c 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -196,9 +196,9 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
val objectName = obj.get().objectName()
if (objectName.contains("metadata.gluten")) {
metadataGlutenExist = true
- } else if (objectName.contains("meta.bin")) {
+ } else if (objectName.contains("part_meta.gluten")) {
metadataBinExist = true
- } else if (objectName.contains("data.bin")) {
+ } else if (objectName.contains("part_data.gluten")) {
dataBinExist = true
} else if (objectName.contains("_commits")) {
// Spark 35 has _commits directory
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.cpp
b/cpp-ch/local-engine/Common/GlutenConfig.cpp
index 93d074ecc2..eb6c2dcab6 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.cpp
+++ b/cpp-ch/local-engine/Common/GlutenConfig.cpp
@@ -137,4 +137,10 @@ GlutenJobSchedulerConfig
GlutenJobSchedulerConfig::loadFromContext(const DB::Con
config.job_scheduler_max_threads =
context->getConfigRef().getUInt64(JOB_SCHEDULER_MAX_THREADS, 10);
return config;
}
+MergeTreeCacheConfig MergeTreeCacheConfig::loadFromContext(const
DB::ContextPtr & context)
+{
+ MergeTreeCacheConfig config;
+ config.enable_data_prefetch =
context->getConfigRef().getBool(ENABLE_DATA_PREFETCH,
config.enable_data_prefetch);
+ return config;
+}
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h
b/cpp-ch/local-engine/Common/GlutenConfig.h
index 82402eaafa..11220afb48 100644
--- a/cpp-ch/local-engine/Common/GlutenConfig.h
+++ b/cpp-ch/local-engine/Common/GlutenConfig.h
@@ -142,4 +142,13 @@ struct GlutenJobSchedulerConfig
static GlutenJobSchedulerConfig loadFromContext(const DB::ContextPtr &
context);
};
+
+struct MergeTreeCacheConfig
+{
+ inline static const String ENABLE_DATA_PREFETCH = "enable_data_prefetch";
+
+ bool enable_data_prefetch = true;
+
+ static MergeTreeCacheConfig loadFromContext(const DB::ContextPtr &
context);
+};
}
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
index 82afeb85e2..5b1fe63a09 100644
---
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
@@ -31,8 +31,8 @@ bool isMetaDataFile(const std::string & path)
void CompactObjectStorageDiskTransaction::commit()
{
auto metadata_tx = disk.getMetadataStorage()->createTransaction();
- std::filesystem::path data_path = std::filesystem::path(prefix_path) /
"data.bin";
- std::filesystem::path meta_path = std::filesystem::path(prefix_path) /
"meta.bin";
+ std::filesystem::path data_path = std::filesystem::path(prefix_path) /
PART_DATA_FILE_NAME;
+ std::filesystem::path meta_path = std::filesystem::path(prefix_path) /
PART_META_FILE_NAME;
auto object_storage = disk.getObjectStorage();
auto data_key = object_storage->generateObjectKeyForPath(data_path,
std::nullopt);
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
index e15c362f30..becb5371aa 100644
---
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
@@ -34,6 +34,9 @@ namespace local_engine
class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {
public:
+ static inline const String PART_DATA_FILE_NAME = "part_data.gluten";
+ static inline const String PART_META_FILE_NAME = "part_meta.gluten";
+
explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const
DB::DiskPtr tmp_)
: disk(disk_), tmp_data(tmp_)
{
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index e2ba48e9d2..3218db2741 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -88,7 +88,9 @@ Task CacheManager::cachePart(
job_context.table.parts.clear();
job_context.table.parts.push_back(part);
job_context.table.snapshot_id = "";
- Task task = [job_detail = job_context, context = this->context,
read_columns = columns, only_meta_cache]()
+ MergeTreeCacheConfig config =
MergeTreeCacheConfig::loadFromContext(context);
+ Task task = [job_detail = job_context, context = this->context,
read_columns = columns, only_meta_cache,
+ prefetch_data = config.enable_data_prefetch]()
{
try
{
@@ -106,6 +108,9 @@ Task CacheManager::cachePart(
job_detail.table.parts.front().name);
return;
}
+ // prefetch part data
+ if (prefetch_data)
+
storage->prefetchPartDataFile({job_detail.table.parts.front().name});
auto storage_snapshot =
std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
NamesAndTypesList names_and_types_list;
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
index 8fd26d249a..c44026ce0b 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -29,7 +29,7 @@ struct MergeTreePart;
struct MergeTreeTableInstance;
/***
- * Manage the cache of the MergeTree, mainly including meta.bin, data.bin,
metadata.gluten
+ * Manage the cache of the MergeTree, mainly including part_data.gluten,
part_meta.gluten, metadata.gluten
*/
class CacheManager
{
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
index 45be9dcf74..15da00fbee 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
@@ -16,6 +16,7 @@
*/
#include "SparkStorageMergeTree.h"
+#include <Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
@@ -159,27 +160,36 @@ SparkStorageMergeTree::SparkStorageMergeTree(
std::atomic<int> SparkStorageMergeTree::part_num;
-void
SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set<std::string>
parts) const
+void SparkStorageMergeTree::prefetchPartDataFile(const
std::unordered_set<std::string>& parts) const
+{
+ prefetchPartFiles(parts,
CompactObjectStorageDiskTransaction::PART_DATA_FILE_NAME);
+}
+
+void SparkStorageMergeTree::prefetchPartFiles(const
std::unordered_set<std::string>& parts, String file_name) const
{
auto disk = getDisks().front();
if (!disk->isRemote())
return;
- std::vector<String> meta_paths;
- std::ranges::for_each(parts, [&](const String & name) {
meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); });
+ std::vector<String> data_paths;
+ std::ranges::for_each(parts, [&](const String & name) {
data_paths.emplace_back(fs::path(relative_data_path) / name / file_name); });
auto read_settings = ReadSettings{};
- // read_settings.enable_filesystem_cache = false;
read_settings.remote_fs_method = RemoteFSReadMethod::read;
- for (const auto & meta_path : meta_paths)
+ for (const auto & data_path : data_paths)
{
- if (!disk->existsDirectory(meta_path))
+ if (!disk->existsFile(data_path))
continue;
-
- auto in = disk->readFile(meta_path, read_settings);
+ LOG_DEBUG(log, "Prefetching part file {}", data_path);
+ auto in = disk->readFile(data_path, read_settings);
String ignore_data;
readStringUntilEOF(ignore_data, *in);
}
}
+void SparkStorageMergeTree::prefetchMetaDataFile(const
std::unordered_set<std::string>& parts) const
+{
+ prefetchPartFiles(parts,
CompactObjectStorageDiskTransaction::PART_META_FILE_NAME);
+}
+
std::vector<MergeTreeDataPartPtr>
SparkStorageMergeTree::loadDataPartsWithNames(const
std::unordered_set<std::string> & parts)
{
Stopwatch watch;
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
index cec1597eab..237cf69192 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.h
@@ -71,6 +71,7 @@ public:
std::map<std::string, MutationCommands> getUnfinishedMutationCommands()
const override;
std::vector<MergeTreeDataPartPtr> loadDataPartsWithNames(const
std::unordered_set<std::string> & parts);
void removePartFromMemory(const MergeTreeData::DataPart & part_to_detach);
+ void prefetchPartDataFile(const std::unordered_set<std::string>& parts)
const;
MergeTreeDataSelectExecutor reader;
MergeTreeDataMergerMutator merger_mutator;
@@ -91,7 +92,8 @@ private:
static std::atomic<int> part_num;
SimpleIncrement increment;
- void prefetchMetaDataFile(std::unordered_set<std::string> parts) const;
+ void prefetchPartFiles(const std::unordered_set<std::string>& parts,
String file_name) const;
+ void prefetchMetaDataFile(const std::unordered_set<std::string>& parts)
const;
void startBackgroundMovesIfNeeded() override;
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
LoadPartResult loadDataPart(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]