This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0ed44a428 [GLUTEN-6590] Support compact mergetree file on s3 (#6591)
0ed44a428 is described below

commit 0ed44a4289f2c725aa18565083c21d847a94029c
Author: Wenzheng Liu <[email protected]>
AuthorDate: Wed Jul 31 23:45:26 2024 +0800

    [GLUTEN-6590] Support compact mergetree file on s3 (#6591)
---
 .../GlutenClickHouseMergeTreeWriteOnS3Suite.scala  | 47 ++++++++++++++--
 cpp-ch/local-engine/Common/CHUtil.cpp              |  4 +-
 .../CompactObjectStorageDiskTransaction.cpp        |  1 +
 .../Disks/ObjectStorages/GlutenDiskS3.cpp          | 62 ++++++++++++++++++++++
 .../Disks/ObjectStorages/GlutenDiskS3.h            | 57 ++++++++++++++++++++
 cpp-ch/local-engine/Disks/registerGlutenDisks.cpp  | 12 ++++-
 .../Storages/Mergetree/SparkMergeTreeWriter.cpp    | 40 ++++----------
 7 files changed, 185 insertions(+), 38 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 30f443265..c95b78858 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -30,6 +30,8 @@ import io.minio.messages.DeleteObject
 import java.io.File
 import java.util
 
+import scala.concurrent.duration.DurationInt
+
 // Some sqls' line length exceeds 100
 // scalastyle:off line.size.limit
 
@@ -43,6 +45,12 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
   override protected val tpchQueries: String = rootPath + 
"queries/tpch-queries-ch"
   override protected val queriesResults: String = rootPath + 
"mergetree-queries-output"
 
+  private val client = MinioClient
+    .builder()
+    .endpoint(MINIO_ENDPOINT)
+    .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
+    .build()
+
   override protected def createTPCHNotNullTables(): Unit = {
     createNotNullTPCHTablesInParquet(tablesPath)
   }
@@ -60,11 +68,6 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
 
   override protected def beforeEach(): Unit = {
     super.beforeEach()
-    val client = MinioClient
-      .builder()
-      .endpoint(MINIO_ENDPOINT)
-      .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
-      .build()
     if 
(client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) {
       val results =
         
client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build())
@@ -168,9 +171,42 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
         assertResult(1)(addFiles.size)
         assertResult(600572)(addFiles.head.rows)
     }
+    eventually(timeout(10.seconds), interval(2.seconds)) {
+      verifyS3CompactFileExist("lineitem_mergetree_s3")
+    }
     spark.sql("drop table lineitem_mergetree_s3") // clean up
   }
 
+  private def verifyS3CompactFileExist(table: String): Unit = {
+    val args = ListObjectsArgs
+      .builder()
+      .bucket(BUCKET_NAME)
+      .recursive(true)
+      .prefix(table)
+      .build()
+    var objectCount: Int = 0
+    var metadataGlutenExist: Boolean = false
+    var metadataBinExist: Boolean = false
+    var dataBinExist: Boolean = false
+    client
+      .listObjects(args)
+      .forEach(
+        obj => {
+          objectCount += 1
+          if (obj.get().objectName().contains("metadata.gluten")) {
+            metadataGlutenExist = true
+          } else if (obj.get().objectName().contains("meta.bin")) {
+            metadataBinExist = true
+          } else if (obj.get().objectName().contains("data.bin")) {
+            dataBinExist = true
+          }
+        })
+    assertResult(5)(objectCount)
+    assert(metadataGlutenExist)
+    assert(metadataBinExist)
+    assert(dataBinExist)
+  }
+
   test("test mergetree write with orderby keys / primary keys") {
     spark.sql(s"""
                  |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_s3;
@@ -635,6 +671,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
 
     withSQLConf(
       "spark.databricks.delta.optimize.minFileSize" -> "200000000",
+      
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage"
 -> "true",
       
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
 -> "true"
     ) {
       spark.sql(s"""
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index b6867f656..e642a67a5 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -569,7 +569,7 @@ std::vector<String> 
BackendInitializerUtil::wrapDiskPathConfig(
     if (path_prefix.empty() && path_suffix.empty())
         return changed_paths;
     Poco::Util::AbstractConfiguration::Keys disks;
-    std::unordered_set<String> disk_types = {"s3", "hdfs_gluten", "cache"};
+    std::unordered_set<String> disk_types = {"s3_gluten", "hdfs_gluten", 
"cache"};
     config.keys("storage_configuration.disks", disks);
 
     std::ranges::for_each(
@@ -590,7 +590,7 @@ std::vector<String> 
BackendInitializerUtil::wrapDiskPathConfig(
                     changed_paths.emplace_back(final_path);
                 }
             }
-            else if (disk_type == "s3" || disk_type == "hdfs_gluten")
+            else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten")
             {
                 String metadata_path = config.getString(disk_prefix + 
".metadata_path", "");
                 if (!metadata_path.empty())
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
index 7a3ba4bed..66c447010 100644
--- 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
@@ -94,6 +94,7 @@ void CompactObjectStorageDiskTransaction::commit()
         whole_meta.addObject(key, 0, offset);
         metadata_tx->writeStringToFile(local_path, 
whole_meta.serializeToString());
         out.sync();
+        out.finalize();
     };
 
     merge_files(files | std::ranges::views::filter([](auto file) { return 
!isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path);
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
new file mode 100644
index 000000000..4a73c5a49
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+
+#include "GlutenDiskS3.h"
+#include <Disks/ObjectStorages/DiskObjectStorage.h>
+#include <Parser/SerializedPlanParser.h>
+#include "CompactObjectStorageDiskTransaction.h"
+
+#if USE_AWS_S3
+namespace local_engine
+{
+
+    DB::DiskTransactionPtr GlutenDiskS3::createTransaction()
+    {
+        return std::make_shared<CompactObjectStorageDiskTransaction>(*this, 
SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
+    }
+
+    std::unique_ptr<ReadBufferFromFileBase> GlutenDiskS3::readFile(
+        const String & path,
+        const ReadSettings & settings,
+        std::optional<size_t> read_hint,
+        std::optional<size_t> file_size) const
+    {
+        ReadSettings copy_settings = settings;
+        // Threadpool read is not supported for s3 compact version currently
+        copy_settings.remote_fs_method = RemoteFSReadMethod::read;
+        return DiskObjectStorage::readFile(path, copy_settings, read_hint, 
file_size);
+    }
+
+    DiskObjectStoragePtr GlutenDiskS3::createDiskObjectStorage()
+    {
+        const auto config_prefix = "storage_configuration.disks." + name;
+        return std::make_shared<GlutenDiskS3>(
+            getName(),
+            object_key_prefix,
+            getMetadataStorage(),
+            getObjectStorage(),
+            SerializedPlanParser::global_context->getConfigRef(),
+            config_prefix,
+            object_storage_creator);
+    }
+
+}
+
+#endif
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
new file mode 100644
index 000000000..4f0d7a029
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+
+#include <Disks/ObjectStorages/DiskObjectStorage.h>
+#include <Parser/SerializedPlanParser.h>
+#include "CompactObjectStorageDiskTransaction.h"
+
+#if USE_AWS_S3
+namespace local_engine
+{
+class GlutenDiskS3 : public DB::DiskObjectStorage
+{
+public:
+    GlutenDiskS3(
+        const String & name_,
+        const String & object_key_prefix_,
+        DB::MetadataStoragePtr metadata_storage_,
+        DB::ObjectStoragePtr object_storage_,
+        const Poco::Util::AbstractConfiguration & config,
+        const String & config_prefix,
+        std::function<DB::ObjectStoragePtr(const 
Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)> creator)
+        : DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, 
object_storage_, config, config_prefix),
+        object_storage_creator(creator) {}
+
+    DB::DiskTransactionPtr createTransaction() override;
+
+    std::unique_ptr<ReadBufferFromFileBase> readFile(
+        const String & path,
+        const ReadSettings & settings,
+        std::optional<size_t> read_hint,
+        std::optional<size_t> file_size) const override;
+
+    DiskObjectStoragePtr createDiskObjectStorage() override;
+
+private:
+    std::function<DB::ObjectStoragePtr(const Poco::Util::AbstractConfiguration 
& conf, DB::ContextPtr context)> object_storage_creator;
+};
+}
+
+#endif
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp 
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
index 8a920edcc..52398b5f2 100644
--- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -25,6 +25,10 @@
 #include <Disks/ObjectStorages/GlutenDiskHDFS.h>
 #endif
 
+#if USE_AWS_S3
+#include <Disks/ObjectStorages/GlutenDiskS3.h>
+#endif
+
 #include "registerGlutenDisks.h"
 
 namespace local_engine
@@ -52,16 +56,20 @@ void registerGlutenDisks(bool global_skip_access_check)
                        bool) -> DB::DiskPtr
     {
         bool skip_access_check = global_skip_access_check || 
config.getBool(config_prefix + ".skip_access_check", false);
+        auto object_storage_creator = [name, skip_access_check, config_prefix](
+                                          const 
Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) -> 
DB::ObjectStoragePtr
+        { return DB::ObjectStorageFactory::instance().create(name, conf, 
config_prefix, ctx, skip_access_check); };
         auto object_storage = 
DB::ObjectStorageFactory::instance().create(name, config, config_prefix, 
context, skip_access_check);
         auto metadata_storage = 
DB::MetadataStorageFactory::instance().create(name, config, config_prefix, 
object_storage, "local");
 
-        DB::DiskObjectStoragePtr disk = 
std::make_shared<DB::DiskObjectStorage>(
+        DB::DiskObjectStoragePtr disk = 
std::make_shared<local_engine::GlutenDiskS3>(
             name,
             object_storage->getCommonKeyPrefix(),
             std::move(metadata_storage),
             std::move(object_storage),
             config,
-            config_prefix);
+            config_prefix,
+            object_storage_creator);
 
         disk->startup(context, skip_access_check);
         return disk;
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index caee87cb9..6fee65efe 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -202,42 +202,24 @@ void 
SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
     auto read_settings = context->getReadSettings();
     auto write_settings = context->getWriteSettings();
     Stopwatch watch;
-
-    // Temporary support for S3
-    bool s3_disk = 
dest_storage->getStoragePolicy()->getAnyDisk()->getName().contains("s3");
     for (const auto & merge_tree_data_part : new_parts.unsafeGet())
     {
         String local_relative_path = storage->getRelativeDataPath() + "/" + 
merge_tree_data_part->name;
         String remote_relative_path = dest_storage->getRelativeDataPath() + 
"/" + merge_tree_data_part->name;
 
-        if (s3_disk)
+        std::vector<String> files;
+        
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, 
files);
+        auto src_disk = storage->getStoragePolicy()->getAnyDisk();
+        auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
+        auto tx = dest_disk->createTransaction();
+        for (const auto & file : files)
         {
-            storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
-            local_relative_path,
-            dest_storage->getStoragePolicy()->getAnyDisk(),
-            remote_relative_path,
-            read_settings,
-            write_settings,
-            nullptr);
+            auto read_buffer = src_disk->readFile(local_relative_path + "/" + 
file, read_settings);
+            auto write_buffer = tx->writeFile(remote_relative_path + "/" + 
file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
+            copyData(*read_buffer, *write_buffer);
+            write_buffer->finalize();
         }
-        else
-        {
-            std::vector<String> files;
-            
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, 
files);
-            auto src_disk = storage->getStoragePolicy()->getAnyDisk();
-            auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
-            auto tx = dest_disk->createTransaction();
-            for (const auto & file : files)
-            {
-                auto read_buffer = src_disk->readFile(local_relative_path + 
"/" + file, read_settings);
-                auto write_buffer = tx->writeFile(remote_relative_path + "/" + 
file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
-                copyData(*read_buffer, *write_buffer);
-                write_buffer->finalize();
-            }
-            tx->commit();
-        }
-
-
+        tx->commit();
         LOG_DEBUG(
             &Poco::Logger::get("SparkMergeTreeWriter"),
             "Upload part {} to disk {} success.",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to