This is an automated email from the ASF dual-hosted git repository.
loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 62ed4ab037 [GLUTEN-7765][CH] Support CACHE META command for MergeTree
table (#7774)
62ed4ab037 is described below
commit 62ed4ab0376922d34cefcf8e0636b8afe724d07a
Author: Shuai li <[email protected]>
AuthorDate: Fri Nov 1 17:57:53 2024 +0800
[GLUTEN-7765][CH] Support CACHE META command for MergeTree table (#7774)
---
.../gluten/execution/CHNativeCacheManager.java | 7 ++++---
.../apache/spark/rpc/GlutenExecutorEndpoint.scala | 4 ++--
.../org/apache/spark/rpc/GlutenRpcMessages.scala | 5 ++++-
.../commands/GlutenCHCacheDataCommand.scala | 4 ++--
.../local-engine/Storages/Cache/CacheManager.cpp | 24 ++++++++++++++++------
cpp-ch/local-engine/Storages/Cache/CacheManager.h | 5 +++--
.../Storages/MergeTree/SparkStorageMergeTree.cpp | 11 ++++++++++
cpp-ch/local-engine/local_engine_jni.cpp | 6 +++---
8 files changed, 47 insertions(+), 19 deletions(-)
diff --git
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
index 4033d8c6b1..7c89cf6b41 100644
---
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
+++
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
@@ -19,11 +19,12 @@ package org.apache.gluten.execution;
import java.util.Set;
public class CHNativeCacheManager {
- public static String cacheParts(String table, Set<String> columns) {
- return nativeCacheParts(table, String.join(",", columns));
+ public static String cacheParts(String table, Set<String> columns, boolean
onlyMetaCache) {
+ return nativeCacheParts(table, String.join(",", columns), onlyMetaCache);
}
- private static native String nativeCacheParts(String table, String columns);
+ private static native String nativeCacheParts(
+ String table, String columns, boolean onlyMetaCache);
public static CacheResult getCacheStatus(String jobId) {
return nativeGetCacheStatus(jobId);
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index 559a22cb12..b9ae9ff363 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -70,9 +70,9 @@ class GlutenExecutorEndpoint(val executorId: String, val
conf: SparkConf)
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
Unit] = {
- case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
+ case GlutenMergeTreeCacheLoad(mergeTreeTable, columns, onlyMetaCache) =>
try {
- val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
+ val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns,
onlyMetaCache)
context.reply(CacheJobInfo(status = true, jobId))
} catch {
case e: Exception =>
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
index e596e94fed..8127c324b7 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
@@ -36,7 +36,10 @@ object GlutenRpcMessages {
extends GlutenRpcMessage
// for mergetree cache
- case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns:
util.Set[String])
+ case class GlutenMergeTreeCacheLoad(
+ mergeTreeTable: String,
+ columns: util.Set[String],
+ onlyMetaCache: Boolean)
extends GlutenRpcMessage
case class GlutenCacheLoadStatus(jobId: String)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index 1c7b4f2322..69a8c42187 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -201,7 +201,7 @@ case class GlutenCHCacheDataCommand(
(
executorId,
executor.executorEndpointRef.ask[CacheJobInfo](
- GlutenMergeTreeCacheLoad(tableMessage,
selectedColumns.toSet.asJava)
+ GlutenMergeTreeCacheLoad(tableMessage,
selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
} else {
@@ -213,7 +213,7 @@ case class GlutenCHCacheDataCommand(
(
value._1,
executorData.executorEndpointRef.ask[CacheJobInfo](
- GlutenMergeTreeCacheLoad(value._2,
selectedColumns.toSet.asJava)
+ GlutenMergeTreeCacheLoad(value._2,
selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
}
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index bb688724c3..e2ba48e9d2 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -81,17 +81,31 @@ struct CacheJobContext
MergeTreeTableInstance table;
};
-Task CacheManager::cachePart(const MergeTreeTableInstance & table, const
MergeTreePart & part, const std::unordered_set<String> & columns)
+Task CacheManager::cachePart(
+ const MergeTreeTableInstance & table, const MergeTreePart & part, const
std::unordered_set<String> & columns, bool only_meta_cache)
{
CacheJobContext job_context{table};
job_context.table.parts.clear();
job_context.table.parts.push_back(part);
job_context.table.snapshot_id = "";
- Task task = [job_detail = job_context, context = this->context,
read_columns = columns]()
+ Task task = [job_detail = job_context, context = this->context,
read_columns = columns, only_meta_cache]()
{
try
{
auto storage = job_detail.table.restoreStorage(context);
+ std::vector<DataPartPtr> selected_parts
+ =
StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "",
{job_detail.table.parts.front().name});
+
+ if (only_meta_cache)
+ {
+ LOG_INFO(
+ getLogger("CacheManager"),
+ "Load meta cache of table {}.{} part {} success.",
+ job_detail.table.database,
+ job_detail.table.table,
+ job_detail.table.parts.front().name);
+ return;
+ }
auto storage_snapshot =
std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
NamesAndTypesList names_and_types_list;
@@ -102,8 +116,6 @@ Task CacheManager::cachePart(const MergeTreeTableInstance &
table, const MergeTr
names_and_types_list.push_back(NameAndTypePair(column.name, column.type));
}
auto query_info = buildQueryInfo(names_and_types_list);
- std::vector<DataPartPtr> selected_parts
- =
StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "",
{job_detail.table.parts.front().name});
auto read_step = storage->reader.readFromParts(
selected_parts,
storage->getMutationsSnapshot({}),
@@ -135,13 +147,13 @@ Task CacheManager::cachePart(const MergeTreeTableInstance
& table, const MergeTr
return std::move(task);
}
-JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const
std::unordered_set<String>& columns)
+JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const
std::unordered_set<String>& columns, bool only_meta_cache)
{
JobId id = toString(UUIDHelpers::generateV4());
Job job(id);
for (const auto & part : table.parts)
{
- job.addTask(cachePart(table, part, columns));
+ job.addTask(cachePart(table, part, columns, only_meta_cache));
}
auto& scheduler = JobScheduler::instance();
scheduler.scheduleJob(std::move(job));
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
index b59963ec4f..8fd26d249a 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -40,7 +40,7 @@ public:
static CacheManager & instance();
static void initialize(const DB::ContextMutablePtr & context);
- JobId cacheParts(const MergeTreeTableInstance & table, const
std::unordered_set<String> & columns);
+ JobId cacheParts(const MergeTreeTableInstance & table, const
std::unordered_set<String> & columns, bool only_meta_cache);
static jobject getCacheStatus(JNIEnv * env, const String & jobId);
Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file,
ReadBufferBuilderPtr read_buffer_builder);
@@ -48,7 +48,8 @@ public:
static void removeFiles(String file, String cache_name);
private:
- Task cachePart(const MergeTreeTableInstance & table, const MergeTreePart &
part, const std::unordered_set<String> & columns);
+ Task cachePart(
+ const MergeTreeTableInstance & table, const MergeTreePart & part,
const std::unordered_set<String> & columns, bool only_meta_cache);
CacheManager() = default;
DB::ContextMutablePtr context;
};
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
index 9c90c67f69..45be9dcf74 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
@@ -22,6 +22,12 @@
#include <Storages/MergeTree/SparkMergeTreeSink.h>
#include <Storages/MergeTree/checkDataPart.h>
+namespace ProfileEvents
+{
+extern const Event LoadedDataParts;
+extern const Event LoadedDataPartsMicroseconds;
+}
+
namespace DB
{
namespace MergeTreeSetting
@@ -176,6 +182,7 @@ void
SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set<std::string>
std::vector<MergeTreeDataPartPtr>
SparkStorageMergeTree::loadDataPartsWithNames(const
std::unordered_set<std::string> & parts)
{
+ Stopwatch watch;
prefetchMetaDataFile(parts);
std::vector<MergeTreeDataPartPtr> data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
@@ -187,6 +194,10 @@ std::vector<MergeTreeDataPartPtr>
SparkStorageMergeTree::loadDataPartsWithNames(
data_parts.emplace_back(res.part);
}
+ watch.stop();
+ LOG_DEBUG(log, "Loaded data parts ({} items) took {} microseconds",
parts.size(), watch.elapsedMicroseconds());
+ ProfileEvents::increment(ProfileEvents::LoadedDataParts, parts.size());
+ ProfileEvents::increment(ProfileEvents::LoadedDataPartsMicroseconds,
watch.elapsedMicroseconds());
return data_parts;
}
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 02e606e1ed..8ff8a866b7 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1340,8 +1340,8 @@ JNIEXPORT void
Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeExc
}
-JNIEXPORT jstring
-Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv
* env, jobject, jstring table_, jstring columns_)
+JNIEXPORT jstring
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(
+ JNIEnv * env, jobject, jstring table_, jstring columns_, jboolean
only_meta_cache_)
{
LOCAL_ENGINE_JNI_METHOD_START
auto table_def = jstring2string(env, table_);
@@ -1351,7 +1351,7 @@
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv *
for (const auto & col : tokenizer)
column_set.insert(col);
local_engine::MergeTreeTableInstance table(table_def);
- auto id = local_engine::CacheManager::instance().cacheParts(table,
column_set);
+ auto id = local_engine::CacheManager::instance().cacheParts(table,
column_set, only_meta_cache_);
return local_engine::charTojstring(env, id.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]