This is an automated email from the ASF dual-hosted git repository.

liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new c9a6648f8 [GLUTEN-6078][CH] Enable mergetree hdfs suite (#6080)
c9a6648f8 is described below

commit c9a6648f8887bd97af8f547d1091324bd56618b3
Author: Shuai li <[email protected]>
AuthorDate: Wed Jun 19 10:18:46 2024 +0800

    [GLUTEN-6078][CH] Enable mergetree hdfs suite (#6080)
    
    What changes were proposed in this pull request?
    (Fixes: #6078)
    
    How was this patch tested?
    Test by ut
---
 ...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 10 ++++----
 cpp-ch/clickhouse.version                          |  2 +-
 cpp-ch/local-engine/Common/CHUtil.cpp              |  8 +++---
 .../Disks/ObjectStorages/GlutenDiskHDFS.cpp        | 12 +++------
 .../Disks/ObjectStorages/GlutenDiskHDFS.h          | 29 ++++++++++++++++------
 .../ObjectStorages/GlutenHDFSObjectStorage.cpp     |  2 +-
 .../Disks/ObjectStorages/GlutenHDFSObjectStorage.h |  4 +--
 .../registerGlutenDiskObjectStorage.cpp            |  2 +-
 cpp-ch/local-engine/Disks/registerGlutenDisks.cpp  | 17 ++++++++++---
 9 files changed, 54 insertions(+), 32 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index ca5b39fff..56b8f056b 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -74,7 +74,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
     FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
   }
 
-  ignore("test mergetree table write") {
+  test("test mergetree table write") {
     spark.sql(s"""
                  |DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
                  |""".stripMargin)
@@ -157,7 +157,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
     spark.sql("drop table lineitem_mergetree_hdfs")
   }
 
-  ignore("test mergetree write with orderby keys / primary keys") {
+  test("test mergetree write with orderby keys / primary keys") {
     spark.sql(s"""
                  |DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_hdfs;
                  |""".stripMargin)
@@ -254,7 +254,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
     spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
   }
 
-  ignore("test mergetree write with partition") {
+  test("test mergetree write with partition") {
     spark.sql(s"""
                  |DROP TABLE IF EXISTS lineitem_mergetree_partition_hdfs;
                  |""".stripMargin)
@@ -435,7 +435,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
     spark.sql("drop table lineitem_mergetree_partition_hdfs")
   }
 
-  ignore("test mergetree write with bucket table") {
+  test("test mergetree write with bucket table") {
     spark.sql(s"""
                  |DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs;
                  |""".stripMargin)
@@ -537,7 +537,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
     spark.sql("drop table lineitem_mergetree_bucket_hdfs")
   }
 
-  ignore("test mergetree write with the path based") {
+  test("test mergetree write with the path based") {
     val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
 
     val sourceDF = spark.sql(s"""
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index e374d3f5f..2bbb29453 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
 CH_ORG=Kyligence
 CH_BRANCH=rebase_ch/20240616
-CH_COMMIT=e0e4b947245
+CH_COMMIT=803ee50cdb9fd56a5d77c710da1cbd071a74d1da
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp 
b/cpp-ch/local-engine/Common/CHUtil.cpp
index a4634c3f3..937beae99 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -765,6 +765,11 @@ void 
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
         // We must set the application type to CLIENT to avoid 
ServerUUID::get() throw exception
         global_context->setApplicationType(Context::ApplicationType::CLIENT);
     }
+    else
+    {
+        // just for ut
+        global_context->updateStorageConfiguration(*config);
+    }
 }
 
 void 
BackendInitializerUtil::applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr
 config, DB::Settings & settings)
@@ -801,10 +806,7 @@ void registerAllFunctions()
 void registerGlutenDisks()
 {
     registerDisks(true);
-
-#if USE_AWS_S3
     registerGlutenDisks(true);
-#endif
 }
 
 void BackendInitializerUtil::registerAllFactories()
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index cdbe6c728..07a7aa6bd 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -32,16 +32,11 @@ void GlutenDiskHDFS::createDirectory(const String & path)
     hdfsCreateDirectory(hdfs_object_storage->getHDFSFS(), path.c_str());
 }
 
-String GlutenDiskHDFS::path2AbsPath(const String & path)
-{
-    return getObjectStorage()->generateObjectKeyForPath(path).serialize();
-}
-
 void GlutenDiskHDFS::createDirectories(const String & path)
 {
     DiskObjectStorage::createDirectories(path);
-    auto* hdfs = hdfs_object_storage->getHDFSFS();
-    fs::path p = path;
+    auto * hdfs = hdfs_object_storage->getHDFSFS();
+    fs::path p = "/" + path;
     std::vector<std::string> paths_created;
     while (hdfsExists(hdfs, p.c_str()) < 0)
     {
@@ -69,7 +64,8 @@ DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
         getMetadataStorage(),
         getObjectStorage(),
         SerializedPlanParser::global_context->getConfigRef(),
-        config_prefix);
+        config_prefix,
+        object_storage_creator);
 }
 
 std::unique_ptr<DB::WriteBufferFromFileBase> GlutenDiskHDFS::writeFile(
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
index 4e375b283..222b9f892 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -37,12 +37,15 @@ public:
         DB::MetadataStoragePtr metadata_storage_,
         DB::ObjectStoragePtr object_storage_,
         const Poco::Util::AbstractConfiguration & config,
-        const String & config_prefix)
+        const String & config_prefix,
+        std::function<DB::ObjectStoragePtr(
+            const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr 
context)> _object_storage_creator)
         : DiskObjectStorage(name_, object_key_prefix_, metadata_storage_, 
object_storage_, config, config_prefix)
+        , object_key_prefix(object_key_prefix_)
+        , hdfs_config_prefix(config_prefix)
+        , object_storage_creator(_object_storage_creator)
     {
-        chassert(dynamic_cast<local_engine::GlutenHDFSObjectStorage 
*>(object_storage_.get()) != nullptr);
-        object_key_prefix = object_key_prefix_;
-        hdfs_object_storage = 
dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get());
+        hdfs_object_storage = 
typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(object_storage_);
         hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/");
         auto max_speed = config.getUInt(config_prefix + ".write_speed", 450);
         throttler = std::make_shared<DB::Throttler>(max_speed);
@@ -59,12 +62,24 @@ public:
     std::unique_ptr<DB::WriteBufferFromFileBase> writeFile(const String& path, 
size_t buf_size, DB::WriteMode mode,
         const DB::WriteSettings& settings) override;
 
+    void applyNewSettings(
+        const Poco::Util::AbstractConfiguration & config,
+        DB::ContextPtr context,
+        const String & config_prefix,
+        const DB::DisksMap & map) override
+    {
+        DB::ObjectStoragePtr tmp = object_storage_creator(config, context);
+        hdfs_object_storage = 
typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(tmp);
+        object_storage = hdfs_object_storage;
+    }
 private:
-    String path2AbsPath(const String & path);
-
-    GlutenHDFSObjectStorage * hdfs_object_storage;
+    std::shared_ptr<GlutenHDFSObjectStorage> hdfs_object_storage;
     String object_key_prefix;
     DB::ThrottlerPtr throttler;
+    const String hdfs_config_prefix;
+    std::function<DB::ObjectStoragePtr(
+        const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr 
context)>
+        object_storage_creator;
 };
 #endif
 }
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
index 60b82ec84..cab87d66d 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
@@ -38,7 +38,7 @@ DB::ObjectStorageKey 
local_engine::GlutenHDFSObjectStorage::generateObjectKeyFor
     initializeHDFSFS();
     /// what ever data_source_description.description value is, consider that 
key as relative key
     chassert(data_directory.starts_with("/"));
-    return ObjectStorageKey::createAsRelative(fs::path(url_without_path) / 
data_directory.substr(1) / path);
+    return ObjectStorageKey::createAsRelative(fs::path(url_without_path) / 
data_directory.substr(1), path);
 }
 }
 #endif
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
index a532c98cb..da37e1d78 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
@@ -33,7 +33,7 @@ public:
             const String & hdfs_root_path_,
             SettingsPtr settings_,
             const Poco::Util::AbstractConfiguration & config_)
-        : HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /* 
lazy_initialize */true), config(config_)
+        : HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /* 
lazy_initialize */false)
     {
     }
     std::unique_ptr<DB::ReadBufferFromFileBase> readObject( /// NOLINT
@@ -43,8 +43,6 @@ public:
       std::optional<size_t> file_size = {}) const override;
     DB::ObjectStorageKey generateObjectKeyForPath(const std::string & path) 
const override;
     hdfsFS getHDFSFS() const { return hdfs_fs.get(); }
-private:
-    const Poco::Util::AbstractConfiguration & config;
 };
 #endif
 
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
index c080e0525..9e4546498 100644
--- 
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
@@ -121,7 +121,7 @@ void registerGlutenHDFSObjectStorage(ObjectStorageFactory & 
factory)
                 config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 
1024),
                 context->getSettingsRef().hdfs_replication
             );
-            return std::make_unique<GlutenHDFSObjectStorage>(uri, 
std::move(settings), config);
+            return std::make_shared<GlutenHDFSObjectStorage>(uri, 
std::move(settings), config);
         });
 }
 #endif
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp 
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
index c7e9c5fd3..8a920edcc 100644
--- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -40,6 +40,8 @@ void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory 
& factory);
 void registerGlutenDisks(bool global_skip_access_check)
 {
     auto & factory = DB::DiskFactory::instance();
+
+#if USE_AWS_S3
     auto creator = [global_skip_access_check](
                        const String & name,
                        const Poco::Util::AbstractConfiguration & config,
@@ -66,7 +68,7 @@ void registerGlutenDisks(bool global_skip_access_check)
     };
 
     auto & object_factory = DB::ObjectStorageFactory::instance();
-#if USE_AWS_S3
+
     registerGlutenS3ObjectStorage(object_factory);
     factory.registerDiskType("s3_gluten", creator); /// For compatibility
 #endif
@@ -82,11 +84,20 @@ void registerGlutenDisks(bool global_skip_access_check)
                             bool) -> DB::DiskPtr
     {
         bool skip_access_check = global_skip_access_check || 
config.getBool(config_prefix + ".skip_access_check", false);
-        auto object_storage = 
DB::ObjectStorageFactory::instance().create(name, config, config_prefix, 
context, skip_access_check);
+        auto object_storage_creator = [name, skip_access_check, config_prefix](
+                                          const 
Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) -> 
DB::ObjectStoragePtr
+        { return DB::ObjectStorageFactory::instance().create(name, conf, 
config_prefix, ctx, skip_access_check); };
+        auto object_storage = object_storage_creator(config, context);
         auto metadata_storage = 
DB::MetadataStorageFactory::instance().create(name, config, config_prefix, 
object_storage, "local");
 
         DB::DiskObjectStoragePtr disk = 
std::make_shared<local_engine::GlutenDiskHDFS>(
-            name, object_storage->getCommonKeyPrefix(), 
std::move(metadata_storage), std::move(object_storage), config, config_prefix);
+            name,
+            object_storage->getCommonKeyPrefix(),
+            std::move(metadata_storage),
+            std::move(object_storage),
+            config,
+            config_prefix,
+            object_storage_creator);
 
         disk->startup(context, skip_access_check);
         return disk;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to