This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new e0fcfe586 [GLUTEN-6178][CH] Add config to insert remote file system
directly #6192
e0fcfe586 is described below
commit e0fcfe586efc7efb3ec0c349d5ca8b2371d969d4
Author: Shuai li <[email protected]>
AuthorDate: Mon Jun 24 13:55:39 2024 +0800
[GLUTEN-6178][CH] Add config to insert remote file system directly #6192
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
(Fixes: #6178)
How was this patch tested?
Test by ut
---
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 44 +++++++++++++++++++++-
cpp-ch/local-engine/Common/CHUtil.cpp | 3 +-
cpp-ch/local-engine/Common/CHUtil.h | 4 +-
.../Disks/ObjectStorages/GlutenDiskHDFS.cpp | 10 ++++-
.../Disks/ObjectStorages/GlutenDiskHDFS.h | 2 +
.../Storages/Mergetree/SparkMergeTreeWriter.cpp | 40 +++++++++++---------
.../Storages/Mergetree/SparkMergeTreeWriter.h | 2 +
7 files changed, 83 insertions(+), 22 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 572d0cd50..99b212059 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -25,10 +25,12 @@ import
org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMerg
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.File
+import scala.concurrent.duration.DurationInt
+
// Some sqls' line length exceeds 100
// scalastyle:off line.size.limit
@@ -614,5 +616,45 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
.count()
assertResult(600572)(result)
}
+
+ test("test mergetree insert with optimize basic") {
+ val tableName = "lineitem_mergetree_insert_optimize_basic_hdfs"
+ val dataPath = s"$HDFS_URL/test/$tableName"
+
+ withSQLConf(
+ "spark.databricks.delta.optimize.minFileSize" -> "200000000",
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.merge_after_insert"
-> "true",
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.mergetree.insert_without_local_storage"
-> "true",
+
"spark.gluten.sql.columnar.backend.ch.runtime_settings.min_insert_block_size_rows"
-> "10000"
+ ) {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS $tableName;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS $tableName
+ |USING clickhouse
+ |LOCATION '$dataPath'
+ |TBLPROPERTIES (storage_policy='__hdfs_main')
+ | as select * from lineitem
+ |""".stripMargin)
+
+ val ret = spark.sql(s"select count(*) from $tableName").collect()
+ assertResult(600572)(ret.apply(0).get(0))
+ val conf = new Configuration
+ conf.set("fs.defaultFS", HDFS_URL)
+ val fs = FileSystem.get(conf)
+
+ eventually(timeout(60.seconds), interval(2.seconds)) {
+ val it = fs.listFiles(new Path(dataPath), true)
+ var files = 0
+ while (it.hasNext) {
+ it.next()
+ files += 1
+ }
+ assertResult(72)(files)
+ }
+ }
+ }
}
// scalastyle:off line.size.limit
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp
b/cpp-ch/local-engine/Common/CHUtil.cpp
index be66d8ecc..94cd38003 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -623,7 +623,8 @@ void
BackendInitializerUtil::initSettings(std::map<std::string, std::string> & b
{
/// Initialize default setting.
settings.set("date_time_input_format", "best_effort");
- settings.set("mergetree.merge_after_insert", true);
+ settings.set(MERGETREE_MERGE_AFTER_INSERT, true);
+ settings.set(MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE, false);
for (const auto & [key, value] : backend_conf_map)
{
diff --git a/cpp-ch/local-engine/Common/CHUtil.h
b/cpp-ch/local-engine/Common/CHUtil.h
index 50de9461f..94e0f0168 100644
--- a/cpp-ch/local-engine/Common/CHUtil.h
+++ b/cpp-ch/local-engine/Common/CHUtil.h
@@ -35,7 +35,9 @@ class QueryPlan;
namespace local_engine
{
-static const std::unordered_set<String>
BOOL_VALUE_SETTINGS{"mergetree.merge_after_insert"};
+static const String MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE =
"mergetree.insert_without_local_storage";
+static const String MERGETREE_MERGE_AFTER_INSERT =
"mergetree.merge_after_insert";
+static const std::unordered_set<String>
BOOL_VALUE_SETTINGS{MERGETREE_MERGE_AFTER_INSERT,
MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE};
static const std::unordered_set<String> LONG_VALUE_SETTINGS{
"optimize.maxfilesize", "optimize.minFileSize",
"mergetree.max_num_part_per_merge_task"};
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index 07a7aa6bd..f207ad232 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -52,7 +52,15 @@ void GlutenDiskHDFS::createDirectories(const String & path)
void GlutenDiskHDFS::removeDirectory(const String & path)
{
DiskObjectStorage::removeDirectory(path);
- hdfsDelete(hdfs_object_storage->getHDFSFS(), path.c_str(), 1);
+ String abs_path = "/" + path;
+ hdfsDelete(hdfs_object_storage->getHDFSFS(), abs_path.c_str(), 1);
+}
+
+void GlutenDiskHDFS::removeRecursive(const String & path)
+{
+ DiskObjectStorage::removeRecursive(path);
+ String abs_path = "/" + path;
+ hdfsDelete(hdfs_object_storage->getHDFSFS(), abs_path.c_str(), 1);
}
DiskObjectStoragePtr GlutenDiskHDFS::createDiskObjectStorage()
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
index 222b9f892..97a99f1de 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -57,6 +57,8 @@ public:
void removeDirectory(const String & path) override;
+ void removeRecursive(const String & path) override;
+
DB::DiskObjectStoragePtr createDiskObjectStorage() override;
std::unique_ptr<DB::WriteBufferFromFileBase> writeFile(const String& path,
size_t buf_size, DB::WriteMode mode,
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index c1f2391a2..406f2aaa2 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -69,11 +69,23 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
, bucket_dir(bucket_dir_)
, thread_pool(CurrentMetrics::LocalThread,
CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1,
100000)
{
+ const DB::Settings & settings = context->getSettingsRef();
+ merge_after_insert =
settings.get(MERGETREE_MERGE_AFTER_INSERT).get<bool>();
+ insert_without_local_storage =
settings.get(MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE).get<bool>();
+
+ Field limit_size_field;
+ if (settings.tryGet("optimize.minFileSize", limit_size_field))
+ merge_min_size = limit_size_field.get<Int64>() <= 0 ? merge_min_size :
limit_size_field.get<Int64>();
+
+ Field limit_cnt_field;
+ if (settings.tryGet("mergetree.max_num_part_per_merge_task",
limit_cnt_field))
+ merge_limit_parts = limit_cnt_field.get<Int64>() <= 0 ?
merge_limit_parts : limit_cnt_field.get<Int64>();
+
dest_storage = MergeTreeRelParser::parseStorage(merge_tree_table,
SerializedPlanParser::global_context);
+ isRemoteStorage =
dest_storage->getStoragePolicy()->getAnyDisk()->isRemote();
- if (dest_storage->getStoragePolicy()->getAnyDisk()->isRemote())
+ if (useLocalStorage())
{
- isRemoteStorage = true;
temp_storage =
MergeTreeRelParser::copyToDefaultPolicyStorage(merge_tree_table,
SerializedPlanParser::global_context);
storage = temp_storage;
LOG_DEBUG(
@@ -86,22 +98,14 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
metadata_snapshot = storage->getInMemoryMetadataPtr();
header = metadata_snapshot->getSampleBlock();
- const DB::Settings & settings = context->getSettingsRef();
squashing = std::make_unique<DB::Squashing>(header,
settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
if (!partition_dir.empty())
extractPartitionValues(partition_dir, partition_values);
+}
- Field is_merge;
- if (settings.tryGet("mergetree.merge_after_insert", is_merge))
- merge_after_insert = is_merge.get<bool>();
-
- Field limit_size_field;
- if (settings.tryGet("optimize.minFileSize", limit_size_field))
- merge_min_size = limit_size_field.get<Int64>() <= 0 ? merge_min_size :
limit_size_field.get<Int64>();
-
- Field limit_cnt_field;
- if (settings.tryGet("mergetree.max_num_part_per_merge_task",
limit_cnt_field))
- merge_limit_parts = limit_cnt_field.get<Int64>() <= 0 ?
merge_limit_parts : limit_cnt_field.get<Int64>();
+bool SparkMergeTreeWriter::useLocalStorage() const
+{
+ return !insert_without_local_storage && isRemoteStorage;
}
void SparkMergeTreeWriter::write(const DB::Block & block)
@@ -161,7 +165,7 @@ void SparkMergeTreeWriter::manualFreeMemory(size_t
before_write_memory)
// it may alloc memory in current thread, and free on global thread.
// Now, wo have not idea to clear global memory by used spark thread
tracker.
// So we manually correct the memory usage.
- if (!isRemoteStorage)
+ if (isRemoteStorage && insert_without_local_storage)
return;
auto disk = storage->getStoragePolicy()->getAnyDisk();
@@ -219,7 +223,7 @@ void SparkMergeTreeWriter::saveMetadata()
void SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
{
- if (!isRemoteStorage)
+ if (!useLocalStorage())
return;
LOG_DEBUG(
@@ -289,8 +293,8 @@ void SparkMergeTreeWriter::finalizeMerge()
{
for (const auto & disk : storage->getDisks())
{
- auto full_path = storage->getFullPathOnDisk(disk);
- disk->removeRecursive(full_path + "/" + tmp_part);
+ auto rel_path = storage->getRelativeDataPath() + "/" +
tmp_part;
+ disk->removeRecursive(rel_path);
}
});
}
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
index 2b07521ed..13ac22394 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
@@ -79,6 +79,7 @@ private:
void finalizeMerge();
bool chunkToPart(Chunk && chunk);
bool blockToPart(Block & block);
+ bool useLocalStorage() const;
CustomStorageMergeTreePtr storage = nullptr;
CustomStorageMergeTreePtr dest_storage = nullptr;
@@ -97,6 +98,7 @@ private:
std::unordered_set<String> tmp_parts;
DB::Block header;
bool merge_after_insert;
+ bool insert_without_local_storage;
FreeThreadPool thread_pool;
size_t merge_min_size = 1024 * 1024 * 1024;
size_t merge_limit_parts = 10;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]