dataroaring commented on code in PR #57770:
URL: https://github.com/apache/doris/pull/57770#discussion_r2512083128


##########
be/src/io/fs/merge_file_manager.cpp:
##########
@@ -0,0 +1,511 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/merge_file_manager.h"
+
+#include <algorithm>
+#include <chrono>
+#include <ctime>
+#include <random>
+#include <sstream>
+#include <unordered_set>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/config.h"
+#include "common/config.h"
+#include "gen_cpp/cloud.pb.h"
+#include "olap/storage_engine.h"
+#include "runtime/exec_env.h"
+#include "util/uid_util.h"
+
+namespace doris::io {
+
+MergeFileManager* MergeFileManager::instance() {
+    static MergeFileManager instance;
+    return &instance;
+}
+
+MergeFileManager::~MergeFileManager() {
+    stop_background_manager();
+}
+
+Status MergeFileManager::init() {
+    return Status::OK();
+}
+
+Status MergeFileManager::create_new_merge_file_state(
+        std::unique_ptr<MergeFileState>& merge_file_state) {
+    RETURN_IF_ERROR(ensure_file_system());
+    if (_file_system == nullptr) {
+        return Status::InternalError("File system is not available for merge 
file creation");
+    }
+
+    std::stringstream path_stream;
+    path_stream << "data/merge_file/" << std::time(nullptr) << "/" << 
generate_uuid_string();
+
+    merge_file_state = std::make_unique<MergeFileState>();
+    merge_file_state->merge_file_path = path_stream.str();
+    merge_file_state->create_time = std::time(nullptr);
+    merge_file_state->create_timestamp = std::chrono::steady_clock::now();
+    merge_file_state->state = MergeFileStateEnum::INIT;
+
+    // Create file writer for the merge file
+    FileWriterPtr new_writer;
+    FileWriterOptions opts;
+    RETURN_IF_ERROR(
+            _file_system->create_file(Path(merge_file_state->merge_file_path), 
&new_writer, &opts));
+    merge_file_state->writer = std::move(new_writer);
+
+    return Status::OK();
+}
+
+Status MergeFileManager::ensure_file_system() {
+    if (_file_system != nullptr) {
+        return Status::OK();
+    }
+
+    if (!config::is_cloud_mode()) {
+        return Status::InternalError("Cloud file system is not available in 
local mode");
+    }
+
+    auto* exec_env = ExecEnv::GetInstance();
+    if (exec_env == nullptr) {
+        return Status::InternalError("ExecEnv instance is not initialized");
+    }
+    // TODO(luwei) support multi vault
+    auto fs = exec_env->storage_engine().to_cloud().latest_fs();
+    if (fs == nullptr) {
+        return Status::InternalError("Cloud file system is not ready");
+    }
+
+    _file_system = fs;
+    return Status::OK();
+}
+
+Status MergeFileManager::append(const std::string& path, const Slice& data) {
+    // Check if file is too large to be merged
+    if (data.get_size() > config::small_file_threshold_bytes) {
+        return Status::OK(); // Skip merging for large files
+    }
+
+    std::lock_guard<std::mutex> lock(_current_merge_file_mutex);
+
+    if (!_current_merge_file || !_current_merge_file->writer) {
+        RETURN_IF_ERROR(create_new_merge_file_state(_current_merge_file));
+    }
+
+    // Check if we need to create a new merge file
+    if (_current_merge_file->total_size + data.get_size() >=
+        config::merge_file_size_threshold_bytes) {
+        RETURN_IF_ERROR(mark_current_merge_file_for_upload_locked());
+    }
+
+    // Write data to current merge file
+    RETURN_IF_ERROR(_current_merge_file->writer->append(data));
+
+    // Update index
+    MergeFileSegmentIndex index;
+    index.merge_file_path = _current_merge_file->merge_file_path;
+    index.offset = _current_merge_file->current_offset;
+    index.size = data.get_size();
+
+    _current_merge_file->index_map[path] = index;
+    _current_merge_file->current_offset += data.get_size();
+    _current_merge_file->total_size += data.get_size();
+
+    // Mark as active if this is the first write
+    if (_current_merge_file->state == MergeFileStateEnum::INIT) {
+        _current_merge_file->state = MergeFileStateEnum::ACTIVE;
+    }
+
+    // Update global index
+    {
+        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
+        _global_index_map[path] = index;
+    }
+
+    return Status::OK();
+}
+
+Status MergeFileManager::wait_for_merge_file_upload(MergeFileState* 
merge_file_ptr) {
+    std::unique_lock<std::mutex> upload_lock(merge_file_ptr->upload_mutex);
+    merge_file_ptr->upload_cv.wait(upload_lock, [merge_file_ptr] {
+        auto state = merge_file_ptr->state.load(std::memory_order_acquire);
+        return state == MergeFileStateEnum::UPLOADED || state == 
MergeFileStateEnum::FAILED;
+    });
+    if (merge_file_ptr->state == MergeFileStateEnum::FAILED) {
+        std::string err = merge_file_ptr->last_error;
+        if (err.empty()) {
+            err = "Merge file upload failed";
+        }
+        return Status::InternalError(err);
+    }
+    return Status::OK();
+}
+
+Status MergeFileManager::wait_write_done(const std::string& path) {
+    std::string merge_file_path;
+    {
+        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
+        auto it = _global_index_map.find(path);
+        if (it == _global_index_map.end()) {
+            return Status::InternalError("File not found in global index: " + 
path);
+        }
+        merge_file_path = it->second.merge_file_path;
+    }
+
+    // Find the merge file in uploaded files first - if already uploaded, no 
need to wait
+    std::shared_ptr<MergeFileState> managed_merge_file;
+    std::shared_ptr<MergeFileState> failed_merge_file;
+    {
+        std::lock_guard<std::mutex> lock(_merge_files_mutex);
+        auto uploaded_it = _uploaded_merge_files.find(merge_file_path);
+        if (uploaded_it != _uploaded_merge_files.end()) {
+            auto state = 
uploaded_it->second->state.load(std::memory_order_acquire);
+            if (state == MergeFileStateEnum::UPLOADED) {
+                return Status::OK(); // Already uploaded, no need to wait
+            }
+            if (state == MergeFileStateEnum::FAILED) {
+                failed_merge_file = uploaded_it->second;
+            } else {
+                managed_merge_file = uploaded_it->second;
+            }
+        }
+    }
+
+    if (failed_merge_file) {
+        std::lock_guard<std::mutex> 
upload_lock(failed_merge_file->upload_mutex);
+        std::string err = failed_merge_file->last_error;
+        if (err.empty()) {
+            err = "Merge file upload failed";
+        }
+        return Status::InternalError(err);
+    }
+
+    // Find the merge file in either current or uploading files
+    MergeFileState* merge_file_ptr = nullptr;
+    {
+        std::lock_guard<std::mutex> current_lock(_current_merge_file_mutex);
+        if (_current_merge_file && _current_merge_file->merge_file_path == 
merge_file_path) {
+            merge_file_ptr = _current_merge_file.get();
+        }
+    }
+
+    if (!merge_file_ptr) {
+        std::lock_guard<std::mutex> lock(_merge_files_mutex);
+        auto uploading_it = _uploading_merge_files.find(merge_file_path);
+        if (uploading_it != _uploading_merge_files.end()) {
+            managed_merge_file = uploading_it->second;
+            merge_file_ptr = managed_merge_file.get();
+        } else {
+            auto uploaded_it = _uploaded_merge_files.find(merge_file_path);
+            if (uploaded_it != _uploaded_merge_files.end()) {
+                managed_merge_file = uploaded_it->second;
+                merge_file_ptr = managed_merge_file.get();
+            }
+        }
+    }
+
+    if (!merge_file_ptr) {
+        // Merge file not found in any location, this is unexpected
+        return Status::InternalError("Merge file not found for path: " + path);
+    }
+
+    Status wait_status = wait_for_merge_file_upload(merge_file_ptr);
+    (void)managed_merge_file; // keep shared ownership alive during wait
+    return wait_status;
+}
+
+Status MergeFileManager::get_merge_file_index(const std::string& path,
+                                              MergeFileSegmentIndex* index) {
+    std::lock_guard<std::mutex> lock(_global_index_mutex);
+    auto it = _global_index_map.find(path);
+    if (it == _global_index_map.end()) {
+        return Status::NotFound("File not found in global merge index: {}", 
path);
+    }
+
+    *index = it->second;
+    return Status::OK();
+}
+
+void MergeFileManager::start_background_manager() {
+    if (_background_thread) {
+        return; // Already started
+    }
+
+    _stop_background_thread = false;
+    _background_thread = std::make_unique<std::thread>([this] { 
background_manager(); });
+}
+
+void MergeFileManager::stop_background_manager() {
+    _stop_background_thread = true;
+    if (_background_thread && _background_thread->joinable()) {
+        _background_thread->join();
+    }
+    _background_thread.reset();
+}
+
+Status MergeFileManager::mark_current_merge_file_for_upload_locked() {
+    if (!_current_merge_file || !_current_merge_file->writer) {
+        return Status::OK(); // Nothing to mark for upload
+    }
+
+    // Mark as ready for upload
+    _current_merge_file->state = MergeFileStateEnum::UPLOADING;
+
+    // Move to uploading files list
+    {
+        std::shared_ptr<MergeFileState> uploading_ptr =
+                
std::shared_ptr<MergeFileState>(std::move(_current_merge_file));
+        std::lock_guard<std::mutex> lock(_merge_files_mutex);
+        _uploading_merge_files[uploading_ptr->merge_file_path] = uploading_ptr;
+    }
+
+    // Create new merge file
+    RETURN_IF_ERROR(create_new_merge_file_state(_current_merge_file));
+
+    return Status::OK();
+}
+
+Status MergeFileManager::mark_current_merge_file_for_upload() {
+    std::lock_guard<std::mutex> lock(_current_merge_file_mutex);
+    return mark_current_merge_file_for_upload_locked();
+}
+
+void MergeFileManager::background_manager() {
+    auto last_cleanup_time = std::chrono::steady_clock::now();
+
+    while (!_stop_background_thread.load()) {
+        int64_t check_interval_ms = std::max<int64_t>(1, 
config::merge_file_time_threshold_ms / 2);
+        
std::this_thread::sleep_for(std::chrono::milliseconds(check_interval_ms));
+
+        // Check if current merge file should be closed due to time threshold
+        bool should_mark_current = false;
+        {
+            std::lock_guard<std::mutex> 
current_lock(_current_merge_file_mutex);
+            if (_current_merge_file && _current_merge_file->state == 
MergeFileStateEnum::ACTIVE) {
+                auto current_time = std::chrono::steady_clock::now();
+                auto elapsed_ms = 
std::chrono::duration_cast<std::chrono::milliseconds>(
+                                          current_time - 
_current_merge_file->create_timestamp)
+                                          .count();
+                if (elapsed_ms >= config::merge_file_time_threshold_ms) {
+                    should_mark_current = true;
+                }
+            }
+        }
+        if (should_mark_current) {
+            Status st = mark_current_merge_file_for_upload();
+            if (!st.ok()) {
+                LOG(WARNING) << "Failed to close current merge file: " << 
st.to_string();
+            }
+        }
+
+        // Process uploading files
+        process_uploading_files();
+
+        auto now = std::chrono::steady_clock::now();
+        int64_t cleanup_interval_sec =
+                std::max<int64_t>(1, 
config::merge_file_cleanup_interval_seconds);
+        auto cleanup_interval = std::chrono::seconds(cleanup_interval_sec);
+        if (now - last_cleanup_time >= cleanup_interval) {
+            cleanup_expired_data();
+            last_cleanup_time = now;
+        }
+    }
+}
+
+void MergeFileManager::process_uploading_files() {
+    std::vector<std::shared_ptr<MergeFileState>> files_to_process;
+
+    // Take snapshots of uploading files that are not already being processed
+    {
+        std::lock_guard<std::mutex> lock(_merge_files_mutex);
+        for (auto& [merge_file_path, merge_file] : _uploading_merge_files) {
+            if (merge_file->state != MergeFileStateEnum::UPLOADING) {
+                continue;
+            }
+            bool expected = false;
+            if (!merge_file->processing.compare_exchange_strong(expected, 
true)) {
+                continue; // already being processed by another iteration
+            }
+            files_to_process.emplace_back(merge_file);
+        }
+    }
+
+    for (auto& merge_file : files_to_process) {
+        const std::string& merge_file_path = merge_file->merge_file_path;
+        // Update meta service reference count BEFORE file upload
+        // Prepare merge file info for meta service
+        cloud::MergedFileInfoPB merge_file_info;
+        merge_file_info.set_ref_cnt(merge_file->index_map.size());
+        merge_file_info.set_total_file_num(merge_file->index_map.size());
+        merge_file_info.set_left_file_num(merge_file->index_map.size());
+        merge_file_info.set_total_file_bytes(merge_file->total_size);
+        merge_file_info.set_left_file_bytes(merge_file->total_size);
+        merge_file_info.set_created_at_sec(merge_file->create_time);
+        merge_file_info.set_corrected(false);
+        merge_file_info.set_state(doris::cloud::MergedFileInfoPB::NORMAL);
+
+        // Add small file information
+        for (const auto& [small_file_path, index] : merge_file->index_map) {
+            auto* small_file = merge_file_info.add_small_files();
+            small_file->set_path(small_file_path);
+            small_file->set_offset(index.offset);
+            small_file->set_size(index.size);
+            small_file->set_deleted(false);
+        }
+
+        Status meta_status = update_meta_service(merge_file->merge_file_path, 
merge_file_info);
+        if (!meta_status.ok()) {
+            LOG(WARNING) << "Failed to update meta service for merge file: "
+                         << merge_file->merge_file_path << ", error: " << 
meta_status.to_string();
+            {
+                std::lock_guard<std::mutex> 
upload_lock(merge_file->upload_mutex);
+                merge_file->state = MergeFileStateEnum::FAILED;
+                merge_file->last_error = meta_status.to_string();
+                merge_file->upload_time = std::time(nullptr);
+            }
+            merge_file->upload_cv.notify_all();
+            {
+                std::lock_guard<std::mutex> lock(_merge_files_mutex);
+                _uploading_merge_files.erase(merge_file_path);
+                _uploaded_merge_files[merge_file->merge_file_path] = 
merge_file;
+            }
+            continue;
+        }
+
+        // Now upload the file
+        Status upload_status =
+                upload_merge_file(merge_file->merge_file_path, 
merge_file->writer.get());
+
+        if (upload_status.ok()) {
+            // Mark as uploaded and move to uploaded list
+            {
+                std::lock_guard<std::mutex> 
upload_lock(merge_file->upload_mutex);
+                merge_file->state = MergeFileStateEnum::UPLOADED;
+                merge_file->upload_time = std::time(nullptr);
+            }
+            merge_file->upload_cv.notify_all();
+
+            // Move to uploaded files list for retention period
+            {
+                std::lock_guard<std::mutex> lock(_merge_files_mutex);
+                _uploading_merge_files.erase(merge_file_path);
+                _uploaded_merge_files[merge_file->merge_file_path] = 
merge_file;
+            }
+        } else {
+            LOG(WARNING) << "Failed to upload merge file: " << 
merge_file->merge_file_path
+                         << ", error: " << upload_status.to_string();
+            {
+                std::lock_guard<std::mutex> 
upload_lock(merge_file->upload_mutex);
+                merge_file->state = MergeFileStateEnum::FAILED;
+                merge_file->last_error = upload_status.to_string();
+                merge_file->upload_time = std::time(nullptr);
+            }
+            merge_file->upload_cv.notify_all();
+            {
+                std::lock_guard<std::mutex> lock(_merge_files_mutex);
+                _uploading_merge_files.erase(merge_file_path);
+                _uploaded_merge_files[merge_file->merge_file_path] = 
merge_file;
+            }
+        }
+    }
+}
+
+Status MergeFileManager::upload_merge_file(const std::string& merge_file_path, 
FileWriter* writer) {
+    if (writer == nullptr) {
+        return Status::InternalError("File writer is null for merge file: " + 
merge_file_path);
+    }
+
+    // Close the file writer to trigger upload
+    RETURN_IF_ERROR(writer->close());
+
+    VLOG_DEBUG << "Merge file upload completed: " << merge_file_path;
+    return Status::OK();
+}
+
+Status MergeFileManager::update_meta_service(const std::string& 
merge_file_path,
+                                             const cloud::MergedFileInfoPB& 
merge_file_info) {
+    VLOG_DEBUG << "Updating meta service for merge file: " << merge_file_path 
<< " with "
+               << merge_file_info.total_file_num() << " small files"
+               << ", total bytes: " << merge_file_info.total_file_bytes();
+
+    // Get CloudMetaMgr through StorageEngine
+    if (!config::is_cloud_mode()) {
+        return Status::InternalError("Storage engine is not cloud mode");
+    }
+
+    auto& storage_engine = ExecEnv::GetInstance()->storage_engine();
+    auto& cloud_meta_mgr = storage_engine.to_cloud().meta_mgr();
+    return cloud_meta_mgr.update_merge_file_info(merge_file_path, 
merge_file_info);
+}
+
+void MergeFileManager::cleanup_expired_data() {
+    auto current_time = std::time(nullptr);
+
+    // Clean up expired uploaded files
+    {
+        std::lock_guard<std::mutex> uploaded_lock(_merge_files_mutex);
+        auto it = _uploaded_merge_files.begin();
+        while (it != _uploaded_merge_files.end()) {
+            if (it->second->state == MergeFileStateEnum::UPLOADED &&
+                current_time - it->second->upload_time > 
config::uploaded_file_retention_seconds) {
+                it = _uploaded_merge_files.erase(it);
+            } else if (it->second->state == MergeFileStateEnum::FAILED &&
+                       current_time - it->second->upload_time >
+                               config::uploaded_file_retention_seconds) {
+                it = _uploaded_merge_files.erase(it);
+            } else {
+                ++it;
+            }
+        }
+    }
+
+    // Clean up expired global index entries
+    {
+        std::unordered_set<std::string> active_merge_files;
+        {
+            std::lock_guard<std::mutex> 
current_lock(_current_merge_file_mutex);
+            if (_current_merge_file) {
+                
active_merge_files.insert(_current_merge_file->merge_file_path);
+            }
+        }
+        {
+            std::lock_guard<std::mutex> merge_lock(_merge_files_mutex);
+            for (const auto& [path, state] : _uploading_merge_files) {
+                active_merge_files.insert(path);
+            }
+            for (const auto& [path, state] : _uploaded_merge_files) {
+                active_merge_files.insert(path);
+            }
+        }
+
+        std::lock_guard<std::mutex> global_lock(_global_index_mutex);
+        auto it = _global_index_map.begin();
+        while (it != _global_index_map.end()) {
+            const auto& index = it->second;
+            if (active_merge_files.find(index.merge_file_path) == 
active_merge_files.end()) {
+                it = _global_index_map.erase(it);

Review Comment:
   We'd better clear immediately for normal path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to