This is an automated email from the ASF dual-hosted git repository.
liuneng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a8b316141 [Gluten-5152][CH] fix bugs for optimizing tables on s3
(#5282)
a8b316141 is described below
commit a8b316141d00e8c849f6098206f3decad9f1657a
Author: Hongbin Ma <[email protected]>
AuthorDate: Fri Apr 12 10:09:51 2024 +0800
[Gluten-5152][CH] fix bugs for optimizing tables on s3 (#5282)
What changes were proposed in this pull request?
fix multiple bugs for optimizing tables on s3, including:
take care of metadata.gluten
race condition bug when two tasks are performing restoreMetadata on same
executor at same time.
avoid creating tmp folder because it's problematic to rename on S3
(Fixes: #5152)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
---
.../GlutenClickHouseMergeTreeOptimizeSuite.scala | 61 ++++++++++++++++++----
cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp | 28 +++++++++-
.../Storages/CustomStorageMergeTree.cpp | 4 +-
.../local-engine/Storages/CustomStorageMergeTree.h | 2 +-
.../Storages/Mergetree/MergeSparkMergeTreeTask.cpp | 12 +++--
.../Storages/Mergetree/MetaDataHelper.cpp | 37 +++++++++++--
.../Storages/Mergetree/MetaDataHelper.h | 7 ++-
.../Storages/Mergetree/SparkMergeTreeWriter.cpp | 25 +--------
.../Storages/Mergetree/SparkMergeTreeWriter.h | 1 -
cpp-ch/local-engine/local_engine_jni.cpp | 8 ++-
10 files changed, 135 insertions(+), 50 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
index e553a8c17..9b4b552f0 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeOptimizeSuite.scala
@@ -74,7 +74,6 @@ class GlutenClickHouseMergeTreeOptimizeSuite
val ret = spark.sql("select count(*) from
lineitem_mergetree_optimize").collect()
assert(ret.apply(0).get(0) == 600572)
- spark.sql("optimize lineitem_mergetree_optimize")
assert(
countFiles(new File(s"$basePath/lineitem_mergetree_optimize")) == 462
) // many merged parts
@@ -148,7 +147,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret.apply(0).get(0) == 600572)
spark.sql("set spark.gluten.enabled=false")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 815)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 812)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p2"))
== 232)
spark.sql("VACUUM lineitem_mergetree_optimize_p2 RETAIN 0 HOURS")
@@ -179,12 +178,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret.apply(0).get(0) == 600572)
spark.sql("set spark.gluten.enabled=false")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 411)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 398)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
- // for tables with more than one layer of nested table (like partition +
bucket, or two partition col
- // the 'tmp_merge' folder is not guarantee to be removed, causing this
file number to be unstable
-// assert(countFiles(new
File(s"$basePath/lineitem_mergetree_optimize_p3")) == 290)
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
> 270)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 286)
spark.sql("VACUUM lineitem_mergetree_optimize_p3 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p3"))
== 270)
spark.sql("set spark.gluten.enabled=true")
@@ -213,10 +209,9 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret.apply(0).get(0) == 600572)
spark.sql("set spark.gluten.enabled=false")
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 411)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 398)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
-// assert(countFiles(new
File(s"$basePath/lineitem_mergetree_optimize_p4")) == 290)
- assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
> 270)
+ assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 286)
spark.sql("VACUUM lineitem_mergetree_optimize_p4 RETAIN 0 HOURS")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p4"))
== 270)
spark.sql("set spark.gluten.enabled=true")
@@ -308,7 +303,7 @@ class GlutenClickHouseMergeTreeOptimizeSuite
spark.sql("set spark.gluten.enabled=false")
assert(countFiles(new File(s"$basePath/lineitem_mergetree_optimize_p6"))
== {
- if (sparkVersion.equals("3.2")) 940 else 1023
+ if (sparkVersion.equals("3.2")) 931 else 1014
})
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
spark.sql("VACUUM lineitem_mergetree_optimize_p6 RETAIN 0 HOURS")
@@ -321,5 +316,49 @@ class GlutenClickHouseMergeTreeOptimizeSuite
assert(ret2.apply(0).get(0) == 600572)
}
+ test("test skip index after optimize") {
+ withSQLConf(
+ "spark.databricks.delta.optimize.maxFileSize" -> "2000000",
+ "spark.sql.adaptive.enabled" -> "false") {
+ spark.sql(s"""
+ |DROP TABLE IF EXISTS lineitem_mergetree_index;
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE IF NOT EXISTS lineitem_mergetree_index
+ |USING clickhouse
+ |LOCATION '$basePath/lineitem_mergetree_index'
+ |TBLPROPERTIES('bloomfilterIndexKey'='l_orderkey')
+ | as select * from lineitem
+ |""".stripMargin)
+
+ spark.sql("optimize lineitem_mergetree_index")
+ spark.sql("set spark.gluten.enabled=false")
+ spark.sql("vacuum lineitem_mergetree_index")
+ spark.sql("set spark.gluten.enabled=true")
+
+ val df = spark
+ .sql(s"""
+ |select count(*) from lineitem_mergetree_index where
l_orderkey = '600000'
+ |""".stripMargin)
+
+ val scanExec = collect(df.queryExecution.executedPlan) {
+ case f: FileSourceScanExecTransformer => f
+ }
+ assert(scanExec.size == 1)
+ val mergetreeScan = scanExec(0)
+ val ret = df.collect()
+ assert(ret.apply(0).get(0) == 2)
+ val marks = mergetreeScan.metrics("selectedMarks").value
+ assert(marks == 1)
+
+ val directory = new File(s"$basePath/lineitem_mergetree_index")
+ val partDir = directory.listFiles().filter(f =>
f.getName.endsWith("merged")).head
+ assert(
+ partDir.listFiles().exists(p =>
p.getName.contains("skp_idx__bloomfilter_l_orderkey.idx")))
+
+ }
+ }
+
}
// scalastyle:off line.size.limit
diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
index eec239c73..57ad97fae 100644
--- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp
@@ -80,6 +80,32 @@ CustomStorageMergeTreePtr MergeTreeRelParser::parseStorage(
auto storage_factory = StorageMergeTreeFactory::instance();
auto metadata = buildMetaData(names_and_types_list, context,
merge_tree_table);
+ {
+ // use instance global table (without uuid) to restore metadata folder
on current instance
+ // we need its lock
+
+ auto global_storage = storage_factory.getStorage(
+ StorageID(merge_tree_table.database, merge_tree_table.table),
+ merge_tree_table.snapshot_id,
+ metadata->getColumns(),
+ [&]() -> CustomStorageMergeTreePtr
+ {
+ auto custom_storage_merge_tree =
std::make_shared<CustomStorageMergeTree>(
+ StorageID(merge_tree_table.database, merge_tree_table.table),
+ merge_tree_table.relative_path,
+ *metadata,
+ false,
+ context,
+ "",
+ MergeTreeData::MergingParams(),
+ buildMergeTreeSettings(merge_tree_table.table_configs));
+ return custom_storage_merge_tree;
+ });
+
+ restoreMetaData(global_storage, merge_tree_table, *context);
+ }
+
+ // return local table (with a uuid) for isolation
auto storage = storage_factory.getStorage(
StorageID(merge_tree_table.database, merge_tree_table.table, uuid),
merge_tree_table.snapshot_id,
@@ -146,7 +172,7 @@ MergeTreeRelParser::parseReadRel(
return custom_storage_merge_tree;
});
- restoreMetaData(storage, merge_tree_table, context);
+ restoreMetaData(storage, merge_tree_table, *context);
for (const auto & [name, sizes] : storage->getColumnSizes())
column_sizes[name] = sizes.data_compressed;
query_context.storage_snapshot =
std::make_shared<StorageSnapshot>(*storage, metadata);
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
index c8f0b1f32..ee481c93f 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
@@ -97,9 +97,9 @@ CustomStorageMergeTree::CustomStorageMergeTree(
std::atomic<int> CustomStorageMergeTree::part_num;
-DataPartsVector
CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string>
parts)
+MergeTreeData::MutableDataPartsVector
CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string>
parts)
{
- DataPartsVector data_parts;
+ MutableDataPartsVector data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
for (const auto& name : parts)
{
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
index 0d5e14998..dfaba5b71 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
@@ -52,7 +52,7 @@ public:
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;
std::map<std::string, MutationCommands> getUnfinishedMutationCommands()
const override;
- DataPartsVector loadDataPartsWithNames(std::unordered_set<std::string>
parts);
+ MutableDataPartsVector
loadDataPartsWithNames(std::unordered_set<std::string> parts);
MergeTreeDataWriter writer;
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
index 7ece1d7ce..a74174495 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
@@ -149,7 +149,10 @@ void MergeSparkMergeTreeTask::prepare()
deduplicate_by_columns,
cleanup,
storage.merging_params,
- txn);
+ txn,
+ // need_prefix = false, so CH won't create a tmp_ folder while merging.
+ // the tmp_ folder is problematic when on S3 (particularlly when
renaming)
+ false);
}
@@ -157,9 +160,10 @@ void MergeSparkMergeTreeTask::finish()
{
new_part = merge_task->getFuture().get();
- MergeTreeData::Transaction transaction(storage, txn.get());
- storage.merger_mutator.renameMergedTemporaryPart(new_part,
future_part->parts, txn, transaction);
- transaction.commit();
+ // Since there is not tmp_ folder, we don't need renaming
+ // MergeTreeData::Transaction transaction(storage, txn.get());
+ // storage.merger_mutator.renameMergedTemporaryPart(new_part,
future_part->parts, txn, transaction);
+ // transaction.commit();
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
index a7d167385..688ca8be8 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
@@ -43,7 +43,7 @@ std::unordered_map<String, String>
extractPartMetaData(ReadBuffer & in)
return result;
}
-void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable
& mergeTreeTable, ContextPtr & context)
+void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable
& mergeTreeTable, const Context & context)
{
auto data_disk = storage->getStoragePolicy()->getAnyDisk();
if (!data_disk->isRemote())
@@ -60,11 +60,14 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage,
const MergeTreeTable &
not_exists_part.emplace(part);
}
- if (not_exists_part.empty())
- return;
- if (auto lock =
storage->lockForAlter(context->getSettingsRef().lock_acquire_timeout))
+ if (auto lock =
storage->lockForAlter(context.getSettingsRef().lock_acquire_timeout))
{
+ // put this return clause in lockForAlter
+ // so that it will not return until other thread finishes restoring
+ if (not_exists_part.empty())
+ return;
+
auto s3 = data_disk->getObjectStorage();
if (!metadata_disk->exists(table_path))
@@ -87,9 +90,35 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage,
const MergeTreeTable &
auto item_path = part_path / item.first;
auto out = metadata_disk->writeFile(item_path);
out->write(item.second.data(), item.second.size());
+ out->finalize();
+ out->sync();
}
}
}
}
+
+void saveFileStatus(
+ const DB::MergeTreeData & storage,
+ const DB::ContextPtr& context,
+ IDataPartStorage & data_part_storage)
+{
+ const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk();
+ if (!disk->isRemote())
+ return;
+ if (auto * const disk_metadata = dynamic_cast<MetadataStorageFromDisk
*>(disk->getMetadataStorage().get()))
+ {
+ const auto out = data_part_storage.writeFile("metadata.gluten",
DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
+ for (const auto it = data_part_storage.iterate(); it->isValid();
it->next())
+ {
+ auto content = disk_metadata->readFileToString(it->path());
+ writeString(it->name(), *out);
+ writeChar('\t', *out);
+ writeIntText(content.length(), *out);
+ writeChar('\n', *out);
+ writeString(content, *out);
+ }
+ out->finalize();
+ }
+}
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
index 47c5d615d..59d7af4e3 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.h
@@ -23,7 +23,12 @@
namespace local_engine
{
-void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable
& mergeTreeTable, ContextPtr & context);
+void restoreMetaData(CustomStorageMergeTreePtr & storage, const MergeTreeTable
& mergeTreeTable, const Context & context);
+
+void saveFileStatus(
+ const DB::MergeTreeData & storage,
+ const DB::ContextPtr& context,
+ IDataPartStorage & data_part_storage);
}
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index 8df171f99..40e716a56 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -21,6 +21,7 @@
#include <Interpreters/ActionsDAG.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <rapidjson/prettywriter.h>
+#include <Storages/Mergetree/MetaDataHelper.h>
using namespace DB;
@@ -77,32 +78,10 @@ SparkMergeTreeWriter::writeTempPartAndFinalize(
{
auto temp_part = writeTempPart(block_with_partition, metadata_snapshot);
temp_part.finalize();
- saveFileStatus(temp_part);
+ saveFileStatus(storage, context, temp_part.part->getDataPartStorage());
return temp_part;
}
-void SparkMergeTreeWriter::saveFileStatus(const
DB::MergeTreeDataWriter::TemporaryPart & temp_part) const
-{
- auto & data_part_storage = temp_part.part->getDataPartStorage();
-
- const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk();
- if (!disk->isRemote()) return;
- if (auto *const disk_metadata = dynamic_cast<MetadataStorageFromDisk
*>(disk->getMetadataStorage().get()))
- {
- const auto out = data_part_storage.writeFile("metadata.gluten",
DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
- for (const auto it = data_part_storage.iterate(); it->isValid();
it->next())
- {
- auto content = disk_metadata->readFileToString(it->path());
- writeString(it->name(), *out);
- writeChar('\t', *out);
- writeIntText(content.length(), *out);
- writeChar('\n', *out);
- writeString(content, *out);
- }
- out->finalize();
- }
-}
-
MergeTreeDataWriter::TemporaryPart SparkMergeTreeWriter::writeTempPart(
BlockWithPartition & block_with_partition, const StorageMetadataPtr &
metadata_snapshot)
{
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
index d316f208e..000d009fe 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.h
@@ -85,7 +85,6 @@ private:
writeTempPart(DB::BlockWithPartition & block_with_partition, const
DB::StorageMetadataPtr & metadata_snapshot);
DB::MergeTreeDataWriter::TemporaryPart
writeTempPartAndFinalize(DB::BlockWithPartition & block_with_partition,
const DB::StorageMetadataPtr & metadata_snapshot);
- void saveFileStatus(const DB::MergeTreeDataWriter::TemporaryPart &
temp_part) const;
String uuid;
String partition_dir;
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp
b/cpp-ch/local-engine/local_engine_jni.cpp
index 368227163..1518f8518 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -55,6 +55,7 @@
#include <google/protobuf/wrappers.pb.h>
#include <Storages/StorageMergeTreeFactory.h>
#include <Storages/Mergetree/MergeSparkMergeTreeTask.h>
+#include <Storages/Mergetree/MetaDataHelper.h>
#ifdef __cplusplus
@@ -1167,13 +1168,16 @@ JNIEXPORT jstring
Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
executeHere(task);
std::unordered_set<std::string> to_load{future_part->name};
- std::vector<DataPartPtr> loaded = storage->loadDataPartsWithNames(to_load);
+ std::vector<std::shared_ptr<DB::IMergeTreeDataPart>> loaded =
storage->loadDataPartsWithNames(to_load);
std::vector<local_engine::PartInfo> res;
- for (const MergeTreeDataPartPtr & partPtr : loaded)
+ for (auto & partPtr : loaded)
+ {
+ local_engine::saveFileStatus(*storage,
local_engine::SerializedPlanParser::global_context,
partPtr->getDataPartStorage());
res.emplace_back(
local_engine::PartInfo{partPtr->name, partPtr->getMarksCount(),
partPtr->getBytesOnDisk(), partPtr->rows_count,
/*partition_value*/ partition_values,
bucket_dir});
+ }
auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]