This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c9a6648f8 [GLUTEN-6078][CH] Enable mergetree hdfs suite (#6080)
c9a6648f8 is described below
commit c9a6648f8887bd97af8f547d1091324bd56618b3
Author: Shuai li <[email protected]>
AuthorDate: Wed Jun 19 10:18:46 2024 +0800
[GLUTEN-6078][CH] Enable mergetree hdfs suite (#6080)
What changes were proposed in this pull request?
(Fixes: #6078)
How was this patch tested?
Test by ut
---
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 10 ++++----
cpp-ch/clickhouse.version | 2 +-
cpp-ch/local-engine/Common/CHUtil.cpp | 8 +++---
.../Disks/ObjectStorages/GlutenDiskHDFS.cpp | 12 +++------
.../Disks/ObjectStorages/GlutenDiskHDFS.h | 29 ++++++++++++++++------
.../ObjectStorages/GlutenHDFSObjectStorage.cpp | 2 +-
.../Disks/ObjectStorages/GlutenHDFSObjectStorage.h | 4 +--
.../registerGlutenDiskObjectStorage.cpp | 2 +-
cpp-ch/local-engine/Disks/registerGlutenDisks.cpp | 17 ++++++++++---
9 files changed, 54 insertions(+), 32 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index ca5b39fff..56b8f056b 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -74,7 +74,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
FileUtils.deleteDirectory(new File(HDFS_METADATA_PATH))
}
- ignore("test mergetree table write") {
+ test("test mergetree table write") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_hdfs;
|""".stripMargin)
@@ -157,7 +157,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_hdfs")
}
- ignore("test mergetree write with orderby keys / primary keys") {
+ test("test mergetree write with orderby keys / primary keys") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_hdfs;
|""".stripMargin)
@@ -254,7 +254,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_orderbykey_hdfs")
}
- ignore("test mergetree write with partition") {
+ test("test mergetree write with partition") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_partition_hdfs;
|""".stripMargin)
@@ -435,7 +435,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_partition_hdfs")
}
- ignore("test mergetree write with bucket table") {
+ test("test mergetree write with bucket table") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_bucket_hdfs;
|""".stripMargin)
@@ -537,7 +537,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
spark.sql("drop table lineitem_mergetree_bucket_hdfs")
}
- ignore("test mergetree write with the path based") {
+ test("test mergetree write with the path based") {
val dataPath = s"$HDFS_URL/test/lineitem_mergetree_bucket_hdfs"
val sourceDF = spark.sql(s"""
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index e374d3f5f..2bbb29453 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,3 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240616
-CH_COMMIT=e0e4b947245
+CH_COMMIT=803ee50cdb9fd56a5d77c710da1cbd071a74d1da
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index a4634c3f3..937beae99 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -765,6 +765,11 @@ void
BackendInitializerUtil::initContexts(DB::Context::ConfigurationPtr config)
// We must set the application type to CLIENT to avoid
ServerUUID::get() throw exception
global_context->setApplicationType(Context::ApplicationType::CLIENT);
}
+ else
+ {
+ // just for ut
+ global_context->updateStorageConfiguration(*config);
+ }
}
void
BackendInitializerUtil::applyGlobalConfigAndSettings(DB::Context::ConfigurationPtr
config, DB::Settings & settings)
@@ -801,10 +806,7 @@ void registerAllFunctions()
void registerGlutenDisks()
{
registerDisks(true);
-
-#if USE_AWS_S3
registerGlutenDisks(true);
-#endif
}
void BackendInitializerUtil::registerAllFactories()
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index cdbe6c728..07a7aa6bd 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -32,16 +32,11 @@ void GlutenDiskHDFS::createDirectory(const String & path)
hdfsCreateDirectory(hdfs_object_storage->getHDFSFS(), path.c_str());
}
-String GlutenDiskHDFS::path2AbsPath(const String & path)
-{
- return getObjectStorage()->generateObjectKeyForPath(path).serialize();
-}
-
void GlutenDiskHDFS::createDirectories(const String & path)
{
DiskObjectStorage::createDirectories(path);
- auto* hdfs = hdfs_object_storage->getHDFSFS();
- fs::path p = path;
+ auto * hdfs = hdfs_object_storage->getHDFSFS();
+ fs::path p = "/" + path;
std::vector<std::string> paths_created;
while (hdfsExists(hdfs, p.c_str()) < 0)
{
@@ -69,7 +64,8 @@ DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
getMetadataStorage(),
getObjectStorage(),
SerializedPlanParser::global_context->getConfigRef(),
- config_prefix);
+ config_prefix,
+ object_storage_creator);
}
std::unique_ptr<DB::WriteBufferFromFileBase> GlutenDiskHDFS::writeFile(
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
index 4e375b283..222b9f892 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -37,12 +37,15 @@ public:
DB::MetadataStoragePtr metadata_storage_,
DB::ObjectStoragePtr object_storage_,
const Poco::Util::AbstractConfiguration & config,
- const String & config_prefix)
+ const String & config_prefix,
+ std::function<DB::ObjectStoragePtr(
+ const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr
context)> _object_storage_creator)
: DiskObjectStorage(name_, object_key_prefix_, metadata_storage_,
object_storage_, config, config_prefix)
+ , object_key_prefix(object_key_prefix_)
+ , hdfs_config_prefix(config_prefix)
+ , object_storage_creator(_object_storage_creator)
{
- chassert(dynamic_cast<local_engine::GlutenHDFSObjectStorage
*>(object_storage_.get()) != nullptr);
- object_key_prefix = object_key_prefix_;
- hdfs_object_storage =
dynamic_cast<local_engine::GlutenHDFSObjectStorage *>(object_storage_.get());
+ hdfs_object_storage =
typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(object_storage_);
hdfsSetWorkingDirectory(hdfs_object_storage->getHDFSFS(), "/");
auto max_speed = config.getUInt(config_prefix + ".write_speed", 450);
throttler = std::make_shared<DB::Throttler>(max_speed);
@@ -59,12 +62,24 @@ public:
std::unique_ptr<DB::WriteBufferFromFileBase> writeFile(const String& path,
size_t buf_size, DB::WriteMode mode,
const DB::WriteSettings& settings) override;
+ void applyNewSettings(
+ const Poco::Util::AbstractConfiguration & config,
+ DB::ContextPtr context,
+ const String & config_prefix,
+ const DB::DisksMap & map) override
+ {
+ DB::ObjectStoragePtr tmp = object_storage_creator(config, context);
+ hdfs_object_storage =
typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(tmp);
+ object_storage = hdfs_object_storage;
+ }
private:
- String path2AbsPath(const String & path);
-
- GlutenHDFSObjectStorage * hdfs_object_storage;
+ std::shared_ptr<GlutenHDFSObjectStorage> hdfs_object_storage;
String object_key_prefix;
DB::ThrottlerPtr throttler;
+ const String hdfs_config_prefix;
+ std::function<DB::ObjectStoragePtr(
+ const Poco::Util::AbstractConfiguration & conf, DB::ContextPtr
context)>
+ object_storage_creator;
};
#endif
}
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
index 60b82ec84..cab87d66d 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.cpp
@@ -38,7 +38,7 @@ DB::ObjectStorageKey
local_engine::GlutenHDFSObjectStorage::generateObjectKeyFor
initializeHDFSFS();
/// what ever data_source_description.description value is, consider that
key as relative key
chassert(data_directory.starts_with("/"));
- return ObjectStorageKey::createAsRelative(fs::path(url_without_path) /
data_directory.substr(1) / path);
+ return ObjectStorageKey::createAsRelative(fs::path(url_without_path) /
data_directory.substr(1), path);
}
}
#endif
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
index a532c98cb..da37e1d78 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenHDFSObjectStorage.h
@@ -33,7 +33,7 @@ public:
const String & hdfs_root_path_,
SettingsPtr settings_,
const Poco::Util::AbstractConfiguration & config_)
- : HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /*
lazy_initialize */true), config(config_)
+ : HDFSObjectStorage(hdfs_root_path_, std::move(settings_), config_, /*
lazy_initialize */false)
{
}
std::unique_ptr<DB::ReadBufferFromFileBase> readObject( /// NOLINT
@@ -43,8 +43,6 @@ public:
std::optional<size_t> file_size = {}) const override;
DB::ObjectStorageKey generateObjectKeyForPath(const std::string & path)
const override;
hdfsFS getHDFSFS() const { return hdfs_fs.get(); }
-private:
- const Poco::Util::AbstractConfiguration & config;
};
#endif
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
index c080e0525..9e4546498 100644
---
a/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/registerGlutenDiskObjectStorage.cpp
@@ -121,7 +121,7 @@ void registerGlutenHDFSObjectStorage(ObjectStorageFactory &
factory)
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 *
1024),
context->getSettingsRef().hdfs_replication
);
- return std::make_unique<GlutenHDFSObjectStorage>(uri,
std::move(settings), config);
+ return std::make_shared<GlutenHDFSObjectStorage>(uri,
std::move(settings), config);
});
}
#endif
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
index c7e9c5fd3..8a920edcc 100644
--- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -40,6 +40,8 @@ void registerGlutenHDFSObjectStorage(DB::ObjectStorageFactory
& factory);
void registerGlutenDisks(bool global_skip_access_check)
{
auto & factory = DB::DiskFactory::instance();
+
+#if USE_AWS_S3
auto creator = [global_skip_access_check](
const String & name,
const Poco::Util::AbstractConfiguration & config,
@@ -66,7 +68,7 @@ void registerGlutenDisks(bool global_skip_access_check)
};
auto & object_factory = DB::ObjectStorageFactory::instance();
-#if USE_AWS_S3
+
registerGlutenS3ObjectStorage(object_factory);
factory.registerDiskType("s3_gluten", creator); /// For compatibility
#endif
@@ -82,11 +84,20 @@ void registerGlutenDisks(bool global_skip_access_check)
bool) -> DB::DiskPtr
{
bool skip_access_check = global_skip_access_check ||
config.getBool(config_prefix + ".skip_access_check", false);
- auto object_storage =
DB::ObjectStorageFactory::instance().create(name, config, config_prefix,
context, skip_access_check);
+ auto object_storage_creator = [name, skip_access_check, config_prefix](
+ const
Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) ->
DB::ObjectStoragePtr
+ { return DB::ObjectStorageFactory::instance().create(name, conf,
config_prefix, ctx, skip_access_check); };
+ auto object_storage = object_storage_creator(config, context);
auto metadata_storage =
DB::MetadataStorageFactory::instance().create(name, config, config_prefix,
object_storage, "local");
DB::DiskObjectStoragePtr disk =
std::make_shared<local_engine::GlutenDiskHDFS>(
- name, object_storage->getCommonKeyPrefix(),
std::move(metadata_storage), std::move(object_storage), config, config_prefix);
+ name,
+ object_storage->getCommonKeyPrefix(),
+ std::move(metadata_storage),
+ std::move(object_storage),
+ config,
+ config_prefix,
+ object_storage_creator);
disk->startup(context, skip_access_check);
return disk;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]