This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0ed44a428 [GLUTEN-6590] Support compact mergetree file on s3 (#6591)
0ed44a428 is described below
commit 0ed44a4289f2c725aa18565083c21d847a94029c
Author: Wenzheng Liu <[email protected]>
AuthorDate: Wed Jul 31 23:45:26 2024 +0800
[GLUTEN-6590] Support compact mergetree file on s3 (#6591)
---
.../GlutenClickHouseMergeTreeWriteOnS3Suite.scala | 47 ++++++++++++++--
cpp-ch/local-engine/Common/CHUtil.cpp | 4 +-
.../CompactObjectStorageDiskTransaction.cpp | 1 +
.../Disks/ObjectStorages/GlutenDiskS3.cpp | 62 ++++++++++++++++++++++
.../Disks/ObjectStorages/GlutenDiskS3.h | 57 ++++++++++++++++++++
cpp-ch/local-engine/Disks/registerGlutenDisks.cpp | 12 ++++-
.../Storages/Mergetree/SparkMergeTreeWriter.cpp | 40 ++++----------
7 files changed, 185 insertions(+), 38 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
index 30f443265..c95b78858 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnS3Suite.scala
@@ -30,6 +30,8 @@ import io.minio.messages.DeleteObject
import java.io.File
import java.util
+import scala.concurrent.duration.DurationInt
+
// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit
@@ -43,6 +45,12 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
override protected val tpchQueries: String = rootPath +
"queries/tpch-queries-ch"
override protected val queriesResults: String = rootPath +
"mergetree-queries-output"
+ private val client = MinioClient
+ .builder()
+ .endpoint(MINIO_ENDPOINT)
+ .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
+ .build()
+
override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
}
@@ -60,11 +68,6 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
override protected def beforeEach(): Unit = {
super.beforeEach()
- val client = MinioClient
- .builder()
- .endpoint(MINIO_ENDPOINT)
- .credentials(S3_ACCESS_KEY, S3_SECRET_KEY)
- .build()
if
(client.bucketExists(BucketExistsArgs.builder().bucket(BUCKET_NAME).build())) {
val results =
client.listObjects(ListObjectsArgs.builder().bucket(BUCKET_NAME).recursive(true).build())
@@ -168,9 +171,42 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
assertResult(1)(addFiles.size)
assertResult(600572)(addFiles.head.rows)
}
+ eventually(timeout(10.seconds), interval(2.seconds)) {
+ verifyS3CompactFileExist("lineitem_mergetree_s3")
+ }
spark.sql("drop table lineitem_mergetree_s3") // clean up
}
+ private def verifyS3CompactFileExist(table: String): Unit = {
+ val args = ListObjectsArgs
+ .builder()
+ .bucket(BUCKET_NAME)
+ .recursive(true)
+ .prefix(table)
+ .build()
+ var objectCount: Int = 0
+ var metadataGlutenExist: Boolean = false
+ var metadataBinExist: Boolean = false
+ var dataBinExist: Boolean = false
+ client
+ .listObjects(args)
+ .forEach(
+ obj => {
+ objectCount += 1
+ if (obj.get().objectName().contains("metadata.gluten")) {
+ metadataGlutenExist = true
+ } else if (obj.get().objectName().contains("meta.bin")) {
+ metadataBinExist = true
+ } else if (obj.get().objectName().contains("data.bin")) {
+ dataBinExist = true
+ }
+ })
+ assertResult(5)(objectCount)
+ assert(metadataGlutenExist)
+ assert(metadataBinExist)
+ assert(dataBinExist)
+ }
+
test("test mergetree write with orderby keys / primary keys") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree_orderbykey_s3;
@@ -635,6 +671,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
withSQLConf(
"spark.databricks.delta.optimize.minFileSize" -> "200000000",
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage"
-> "true",
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
-> "true"
) {
spark.sql(s"""
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index b6867f656..e642a67a5 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -569,7 +569,7 @@ std::vector<String>
BackendInitializerUtil::wrapDiskPathConfig(
if (path_prefix.empty() && path_suffix.empty())
return changed_paths;
Poco::Util::AbstractConfiguration::Keys disks;
- std::unordered_set<String> disk_types = {"s3", "hdfs_gluten", "cache"};
+ std::unordered_set<String> disk_types = {"s3_gluten", "hdfs_gluten",
"cache"};
config.keys("storage_configuration.disks", disks);
std::ranges::for_each(
@@ -590,7 +590,7 @@ std::vector<String>
BackendInitializerUtil::wrapDiskPathConfig(
changed_paths.emplace_back(final_path);
}
}
- else if (disk_type == "s3" || disk_type == "hdfs_gluten")
+ else if (disk_type == "s3_gluten" || disk_type == "hdfs_gluten")
{
String metadata_path = config.getString(disk_prefix +
".metadata_path", "");
if (!metadata_path.empty())
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
index 7a3ba4bed..66c447010 100644
---
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
@@ -94,6 +94,7 @@ void CompactObjectStorageDiskTransaction::commit()
whole_meta.addObject(key, 0, offset);
metadata_tx->writeStringToFile(local_path,
whole_meta.serializeToString());
out.sync();
+ out.finalize();
};
merge_files(files | std::ranges::views::filter([](auto file) { return
!isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path);
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
new file mode 100644
index 000000000..4a73c5a49
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.cpp
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+
+#include "GlutenDiskS3.h"
+#include <Disks/ObjectStorages/DiskObjectStorage.h>
+#include <Parser/SerializedPlanParser.h>
+#include "CompactObjectStorageDiskTransaction.h"
+
+#if USE_AWS_S3
+namespace local_engine
+{
+
+ DB::DiskTransactionPtr GlutenDiskS3::createTransaction()
+ {
+ return std::make_shared<CompactObjectStorageDiskTransaction>(*this,
SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
+ }
+
+ std::unique_ptr<ReadBufferFromFileBase> GlutenDiskS3::readFile(
+ const String & path,
+ const ReadSettings & settings,
+ std::optional<size_t> read_hint,
+ std::optional<size_t> file_size) const
+ {
+ ReadSettings copy_settings = settings;
+ // Threadpool read is not supported for s3 compact version currently
+ copy_settings.remote_fs_method = RemoteFSReadMethod::read;
+ return DiskObjectStorage::readFile(path, copy_settings, read_hint,
file_size);
+ }
+
+ DiskObjectStoragePtr GlutenDiskS3::createDiskObjectStorage()
+ {
+ const auto config_prefix = "storage_configuration.disks." + name;
+ return std::make_shared<GlutenDiskS3>(
+ getName(),
+ object_key_prefix,
+ getMetadataStorage(),
+ getObjectStorage(),
+ SerializedPlanParser::global_context->getConfigRef(),
+ config_prefix,
+ object_storage_creator);
+ }
+
+}
+
+#endif
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
new file mode 100644
index 000000000..4f0d7a029
--- /dev/null
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskS3.h
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+
+#include <Disks/ObjectStorages/DiskObjectStorage.h>
+#include <Parser/SerializedPlanParser.h>
+#include "CompactObjectStorageDiskTransaction.h"
+
+#if USE_AWS_S3
+namespace local_engine
+{
+class GlutenDiskS3 : public DB::DiskObjectStorage
+{
+public:
+ GlutenDiskS3(
+ const String & name_,
+ const String & object_key_prefix_,
+ DB::MetadataStoragePtr metadata_storage_,
+ DB::ObjectStoragePtr object_storage_,
+ const Poco::Util::AbstractConfiguration & config,
+ const String & config_prefix,
+ std::function<DB::ObjectStoragePtr(const
Poco::Util::AbstractConfiguration & conf, DB::ContextPtr context)> creator)
+ : DiskObjectStorage(name_, object_key_prefix_, metadata_storage_,
object_storage_, config, config_prefix),
+ object_storage_creator(creator) {}
+
+ DB::DiskTransactionPtr createTransaction() override;
+
+ std::unique_ptr<ReadBufferFromFileBase> readFile(
+ const String & path,
+ const ReadSettings & settings,
+ std::optional<size_t> read_hint,
+ std::optional<size_t> file_size) const override;
+
+ DiskObjectStoragePtr createDiskObjectStorage() override;
+
+private:
+ std::function<DB::ObjectStoragePtr(const Poco::Util::AbstractConfiguration
& conf, DB::ContextPtr context)> object_storage_creator;
+};
+}
+
+#endif
diff --git a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
index 8a920edcc..52398b5f2 100644
--- a/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
+++ b/cpp-ch/local-engine/Disks/registerGlutenDisks.cpp
@@ -25,6 +25,10 @@
#include <Disks/ObjectStorages/GlutenDiskHDFS.h>
#endif
+#if USE_AWS_S3
+#include <Disks/ObjectStorages/GlutenDiskS3.h>
+#endif
+
#include "registerGlutenDisks.h"
namespace local_engine
@@ -52,16 +56,20 @@ void registerGlutenDisks(bool global_skip_access_check)
bool) -> DB::DiskPtr
{
bool skip_access_check = global_skip_access_check ||
config.getBool(config_prefix + ".skip_access_check", false);
+ auto object_storage_creator = [name, skip_access_check, config_prefix](
+ const
Poco::Util::AbstractConfiguration & conf, DB::ContextPtr ctx) ->
DB::ObjectStoragePtr
+ { return DB::ObjectStorageFactory::instance().create(name, conf,
config_prefix, ctx, skip_access_check); };
auto object_storage =
DB::ObjectStorageFactory::instance().create(name, config, config_prefix,
context, skip_access_check);
auto metadata_storage =
DB::MetadataStorageFactory::instance().create(name, config, config_prefix,
object_storage, "local");
- DB::DiskObjectStoragePtr disk =
std::make_shared<DB::DiskObjectStorage>(
+ DB::DiskObjectStoragePtr disk =
std::make_shared<local_engine::GlutenDiskS3>(
name,
object_storage->getCommonKeyPrefix(),
std::move(metadata_storage),
std::move(object_storage),
config,
- config_prefix);
+ config_prefix,
+ object_storage_creator);
disk->startup(context, skip_access_check);
return disk;
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index caee87cb9..6fee65efe 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -202,42 +202,24 @@ void
SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
auto read_settings = context->getReadSettings();
auto write_settings = context->getWriteSettings();
Stopwatch watch;
-
- // Temporary support for S3
- bool s3_disk =
dest_storage->getStoragePolicy()->getAnyDisk()->getName().contains("s3");
for (const auto & merge_tree_data_part : new_parts.unsafeGet())
{
String local_relative_path = storage->getRelativeDataPath() + "/" +
merge_tree_data_part->name;
String remote_relative_path = dest_storage->getRelativeDataPath() +
"/" + merge_tree_data_part->name;
- if (s3_disk)
+ std::vector<String> files;
+
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path,
files);
+ auto src_disk = storage->getStoragePolicy()->getAnyDisk();
+ auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
+ auto tx = dest_disk->createTransaction();
+ for (const auto & file : files)
{
- storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
- local_relative_path,
- dest_storage->getStoragePolicy()->getAnyDisk(),
- remote_relative_path,
- read_settings,
- write_settings,
- nullptr);
+ auto read_buffer = src_disk->readFile(local_relative_path + "/" +
file, read_settings);
+ auto write_buffer = tx->writeFile(remote_relative_path + "/" +
file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
+ copyData(*read_buffer, *write_buffer);
+ write_buffer->finalize();
}
- else
- {
- std::vector<String> files;
-
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path,
files);
- auto src_disk = storage->getStoragePolicy()->getAnyDisk();
- auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
- auto tx = dest_disk->createTransaction();
- for (const auto & file : files)
- {
- auto read_buffer = src_disk->readFile(local_relative_path +
"/" + file, read_settings);
- auto write_buffer = tx->writeFile(remote_relative_path + "/" +
file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
- copyData(*read_buffer, *write_buffer);
- write_buffer->finalize();
- }
- tx->commit();
- }
-
-
+ tx->commit();
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Upload part {} to disk {} success.",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]