This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new acdf9a810 [CH] Support merge MergeTree files (#6472)
acdf9a810 is described below

commit acdf9a810f5a2ca945649833efe3e9bccc90e81a
Author: LiuNeng <[email protected]>
AuthorDate: Thu Jul 18 10:21:55 2024 +0800

    [CH] Support merge MergeTree files (#6472)
    
    Support merge MergeTree files
---
 ...GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala |   2 +-
 cpp-ch/clickhouse.version                          |   2 +-
 .../CompactObjectStorageDiskTransaction.cpp        | 133 ++++++++++++++++
 .../CompactObjectStorageDiskTransaction.h          | 175 +++++++++++++++++++++
 .../Disks/ObjectStorages/GlutenDiskHDFS.cpp        |   7 +
 .../Disks/ObjectStorages/GlutenDiskHDFS.h          |  16 +-
 .../Storages/CustomStorageMergeTree.cpp            |  25 ++-
 .../local-engine/Storages/CustomStorageMergeTree.h |   1 +
 .../Storages/Mergetree/MergeSparkMergeTreeTask.cpp |   2 +-
 .../Storages/Mergetree/MetaDataHelper.cpp          |   2 -
 .../Storages/Mergetree/SparkMergeTreeWriter.cpp    |  30 +++-
 11 files changed, 384 insertions(+), 11 deletions(-)

diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
index 99b212059..bbfac80a7 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteOnHDFSSuite.scala
@@ -652,7 +652,7 @@ class GlutenClickHouseMergeTreeWriteOnHDFSSuite
           it.next()
           files += 1
         }
-        assertResult(72)(files)
+        assertResult(4)(files)
       }
     }
   }
diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version
index 6afd19152..58fe073de 100644
--- a/cpp-ch/clickhouse.version
+++ b/cpp-ch/clickhouse.version
@@ -1,4 +1,4 @@
 CH_ORG=Kyligence
 CH_BRANCH=rebase_ch/20240711
-CH_COMMIT=4ab4aa7fe04
+CH_COMMIT=6632e76fd32940940749b53335ccc4843f3f2638
 
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
new file mode 100644
index 000000000..7a3ba4bed
--- /dev/null
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.cpp
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "CompactObjectStorageDiskTransaction.h"
+
+#include <format>
+#include <ranges>
+
+namespace local_engine
+{
+int getFileOrder(const std::string & path)
+{
+    if (path.ends_with("columns.txt"))
+        return 1;
+    if (path.ends_with("metadata_version.txt"))
+        return 2;
+    if (path.ends_with("count.txt"))
+        return 3;
+    if (path.ends_with("default_compression_codec.txt"))
+        return 4;
+    if (path.ends_with("checksums.txt"))
+        return 5;
+    if (path.ends_with("uuid.txt"))
+        return 6;
+    if (path.ends_with(".cmrk3") || path.ends_with(".cmrk2") || 
path.ends_with(".cmrk1") ||
+        path.ends_with(".mrk3") || path.ends_with(".mrk2") || 
path.ends_with(".mrk1"))
+        return 10;
+    if (path.ends_with("idx"))
+        return 20;
+    if (path.ends_with("bin"))
+        return 1000;
+    return 100;
+}
+
+bool isMetaDataFile(const std::string & path)
+{
+    return !path.ends_with("bin");
+}
+
+using FileMappings = std::vector<std::pair<String, 
std::shared_ptr<DB::TemporaryFileOnDisk>>>;
+
+void CompactObjectStorageDiskTransaction::commit()
+{
+    auto metadata_tx = disk.getMetadataStorage()->createTransaction();
+    std::filesystem::path data_path = std::filesystem::path(prefix_path) / 
"data.bin";
+    std::filesystem::path meta_path = std::filesystem::path(prefix_path) / 
"meta.bin";
+
+    auto object_storage = disk.getObjectStorage();
+    auto data_key = object_storage->generateObjectKeyForPath(data_path);
+    auto meta_key = object_storage->generateObjectKeyForPath(meta_path);
+
+    disk.createDirectories(prefix_path);
+    auto data_write_buffer = 
object_storage->writeObject(DB::StoredObject(data_key.serialize(), data_path), 
DB::WriteMode::Rewrite);
+    auto meta_write_buffer = 
object_storage->writeObject(DB::StoredObject(meta_key.serialize(), meta_path), 
DB::WriteMode::Rewrite);
+    String buffer;
+    buffer.resize(1024 * 1024);
+
+    auto merge_files = [&](std::ranges::input_range auto && list, 
DB::WriteBuffer & out, const DB::ObjectStorageKey & key , const String 
&local_path)
+    {
+        size_t offset = 0;
+        std::ranges::for_each(
+            list,
+            [&](auto & item)
+            {
+                DB::DiskObjectStorageMetadata 
metadata(object_storage->getCommonKeyPrefix(), item.first);
+                DB::ReadBufferFromFilePRead 
read(item.second->getAbsolutePath());
+                int file_size = 0;
+                while (int count = read.readBig(buffer.data(), buffer.size()))
+                {
+                    file_size += count;
+                    out.write(buffer.data(), count);
+                }
+                metadata.addObject(key, offset, file_size);
+                metadata_tx->writeStringToFile(item.first, 
metadata.serializeToString());
+                offset += file_size;
+            });
+
+        // You can load the complete file in advance through this metadata 
original, which improves the download efficiency of mergetree metadata.
+        DB::DiskObjectStorageMetadata 
whole_meta(object_storage->getCommonKeyPrefix(), local_path);
+        whole_meta.addObject(key, 0, offset);
+        metadata_tx->writeStringToFile(local_path, 
whole_meta.serializeToString());
+        out.sync();
+    };
+
+    merge_files(files | std::ranges::views::filter([](auto file) { return 
!isMetaDataFile(file.first); }), *data_write_buffer, data_key, data_path);
+    merge_files(files | std::ranges::views::filter([](auto file) { return 
isMetaDataFile(file.first); }), *meta_write_buffer, meta_key, meta_path);
+
+    metadata_tx->commit();
+    files.clear();
+}
+
+std::unique_ptr<DB::WriteBufferFromFileBase> 
CompactObjectStorageDiskTransaction::writeFile(
+    const std::string & path,
+    size_t buf_size,
+    DB::WriteMode mode,
+    const DB::WriteSettings &,
+    bool)
+{
+    if (mode != DB::WriteMode::Rewrite)
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`writeFile` with Append is not implemented");
+    }
+    if (prefix_path.empty())
+        prefix_path = path.substr(0, path.find_last_of('/'));
+    else if (!path.starts_with(prefix_path))
+        throw DB::Exception(
+            DB::ErrorCodes::NOT_IMPLEMENTED,
+            "Don't support write file in different dirs, path {}, prefix path: 
{}",
+            path,
+            prefix_path);
+    auto tmp = std::make_shared<DB::TemporaryFileOnDisk>(tmp_data);
+    files.emplace_back(path, tmp);
+    auto tx = disk.getMetadataStorage()->createTransaction();
+    tx->createDirectoryRecursive(std::filesystem::path(path).parent_path());
+    tx->createEmptyMetadataFile(path);
+    tx->commit();
+    return std::make_unique<DB::WriteBufferFromFile>(tmp->getAbsolutePath(), 
buf_size);
+}
+}
\ No newline at end of file
diff --git 
a/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
new file mode 100644
index 000000000..e15c362f3
--- /dev/null
+++ 
b/cpp-ch/local-engine/Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include <Disks/IDiskTransaction.h>
+#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
+#include <Disks/ObjectStorages/IMetadataStorage.h>
+#include <Interpreters/TemporaryDataOnDisk.h>
+
+
+namespace DB
+{
+namespace ErrorCodes
+{
+extern const int NOT_IMPLEMENTED;
+}
+}
+
+namespace local_engine
+{
+
+class CompactObjectStorageDiskTransaction: public DB::IDiskTransaction {
+    public:
+    explicit CompactObjectStorageDiskTransaction(DB::IDisk & disk_, const 
DB::DiskPtr tmp_)
+        : disk(disk_), tmp_data(tmp_)
+    {
+        chassert(!tmp_->isRemote());
+    }
+
+    void commit() override;
+
+    void undo() override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation `undo` 
is not implemented");
+    }
+
+    void createDirectory(const std::string & path) override
+    {
+        disk.createDirectory(path);
+    }
+
+    void createDirectories(const std::string & path) override
+    {
+        disk.createDirectories(path);
+    }
+
+    void createFile(const std::string & path) override
+    {
+        disk.createFile(path);
+    }
+
+    void clearDirectory(const std::string & path) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`clearDirectory` is not implemented");
+    }
+
+    void moveDirectory(const std::string & from_path, const std::string & 
to_path) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`moveDirectory` is not implemented");
+    }
+
+    void moveFile(const String & from_path, const String & to_path) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`moveFile` is not implemented");
+    }
+
+    void replaceFile(const std::string & from_path, const std::string & 
to_path) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`replaceFile` is not implemented");
+    }
+
+    void copyFile(const std::string & from_file_path, const std::string & 
to_file_path, const DB::ReadSettings & read_settings, const DB::WriteSettings & 
write_settings) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`copyFile` is not implemented");
+    }
+
+    std::unique_ptr<DB::WriteBufferFromFileBase> writeFile( /// NOLINT
+        const std::string & path,
+        size_t buf_size,
+        DB::WriteMode mode,
+        const DB::WriteSettings & settings,
+        bool /*autocommit */) override;
+
+
+    void writeFileUsingBlobWritingFunction(const String & path, DB::WriteMode 
mode, WriteBlobFunction && write_blob_function) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`writeFileUsingBlobWritingFunction` is not implemented");
+    }
+
+    void removeFile(const std::string & path) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`removeFile` is not implemented");
+    }
+
+    void removeFileIfExists(const std::string & path) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`removeFileIfExists` is not implemented");
+    }
+
+    void removeDirectory(const std::string & path) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`removeDirectory` is not implemented");
+    }
+
+    void removeRecursive(const std::string & path) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`removeRecursive` is not implemented");
+    }
+
+    void removeSharedFile(const std::string & path, bool keep_shared_data) 
override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`removeSharedFile` is not implemented");
+    }
+
+    void removeSharedRecursive(const std::string & path, bool 
keep_all_shared_data, const DB::NameSet & file_names_remove_metadata_only) 
override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`removeSharedRecursive` is not implemented");
+    }
+
+    void removeSharedFileIfExists(const std::string & path, bool 
keep_shared_data) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`removeSharedFileIfExists` is not implemented");
+    }
+
+    void removeSharedFiles(const DB::RemoveBatchRequest & files, bool 
keep_all_batch_data, const DB::NameSet & file_names_remove_metadata_only) 
override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`removeSharedFiles` is not implemented");
+    }
+
+    void setLastModified(const std::string & path, const Poco::Timestamp & 
timestamp) override
+    {
+        disk.setLastModified(path, timestamp);
+    }
+
+    void chmod(const String & path, mode_t mode) override
+    {
+        disk.chmod(path, mode);
+    }
+
+    void setReadOnly(const std::string & path) override
+    {
+        disk.setReadOnly(path);
+    }
+
+    void createHardLink(const std::string & src_path, const std::string & 
dst_path) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`createHardLink` is not implemented");
+    }
+
+    void truncateFile(const std::string & /* src_path */, size_t /* 
target_size */) override
+    {
+        throw DB::Exception(DB::ErrorCodes::NOT_IMPLEMENTED, "Operation 
`truncateFile` is not implemented");
+    }
+
+private:
+    DB::IDisk & disk;
+    DB::DiskPtr tmp_data;
+    std::vector<std::pair<String, std::shared_ptr<DB::TemporaryFileOnDisk>>> 
files;
+    String prefix_path = "";
+};
+}
+
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
index f207ad232..9c4b390ea 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.cpp
@@ -20,12 +20,19 @@
 
 #include <Common/Throttler.h>
 #include <Parser/SerializedPlanParser.h>
+
+#include "CompactObjectStorageDiskTransaction.h"
 #if USE_HDFS
 
 namespace local_engine
 {
 using namespace DB;
 
+DiskTransactionPtr GlutenDiskHDFS::createTransaction()
+{
+    return std::make_shared<CompactObjectStorageDiskTransaction>(*this, 
SerializedPlanParser::global_context->getTempDataOnDisk()->getVolume()->getDisk());
+}
+
 void GlutenDiskHDFS::createDirectory(const String & path)
 {
     DiskObjectStorage::createDirectory(path);
diff --git a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h 
b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
index 97a99f1de..b0f82a340 100644
--- a/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
+++ b/cpp-ch/local-engine/Disks/ObjectStorages/GlutenDiskHDFS.h
@@ -21,6 +21,8 @@
 
 #include <Common/Throttler.h>
 #include <Disks/ObjectStorages/DiskObjectStorage.h>
+#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
+#include <Interpreters/Cache/FileCacheFactory.h>
 #if USE_HDFS
 #include <Disks/ObjectStorages/GlutenHDFSObjectStorage.h>
 #endif
@@ -51,6 +53,8 @@ public:
         throttler = std::make_shared<DB::Throttler>(max_speed);
     }
 
+    DB::DiskTransactionPtr createTransaction() override;
+
     void createDirectory(const String & path) override;
 
     void createDirectories(const String & path) override;
@@ -72,7 +76,17 @@ public:
     {
         DB::ObjectStoragePtr tmp = object_storage_creator(config, context);
         hdfs_object_storage = 
typeid_cast<std::shared_ptr<GlutenHDFSObjectStorage>>(tmp);
-        object_storage = hdfs_object_storage;
+        // only for java ut
+        bool is_cache = object_storage->supportsCache();
+        if (is_cache)
+        {
+            auto cache_os = 
reinterpret_cast<DB::CachedObjectStorage*>(object_storage.get());
+            object_storage = hdfs_object_storage;
+            auto cache = 
DB::FileCacheFactory::instance().getOrCreate(cache_os->getCacheName(), 
cache_os->getCacheSettings(), "storage_configuration.disks.hdfs_cache");
+            wrapWithCache(cache, cache_os->getCacheSettings(), 
cache_os->getCacheConfigName());
+        }
+        else
+            object_storage = hdfs_object_storage;
     }
 private:
     std::shared_ptr<GlutenHDFSObjectStorage> hdfs_object_storage;
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp 
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
index 3e93edaaa..961f482c7 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.cpp
@@ -148,9 +148,26 @@ CustomStorageMergeTree::CustomStorageMergeTree(
 
 std::atomic<int> CustomStorageMergeTree::part_num;
 
+
+
+void 
CustomStorageMergeTree::prefectchMetaDataFile(std::unordered_set<std::string> 
parts)
+{
+    auto disk = getDisks().front();
+    if (!disk->isRemote()) return;
+    std::vector<String> meta_paths;
+    std::ranges::for_each(parts, [&](const String & name) { 
meta_paths.emplace_back(fs::path(relative_data_path) / name / "meta.bin"); });
+    for (const auto & meta_path: meta_paths)
+    {
+        if (!disk->exists(meta_path)) continue;
+        auto in = disk->readFile(meta_path);
+        String ignore_data;
+        readStringUntilEOF(ignore_data, *in);
+    }
+}
+
 std::vector<MergeTreeDataPartPtr> 
CustomStorageMergeTree::loadDataPartsWithNames(std::unordered_set<std::string> 
parts)
 {
-    auto parts_lock = lockParts();
+    prefectchMetaDataFile(parts);
     std::vector<MergeTreeDataPartPtr> data_parts;
     const auto disk = getStoragePolicy()->getDisks().at(0);
     for (const auto& name : parts)
@@ -161,8 +178,6 @@ std::vector<MergeTreeDataPartPtr> 
CustomStorageMergeTree::loadDataPartsWithNames
         data_parts.emplace_back(res.part);
     }
 
-    // without it "test mergetree optimize partitioned by one low card column" 
will log ERROR
-    calculateColumnAndSecondaryIndexSizesImpl();
     return data_parts;
 }
 
@@ -211,6 +226,7 @@ MergeTreeData::LoadPartResult 
CustomStorageMergeTree::loadDataPart(
     res.part->loadVersionMetadata();
 
     res.part->setState(to_state);
+    auto parts_lock = lockParts();
 
     DataPartIteratorByInfo it;
     bool inserted;
@@ -239,6 +255,9 @@ MergeTreeData::LoadPartResult 
CustomStorageMergeTree::loadDataPart(
     if (res.part->hasLightweightDelete())
         has_lightweight_delete_parts.store(true);
 
+    // without it "test mergetree optimize partitioned by one low card column" 
will log ERROR
+    calculateColumnAndSecondaryIndexSizesImpl();
+
     LOG_TRACE(log, "Finished loading {} part {} on disk {}", 
magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName());
     return res;
 }
diff --git a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h 
b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
index cd507a3ac..9144aba42 100644
--- a/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
+++ b/cpp-ch/local-engine/Storages/CustomStorageMergeTree.h
@@ -65,6 +65,7 @@ public:
 private:
     SimpleIncrement increment;
 
+    void prefectchMetaDataFile(std::unordered_set<std::string> parts);
     void startBackgroundMovesIfNeeded() override;
     std::unique_ptr<MergeTreeSettings> getDefaultSettings() const override;
     LoadPartResult loadDataPart(
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
index 8e8e4c556..05b2623b4 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/MergeSparkMergeTreeTask.cpp
@@ -164,7 +164,7 @@ void MergeSparkMergeTreeTask::finish()
     // MergeTreeData::Transaction transaction(storage, txn.get());
     // storage.merger_mutator.renameMergedTemporaryPart(new_part, 
future_part->parts, txn, transaction);
     // transaction.commit();
-
+    new_part->getDataPartStoragePtr()->commitTransaction();
     ThreadFuzzer::maybeInjectSleep();
     ThreadFuzzer::maybeInjectMemoryLimitException();
 
diff --git a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
index c6a8e03a6..8d43af068 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/MetaDataHelper.cpp
@@ -113,8 +113,6 @@ void restoreMetaData(CustomStorageMergeTreePtr & storage, 
const MergeTreeTable &
                     auto item_path = part_path / item.first;
                     auto out = metadata_disk->writeFile(item_path);
                     out->write(item.second.data(), item.second.size());
-                    out->finalize();
-                    out->sync();
                 }
             };
             thread_pool.scheduleOrThrow(job);
diff --git a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp 
b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
index 406f2aaa2..0f0df0050 100644
--- a/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/Mergetree/SparkMergeTreeWriter.cpp
@@ -232,18 +232,42 @@ void 
SparkMergeTreeWriter::commitPartToRemoteStorageIfNeeded()
     auto read_settings = context->getReadSettings();
     auto write_settings = context->getWriteSettings();
     Stopwatch watch;
+
+    // Temporary support for S3
+    bool s3_disk = 
dest_storage->getStoragePolicy()->getAnyDisk()->getName().contains("s3");
     for (const auto & merge_tree_data_part : new_parts.unsafeGet())
     {
         String local_relative_path = storage->getRelativeDataPath() + "/" + 
merge_tree_data_part->name;
         String remote_relative_path = dest_storage->getRelativeDataPath() + 
"/" + merge_tree_data_part->name;
 
-        storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
+        if (s3_disk)
+        {
+            storage->getStoragePolicy()->getAnyDisk()->copyDirectoryContent(
             local_relative_path,
             dest_storage->getStoragePolicy()->getAnyDisk(),
             remote_relative_path,
             read_settings,
             write_settings,
             nullptr);
+        }
+        else
+        {
+            std::vector<String> files;
+            
storage->getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, 
files);
+            auto src_disk = storage->getStoragePolicy()->getAnyDisk();
+            auto dest_disk = dest_storage->getStoragePolicy()->getAnyDisk();
+            auto tx = dest_disk->createTransaction();
+            for (const auto & file : files)
+            {
+                auto read_buffer = src_disk->readFile(local_relative_path + 
"/" + file, read_settings);
+                auto write_buffer = tx->writeFile(remote_relative_path + "/" + 
file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
+                copyData(*read_buffer, *write_buffer);
+                write_buffer->finalize();
+            }
+            tx->commit();
+        }
+
+
         LOG_DEBUG(
             &Poco::Logger::get("SparkMergeTreeWriter"),
             "Upload part {} to disk {} success.",
@@ -306,7 +330,6 @@ DB::MergeTreeDataWriter::TemporaryPart 
SparkMergeTreeWriter::writeTempPartAndFin
 {
     MergeTreeDataWriter::TemporaryPart temp_part;
     writeTempPart(temp_part, block_with_partition, metadata_snapshot);
-    temp_part.finalize();
     return temp_part;
 }
 
@@ -399,6 +422,7 @@ void SparkMergeTreeWriter::writeTempPart(
     new_data_part->partition = std::move(partition);
     new_data_part->minmax_idx = std::move(minmax_idx);
 
+    data_part_storage->beginTransaction();
     SyncGuardPtr sync_guard;
     if (new_data_part->isStoredOnDisk())
     {
@@ -441,6 +465,8 @@ void SparkMergeTreeWriter::writeTempPart(
 
     temp_part.part = new_data_part;
     
temp_part.streams.emplace_back(MergeTreeDataWriter::TemporaryPart::Stream{.stream
 = std::move(out), .finalizer = std::move(finalizer)});
+    temp_part.finalize();
+    data_part_storage->commitTransaction();
 }
 
 std::vector<PartInfo> SparkMergeTreeWriter::getAllPartInfo()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to