dataroaring commented on code in PR #20492:
URL: https://github.com/apache/doris/pull/20492#discussion_r1219591910


##########
be/src/runtime/snapshot_loader.cpp:
##########
@@ -370,6 +373,272 @@ Status SnapshotLoader::download(const 
std::map<std::string, std::string>& src_to
     return status;
 }
 
+Status SnapshotLoader::remote_http_download(
+        const std::vector<TRemoteTabletSnapshot>& remote_tablet_snapshots,
+        std::vector<int64_t>* downloaded_tablet_ids) {
+    LOG(INFO) << fmt::format("begin to download snapshots via http. job: {}, 
task id: {}", _job_id,
+                             _task_id);
+    constexpr uint32_t kListRemoteFileTimeout = 15;
+    constexpr uint32_t kDownloadFileMaxRetry = 3;
+    constexpr uint32_t kGetLengthTimeout = 10;
+
+    // check if job has already been cancelled
+    int tmp_counter = 1;
+    RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0, 
TTaskType::type::DOWNLOAD));
+    Status status = Status::OK();
+
+    // Step before, validate all remote
+
+    // Step 1: Validate local tablet snapshot paths
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        auto& path = remote_tablet_snapshot.local_snapshot_path;
+        bool res = true;
+        RETURN_IF_ERROR(io::global_local_filesystem()->is_directory(path, 
&res));
+        if (!res) {
+            std::stringstream ss;
+            auto err_msg =
+                    fmt::format("snapshot path is not directory or does not 
exist: {}", path);
+            LOG(WARNING) << err_msg;
+            return Status::RuntimeError(err_msg);
+        }
+    }
+
+    // Step 2: get all local files
+    struct LocalFileStat {
+        uint64_t size;
+        // TODO(Drogon): add md5sum
+    };
+    std::unordered_map<std::string, std::unordered_map<std::string, 
LocalFileStat>> local_files_map;
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+        std::vector<std::string> local_files;
+        RETURN_IF_ERROR(_get_existing_files_from_local(local_path, 
&local_files));
+
+        auto& local_filestat = local_files_map[local_path];
+        for (auto& local_file : local_files) {
+            // add file size
+            std::string local_file_path = local_path + "/" + local_file;
+            std::error_code ec;
+            uint64_t local_file_size = 
std::filesystem::file_size(local_file_path, ec);
+            if (ec) {
+                LOG(WARNING) << "download file error" << ec.message();
+                return Status::IOError("can't retrive file_size of {}, due to 
{}", local_file_path,
+                                       ec.message());
+            }
+            local_filestat[local_file] = {local_file_size};
+        }
+    }
+
+    // Step 3: Validate remote tablet snapshot paths && remote files map
+    // TODO(Drogon): Add md5sum check
+    // key is remote snapshot paths, value is filelist
+    // get all these use http download action
+    // 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180//2774718/217609978/2774718.hdr
+    int report_counter = 0;
+    int total_num = remote_tablet_snapshots.size();
+    int finished_num = 0;
+    struct RemoteFileStat {
+        // TODO(Drogon): Add md5sum
+        std::string url;
+        uint64_t size;
+    };
+    std::unordered_map<std::string, std::unordered_map<std::string, 
RemoteFileStat>>
+            remote_files_map;
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+        auto& remote_files = remote_files_map[remote_path];
+        const auto& token = remote_tablet_snapshot.remote_token;
+        const auto& remote_be_addr = remote_tablet_snapshot.remote_be_addr;
+
+        // HEAD 
http://172.16.0.14:6781/api/_tablet/_download?token=e804dd27-86da-4072-af58-70724075d2a4&file=/home/ubuntu/doris_master/output/be/storage/snapshot/20230410102306.9.180/
+        std::string remote_url_prefix =
+                
fmt::format("http://{}:{}/api/_tablet/_download?token={}&file={}";,
+                            remote_be_addr.hostname, remote_be_addr.port, 
token, remote_path);
+
+        string file_list_str;
+        auto list_files_cb = [&remote_url_prefix, &file_list_str](HttpClient* 
client) {
+            RETURN_IF_ERROR(client->init(remote_url_prefix));
+            client->set_timeout_ms(kListRemoteFileTimeout * 1000);
+            return client->execute(&file_list_str);
+        };
+        RETURN_IF_ERROR(HttpClient::execute_with_retry(kDownloadFileMaxRetry, 
1, list_files_cb));
+        std::vector<string> filename_list =
+                strings::Split(file_list_str, "\n", strings::SkipWhitespace());
+
+        for (const auto& filename : filename_list) {
+            std::string remote_file_url = fmt::format(
+                    "http://{}:{}/api/_tablet/_download?token={}&file={}/{}";,
+                    remote_tablet_snapshot.remote_be_addr.hostname,
+                    remote_tablet_snapshot.remote_be_addr.port, 
remote_tablet_snapshot.remote_token,
+                    remote_tablet_snapshot.remote_snapshot_path, filename);
+
+            // get file length
+            uint64_t file_size = 0;
+            auto get_file_size_cb = [&remote_file_url, &file_size](HttpClient* 
client) {
+                RETURN_IF_ERROR(client->init(remote_file_url));
+                client->set_timeout_ms(kGetLengthTimeout * 1000);
+                RETURN_IF_ERROR(client->head());
+                RETURN_IF_ERROR(client->get_content_length(&file_size));
+                return Status::OK();
+            };
+            RETURN_IF_ERROR(
+                    HttpClient::execute_with_retry(kDownloadFileMaxRetry, 1, 
get_file_size_cb));
+
+            remote_files[filename] = RemoteFileStat {remote_file_url, 
file_size};
+        }
+    }
+
+    // Step 4: Compare local and remote files && get all need download files
+    for (auto& remote_tablet_snapshot : remote_tablet_snapshots) {
+        RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num, 
total_num,
+                                      TTaskType::type::DOWNLOAD));
+
+        const auto& remote_path = remote_tablet_snapshot.remote_snapshot_path;
+        const auto& local_path = remote_tablet_snapshot.local_snapshot_path;
+        auto& remote_files = remote_files_map[remote_path];
+        auto& local_files = local_files_map[local_path];
+        auto remote_tablet_id = remote_tablet_snapshot.remote_tablet_id;
+
+        // get all need download files
+        std::vector<std::string> need_download_files;
+        for (const auto& [remote_file, remote_filestat] : remote_files) {
+            LOG(INFO) << fmt::format("remote file: {}, size: {}", remote_file,
+                                     remote_filestat.size);
+            auto it = local_files.find(remote_file);
+            if (it == local_files.end()) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
+            if (_end_with(remote_file, ".hdr")) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
+
+            if (auto& local_filestat = it->second; local_filestat.size != 
remote_filestat.size) {
+                need_download_files.emplace_back(remote_file);
+                continue;
+            }
+            // TODO(Drogon): check by md5sum, if not match then download
+
+            LOG(INFO) << fmt::format("file {} already exists, skip download", 
remote_file);
+        }
+
+        auto local_tablet_id = remote_tablet_snapshot.local_tablet_id;
+        TabletSharedPtr tablet =
+                
_env->storage_engine()->tablet_manager()->get_tablet(local_tablet_id);
+        if (tablet == nullptr) {
+            std::stringstream ss;
+            ss << "failed to get local tablet: " << local_tablet_id;
+            LOG(WARNING) << ss.str();
+            return Status::InternalError(ss.str());
+        }
+        DataDir* data_dir = tablet->data_dir();
+
+        // download all need download files
+        uint64_t total_file_size = 0;
+        MonotonicStopWatch watch;
+        watch.start();
+        for (auto& filename : need_download_files) {

Review Comment:
   We should control parallel of download via a config or a queue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to