This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new acdf9a810 [CH] Support merge MergeTree files (#6472)
acdf9a810 is described below
commit acdf9a810f5a2ca945649833efe3e9bccc90e81a
Author: LiuNeng <[email protected]>
AuthorDate: Thu Jul 18 10:21:55 2024 +0800
[CH] Support merge MergeTree files (#6472)
Support merge MergeTree files
---
...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala | 2 +-
cpp-ch/clickhouse.version | 2 +-
.../CompactObjectStorageDiskTransaction.cpp | 133 ++++++++++++++++
.../CompactObjectStorageDiskTransaction.h | 175 +++++++++++++++++++++
.../Disks/ObjectStorages/GlutenDiskHDFS.cpp | 7 +
.../Disks/ObjectStorages/GlutenDiskHDFS.h | 16 +-
.../Storages/CustomStorageMergeTree.cpp | 25 ++-
.../local-engine/Storages/CustomStorageMergeTree.h | 1 +
.../Storages/Mergetree/MergeSparkMergeTreeTask.cpp | 2 +-
.../Storages/Mergetree/MetaDataHelper.cpp | 2 -
.../Storages/Mergetree/SparkMergeTreeWriter.cpp | 30 +++-
11 files changed, 384 insertions(+), 11 deletions(-)
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 99b212059..bbfac80a7 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -652,7 +652,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
it.next()
files += 1
}
- assertResult(72)(files)
+ assertResult(4)(files)
}
}
}
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 6afd19152..58fe073de 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,4 +1,4 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240711
-CH_COMMIT=4ab4aa7fe04
+CH_COMMIT=6632e76fd32940940749b53335ccc4843f3f2638
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
new file mode 100644
index 000000000..7a3ba4bed
--- /dev/null
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CompactObjectStorageDiskTransaction.h"
+
+#include <format>
+#include <ranges>
+
+namespace local_engine
+{
+int getFileOrder(const std::string & path)
+{
+ if (path.ends_with("columns.txt"))
+ return 1;
+ if (path.ends_with("metadata_version.txt"))
+ return 2;
+ if (path.ends_with("count.txt"))
+ return 3;
+ if (path.ends_with("default_compression_codec.txt"))
+ return 4;
+ if (path.ends_with("checksums.txt"))
+ return 5;
+ if (path.ends_with("uuid.txt"))
+ return 6;
+ if (path.ends_with(".cmrk3") || path.ends_with(".cmrk2") ||
path.ends_with(".cmrk1") ||
+ path.ends_with(".mrk3") || path.ends_with(".mrk2") ||
path.ends_with(".mrk1"))
+ return 10;
+ if (path.ends_with("idx"))
+ return 20;
+ if (path.ends_with("bin"))
+ return 1000;
+ return 100;
+}
+
+bool isMetaDataFile(const std::string & path)
+{
+ return !path.ends_with("bin");
+}
+
+using FileMappings = std::vector<std::pair<String,
std::shared_ptr<DB::TemporaryFileOnDisk>>>;
+
+void CompactObjectStorageDiskTransaction::commit()
+{
+ auto metadata_tx = disk.getMetadataStorage()->createTransaction();
+ std::filesystem::path data_path = std::filesystem::path(prefix_path) /
"data.bin";
+ std::filesystem::path meta_path = std::filesystem::path(prefix_path) /
"meta.bin";
+
+ auto object_storage = disk.getObjectStorage();
+ auto data_key = object_storage->generateObjectKeyForPath(data_path);
+ auto meta_key = object_storage->generateObjectKeyForPath(meta_path);
+
+ disk.createDirectories(prefix_path);
+ auto data_write_buffer =
object_storage->writeObject(DB::StoredObject(data_key.serialize(), data_path),
DB::WriteMode::Rewrite);
+ auto meta_write_buffer =
object_storage->writeObject(DB::StoredObject(meta_key.serialize(), meta_path),
DB::WriteMode::Rewrite);
+ String buffer;
+ buffer.resize(1024 * 1024);
+
+ auto merge_files = [&](std::ranges::input_range auto && list,
DB::WriteBuffer & out, const DB::ObjectStorageKey & key , const String
&local_path)
+ {
+ size_t offset = 0;
+ std::ranges::for_each(
+ list,
+ [&](auto & item)
+ {
+ DB::DiskObjectStorageMetadata
metadata(object_storage->getCommonKeyPrefix(), item.first);
+ DB::ReadBufferFromFilePRead
read(item.second->getAbsolutePath());
+ int file_size = 0;
+ while (int count = read.readBig(buffer.data(), buffer.size()))
+ {
+ file_size += count;
+ out.write(buffer.data(), count);
+ }
+ metadata.addObject(key, offset, file_size);
+ metadata_tx->writeStringToFile(item.first,
metadata.serializeToString());
+ offset += file_size;
+ });
+
+ // You can load the complete file in advance through this metadata
original, which improves the download efficiency of mergetree metadata.
+ DB::DiskObjectStorageMetadata
whole_meta(object_storage->getCommonKeyPrefix(), local_path);
+ whole_meta.addObject(key, 0, offset);
+ metadata_tx->writeStringToFile(local_path,
whole_meta.serializeToString());
+ out.sync();
+ };
+
+ merge_files(files | std::ranges::views::filter([](auto file) { return
!isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path);
+ merge_files(files | std::ranges::views::filter([](auto file) { return
isMetaDataFile(file.first); }), *meta_write_buffer, meta_key, meta_path);
+
+ metadata_tx->commit();
+ files.clear();
+}
+
+std::unique_ptr<DB::WriteBufferFromFileBase>
CompactObjectStorageDiskTransaction::writeFile(
+ const std::string & path,
+ size_t buf_size,
+ DB::WriteMode mode,
+ const DB::WriteSettings &,
+ bool)
+{
+ if (mode != DB::WriteMode::Rewrite)
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`writeFile` with Append is not implemented");
+ }
+ if (prefix_path.empty())
+ prefix_path = path.substr(0, path.find_last_of('/'));
+ else if (!path.starts_with(prefix_path))
+ throw DB::Exception(
+ DB::ErrorCodes::NOT_IMPLEMENTED,
+ "Don't support write file in different dirs, path {}, prefix path:
{}",
+ path,
+ prefix_path);
+ auto tmp = std::make_shared<DB::TemporaryFileOnDisk>(tmp_data);
+ files.emplace_back(path, tmp);
+ auto tx = disk.getMetadataStorage()->createTransaction();
+ tx->createDirectoryRecursive(std::filesystem::path(path).parent_path());
+ tx->createEmptyMetadataFile(path);
+ tx->commit();
+ return std::make_unique<DB::WriteBufferFromFile>(tmp->getAbsolutePath(),
buf_size);
+}
+}
\ No newline at end of file
diff --git
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
new file mode 100644
index 000000000..e15c362f3
--- /dev/null
+++
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <Disks/IDiskTransaction.h>
+#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
+#include <Disks/ObjectStorages/IMetadataStorage.h>
+#include <Interpreters/TemporaryDataOnDisk.h>
+
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int NOT_IMPLEMENTED;
+}
+}
+
+namespace local_engine
+{
+
+class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {
+ public:
+ explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const
DB::DiskPtr tmp_)
+ : disk(disk_), tmp_data(tmp_)
+ {
+ chassert(!tmp_->isRemote());
+ }
+
+ void commit() override;
+
+ void undo() override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `undo`
is not implemented");
+ }
+
+ void createDirectory(const std::string & path) override
+ {
+ disk.createDirectory(path);
+ }
+
+ void createDirectories(const std::string & path) override
+ {
+ disk.createDirectories(path);
+ }
+
+ void createFile(const std::string & path) override
+ {
+ disk.createFile(path);
+ }
+
+ void clearDirectory(const std::string & path) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`clearDirectory` is not implemented");
+ }
+
+ void moveDirectory(const std::string & from_path, const std::string &
to_path) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`moveDirectory` is not implemented");
+ }
+
+ void moveFile(const String & from_path, const String & to_path) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`moveFile` is not implemented");
+ }
+
+ void replaceFile(const std::string & from_path, const std::string &
to_path) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`replaceFile` is not implemented");
+ }
+
+ void copyFile(const std::string & from_file_path, const std::string &
to_file_path, const DB::ReadSettings & read_settings, const DB::WriteSettings &
write_settings) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`copyFile` is not implemented");
+ }
+
+ std::unique_ptr<DB::WriteBufferFromFileBase> writeFile( /// NOLINT
+ const std::string & path,
+ size_t buf_size,
+ DB::WriteMode mode,
+ const DB::WriteSettings & settings,
+ bool /*autocommit */) override;
+
+
+ void writeFileUsingBlobWritingFunction(const String & path, DB::WriteMode
mode, WriteBlobFunction && write_blob_function) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`writeFileUsingBlobWritingFunction` is not implemented");
+ }
+
+ void removeFile(const std::string & path) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`removeFile` is not implemented");
+ }
+
+ void removeFileIfExists(const std::string & path) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`removeFileIfExists` is not implemented");
+ }
+
+ void removeDirectory(const std::string & path) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`removeDirectory` is not implemented");
+ }
+
+ void removeRecursive(const std::string & path) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`removeRecursive` is not implemented");
+ }
+
+ void removeSharedFile(const std::string & path, bool keep_shared_data)
override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`removeSharedFile` is not implemented");
+ }
+
+ void removeSharedRecursive(const std::string & path, bool
keep_all_shared_data, const DB::NameSet & file_names_remove_metadata_only)
override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`removeSharedRecursive` is not implemented");
+ }
+
+ void removeSharedFileIfExists(const std::string & path, bool
keep_shared_data) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`removeSharedFileIfExists` is not implemented");
+ }
+
+ void removeSharedFiles(const DB::RemoveBatchRequest & files, bool
keep_all_batch_data, const DB::NameSet & file_names_remove_metadata_only)
override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`removeSharedFiles` is not implemented");
+ }
+
+ void setLastModified(const std::string & path, const Poco::Timestamp &
timestamp) override
+ {
+ disk.setLastModified(path, timestamp);
+ }
+
+ void chmod(const String & path, mode_t mode) override
+ {
+ disk.chmod(path, mode);
+ }
+
+ void setReadOnly(const std::string & path) override
+ {
+ disk.setReadOnly(path);
+ }
+
+ void createHardLink(const std::string & src_path, const std::string &
dst_path) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`createHardLink` is not implemented");
+ }
+
+ void truncateFile(const std::string & /* src_path */, size_t /*
target_size */) override
+ {
+ throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation
`truncateFile` is not implemented");
+ }
+
+private:
+ DB::IDisk & disk;
+ DB::DiskPtr tmp_data;
+ std::vector<std::pair<String, std::shared_ptr<DB::TemporaryFileOnDisk>>>
files;
+ String prefix_path = "";
+};
+}
+
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index f207ad232..9c4b390ea 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -20,12 +20,19 @@
#include <Common/Throttler.h>
#include <Parser/SerializedPlanParser.h>
+
+#include "CompactObjectStorageDiskTransaction.h"
#if USE_HDFS
namespace local_engine
{
using namespace DB;
+DiskTransactionPtr GlutenDiskHDFS::createTransaction()
+{
+ return std::make_shared<CompactObjectStorageDiskTransaction>(*this,
SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
+}
+
void GlutenDiskHDFS::createDirectory(const String & path)
{
DiskObjectStorage::createDirectory(path);
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
index 97a99f1de..b0f82a340 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -21,6 +21,8 @@
#include <Common/Throttler.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
+#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
+#include <Interpreters/Cache/FileCacheFactory.h>
#if USE_HDFS
#include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
#endif
@@ -51,6 +53,8 @@ public:
throttler = std::make_shared<DB::Throttler>(max_speed);
}
+ DB::DiskTransactionPtr createTransaction() override;
+
void createDirectory(const String & path) override;
void createDirectories(const String & path) override;
@@ -72,7 +76,17 @@ public:
{
DB::ObjectStoragePtr tmp = object_storage_creator(config, context);
hdfs_object_storage =
typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(tmp);
- object_storage = hdfs_object_storage;
+ // only for java ut
+ bool is_cache = object_storage->supportsCache();
+ if (is_cache)
+ {
+ auto cache_os =
reinterpret_cast<DB::CachedObjectStorage*>(object_storage.get());
+ object_storage = hdfs_object_storage;
+ auto cache =
DB::FileCacheFactory::instance().getOrCreate(cache_os->getCacheName(),
cache_os->getCacheSettings(), "storage_configuration.disks.hdfs_cache");
+ wrapWithCache(cache, cache_os->getCacheSettings(),
cache_os->getCacheConfigName());
+ }
+ else
+ object_storage = hdfs_object_storage;
}
private:
std::shared_ptr<GlutenHDFSObjectStorage> hdfs_object_storage;
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
index 3e93edaaa..961f482c7 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
@@ -148,9 +148,26 @@ CustomStorageMergeTree::CustomStorageMergeTree(
std::atomic<int> CustomStorageMergeTree::part_num;
+
+
+void
CustomStorageMergeTree::prefectchMetaDataFile(std::unordered_set<std::string>
parts)
+{
+ auto disk = getDisks().front();
+ if (!disk->isRemote()) return;
+ std::vector<String> meta_paths;
+ std::ranges::for_each(parts, [&](const String & name) {
meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); });
+ for (const auto & meta_path: meta_paths)
+ {
+ if (!disk->exists(meta_path)) continue;
+ auto in = disk->readFile(meta_path);
+ String ignore_data;
+ readStringUntilEOF(ignore_data, *in);
+ }
+}
+
std::vector<MergeTreeDataPartPtr>
CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string>
parts)
{
- auto parts_lock = lockParts();
+ prefectchMetaDataFile(parts);
std::vector<MergeTreeDataPartPtr> data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
for (const auto& name : parts)
@@ -161,8 +178,6 @@ std::vector<MergeTreeDataPartPtr>
CustomStorageMergeTree::loadDataPartsWithNames
data_parts.emplace_back(res.part);
}
- // without it "test mergetree optimize partitioned by one low card column"
will log ERROR
- calculateColumnAndSecondaryIndexSizesImpl();
return data_parts;
}
@@ -211,6 +226,7 @@ MergeTreeData::LoadPartResult
CustomStorageMergeTree::loadDataPart(
res.part->loadVersionMetadata();
res.part->setState(to_state);
+ auto parts_lock = lockParts();
DataPartIteratorByInfo it;
bool inserted;
@@ -239,6 +255,9 @@ MergeTreeData::LoadPartResult
CustomStorageMergeTree::loadDataPart(
if (res.part->hasLightweightDelete())
has_lightweight_delete_parts.store(true);
+ // without it "test mergetree optimize partitioned by one low card column"
will log ERROR
+ calculateColumnAndSecondaryIndexSizesImpl();
+
LOG_TRACE(log, "Finished loading {} part {} on disk {}",
magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName());
return res;
}
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
index cd507a3ac..9144aba42 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
@@ -65,6 +65,7 @@ public:
private:
SimpleIncrement increment;
+ void prefectchMetaDataFile(std::unordered_set<std::string> parts);
void startBackgroundMovesIfNeeded() override;
std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
LoadPartResult loadDataPart(
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
index 8e8e4c556..05b2623b4 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
@@ -164,7 +164,7 @@ void MergeSparkMergeTreeTask::finish()
// MergeTreeData::Transaction transaction(storage, txn.get());
// storage.merger_mutator.renameMergedTemporaryPart(new_part,
future_part->parts, txn, transaction);
// transaction.commit();
-
+ new_part->getDataPartStoragePtr()->commitTransaction();
ThreadFuzzer::maybeInjectSleep();
ThreadFuzzer::maybeInjectMemoryLimitException();
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
index c6a8e03a6..8d43af068 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
@@ -113,8 +113,6 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage,
const MergeTreeTable &
auto item_path = part_path / item.first;
auto out = metadata_disk->writeFile(item_path);
out->write(item.second.data(), item.second.size());
- out->finalize();
- out->sync();
}
};
thread_pool.scheduleOrThrow(job);
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index 406f2aaa2..0f0df0050 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -232,18 +232,42 @@ void
SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
auto read_settings = context->getReadSettings();
auto write_settings = context->getWriteSettings();
Stopwatch watch;
+
+ // Temporary support for S3
+ bool s3_disk =
dest_storage->getStoragePolicy()->getAnyDisk()->getName().contains("s3");
for (const auto & merge_tree_data_part : new_parts.unsafeGet())
{
String local_relative_path = storage->getRelativeDataPath() + "/" +
merge_tree_data_part->name;
String remote_relative_path = dest_storage->getRelativeDataPath() +
"/" + merge_tree_data_part->name;
- storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
+ if (s3_disk)
+ {
+ storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
local_relative_path,
dest_storage->getStoragePolicy()->getAnyDisk(),
remote_relative_path,
read_settings,
write_settings,
nullptr);
+ }
+ else
+ {
+ std::vector<String> files;
+
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path,
files);
+ auto src_disk = storage->getStoragePolicy()->getAnyDisk();
+ auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
+ auto tx = dest_disk->createTransaction();
+ for (const auto & file : files)
+ {
+ auto read_buffer = src_disk->readFile(local_relative_path +
"/" + file, read_settings);
+ auto write_buffer = tx->writeFile(remote_relative_path + "/" +
file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
+ copyData(*read_buffer, *write_buffer);
+ write_buffer->finalize();
+ }
+ tx->commit();
+ }
+
+
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Upload part {} to disk {} success.",
@@ -306,7 +330,6 @@ DB::MergeTreeDataWriter::TemporaryPart
SparkMergeTreeWriter::writeTempPartAndFin
{
MergeTreeDataWriter::TemporaryPart temp_part;
writeTempPart(temp_part, block_with_partition, metadata_snapshot);
- temp_part.finalize();
return temp_part;
}
@@ -399,6 +422,7 @@ void SparkMergeTreeWriter::writeTempPart(
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
+ data_part_storage->beginTransaction();
SyncGuardPtr sync_guard;
if (new_data_part->isStoredOnDisk())
{
@@ -441,6 +465,8 @@ void SparkMergeTreeWriter::writeTempPart(
temp_part.part = new_data_part;
temp_part.streams.emplace_back(MergeTreeDataWriter::TemporaryPart::Stream{.stream
= std::move(out), .finalizer = std::move(finalizer)});
+ temp_part.finalize();
+ data_part_storage->commitTransaction();
}
std::vector<PartInfo> SparkMergeTreeWriter::getAllPartInfo()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]