This is an automated email from the ASF dual-hosted git repository.

loneylee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 62ed4ab037 [GLUTEN-7765][CH] Support CACHE META command for MergeTree 
table (#7774)
62ed4ab037 is described below

commit 62ed4ab0376922d34cefcf8e0636b8afe724d07a
Author: Shuai li <[email protected]>
AuthorDate: Fri Nov 1 17:57:53 2024 +0800

    [GLUTEN-7765][CH] Support CACHE META command for MergeTree table (#7774)
---
 .../gluten/execution/CHNativeCacheManager.java     |  7 ++++---
 .../apache/spark/rpc/GlutenExecutorEndpoint.scala  |  4 ++--
 .../org/apache/spark/rpc/GlutenRpcMessages.scala   |  5 ++++-
 .../commands/GlutenCHCacheDataCommand.scala        |  4 ++--
 .../local-engine/Storages/Cache/CacheManager.cpp   | 24 ++++++++++++++++------
 cpp-ch/local-engine/Storages/Cache/CacheManager.h  |  5 +++--
 .../Storages/MergeTree/SparkStorageMergeTree.cpp   | 11 ++++++++++
 cpp-ch/local-engine/local_engine_jni.cpp           |  6 +++---
 8 files changed, 47 insertions(+), 19 deletions(-)

diff --git 
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
index 4033d8c6b1..7c89cf6b41 100644
--- 
a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
+++ 
b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java
@@ -19,11 +19,12 @@ package org.apache.gluten.execution;
 import java.util.Set;
 
 public class CHNativeCacheManager {
-  public static String cacheParts(String table, Set<String> columns) {
-    return nativeCacheParts(table, String.join(",", columns));
+  public static String cacheParts(String table, Set<String> columns, boolean 
onlyMetaCache) {
+    return nativeCacheParts(table, String.join(",", columns), onlyMetaCache);
   }
 
-  private static native String nativeCacheParts(String table, String columns);
+  private static native String nativeCacheParts(
+      String table, String columns, boolean onlyMetaCache);
 
   public static CacheResult getCacheStatus(String jobId) {
     return nativeGetCacheStatus(jobId);
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
index 559a22cb12..b9ae9ff363 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala
@@ -70,9 +70,9 @@ class GlutenExecutorEndpoint(val executorId: String, val 
conf: SparkConf)
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
-    case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
+    case GlutenMergeTreeCacheLoad(mergeTreeTable, columns, onlyMetaCache) =>
       try {
-        val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
+        val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns, 
onlyMetaCache)
         context.reply(CacheJobInfo(status = true, jobId))
       } catch {
         case e: Exception =>
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
index e596e94fed..8127c324b7 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala
@@ -36,7 +36,10 @@ object GlutenRpcMessages {
     extends GlutenRpcMessage
 
   // for mergetree cache
-  case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: 
util.Set[String])
+  case class GlutenMergeTreeCacheLoad(
+      mergeTreeTable: String,
+      columns: util.Set[String],
+      onlyMetaCache: Boolean)
     extends GlutenRpcMessage
 
   case class GlutenCacheLoadStatus(jobId: String)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index 1c7b4f2322..69a8c42187 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -201,7 +201,7 @@ case class GlutenCHCacheDataCommand(
             (
               executorId,
               executor.executorEndpointRef.ask[CacheJobInfo](
-                GlutenMergeTreeCacheLoad(tableMessage, 
selectedColumns.toSet.asJava)
+                GlutenMergeTreeCacheLoad(tableMessage, 
selectedColumns.toSet.asJava, onlyMetaCache)
               )))
         })
     } else {
@@ -213,7 +213,7 @@ case class GlutenCHCacheDataCommand(
             (
               value._1,
               executorData.executorEndpointRef.ask[CacheJobInfo](
-                GlutenMergeTreeCacheLoad(value._2, 
selectedColumns.toSet.asJava)
+                GlutenMergeTreeCacheLoad(value._2, 
selectedColumns.toSet.asJava, onlyMetaCache)
               )))
         })
     }
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp 
b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
index bb688724c3..e2ba48e9d2 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp
@@ -81,17 +81,31 @@ struct CacheJobContext
     MergeTreeTableInstance table;
 };
 
-Task CacheManager::cachePart(const MergeTreeTableInstance & table, const 
MergeTreePart & part, const std::unordered_set<String> & columns)
+Task CacheManager::cachePart(
+    const MergeTreeTableInstance & table, const MergeTreePart & part, const 
std::unordered_set<String> & columns, bool only_meta_cache)
 {
     CacheJobContext job_context{table};
     job_context.table.parts.clear();
     job_context.table.parts.push_back(part);
     job_context.table.snapshot_id = "";
-    Task task = [job_detail = job_context, context = this->context, 
read_columns = columns]()
+    Task task = [job_detail = job_context, context = this->context, 
read_columns = columns, only_meta_cache]()
     {
         try
         {
             auto storage = job_detail.table.restoreStorage(context);
+            std::vector<DataPartPtr> selected_parts
+                = 
StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", 
{job_detail.table.parts.front().name});
+
+            if (only_meta_cache)
+            {
+                LOG_INFO(
+                    getLogger("CacheManager"),
+                    "Load meta cache of table {}.{} part {} success.",
+                    job_detail.table.database,
+                    job_detail.table.table,
+                    job_detail.table.parts.front().name);
+                return;
+            }
 
             auto storage_snapshot = 
std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
             NamesAndTypesList names_and_types_list;
@@ -102,8 +116,6 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & 
table, const MergeTr
                     
names_and_types_list.push_back(NameAndTypePair(column.name, column.type));
             }
             auto query_info = buildQueryInfo(names_and_types_list);
-            std::vector<DataPartPtr> selected_parts
-                = 
StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", 
{job_detail.table.parts.front().name});
             auto read_step = storage->reader.readFromParts(
                 selected_parts,
                 storage->getMutationsSnapshot({}),
@@ -135,13 +147,13 @@ Task CacheManager::cachePart(const MergeTreeTableInstance 
& table, const MergeTr
     return std::move(task);
 }
 
-JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const 
std::unordered_set<String>& columns)
+JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const 
std::unordered_set<String>& columns, bool only_meta_cache)
 {
     JobId id = toString(UUIDHelpers::generateV4());
     Job job(id);
     for (const auto & part : table.parts)
     {
-        job.addTask(cachePart(table, part, columns));
+        job.addTask(cachePart(table, part, columns, only_meta_cache));
     }
     auto& scheduler = JobScheduler::instance();
     scheduler.scheduleJob(std::move(job));
diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h 
b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
index b59963ec4f..8fd26d249a 100644
--- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h
+++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h
@@ -40,7 +40,7 @@ public:
 
     static CacheManager & instance();
     static void initialize(const DB::ContextMutablePtr & context);
-    JobId cacheParts(const MergeTreeTableInstance & table, const 
std::unordered_set<String> & columns);
+    JobId cacheParts(const MergeTreeTableInstance & table, const 
std::unordered_set<String> & columns, bool only_meta_cache);
     static jobject getCacheStatus(JNIEnv * env, const String & jobId);
 
     Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, 
ReadBufferBuilderPtr read_buffer_builder);
@@ -48,7 +48,8 @@ public:
     static void removeFiles(String file, String cache_name);
 
 private:
-    Task cachePart(const MergeTreeTableInstance & table, const MergeTreePart & 
part, const std::unordered_set<String> & columns);
+    Task cachePart(
+        const MergeTreeTableInstance & table, const MergeTreePart & part, 
const std::unordered_set<String> & columns, bool only_meta_cache);
     CacheManager() = default;
     DB::ContextMutablePtr context;
 };
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp 
b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
index 9c90c67f69..45be9dcf74 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp
@@ -22,6 +22,12 @@
 #include <Storages/MergeTree/SparkMergeTreeSink.h>
 #include <Storages/MergeTree/checkDataPart.h>
 
+namespace ProfileEvents
+{
+extern const Event LoadedDataParts;
+extern const Event LoadedDataPartsMicroseconds;
+}
+
 namespace DB
 {
 namespace MergeTreeSetting
@@ -176,6 +182,7 @@ void 
SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set<std::string>
 
 std::vector<MergeTreeDataPartPtr> 
SparkStorageMergeTree::loadDataPartsWithNames(const 
std::unordered_set<std::string> & parts)
 {
+    Stopwatch watch;
     prefetchMetaDataFile(parts);
     std::vector<MergeTreeDataPartPtr> data_parts;
     const auto disk = getStoragePolicy()->getDisks().at(0);
@@ -187,6 +194,10 @@ std::vector<MergeTreeDataPartPtr> 
SparkStorageMergeTree::loadDataPartsWithNames(
         data_parts.emplace_back(res.part);
     }
 
+    watch.stop();
+    LOG_DEBUG(log, "Loaded data parts ({} items) took {} microseconds", 
parts.size(), watch.elapsedMicroseconds());
+    ProfileEvents::increment(ProfileEvents::LoadedDataParts, parts.size());
+    ProfileEvents::increment(ProfileEvents::LoadedDataPartsMicroseconds, 
watch.elapsedMicroseconds());
     return data_parts;
 }
 
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp 
b/cpp-ch/local-engine/local_engine_jni.cpp
index 02e606e1ed..8ff8a866b7 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -1340,8 +1340,8 @@ JNIEXPORT void 
Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeExc
 }
 
 
-JNIEXPORT jstring
-Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv 
* env, jobject, jstring table_, jstring columns_)
+JNIEXPORT jstring 
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(
+    JNIEnv * env, jobject, jstring table_, jstring columns_, jboolean 
only_meta_cache_)
 {
     LOCAL_ENGINE_JNI_METHOD_START
     auto table_def = jstring2string(env, table_);
@@ -1351,7 +1351,7 @@ 
Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv *
     for (const auto & col : tokenizer)
         column_set.insert(col);
     local_engine::MergeTreeTableInstance table(table_def);
-    auto id = local_engine::CacheManager::instance().cacheParts(table, 
column_set);
+    auto id = local_engine::CacheManager::instance().cacheParts(table, 
column_set, only_meta_cache_);
     return local_engine::charTojstring(env, id.c_str());
     LOCAL_ENGINE_JNI_METHOD_END(env, nullptr);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to