chaoyli commented on a change in pull request #1200: Merge refactor code into 
master
URL: https://github.com/apache/incubator-doris/pull/1200#discussion_r288398316
 
 

 ##########
 File path: be/src/olap/task/engine_clone_task.cpp
 ##########
 @@ -0,0 +1,903 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/task/engine_clone_task.h"
+
+#include <set>
+
+#include "olap/olap_snapshot_converter.h"
+#include "olap/snapshot_manager.h"
+#include "olap/rowset/alpha_rowset.h"
+#include "olap/rowset/alpha_rowset_writer.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_id_generator.h"
+#include "olap/rowset/rowset_writer.h"
+
+using std::set;
+using std::stringstream;
+
+namespace doris {
+
+const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?";
+const std::string HTTP_REQUEST_TOKEN_PARAM = "token=";
+const std::string HTTP_REQUEST_FILE_PARAM = "&file=";
+const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
+const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15;
+
+EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, 
+                    const TMasterInfo& master_info,  
+                    int64_t signature, 
+                    vector<string>* error_msgs, 
+                    vector<TTabletInfo>* tablet_infos,
+                    AgentStatus* res_status) :
+    _clone_req(clone_req),
+    _error_msgs(error_msgs), 
+    _tablet_infos(tablet_infos),
+    _res_status(res_status),
+    _signature(signature), 
+    _master_info(master_info) {}
+
+OLAPStatus EngineCloneTask::execute() {
+    AgentStatus status = DORIS_SUCCESS;
+    string src_file_path;
+    TBackend src_host;
+    // Check local tablet exist or not
+    int32_t snapshot_version = 1;
+    TabletSharedPtr tablet =
+            StorageEngine::instance()->tablet_manager()->get_tablet(
+            _clone_req.tablet_id, _clone_req.schema_hash);
+    bool is_new_tablet = tablet == nullptr;
+    // try to repair a tablet with missing version
+    if (tablet != nullptr) {
+        LOG(INFO) << "clone tablet exist yet, begin to incremental clone. "
+                    << "signature:" << _signature
+                    << ", tablet_id:" << _clone_req.tablet_id
+                    << ", schema_hash:" << _clone_req.schema_hash
+                    << ", committed_version:" << _clone_req.committed_version;
+
+        // try to incremental clone
+        vector<Version> missed_versions;
+        tablet->calc_missed_versions(_clone_req.committed_version, 
&missed_versions);
+        LOG(INFO) << "finish to calculate missed versions when clone. "
+                  << "tablet=" << tablet->full_name()
+                  << ", committed_version=" << _clone_req.committed_version
+                  << ", missed_versions_size=" << missed_versions.size();
+        // if missed version size is 0, then it is useless to clone from 
remote be, it means local data is 
+        // completed. Or remote be will just return header not the rowset 
files. clone will failed.
+        if (missed_versions.size() == 0) {
+            LOG(INFO) << "missed version size = 0, skip clone and reture 
success";
+            return OLAP_SUCCESS;
+        }
+        // get download path
+        string local_data_path = tablet->tablet_path() + CLONE_PREFIX;
+
+        bool allow_incremental_clone = false;
+        status = _clone_copy(*(tablet->data_dir()), _clone_req, _signature, 
local_data_path,
+                            &src_host, &src_file_path, _error_msgs,
+                            &missed_versions,
+                            &allow_incremental_clone, 
+                            &snapshot_version, tablet);
+        if (status == DORIS_SUCCESS && allow_incremental_clone) {
+            OLAPStatus olap_status = _finish_clone(tablet, local_data_path, 
_clone_req.committed_version, allow_incremental_clone, snapshot_version);
+            if (olap_status != OLAP_SUCCESS) {
+                LOG(WARNING) << "failed to finish incremental clone. [table=" 
<< tablet->full_name()
+                             << " res=" << olap_status << "]";
+                _error_msgs->push_back("incremental clone error.");
+                status = DORIS_ERROR;
+            }
+        } else {
+            // begin to full clone if incremental failed
+            LOG(INFO) << "begin to full clone. [table=" << tablet->full_name();
+            status = _clone_copy(*(tablet->data_dir()), _clone_req, 
_signature, local_data_path,
+                                &src_host, &src_file_path,  _error_msgs,
+                                NULL, NULL, &snapshot_version, tablet);
+            if (status == DORIS_SUCCESS) {
+                LOG(INFO) << "download successfully when full clone. [table=" 
<< tablet->full_name()
+                          << " src_host=" << src_host.host << " 
src_file_path=" << src_file_path
+                          << " local_data_path=" << local_data_path << "]";
+
+                OLAPStatus olap_status = _finish_clone(tablet, 
local_data_path, _clone_req.committed_version, false, snapshot_version);
+
+                if (olap_status != OLAP_SUCCESS) {
+                    LOG(WARNING) << "fail to finish full clone. [table=" << 
tablet->full_name()
+                                 << " res=" << olap_status << "]";
+                    _error_msgs->push_back("full clone error.");
+                    status = DORIS_ERROR;
+                }
+            }
+        }
+    } else {
+        LOG(INFO) << "clone tablet not exist, begin clone a new tablet from 
remote be. "
+                    << "signature:" << _signature
+                    << ", tablet_id:" << _clone_req.tablet_id
+                    << ", schema_hash:" << _clone_req.schema_hash
+                    << ", committed_version:" << _clone_req.committed_version;
+        // create a new tablet in this be
+        // Get local disk from olap
+        string local_shard_root_path;
+        DataDir* store = nullptr;
+        OLAPStatus olap_status = StorageEngine::instance()->obtain_shard_path(
+            _clone_req.storage_medium, &local_shard_root_path, &store);
+        if (olap_status != OLAP_SUCCESS) {
+            LOG(WARNING) << "clone get local root path failed. signature: " << 
_signature;
+            _error_msgs->push_back("clone get local root path failed.");
+            status = DORIS_ERROR;
+        }
+        stringstream tablet_dir_stream;
+        tablet_dir_stream << local_shard_root_path
+                            << "/" << _clone_req.tablet_id
+                            << "/" << _clone_req.schema_hash;
+
+        if (status == DORIS_SUCCESS) {
+            status = _clone_copy(*store,
+                                _clone_req,
+                                _signature,
+                                tablet_dir_stream.str(),
+                                &src_host,
+                                &src_file_path,
+                                _error_msgs,
+                                nullptr, nullptr, &snapshot_version, nullptr);
+        }
+
+        if (status == DORIS_SUCCESS) {
+            LOG(INFO) << "clone copy done. src_host: " << src_host.host
+                        << " src_file_path: " << src_file_path;
+            stringstream schema_hash_path_stream;
+            schema_hash_path_stream << local_shard_root_path
+                                    << "/" << _clone_req.tablet_id
+                                    << "/" << _clone_req.schema_hash;
+            string header_path = 
TabletMeta::construct_header_file_path(schema_hash_path_stream.str(), 
+                _clone_req.tablet_id);
+            OLAPStatus reset_id_status = 
TabletMeta::reset_tablet_uid(header_path);
+            if (reset_id_status != OLAP_SUCCESS) {
+                LOG(WARNING) << "errors while set tablet uid: '" << 
header_path;
+                _error_msgs->push_back("errors while set tablet uid.");
+                status = DORIS_ERROR;
+            } else {
+                OLAPStatus load_header_status =  
StorageEngine::instance()->tablet_manager()->load_tablet_from_dir(
+                    store, _clone_req.tablet_id, _clone_req.schema_hash, 
schema_hash_path_stream.str(), false);
+                if (load_header_status != OLAP_SUCCESS) {
+                    LOG(WARNING) << "load header failed. 
local_shard_root_path: '" << local_shard_root_path
+                                << "' schema_hash: " << _clone_req.schema_hash 
<< ". status: " << load_header_status
+                                << ". signature: " << _signature;
+                    _error_msgs->push_back("load header failed.");
+                    status = DORIS_ERROR;
+                }
+            }
+            // clone success, delete .hdr file because tablet meta is stored 
in rocksdb
+            string cloned_meta_file = tablet_dir_stream.str() + "/" + 
std::to_string(_clone_req.tablet_id) + ".hdr";
+            remove_dir(cloned_meta_file);
+        }
+        // Clean useless dir, if failed, ignore it.
+        if (status != DORIS_SUCCESS && status != DORIS_CREATE_TABLE_EXIST) {
+            stringstream local_data_path_stream;
+            local_data_path_stream << local_shard_root_path
+                                    << "/" << _clone_req.tablet_id;
+            string local_data_path = local_data_path_stream.str();
+            LOG(INFO) << "clone failed. want to delete local dir: " << 
local_data_path
+                        << ". signature: " << _signature;
+            try {
+                boost::filesystem::path local_path(local_data_path);
+                if (boost::filesystem::exists(local_path)) {
+                    boost::filesystem::remove_all(local_path);
+                }
+            } catch (boost::filesystem::filesystem_error e) {
+                // Ignore the error, OLAP will delete it
+                LOG(WARNING) << "clone delete useless dir failed. "
+                             << " error: " << e.what()
+                             << " local dir: " << local_data_path.c_str()
+                             << " signature: " << _signature;
+            }
+        }
+    }
+
+    // Get clone tablet info
+    if (status == DORIS_SUCCESS || status == DORIS_CREATE_TABLE_EXIST) {
+        TTabletInfo tablet_info;
+        tablet_info.__set_tablet_id(_clone_req.tablet_id);
+        tablet_info.__set_schema_hash(_clone_req.schema_hash);
+        OLAPStatus get_tablet_info_status = 
StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info);
+        if (get_tablet_info_status != OLAP_SUCCESS) {
+            LOG(WARNING) << "clone success, but get tablet info failed."
+                         << " tablet id: " <<  _clone_req.tablet_id
+                         << " schema hash: " << _clone_req.schema_hash
+                         << " signature: " << _signature;
+            _error_msgs->push_back("clone success, but get tablet info 
failed.");
+            status = DORIS_ERROR;
+        } else if (
+            (_clone_req.__isset.committed_version
+                    && _clone_req.__isset.committed_version_hash)
+                    && (tablet_info.version < _clone_req.committed_version ||
+                        (tablet_info.version == _clone_req.committed_version
+                        && tablet_info.version_hash != 
_clone_req.committed_version_hash))) {
+            LOG(WARNING) << "failed to clone tablet. tablet_id:" << 
_clone_req.tablet_id
+                      << ", schema_hash:" << _clone_req.schema_hash
+                      << ", signature:" << _signature
+                      << ", version:" << tablet_info.version
+                      << ", version_hash:" << tablet_info.version_hash
+                      << ", expected_version: " << _clone_req.committed_version
+                      << ", version_hash:" << 
_clone_req.committed_version_hash;
+            // if it is a new tablet and clone failed, then remove the tablet
+            // if it is incremental clone, then must not drop the tablet
+            if (is_new_tablet) {
+                // we need to check if this cloned table's version is what we 
expect.
+                // if not, maybe this is a stale remaining table which is 
waiting for drop.
+                // we drop it.
+                LOG(WARNING) << "begin to drop the stale tablet. tablet_id:" 
<< _clone_req.tablet_id
+                            << ", schema_hash:" << _clone_req.schema_hash
+                            << ", signature:" << _signature
+                            << ", version:" << tablet_info.version
+                            << ", version_hash:" << tablet_info.version_hash
+                            << ", expected_version: " << 
_clone_req.committed_version
+                            << ", version_hash:" << 
_clone_req.committed_version_hash;
+                OLAPStatus drop_status = 
StorageEngine::instance()->tablet_manager()->drop_tablet(_clone_req.tablet_id, 
+                    _clone_req.schema_hash);
+                if (drop_status != OLAP_SUCCESS && drop_status != 
OLAP_ERR_TABLE_NOT_FOUND) {
+                    // just log
+                    LOG(WARNING) << "drop stale cloned table failed! tabelt 
id: " << _clone_req.tablet_id;
+                }
+            }
+            status = DORIS_ERROR;
+        } else {
+            LOG(INFO) << "clone get tablet info success. tablet_id:" << 
_clone_req.tablet_id
+                        << ", schema_hash:" << _clone_req.schema_hash
+                        << ", signature:" << _signature
+                        << ", version:" << tablet_info.version
+                        << ", version_hash:" << tablet_info.version_hash;
+            _tablet_infos->push_back(tablet_info);
+        }
+    }
+    *_res_status = status;
+    return OLAP_SUCCESS;
+}
+
+AgentStatus EngineCloneTask::_clone_copy(
+        DataDir& data_dir,
+        const TCloneReq& clone_req,
+        int64_t signature,
+        const string& local_data_path,
+        TBackend* src_host,
+        string* src_file_path,
+        vector<string>* error_msgs,
+        const vector<Version>* missed_versions,
+        bool* allow_incremental_clone, 
+        int32_t* snapshot_version, 
+        TabletSharedPtr tablet) {
+    AgentStatus status = DORIS_SUCCESS;
+
+    std::string token = _master_info.token;
+    for (auto src_backend : clone_req.src_backends) {
+        stringstream http_host_stream;
+        http_host_stream << "http://"; << src_backend.host << ":" << 
src_backend.http_port;
+        string http_host = http_host_stream.str();
+        // Make snapshot in remote olap engine
+        *src_host = src_backend;
+        AgentServerClient agent_client(*src_host);
+        TAgentResult make_snapshot_result;
+        status = DORIS_SUCCESS;
+
+        LOG(INFO) << "pre make snapshot. backend_ip: " << src_host->host;
+        TSnapshotRequest snapshot_request;
+        snapshot_request.__set_tablet_id(clone_req.tablet_id);
+        snapshot_request.__set_schema_hash(clone_req.schema_hash);
+        // This is a new version be, should set preferred version to 2
+        
snapshot_request.__set_preferred_snapshot_version(PREFERRED_SNAPSHOT_VERSION);
+        if (missed_versions != NULL) {
+            // TODO: missing version composed of singleton delta.
+            // if not, this place should be rewrote.
+            vector<int64_t> snapshot_versions;
+            for (Version version : *missed_versions) {
+               snapshot_versions.push_back(version.first); 
+            } 
+            snapshot_request.__set_missing_version(snapshot_versions);
+        }
+        agent_client.make_snapshot(
+                snapshot_request,
+                &make_snapshot_result);
+        *snapshot_version = make_snapshot_result.snapshot_version;
+        if (make_snapshot_result.__isset.allow_incremental_clone) {
+            // During upgrading, some BE nodes still be installed an old 
previous old.
+            // which incremental clone is not ready in those nodes.
+            // should add a symbol to indicate it.
+            *allow_incremental_clone = 
make_snapshot_result.allow_incremental_clone;
+        }
+        if (make_snapshot_result.status.status_code == TStatusCode::OK) {
+            if (make_snapshot_result.__isset.snapshot_path) {
+                *src_file_path = make_snapshot_result.snapshot_path;
+                if (src_file_path->at(src_file_path->length() - 1) != '/') {
+                    src_file_path->append("/");
+                }
+                LOG(INFO) << "make snapshot success. backend_ip: " << 
src_host->host << ". src_file_path: "
+                          << *src_file_path << ". signature: " << signature;
+            } else {
+                LOG(WARNING) << "clone make snapshot success, "
+                                 "but get src file path failed. signature: " 
<< signature;
+                status = DORIS_ERROR;
+                continue;
+            }
+        } else {
+            LOG(WARNING) << "make snapshot failed. tablet_id: " << 
clone_req.tablet_id
+                         << ". schema_hash: " << clone_req.schema_hash << ". 
backend_ip: " << src_host->host
+                         << ". backend_port: " << src_host->be_port << ". 
signature: " << signature;
+            error_msgs->push_back("make snapshot failed. backend_ip: " + 
src_host->host);
+            status = DORIS_ERROR;
+            continue;
+        }
+
+        // Get remote and local full path
+        stringstream src_file_full_path_stream;
+        stringstream local_file_full_path_stream;
+
+        if (status == DORIS_SUCCESS) {
+            src_file_full_path_stream << *src_file_path
+                                      << "/" << clone_req.tablet_id
+                                      << "/" << clone_req.schema_hash << "/";
+            local_file_full_path_stream << local_data_path  << "/";
+        }
+        string src_file_full_path = src_file_full_path_stream.str();
+        string local_file_full_path = local_file_full_path_stream.str();
+
+#ifndef BE_TEST
+        // Check local path exist, if exist, remove it, then create the dir
+        // local_file_full_path = tabletid/clone, for a specific tablet, there 
should be only one folder
+        // if this folder exists, then should remove it
+        if (status == DORIS_SUCCESS) {
+            boost::filesystem::path local_file_full_dir(local_file_full_path);
+            if (boost::filesystem::exists(local_file_full_dir)) {
+                boost::filesystem::remove_all(local_file_full_dir);
+            }
+            boost::filesystem::create_directories(local_file_full_dir);
+        }
+#endif
+
+        // Get remove dir file list
+        FileDownloader::FileDownloaderParam downloader_param;
+        downloader_param.remote_file_path = http_host + HTTP_REQUEST_PREFIX
+            + HTTP_REQUEST_TOKEN_PARAM + token
+            + HTTP_REQUEST_FILE_PARAM + src_file_full_path;
+        downloader_param.curl_opt_timeout = LIST_REMOTE_FILE_TIMEOUT;
+
+#ifndef BE_TEST
+        FileDownloader* file_downloader_ptr = new 
FileDownloader(downloader_param);
+        if (file_downloader_ptr == NULL) {
+            LOG(WARNING) << "clone copy create file downloader failed. try 
next backend";
+            status = DORIS_ERROR;
+        }
+#endif
+
+        string file_list_str;
+        AgentStatus download_status = DORIS_SUCCESS;
+        uint32_t download_retry_time = 0;
+        while (status == DORIS_SUCCESS && download_retry_time < 
DOWNLOAD_FILE_MAX_RETRY) {
+#ifndef BE_TEST
+            download_status = 
file_downloader_ptr->list_file_dir(&file_list_str);
+#else
+            download_status = 
_file_downloader_ptr->list_file_dir(&file_list_str);
+#endif
+            if (download_status != DORIS_SUCCESS) {
+                LOG(WARNING) << "clone get remote file list failed. " 
+                             << " backend_ip: " << src_host->host.c_str()
+                             << " src_file_path: " << 
downloader_param.remote_file_path.c_str()
+                             << " signature: " << signature;
+                ++download_retry_time;
+#ifndef BE_TEST
+                sleep(download_retry_time);
+#endif
+            } else {
+                break;
+            }
+        }
+
+#ifndef BE_TEST
+        if (file_downloader_ptr != NULL) {
+            delete file_downloader_ptr;
+            file_downloader_ptr = NULL;
+        }
+#endif
+
+        vector<string> file_name_list;
+        if (download_status != DORIS_SUCCESS) {
+            LOG(WARNING) << "clone get remote file list failed over max time. 
" 
+                         << " backend_ip: " << src_host->host.c_str()
+                         << " src_file_path: " << 
downloader_param.remote_file_path.c_str()
+                         << " signature: " << signature;
+            status = DORIS_ERROR;
+        } else {
+            size_t start_position = 0;
+            size_t end_position = file_list_str.find("\n");
+
+            // Split file name from file_list_str
+            while (end_position != string::npos) {
+                string file_name = file_list_str.substr(
+                        start_position, end_position - start_position);
+                // If the header file is not exist, the table could't loaded 
by olap engine.
+                // Avoid of data is not complete, we copy the header file at 
last.
+                // The header file's name is end of .hdr.
+                if (file_name.size() > 4 && file_name.substr(file_name.size() 
- 4, 4) == ".hdr") {
+                    file_name_list.push_back(file_name);
+                } else {
+                    file_name_list.insert(file_name_list.begin(), file_name);
+                }
+
+                start_position = end_position + 1;
+                end_position = file_list_str.find("\n", start_position);
+            }
+            if (start_position != file_list_str.size()) {
+                string file_name = file_list_str.substr(
+                        start_position, file_list_str.size() - start_position);
+                if (file_name.size() > 4 && file_name.substr(file_name.size() 
- 4, 4) == ".hdr") {
+                    file_name_list.push_back(file_name);
+                } else {
+                    file_name_list.insert(file_name_list.begin(), file_name);
+                }
+            }
+        }
+
+        // Get copy from remote
+        for (auto file_name : file_name_list) {
+            download_retry_time = 0;
+            downloader_param.remote_file_path = http_host + HTTP_REQUEST_PREFIX
+                + HTTP_REQUEST_TOKEN_PARAM + token
+                + HTTP_REQUEST_FILE_PARAM + src_file_full_path + file_name;
+            downloader_param.local_file_path = local_file_full_path + 
file_name;
+
+            // Get file length
+            uint64_t file_size = 0;
+            uint64_t estimate_time_out = 0;
+
+            downloader_param.curl_opt_timeout = GET_LENGTH_TIMEOUT;
+#ifndef BE_TEST
+            file_downloader_ptr = new FileDownloader(downloader_param);
+            if (file_downloader_ptr == NULL) {
+                LOG(WARNING) << "clone copy create file downloader failed. try 
next backend";
+                status = DORIS_ERROR;
+                break;
+            }
+#endif
+            while (download_retry_time < DOWNLOAD_FILE_MAX_RETRY) {
+#ifndef BE_TEST
+                download_status = file_downloader_ptr->get_length(&file_size);
+#else
+                download_status = _file_downloader_ptr->get_length(&file_size);
+#endif
+                if (download_status != DORIS_SUCCESS) {
+                    LOG(WARNING) << "clone copy get file length failed. " 
+                                 << " backend_ip:  " << src_host->host.c_str()
+                                 << " src_file_path: " << 
downloader_param.remote_file_path.c_str() 
+                                 << " signature: " << signature;
+                    ++download_retry_time;
+#ifndef BE_TEST
+                    sleep(download_retry_time);
+#endif
+                } else {
+                    break;
+                }
+            }
+
+#ifndef BE_TEST
+            if (file_downloader_ptr != NULL) {
+                delete file_downloader_ptr;
+                file_downloader_ptr = NULL;
+            }
+#endif
+            if (download_status != DORIS_SUCCESS) {
+                LOG(WARNING) << "clone copy get file length failed over max 
time. "
+                             << " backend_ip: " << src_host->host.c_str()
+                             << " src_file_path: " << 
downloader_param.remote_file_path.c_str()
+                             << " signature: " << signature;
+                status = DORIS_ERROR;
+                break;
+            }
+
+            estimate_time_out = file_size / 
config::download_low_speed_limit_kbps / 1024;
+            if (estimate_time_out < config::download_low_speed_time) {
+                estimate_time_out = config::download_low_speed_time;
+            }
+
+            // Download the file
+            download_retry_time = 0;
+            downloader_param.curl_opt_timeout = estimate_time_out;
+#ifndef BE_TEST
+            file_downloader_ptr = new FileDownloader(downloader_param);
+            if (file_downloader_ptr == NULL) {
+                LOG(WARNING) << "clone copy create file downloader failed. try 
next backend";
+                status = DORIS_ERROR;
+                break;
+            }
+#endif
+            while (download_retry_time < DOWNLOAD_FILE_MAX_RETRY) {
+#ifndef BE_TEST
+                download_status = file_downloader_ptr->download_file();
+#else
+                download_status = _file_downloader_ptr->download_file();
+#endif
+                if (download_status != DORIS_SUCCESS) {
+                    LOG(WARNING) << "download file failed. " 
+                                 << " backend_ip: " << src_host->host.c_str() 
+                                 << " src_file_path: " << 
downloader_param.remote_file_path.c_str()
+                                 << " signature: " << signature;
+                } else {
+                    // Check file length
+                    boost::filesystem::path 
local_file_path(downloader_param.local_file_path);
+                    uint64_t local_file_size = 
boost::filesystem::file_size(local_file_path);
+                    if (local_file_size != file_size) {
+                        LOG(WARNING) << "download file length error. " 
+                                     << " backend_ip: " << 
src_host->host.c_str()
+                                     << " src_file_path: " << 
downloader_param.remote_file_path.c_str()
+                                     << " signature: " << signature
+                                     << " remote file size: " << file_size
+                                     << " local file size: " << 
local_file_size;
+                        download_status = DORIS_FILE_DOWNLOAD_FAILED;
+                    } else {
+                        chmod(downloader_param.local_file_path.c_str(), 
S_IRUSR | S_IWUSR);
+                        break;
+                    }
+                }
+                ++download_retry_time;
+#ifndef BE_TEST
+                sleep(download_retry_time);
+#endif
+            } // Try to download a file from remote backend
+
+#ifndef BE_TEST
+            if (file_downloader_ptr != NULL) {
+                delete file_downloader_ptr;
+                file_downloader_ptr = NULL;
+            }
+#endif
+
+            if (download_status != DORIS_SUCCESS) {
+                LOG(WARNING) << "download file failed over max retry. " 
+                             << "backend_ip: " << src_host->host.c_str() 
+                             << "src_file_path: " << 
downloader_param.remote_file_path.c_str()
+                             << " signature: " << signature;
+                status = DORIS_ERROR;
+                break;
+            }
+        } // Clone files from remote backend
+        if (make_snapshot_result.snapshot_version < 
PREFERRED_SNAPSHOT_VERSION) {
+            OLAPStatus convert_status = _convert_to_new_snapshot(data_dir, 
local_data_path, clone_req.tablet_id);
+            if (convert_status != OLAP_SUCCESS) {
+                status = DORIS_ERROR;
+            }
+        } 
+        // change all rowset ids because they maybe its id same with local 
rowset
+        OLAPStatus convert_status = 
SnapshotManager::instance()->convert_rowset_ids(data_dir, 
+            local_data_path, clone_req.tablet_id, clone_req.schema_hash, 
tablet);
+        if (convert_status != OLAP_SUCCESS) {
+            status = DORIS_ERROR;
+        }
+        
+
+        // Release snapshot, if failed, ignore it. OLAP engine will drop 
useless snapshot
+        TAgentResult release_snapshot_result;
+        agent_client.release_snapshot(
+                make_snapshot_result.snapshot_path,
+                &release_snapshot_result);
+        if (release_snapshot_result.status.status_code != TStatusCode::OK) {
+            LOG(WARNING) << "release snapshot failed. src_file_path: " << 
*src_file_path
+                         << ". signature: " << signature;
+        }
+
+        if (status == DORIS_SUCCESS) {
+            break;
+        }
+    } // clone copy from one backend
+    return status;
+}
+
+OLAPStatus EngineCloneTask::_convert_to_new_snapshot(DataDir& data_dir, const 
string& clone_dir, int64_t tablet_id) {
+    OLAPStatus res = OLAP_SUCCESS;   
+    // check clone dir existed
+    if (!check_dir_existed(clone_dir)) {
+        res = OLAP_ERR_DIR_NOT_EXIST;
+        LOG(WARNING) << "clone dir not existed when clone. clone_dir=" << 
clone_dir.c_str();
+        return res;
+    }
+
+    // load src header
+    string cloned_meta_file = clone_dir + "/" + std::to_string(tablet_id) + 
".hdr";
+    FileHeader<OLAPHeaderMessage> file_header;
+    FileHandler file_handler;
+    OLAPHeaderMessage olap_header_msg;
+    if (file_handler.open(cloned_meta_file.c_str(), O_RDONLY) != OLAP_SUCCESS) 
{
+        LOG(WARNING) << "fail to open ordinal file. file=" << cloned_meta_file;
+        return OLAP_ERR_IO_ERROR;
+    }
+
+    // In file_header.unserialize(), it validates file length, signature, 
checksum of protobuf.
+    if (file_header.unserialize(&file_handler) != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to unserialize tablet_meta. file='" << 
cloned_meta_file;
+        return OLAP_ERR_PARSE_PROTOBUF_ERROR;
+    }
+
+    set<string> clone_files;
+    if ((res = dir_walk(clone_dir, NULL, &clone_files)) != OLAP_SUCCESS) {
+        LOG(WARNING) << "failed to dir walk when clone. [clone_dir=" << 
clone_dir << "]";
+        return res;
+    }
+
+    try {
+       olap_header_msg.CopyFrom(file_header.message());
+    } catch (...) {
+        LOG(WARNING) << "fail to copy protocol buffer object. file='" << 
cloned_meta_file;
+        return OLAP_ERR_PARSE_PROTOBUF_ERROR;
+    }
+    OlapSnapshotConverter converter;
+    TabletMetaPB tablet_meta_pb;
+    vector<RowsetMetaPB> pending_rowsets;
+    res = converter.to_new_snapshot(olap_header_msg, clone_dir, clone_dir, 
data_dir, &tablet_meta_pb, 
+        &pending_rowsets, false);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to convert snapshot to new format. dir='" << 
clone_dir;
+        return res;
+    }
+    vector<string> files_to_delete;
+    for (auto file_name : clone_files) {
+        string full_file_path = clone_dir + "/" + file_name;
+        files_to_delete.push_back(full_file_path);
+    }
+    // remove all files
+    remove_files(files_to_delete);
+
+    res = TabletMeta::save(cloned_meta_file, tablet_meta_pb);
+    if (res != OLAP_SUCCESS) {
+        LOG(WARNING) << "fail to save converted tablet meta to dir='" << 
clone_dir;
+        return res;
+    }
+
+    return OLAP_SUCCESS;
+}
+
+// only incremental clone use this method
+OLAPStatus EngineCloneTask::_finish_clone(TabletSharedPtr tablet, const 
string& clone_dir,
+                                         int64_t committed_version, bool 
is_incremental_clone, 
+                                         int32_t snapshot_version) {
 
 Review comment:
   snapshot_version is not used

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to