This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e0fcfe586 [GLUTEN-6178][CH] Add config to insert remote file system 
directly #6192
e0fcfe586 is described below

commit e0fcfe586efc7efb3ec0c349d5ca8b2371d969d4
Author: Shuai li <[email protected]>
AuthorDate: Mon Jun 24 13:55:39 2024 +0800

    [GLUTEN-6178][CH] Add config to insert remote file system directly #6192
    
    What changes were proposed in this pull request?
    (Please fill in changes proposed in this fix)
    
    (Fixes: #6178)
    
    How was this patch tested?
    Test by ut
---
 ...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 44 +++++++++++++++++++++-
 cpp-ch/local-engine/Common/CHUtil.cpp              |  3 +-
 cpp-ch/local-engine/Common/CHUtil.h                |  4 +-
 .../Disks/ObjectStorages/GlutenDiskHDFS.cpp        | 10 ++++-
 .../Disks/ObjectStorages/GlutenDiskHDFS.h          |  2 +
 .../Storages/Mergetree/SparkMergeTreeWriter.cpp    | 40 +++++++++++---------
 .../Storages/Mergetree/SparkMergeTreeWriter.h      |  2 +
 7 files changed, 83 insertions(+), 22 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 572d0cd50..99b212059 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -25,10 +25,12 @@ import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMerg
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.{FileSystem, Path}
 
 import java.io.File
 
+import scala.concurrent.duration.DurationInt
+
 // Some sqls' line length exceeds 100
 // scalastyle:off line.size.limit
 
@@ -614,5 +616,45 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
       .count()
     assertResult(600572)(result)
   }
+
+  test("test mergetree insert with optimize basic") {
+    val tableName = "lineitem_mergetree_insert_optimize_basic_hdfs"
+    val dataPath = s"$HDFS_URL/test/$tableName"
+
+    withSQLConf(
+      "spark.databricks.delta.optimize.minFileSize" -> "200000000",
+      
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
 -> "true",
+      
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage"
 -> "true",
+      
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows"
 -> "10000"
+    ) {
+      spark.sql(s"""
+                   |DROP TABLE IF EXISTS $tableName;
+                   |""".stripMargin)
+
+      spark.sql(s"""
+                   |CREATE TABLE IF NOT EXISTS $tableName
+                   |USING clickhouse
+                   |LOCATION '$dataPath'
+                   |TBLPROPERTIES (storage_policy='__hdfs_main')
+                   | as select * from lineitem
+                   |""".stripMargin)
+
+      val ret = spark.sql(s"select count(*) from $tableName").collect()
+      assertResult(600572)(ret.apply(0).get(0))
+      val conf = new Configuration
+      conf.set("fs.defaultFS", HDFS_URL)
+      val fs = FileSystem.get(conf)
+
+      eventually(timeout(60.seconds), interval(2.seconds)) {
+        val it = fs.listFiles(new Path(dataPath), true)
+        var files = 0
+        while (it.hasNext) {
+          it.next()
+          files += 1
+        }
+        assertResult(72)(files)
+      }
+    }
+  }
 }
 // scalastyle:off line.size.limit
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index be66d8ecc..94cd38003 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -623,7 +623,8 @@ void 
BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
 {
     /// Initialize default setting.
     settings.set("date_time_input_format", "best_effort");
-    settings.set("mergetree.merge_after_insert", true);
+    settings.set(MERGETREE_MERGE_AFTER_INSERT, true);
+    settings.set(MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE, false);
 
     for (const auto & [key, value] : backend_conf_map)
     {
diff --git a/cpp-ch/local-engine/Common/CHUtil.h 
b/cpp-ch/local-engine/Common/CHUtil.h
index 50de9461f..94e0f0168 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -35,7 +35,9 @@ class QueryPlan;
 
 namespace local_engine
 {
-static const std::unordered_set<String> 
BOOL_VALUE_SETTINGS{"mergetree.merge_after_insert"};
+static const String MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE = 
"mergetree.insert_without_local_storage";
+static const String MERGETREE_MERGE_AFTER_INSERT = 
"mergetree.merge_after_insert";
+static const std::unordered_set<String> 
BOOL_VALUE_SETTINGS{MERGETREE_MERGE_AFTER_INSERT, 
MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE};
 static const std::unordered_set<String> LONG_VALUE_SETTINGS{
     "optimize.maxfilesize", "optimize.minFileSize", 
"mergetree.max_num_part_per_merge_task"};
 
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index 07a7aa6bd..f207ad232 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -52,7 +52,15 @@ void GlutenDiskHDFS::createDirectories(const String & path)
 void GlutenDiskHDFS::removeDirectory(const String & path)
 {
     DiskObjectStorage::removeDirectory(path);
-    hdfsDelete(hdfs_object_storage->getHDFSFS(), path.c_str(), 1);
+    String abs_path = "/" + path;
+    hdfsDelete(hdfs_object_storage->getHDFSFS(), abs_path.c_str(), 1);
+}
+
+void GlutenDiskHDFS::removeRecursive(const String & path)
+{
+    DiskObjectStorage::removeRecursive(path);
+    String abs_path = "/" + path;
+    hdfsDelete(hdfs_object_storage->getHDFSFS(), abs_path.c_str(), 1);
 }
 
 DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
index 222b9f892..97a99f1de 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -57,6 +57,8 @@ public:
 
     void removeDirectory(const String & path) override;
 
+    void removeRecursive(const String & path) override;
+
     DB::DiskObjectStoragePtr createDiskObjectStorage() override;
 
     std::unique_ptr<DB::WriteBufferFromFileBase> writeFile(const String& path, 
size_t buf_size, DB::WriteMode mode,
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index c1f2391a2..406f2aaa2 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -69,11 +69,23 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
     , bucket_dir(bucket_dir_)
     , thread_pool(CurrentMetrics::LocalThread, 
CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 
100000)
 {
+    const DB::Settings & settings = context->getSettingsRef();
+    merge_after_insert = 
settings.get(MERGETREE_MERGE_AFTER_INSERT).get<bool>();
+    insert_without_local_storage = 
settings.get(MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE).get<bool>();
+
+    Field limit_size_field;
+    if (settings.tryGet("optimize.minFileSize", limit_size_field))
+        merge_min_size = limit_size_field.get<Int64>() <= 0 ? merge_min_size : 
limit_size_field.get<Int64>();
+
+    Field limit_cnt_field;
+    if (settings.tryGet("mergetree.max_num_part_per_merge_task", 
limit_cnt_field))
+        merge_limit_parts = limit_cnt_field.get<Int64>() <= 0 ? 
merge_limit_parts : limit_cnt_field.get<Int64>();
+
     dest_storage = MergeTreeRelParser::parseStorage(merge_tree_table, 
SerializedPlanParser::global_context);
+    isRemoteStorage = 
dest_storage->getStoragePolicy()->getAnyDisk()->isRemote();
 
-    if (dest_storage->getStoragePolicy()->getAnyDisk()->isRemote())
+    if (useLocalStorage())
     {
-        isRemoteStorage = true;
         temp_storage = 
MergeTreeRelParser::copyToDefaultPolicyStorage(merge_tree_table, 
SerializedPlanParser::global_context);
         storage = temp_storage;
         LOG_DEBUG(
@@ -86,22 +98,14 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
 
     metadata_snapshot = storage->getInMemoryMetadataPtr();
     header = metadata_snapshot->getSampleBlock();
-    const DB::Settings & settings = context->getSettingsRef();
     squashing = std::make_unique<DB::Squashing>(header, 
settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
     if (!partition_dir.empty())
         extractPartitionValues(partition_dir, partition_values);
+}
 
-    Field is_merge;
-    if (settings.tryGet("mergetree.merge_after_insert", is_merge))
-        merge_after_insert = is_merge.get<bool>();
-
-    Field limit_size_field;
-    if (settings.tryGet("optimize.minFileSize", limit_size_field))
-        merge_min_size = limit_size_field.get<Int64>() <= 0 ? merge_min_size : 
limit_size_field.get<Int64>();
-
-    Field limit_cnt_field;
-    if (settings.tryGet("mergetree.max_num_part_per_merge_task", 
limit_cnt_field))
-        merge_limit_parts = limit_cnt_field.get<Int64>() <= 0 ? 
merge_limit_parts : limit_cnt_field.get<Int64>();
+bool SparkMergeTreeWriter::useLocalStorage() const
+{
+    return !insert_without_local_storage && isRemoteStorage;
 }
 
 void SparkMergeTreeWriter::write(const DB::Block & block)
@@ -161,7 +165,7 @@ void SparkMergeTreeWriter::manualFreeMemory(size_t 
before_write_memory)
     // it may alloc memory in current thread, and free on global thread.
     // Now, wo have not idea to clear global memory by used spark thread 
tracker.
     // So we manually correct the memory usage.
-    if (!isRemoteStorage)
+    if (isRemoteStorage && insert_without_local_storage)
         return;
 
     auto disk = storage->getStoragePolicy()->getAnyDisk();
@@ -219,7 +223,7 @@ void SparkMergeTreeWriter::saveMetadata()
 
 void SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
 {
-    if (!isRemoteStorage)
+    if (!useLocalStorage())
         return;
 
     LOG_DEBUG(
@@ -289,8 +293,8 @@ void SparkMergeTreeWriter::finalizeMerge()
                 {
                     for (const auto & disk : storage->getDisks())
                     {
-                        auto full_path = storage->getFullPathOnDisk(disk);
-                        disk->removeRecursive(full_path + "/" + tmp_part);
+                        auto rel_path = storage->getRelativeDataPath() + "/" + 
tmp_part;
+                        disk->removeRecursive(rel_path);
                     }
                 });
         }
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
index 2b07521ed..13ac22394 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
@@ -79,6 +79,7 @@ private:
     void finalizeMerge();
     bool chunkToPart(Chunk && chunk);
     bool blockToPart(Block & block);
+    bool useLocalStorage() const;
 
     CustomStorageMergeTreePtr storage = nullptr;
     CustomStorageMergeTreePtr dest_storage = nullptr;
@@ -97,6 +98,7 @@ private:
     std::unordered_set<String> tmp_parts;
     DB::Block header;
     bool merge_after_insert;
+    bool insert_without_local_storage;
     FreeThreadPool thread_pool;
     size_t merge_min_size = 1024 * 1024 * 1024;
     size_t merge_limit_parts = 10;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to