This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e98bbb5  Refactor clone task (#2285)
e98bbb5 is described below

commit e98bbb5bc599a5056280ea3134a9d5a9f1238b6a
Author: ZHAO Chun <[email protected]>
AuthorDate: Sun Nov 24 22:36:10 2019 +0800

    Refactor clone task (#2285)
    
    In the previous implementation, clone task will continue download files
    even if some error happened. This may cause unexpected problem. This
    Change List refactor it to that when error happends, clone task will
    fail total and try to clone from another remote source.
    
    Besides above change, I call FileUtils::remove_all and create_dir
    instead of boost one, which may cause exception. What's more
    AgentMasterClient is replaced with ThriftRpcHelper, by this change
    conncection can be reused.
---
 be/src/agent/task_worker_pool.h                    |   3 -
 be/src/agent/utils.cpp                             |  54 ---
 be/src/agent/utils.h                               |  40 --
 be/src/olap/snapshot_manager.cpp                   |   2 +-
 be/src/olap/snapshot_manager.h                     |   2 +-
 be/src/olap/task/engine_clone_task.cpp             | 514 +++++++++++----------
 be/src/olap/task/engine_clone_task.h               |  31 +-
 be/src/olap/task/engine_storage_migration_task.cpp |   2 +-
 be/src/runtime/snapshot_loader.cpp                 |   2 +-
 9 files changed, 291 insertions(+), 359 deletions(-)

diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index ac7aacb..0f3a55f 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -139,9 +139,6 @@ private:
     AgentUtils* _agent_utils;
     MasterServerClient* _master_client;
     ExecEnv* _env;
-#ifdef BE_TEST
-    AgentServerClient* _agent_client;
-#endif
 
     std::deque<TAgentTaskRequest> _tasks;
     Mutex _worker_thread_lock;
diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp
index 6c70fef..b7515e8 100644
--- a/be/src/agent/utils.cpp
+++ b/be/src/agent/utils.cpp
@@ -59,60 +59,6 @@ using apache::thrift::transport::TTransportException;
 
 namespace doris {
 
-
-AgentServerClient::AgentServerClient(const TBackend backend) :
-        _socket(new TSocket(backend.host, backend.be_port)),
-        _transport(new TBufferedTransport(_socket)),
-        _protocol(new TBinaryProtocol(_transport)),
-        _agent_service_client(_protocol) {
-}
-
-AgentServerClient::~AgentServerClient() {
-    if (_transport != NULL) {
-        _transport->close();
-    }
-}
-
-AgentStatus AgentServerClient::make_snapshot(
-        const TSnapshotRequest& snapshot_request,
-        TAgentResult* result) {
-    AgentStatus status = DORIS_SUCCESS;
-
-    TAgentResult thrift_result;
-    try {
-        _transport->open();
-        _agent_service_client.make_snapshot(thrift_result, snapshot_request);
-        *result = thrift_result;
-        _transport->close();
-    } catch (TException& e) {
-        OLAP_LOG_WARNING("agent clinet make snapshot, "
-                         "get exception, error: %s", e.what());
-        _transport->close();
-        status = DORIS_ERROR;
-    }
-
-    return status;
-}
-
-AgentStatus AgentServerClient::release_snapshot(
-        const string& snapshot_path,
-        TAgentResult* result) {
-    AgentStatus status = DORIS_SUCCESS;
-
-    try {
-        _transport->open();
-        _agent_service_client.release_snapshot(*result, snapshot_path);
-        _transport->close();
-    } catch (TException& e) {
-        OLAP_LOG_WARNING("agent clinet make snapshot, "
-                         "get exception, error: %s", e.what());
-        _transport->close();
-        status = DORIS_ERROR;
-    }
-    
-    return status;
-}
-
 MasterServerClient::MasterServerClient(
         const TMasterInfo& master_info,
         FrontendServiceClientCache* client_cache) :
diff --git a/be/src/agent/utils.h b/be/src/agent/utils.h
index 234be89..0445269 100644
--- a/be/src/agent/utils.h
+++ b/be/src/agent/utils.h
@@ -34,46 +34,6 @@
 
 namespace doris {
 
-// client cache
-// All service client should be defined in client_cache.h
-//class MasterServiceClient;
-//typedef ClientCache<MasterServiceClient> MasterServiceClientCache;
-//typedef ClientConnection<MasterServiceClient> MasterServiceConnection;
-
-class AgentServerClient {
-public:
-    explicit AgentServerClient(const TBackend backend);
-    virtual ~AgentServerClient();
-    
-    // Make a snapshot of tablet
-    //
-    // Input parameters:
-    // * tablet_id: The id of tablet to make snapshot
-    // * schema_hash: The schema hash of tablet to make snapshot
-    //
-    // Output parameters:
-    // * result: The result of make snapshot
-    virtual AgentStatus make_snapshot(
-            const TSnapshotRequest& snapshot_request,
-            TAgentResult* result);
-
-    // Release the snapshot
-    //
-    // Input parameters:
-    // * snapshot_path: The path of snapshot
-    //
-    // Output parameters:
-    // * result: The result of release snapshot
-    virtual AgentStatus release_snapshot(const std::string& snapshot_path, 
TAgentResult* result);
-
-private:
-    boost::shared_ptr<apache::thrift::transport::TTransport> _socket;
-    boost::shared_ptr<apache::thrift::transport::TTransport> _transport;
-    boost::shared_ptr<apache::thrift::protocol::TProtocol> _protocol;
-    BackendServiceClient _agent_service_client;
-    DISALLOW_COPY_AND_ASSIGN(AgentServerClient);
-};  // class AgentServerClient
-
 class MasterServerClient {
 public:
     MasterServerClient(const TMasterInfo& master_info, 
FrontendServiceClientCache* client_cache);
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 062be75..0358dbd 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -123,7 +123,7 @@ OLAPStatus SnapshotManager::release_snapshot(const string& 
snapshot_path) {
 
 // TODO support beta rowset
 OLAPStatus SnapshotManager::convert_rowset_ids(const string& clone_dir, 
int64_t tablet_id,
-    const int32_t& schema_hash, TabletSharedPtr tablet) {
+    const int32_t& schema_hash) {
     OLAPStatus res = OLAP_SUCCESS;   
     // check clone dir existed
     if (!FileUtils::check_exist(clone_dir)) {
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index 005e8be..1be86bf 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -65,7 +65,7 @@ public:
     static SnapshotManager* instance();
 
     OLAPStatus convert_rowset_ids(const string& clone_dir, int64_t tablet_id,
-            const int32_t& schema_hash, TabletSharedPtr tablet);
+            const int32_t& schema_hash);
 
 private:
     SnapshotManager()
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 773d705..e029b47 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -19,16 +19,24 @@
 
 #include <set>
 
+#include "gutil/strings/stringpiece.h"
+#include "gutil/strings/split.h"
+#include "gutil/strings/substitute.h"
+#include "http/http_client.h"
 #include "http/http_client.h"
 #include "olap/olap_snapshot_converter.h"
 #include "olap/snapshot_manager.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
+#include "util/thrift_rpc_helper.h"
 
 #include "env/env.h"
 
 using std::set;
 using std::stringstream;
+using strings::Substitute;
+using strings::Split;
+using strings::SkipWhitespace;
 
 namespace doris {
 
@@ -95,11 +103,10 @@ OLAPStatus EngineCloneTask::execute() {
                 _set_tablet_info(DORIS_SUCCESS, is_new_tablet);
                 return OLAP_SUCCESS;
             }
-            status = _clone_copy(*(tablet->data_dir()), _clone_req, 
_signature, local_data_path,
+            status = _clone_copy(*(tablet->data_dir()), local_data_path,
                                 &src_host, &src_file_path, _error_msgs,
                                 &missed_versions,
-                                &allow_incremental_clone, 
-                                tablet);
+                                &allow_incremental_clone);
         } else {
             LOG(INFO) << "current tablet has invalid rowset that's version == 
commit_version but version hash not equal"
                       << " clone req commit_version=" <<  
_clone_req.committed_version
@@ -108,7 +115,7 @@ OLAPStatus EngineCloneTask::execute() {
                       << " tablet info = " << tablet->full_name();
         }
         if (status == DORIS_SUCCESS && allow_incremental_clone) {
-            OLAPStatus olap_status = _finish_clone(tablet, local_data_path, 
_clone_req.committed_version, allow_incremental_clone);
+            OLAPStatus olap_status = _finish_clone(tablet.get(), 
local_data_path, _clone_req.committed_version, allow_incremental_clone);
             if (olap_status != OLAP_SUCCESS) {
                 LOG(WARNING) << "failed to finish incremental clone. [table=" 
<< tablet->full_name()
                              << " res=" << olap_status << "]";
@@ -116,17 +123,18 @@ OLAPStatus EngineCloneTask::execute() {
                 status = DORIS_ERROR;
             }
         } else {
+            bool allow_incremental_clone = false;
             // begin to full clone if incremental failed
             LOG(INFO) << "begin to full clone. [table=" << tablet->full_name();
-            status = _clone_copy(*(tablet->data_dir()), _clone_req, 
_signature, local_data_path,
+            status = _clone_copy(*(tablet->data_dir()), local_data_path,
                                 &src_host, &src_file_path,  _error_msgs,
-                                NULL, NULL, tablet);
+                                NULL, &allow_incremental_clone);
             if (status == DORIS_SUCCESS) {
                 LOG(INFO) << "download successfully when full clone. [table=" 
<< tablet->full_name()
                           << " src_host=" << src_host.host << " 
src_file_path=" << src_file_path
                           << " local_data_path=" << local_data_path << "]";
 
-                OLAPStatus olap_status = _finish_clone(tablet, 
local_data_path, _clone_req.committed_version, false);
+                OLAPStatus olap_status = _finish_clone(tablet.get(), 
local_data_path, _clone_req.committed_version, false);
 
                 if (olap_status != OLAP_SUCCESS) {
                     LOG(WARNING) << "fail to finish full clone. [table=" << 
tablet->full_name()
@@ -159,14 +167,13 @@ OLAPStatus EngineCloneTask::execute() {
                             << "/" << _clone_req.schema_hash;
 
         if (status == DORIS_SUCCESS) {
+            bool allow_incremental_clone = false;
             status = _clone_copy(*store,
-                                _clone_req,
-                                _signature,
                                 tablet_dir_stream.str(),
                                 &src_host,
                                 &src_file_path,
                                 _error_msgs,
-                                nullptr, nullptr, nullptr);
+                                nullptr, &allow_incremental_clone);
         }
 
         if (status == DORIS_SUCCESS) {
@@ -286,269 +293,105 @@ void EngineCloneTask::_set_tablet_info(AgentStatus 
status, bool is_new_tablet) {
 
 AgentStatus EngineCloneTask::_clone_copy(
         DataDir& data_dir,
-        const TCloneReq& clone_req,
-        int64_t signature,
         const string& local_data_path,
         TBackend* src_host,
-        string* src_file_path,
+        string* snapshot_path,
         vector<string>* error_msgs,
         const vector<Version>* missed_versions,
-        bool* allow_incremental_clone, 
-        TabletSharedPtr tablet) {
+        bool* allow_incremental_clone) {
     AgentStatus status = DORIS_SUCCESS;
-    std::string token = _master_info.token;
-    for (auto src_backend : clone_req.src_backends) {
-        stringstream http_host_stream;
-        http_host_stream << "http://"; << src_backend.host << ":" << 
src_backend.http_port;
-        string http_host = http_host_stream.str();
+
+    std::string local_path = local_data_path + "/";
+    const auto& token = _master_info.token;
+
+    int timeout_s = 0;
+    if (_clone_req.__isset.timeout_s) {
+        timeout_s = _clone_req.timeout_s;
+    }
+
+    for (auto& src : _clone_req.src_backends) {
         // Make snapshot in remote olap engine
-        *src_host = src_backend;
-        AgentServerClient agent_client(*src_host);
-        TAgentResult make_snapshot_result;
-        status = DORIS_SUCCESS;
-
-        LOG(INFO) << "pre make snapshot. backend_ip: " << src_host->host;
-        TSnapshotRequest snapshot_request;
-        snapshot_request.__set_tablet_id(clone_req.tablet_id);
-        snapshot_request.__set_schema_hash(clone_req.schema_hash);
-        // This is a new version be, should set preferred version to 2
-        
snapshot_request.__set_preferred_snapshot_version(PREFERRED_SNAPSHOT_VERSION);
-        if (missed_versions != NULL) {
-            // TODO: missing version composed of singleton delta.
-            // if not, this place should be rewrote.
-            vector<int64_t> snapshot_versions;
-            for (Version version : *missed_versions) {
-               snapshot_versions.push_back(version.first); 
-            } 
-            snapshot_request.__set_missing_version(snapshot_versions);
-        }
-        if (clone_req.__isset.timeout_s) {
-            snapshot_request.__set_timeout(clone_req.timeout_s);
-        }
-
-        agent_client.make_snapshot(
-                snapshot_request,
-                &make_snapshot_result);
-        if (make_snapshot_result.__isset.allow_incremental_clone) {
-            // During upgrading, some BE nodes still be installed an old 
previous old.
-            // which incremental clone is not ready in those nodes.
-            // should add a symbol to indicate it.
-            *allow_incremental_clone = 
make_snapshot_result.allow_incremental_clone;
-        }
-        if (make_snapshot_result.status.status_code == TStatusCode::OK) {
-            if (make_snapshot_result.__isset.snapshot_path) {
-                *src_file_path = make_snapshot_result.snapshot_path;
-                if (src_file_path->at(src_file_path->length() - 1) != '/') {
-                    src_file_path->append("/");
-                }
-                LOG(INFO) << "make snapshot success. backend_ip: " << 
src_host->host << ". src_file_path: "
-                          << *src_file_path << ". signature: " << signature;
-            } else {
-                LOG(WARNING) << "clone make snapshot success, "
-                                 "but get src file path failed. signature: " 
<< signature;
-                status = DORIS_ERROR;
-                continue;
-            }
+        *src_host = src;
+        int32_t snapshot_version = 0;
+        // make snapsthot
+        auto st = _make_snapshot(src.host, src.be_port, 
+                                 _clone_req.tablet_id, _clone_req.schema_hash,
+                                 timeout_s,
+                                 missed_versions,
+                                 snapshot_path,
+                                 allow_incremental_clone,
+                                 &snapshot_version);
+        if (st.ok()) {
+            LOG(INFO) << "success to make snapshot. ip=" << src.host << ", 
port=" << src.be_port
+                << ", tablet=" << _clone_req.tablet_id << ", schema_hash=" << 
_clone_req.schema_hash
+                << ", snapshot_path=" << *snapshot_path
+                << ", signature=" << _signature;
         } else {
-            LOG(WARNING) << "make snapshot failed. tablet_id: " << 
clone_req.tablet_id
-                         << ". schema_hash: " << clone_req.schema_hash << ". 
backend_ip: " << src_host->host
-                         << ". backend_port: " << src_host->be_port << ". 
signature: " << signature;
+            LOG(WARNING) << "fail to make snapshot, ip=" << src.host << ", 
port=" << src.be_port
+                << ", tablet=" << _clone_req.tablet_id << ", schema_hash=" << 
_clone_req.schema_hash
+                << ", signature=" << _signature << ", error=" << 
st.to_string();
             error_msgs->push_back("make snapshot failed. backend_ip: " + 
src_host->host);
+
             status = DORIS_ERROR;
             continue;
         }
 
-        // Get remote and local full path
-        stringstream src_file_full_path_stream;
-        stringstream local_file_full_path_stream;
-
-        if (status == DORIS_SUCCESS) {
-            src_file_full_path_stream << *src_file_path
-                                      << "/" << clone_req.tablet_id
-                                      << "/" << clone_req.schema_hash << "/";
-            local_file_full_path_stream << local_data_path  << "/";
-        }
-        string src_file_full_path = src_file_full_path_stream.str();
-        string local_file_full_path = local_file_full_path_stream.str();
-        // Check local path exist, if exist, remove it, then create the dir
-        // local_file_full_path = tabletid/clone, for a specific tablet, there 
should be only one folder
-        // if this folder exists, then should remove it
-        // for example, BE clone from BE 1 to download file 1 with version 
(2,2), but clone from BE 1 failed
-        // then it will try to clone from BE 2, but it will find the file 1 
already exist, but file 1 with same
-        // name may have different versions.
-        if (status == DORIS_SUCCESS) {
-            boost::filesystem::path local_file_full_dir(local_file_full_path);
-            if (boost::filesystem::exists(local_file_full_dir)) {
-                boost::filesystem::remove_all(local_file_full_dir);
-            }
-            boost::filesystem::create_directories(local_file_full_dir);
+        std::string remote_url_prefix;
+        {
+            // TODO(zc): if snapshot path has been returned from source, it is 
some strange to 
+            // concat talbet_id and schema hash here.
+            std::stringstream ss;
+            ss << "http://"; << src.host << ":" << src.http_port
+                << HTTP_REQUEST_PREFIX
+                << HTTP_REQUEST_TOKEN_PARAM << token
+                << HTTP_REQUEST_FILE_PARAM
+                << *snapshot_path
+                << "/" << _clone_req.tablet_id
+                << "/" << _clone_req.schema_hash << "/";
+
+            remote_url_prefix = ss.str();
         }
 
-        // Get remove dir file list
-        HttpClient client;
-        std::string remote_file_path = http_host + HTTP_REQUEST_PREFIX
-            + HTTP_REQUEST_TOKEN_PARAM + token
-            + HTTP_REQUEST_FILE_PARAM + src_file_full_path;
-
-        string file_list_str;
-        auto list_files_cb = [&remote_file_path, &file_list_str] (HttpClient* 
client) {
-            RETURN_IF_ERROR(client->init(remote_file_path));
-            client->set_timeout_ms(LIST_REMOTE_FILE_TIMEOUT * 1000);
-            RETURN_IF_ERROR(client->execute(&file_list_str));
-            return Status::OK();
-        };
-
-        Status download_status = HttpClient::execute_with_retry(
-            DOWNLOAD_FILE_MAX_RETRY, 1, list_files_cb);
-
-        vector<string> file_name_list;
-        if (!download_status.ok()) {
-            LOG(WARNING) << "clone get remote file list failed over max time. 
" 
-                         << " backend_ip: " << src_host->host
-                         << " src_file_path: " << remote_file_path
-                         << " signature: " << signature;
+        st = _download_files(&data_dir, remote_url_prefix, local_path);
+        if (!st.ok()) {
+            LOG(WARNING) << "fail to download and convert tablet, remote="
+                << remote_url_prefix << ", error=" << st.to_string();
             status = DORIS_ERROR;
-        } else {
-            size_t start_position = 0;
-            size_t end_position = file_list_str.find("\n");
-
-            // Split file name from file_list_str
-            while (end_position != string::npos) {
-                string file_name = file_list_str.substr(
-                        start_position, end_position - start_position);
-                // If the header file is not exist, the table could't loaded 
by olap engine.
-                // Avoid of data is not complete, we copy the header file at 
last.
-                // The header file's name is end of .hdr.
-                if (file_name.size() > 4 && file_name.substr(file_name.size() 
- 4, 4) == ".hdr") {
-                    file_name_list.push_back(file_name);
-                } else {
-                    file_name_list.insert(file_name_list.begin(), file_name);
-                }
-
-                start_position = end_position + 1;
-                end_position = file_list_str.find("\n", start_position);
-            }
-            if (start_position != file_list_str.size()) {
-                string file_name = file_list_str.substr(
-                        start_position, file_list_str.size() - start_position);
-                if (file_name.size() > 4 && file_name.substr(file_name.size() 
- 4, 4) == ".hdr") {
-                    file_name_list.push_back(file_name);
-                } else {
-                    file_name_list.insert(file_name_list.begin(), file_name);
-                }
-            }
+            // when there is an error, keep this program executing to release 
snapshot
         }
 
-        // Get copy from remote
-        uint64_t total_file_size = 0;
-        MonotonicStopWatch watch;
-        watch.start();
-        for (auto& file_name : file_name_list) {
-            remote_file_path = http_host + HTTP_REQUEST_PREFIX
-                + HTTP_REQUEST_TOKEN_PARAM + token
-                + HTTP_REQUEST_FILE_PARAM + src_file_full_path + file_name;
-
-            // get file length
-            uint64_t file_size = 0;
-            auto get_file_size_cb = [&remote_file_path, &file_size] 
(HttpClient* client) {
-                RETURN_IF_ERROR(client->init(remote_file_path));
-                client->set_timeout_ms(GET_LENGTH_TIMEOUT * 1000);
-                RETURN_IF_ERROR(client->head());
-                file_size = client->get_content_length();
-                return Status::OK();
-            };
-            download_status = HttpClient::execute_with_retry(
-                DOWNLOAD_FILE_MAX_RETRY, 1, get_file_size_cb);
-            if (!download_status.ok()) {
-                LOG(WARNING) << "clone copy get file length failed over max 
time. remote_path="
-                    << remote_file_path
-                    << ", signature=" << signature;
+        if (status == DORIS_SUCCESS && snapshot_version == 1) {
+            auto olap_st = _convert_to_new_snapshot(local_path, 
_clone_req.tablet_id);
+            if (olap_st != OLAP_SUCCESS) {
+                LOG(WARNING) << "fail to convert to new snapshot, path=" << 
local_path
+                    << ", tablet_id=" << _clone_req.tablet_id
+                    << ", error=" << olap_st;
                 status = DORIS_ERROR;
-                break;
-            }
-
-            // check disk capacity
-            if (data_dir.reach_capacity_limit(file_size)) {
-                status = DORIS_DISK_REACH_CAPACITY_LIMIT;
-                break;
             }
-
-            total_file_size += file_size;
-            uint64_t estimate_timeout = file_size / 
config::download_low_speed_limit_kbps / 1024;
-            if (estimate_timeout < config::download_low_speed_time) {
-                estimate_timeout = config::download_low_speed_time;
-            }
-
-            std::string local_file_path = local_file_full_path + file_name;
-
-            auto download_cb = [&remote_file_path,
-                                estimate_timeout,
-                                &local_file_path,
-                                file_size] (HttpClient* client) {
-                RETURN_IF_ERROR(client->init(remote_file_path));
-                client->set_timeout_ms(estimate_timeout * 1000);
-                RETURN_IF_ERROR(client->download(local_file_path));
-
-                // Check file length
-                uint64_t local_file_size = 
boost::filesystem::file_size(local_file_path);
-                if (local_file_size != file_size) {
-                    LOG(WARNING) << "download file length error"
-                        << ", remote_path=" << remote_file_path
-                        << ", file_size=" << file_size
-                        << ", local_file_size=" << local_file_size;
-                    return Status::InternalError("downloaded file size is not 
equal");
-                }
-                chmod(local_file_path.c_str(), S_IRUSR | S_IWUSR);
-                return Status::OK();
-            };
-            download_status = HttpClient::execute_with_retry(
-                DOWNLOAD_FILE_MAX_RETRY, 1, download_cb);
-            if (!download_status.ok()) {
-                LOG(WARNING) << "download file failed over max retry."
-                    << ", remote_path=" << remote_file_path
-                    << ", signature=" << signature
-                    << ", errormsg=" << download_status.get_error_msg();
-                status = DORIS_ERROR;
-                break;
-            }
-        } // Clone files from remote backend
-
-        uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
-        total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
-        double copy_rate = 0.0;
-        if (total_time_ms > 0) {
-            copy_rate = total_file_size / ((double) total_time_ms) / 1000;
-        }
-        _copy_size = (int64_t) total_file_size;
-        _copy_time_ms = (int64_t) total_time_ms;
-        LOG(INFO) << "succeed to copy tablet " << signature
-                  << ", total file size: " << total_file_size << " B"
-                  << ", cost: " << total_time_ms << " ms"
-                  << ", rate: " << copy_rate << " MB/s";
-        if (make_snapshot_result.snapshot_version == 1) {
-            OLAPStatus convert_status = 
_convert_to_new_snapshot(local_data_path, clone_req.tablet_id);
-            if (convert_status != OLAP_SUCCESS) {
+        }
+        if (status == DORIS_SUCCESS) {
+            // change all rowset ids because they maybe its id same with local 
rowset
+            auto olap_st = SnapshotManager::instance()->convert_rowset_ids(
+                local_path, _clone_req.tablet_id, _clone_req.schema_hash);
+            if (olap_st != OLAP_SUCCESS) {
+                LOG(WARNING) << "fail to convert rowset ids, path=" << 
local_path
+                    << ", tablet_id=" << _clone_req.tablet_id
+                    << ", schema_hash=" << _clone_req.schema_hash
+                    << ", error=" << olap_st;
                 status = DORIS_ERROR;
             }
-        } 
-        // change all rowset ids because they maybe its id same with local 
rowset
-        OLAPStatus convert_status = 
SnapshotManager::instance()->convert_rowset_ids(
-            local_data_path, clone_req.tablet_id, clone_req.schema_hash, 
tablet);
-        if (convert_status != OLAP_SUCCESS) {
-            status = DORIS_ERROR;
         }
-        
 
         // Release snapshot, if failed, ignore it. OLAP engine will drop 
useless snapshot
-        TAgentResult release_snapshot_result;
-        agent_client.release_snapshot(
-                make_snapshot_result.snapshot_path,
-                &release_snapshot_result);
-        if (release_snapshot_result.status.status_code != TStatusCode::OK) {
-            LOG(WARNING) << "release snapshot failed. src_file_path: " << 
*src_file_path
-                         << ". signature: " << signature;
+        st = _release_snapshot(src.host, src.be_port, *snapshot_path);
+        if (st.ok()) {
+            LOG(INFO) << "success to release snapshot, ip=" << src.host << ", 
port=" << src.be_port
+                << ", snapshot_path=" << snapshot_path;
+        } else {
+            LOG(WARNING) << "fail to release snapshot, ip=" << src.host << ", 
port=" << src.be_port
+                << ", snapshot_path=" << snapshot_path << ", error=" << 
st.to_string();
+            // DON'T change the status
         }
-
         if (status == DORIS_SUCCESS) {
             break;
         }
@@ -556,6 +399,175 @@ AgentStatus EngineCloneTask::_clone_copy(
     return status;
 }
 
+Status EngineCloneTask::_make_snapshot(
+        const std::string& ip, int port,
+        TTableId tablet_id,
+        TSchemaHash schema_hash,
+        int timeout_s,
+        const std::vector<Version>* missed_versions,
+        std::string* snapshot_path,
+        bool* allow_incremental_clone,
+        int32_t* snapshot_version) {
+    TSnapshotRequest request;
+    request.__set_tablet_id(tablet_id);
+    request.__set_schema_hash(schema_hash);
+    // This is a new version be, should set preferred version to 2
+    request.__set_preferred_snapshot_version(PREFERRED_SNAPSHOT_VERSION);
+    if (missed_versions != nullptr) {
+        // TODO: missing version composed of singleton delta.
+        // if not, this place should be rewrote.
+        request.__isset.missing_version = true;
+        for (auto& version : *missed_versions) {
+            request.missing_version.push_back(version.first); 
+        }
+    }
+    if (timeout_s > 0) {
+        request.__set_timeout(timeout_s);
+    }
+
+    TAgentResult result;
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<BackendServiceClient>(
+        ip, port,
+        [&request, &result] (BackendServiceConnection& client) {
+            client->make_snapshot(result, request);
+        }));
+    if (result.status.status_code != TStatusCode::OK) {
+        return Status(result.status);
+    }
+
+    if (result.__isset.snapshot_path) {
+        *snapshot_path = result.snapshot_path;
+        if (snapshot_path->at(snapshot_path->length() - 1) != '/') {
+            snapshot_path->append("/");
+        }
+    } else {
+        return Status::InternalError("success snapshot without snapshot path");
+    }
+    if (result.__isset.allow_incremental_clone) {
+        // During upgrading, some BE nodes still be installed an old previous 
old.
+        // which incremental clone is not ready in those nodes.
+        // should add a symbol to indicate it.
+        *allow_incremental_clone = result.allow_incremental_clone;
+    }
+    *snapshot_version = result.snapshot_version;
+    return Status::OK();
+}
+
+Status EngineCloneTask::_release_snapshot(
+        const std::string& ip, int port,
+        const std::string& snapshot_path) {
+    TAgentResult result;
+    RETURN_IF_ERROR(ThriftRpcHelper::rpc<BackendServiceClient>(
+            ip, port,
+            [&snapshot_path, &result] (BackendServiceConnection& client) {
+                client->release_snapshot(result, snapshot_path);
+            }));
+    return Status(result.status);
+}
+
+Status EngineCloneTask::_download_files(
+        DataDir* data_dir,
+        const std::string& remote_url_prefix,
+        const std::string& local_path) {
+    // Check local path exist, if exist, remove it, then create the dir
+    // local_file_full_path = tabletid/clone, for a specific tablet, there 
should be only one folder
+    // if this folder exists, then should remove it
+    // for example, BE clone from BE 1 to download file 1 with version (2,2), 
but clone from BE 1 failed
+    // then it will try to clone from BE 2, but it will find the file 1 
already exist, but file 1 with same
+    // name may have different versions.
+    RETURN_IF_ERROR(FileUtils::remove_all(local_path));
+    RETURN_IF_ERROR(FileUtils::create_dir(local_path));
+
+    // Get remove dir file list
+    string file_list_str;
+    auto list_files_cb = [&remote_url_prefix, &file_list_str] (HttpClient* 
client) {
+        RETURN_IF_ERROR(client->init(remote_url_prefix));
+        client->set_timeout_ms(LIST_REMOTE_FILE_TIMEOUT * 1000);
+        RETURN_IF_ERROR(client->execute(&file_list_str));
+        return Status::OK();
+    };
+    RETURN_IF_ERROR(HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, 
list_files_cb));
+    vector<string> file_name_list = strings::Split(file_list_str, "\n", 
strings::SkipWhitespace());
+
+    // If the header file is not exist, the table could't loaded by olap 
engine.
+    // Avoid of data is not complete, we copy the header file at last.
+    // The header file's name is end of .hdr.
+    for (int i = 0; i < file_name_list.size() - 1; ++i) {
+        StringPiece sp(file_name_list[i]);
+        if (sp.ends_with(".hdr")) {
+            std::swap(file_name_list[i], file_name_list[file_name_list.size() 
- 1]);
+            break;
+        }
+    }
+
+    // Get copy from remote
+    uint64_t total_file_size = 0;
+    MonotonicStopWatch watch;
+    watch.start();
+    for (auto& file_name : file_name_list) {
+        auto remote_file_url = remote_url_prefix + file_name;
+
+        // get file length
+        uint64_t file_size = 0;
+        auto get_file_size_cb = [&remote_file_url, &file_size] (HttpClient* 
client) {
+            RETURN_IF_ERROR(client->init(remote_file_url));
+            client->set_timeout_ms(GET_LENGTH_TIMEOUT * 1000);
+            RETURN_IF_ERROR(client->head());
+            file_size = client->get_content_length();
+            return Status::OK();
+        };
+        
RETURN_IF_ERROR(HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, 
get_file_size_cb));
+        // check disk capacity
+        if (data_dir->reach_capacity_limit(file_size)) {
+            return Status::InternalError("Disk reach capacity limit");
+        }
+
+        total_file_size += file_size;
+        uint64_t estimate_timeout = file_size / 
config::download_low_speed_limit_kbps / 1024;
+        if (estimate_timeout < config::download_low_speed_time) {
+            estimate_timeout = config::download_low_speed_time;
+        }
+
+        std::string local_file_path = local_path + file_name;
+
+        auto download_cb = [&remote_file_url,
+             estimate_timeout,
+             &local_file_path,
+             file_size] (HttpClient* client) {
+                 RETURN_IF_ERROR(client->init(remote_file_url));
+                 client->set_timeout_ms(estimate_timeout * 1000);
+                 RETURN_IF_ERROR(client->download(local_file_path));
+
+                 // Check file length
+                 uint64_t local_file_size = 
boost::filesystem::file_size(local_file_path);
+                 if (local_file_size != file_size) {
+                     LOG(WARNING) << "download file length error"
+                         << ", remote_path=" << remote_file_url
+                         << ", file_size=" << file_size
+                         << ", local_file_size=" << local_file_size;
+                     return Status::InternalError("downloaded file size is not 
equal");
+                 }
+                 chmod(local_file_path.c_str(), S_IRUSR | S_IWUSR);
+                 return Status::OK();
+             };
+        
RETURN_IF_ERROR(HttpClient::execute_with_retry(DOWNLOAD_FILE_MAX_RETRY, 1, 
download_cb));
+    } // Clone files from remote backend
+
+    uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
+    total_time_ms = total_time_ms > 0 ? total_time_ms : 0;
+    double copy_rate = 0.0;
+    if (total_time_ms > 0) {
+        copy_rate = total_file_size / ((double) total_time_ms) / 1000;
+    }
+    _copy_size = (int64_t) total_file_size;
+    _copy_time_ms = (int64_t) total_time_ms;
+    LOG(INFO) << "succeed to copy tablet " << _signature
+        << ", total file size: " << total_file_size << " B"
+        << ", cost: " << total_time_ms << " ms"
+        << ", rate: " << copy_rate << " MB/s";
+    return Status::OK();
+}
+
 OLAPStatus EngineCloneTask::_convert_to_new_snapshot(const string& clone_dir, 
int64_t tablet_id) {
     OLAPStatus res = OLAP_SUCCESS;
     // check clone dir existed
@@ -622,7 +634,7 @@ OLAPStatus EngineCloneTask::_convert_to_new_snapshot(const 
string& clone_dir, in
 }
 
 // only incremental clone use this method
-OLAPStatus EngineCloneTask::_finish_clone(TabletSharedPtr tablet, const 
string& clone_dir,
+OLAPStatus EngineCloneTask::_finish_clone(Tablet* tablet, const string& 
clone_dir,
                                          int64_t committed_version, bool 
is_incremental_clone) {
     OLAPStatus res = OLAP_SUCCESS;
     vector<string> linked_success_files;
@@ -731,7 +743,7 @@ OLAPStatus EngineCloneTask::_finish_clone(TabletSharedPtr 
tablet, const string&
     return res;
 }
 
-OLAPStatus EngineCloneTask::_clone_incremental_data(TabletSharedPtr tablet, 
const TabletMeta& cloned_tablet_meta,
+OLAPStatus EngineCloneTask::_clone_incremental_data(Tablet* tablet, const 
TabletMeta& cloned_tablet_meta,
                                               int64_t committed_version) {
     LOG(INFO) << "begin to incremental clone. tablet=" << tablet->full_name()
               << ", committed_version=" << committed_version;
@@ -765,7 +777,7 @@ OLAPStatus 
EngineCloneTask::_clone_incremental_data(TabletSharedPtr tablet, cons
     return clone_res;
 }
 
-OLAPStatus EngineCloneTask::_clone_full_data(TabletSharedPtr tablet, 
TabletMeta* cloned_tablet_meta) {
+OLAPStatus EngineCloneTask::_clone_full_data(Tablet* tablet, TabletMeta* 
cloned_tablet_meta) {
     Version cloned_max_version = cloned_tablet_meta->max_version();
     LOG(INFO) << "begin to full clone. tablet=" << tablet->full_name()
               << ", cloned_max_version=" << cloned_max_version.first
diff --git a/be/src/olap/task/engine_clone_task.h 
b/be/src/olap/task/engine_clone_task.h
index a5995cf..52d1d1c 100644
--- a/be/src/olap/task/engine_clone_task.h
+++ b/be/src/olap/task/engine_clone_task.h
@@ -45,29 +45,46 @@ public:
 
 private:
     
-    virtual OLAPStatus _finish_clone(TabletSharedPtr tablet, const 
std::string& clone_dir,
+    virtual OLAPStatus _finish_clone(Tablet* tablet, const std::string& 
clone_dir,
                                     int64_t committed_version, bool 
is_incremental_clone);
     
-    OLAPStatus _clone_incremental_data(TabletSharedPtr tablet, const 
TabletMeta& cloned_tablet_meta,
+    OLAPStatus _clone_incremental_data(Tablet* tablet, const TabletMeta& 
cloned_tablet_meta,
                                      int64_t committed_version);
 
-    OLAPStatus _clone_full_data(TabletSharedPtr tablet, TabletMeta* 
cloned_tablet_meta);
+    OLAPStatus _clone_full_data(Tablet* tablet, TabletMeta* 
cloned_tablet_meta);
 
     AgentStatus _clone_copy(DataDir& data_dir,
-        const TCloneReq& clone_req,
-        int64_t signature,
         const string& local_data_path,
         TBackend* src_host,
         string* src_file_path,
         vector<string>* error_msgs,
         const vector<Version>* missing_versions,
-        bool* allow_incremental_clone, 
-        TabletSharedPtr tablet);
+        bool* allow_incremental_clone);
         
     OLAPStatus _convert_to_new_snapshot(const string& clone_dir, int64_t 
tablet_id);
 
     void _set_tablet_info(AgentStatus status, bool is_new_tablet);
 
+    // Download tablet files from 
+    Status _download_files(
+        DataDir* data_dir,
+        const std::string& remote_url_prefix,
+        const std::string& local_path);
+
+    Status _make_snapshot(
+        const std::string& ip, int port,
+        TTableId tablet_id,
+        TSchemaHash schema_hash,
+        int timeout_s,
+        const std::vector<Version>* missed_versions,
+        std::string* snapshot_path,
+        bool* allow_incremental_clone,
+        int32_t* snapshot_version);
+
+    Status _release_snapshot(
+        const std::string& ip, int port,
+        const std::string& snapshot_path);
+
 private:
     const TCloneReq& _clone_req;
     vector<string>* _error_msgs;
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index a48cd39..9475a1f 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -193,7 +193,7 @@ OLAPStatus 
EngineStorageMigrationTask::_storage_medium_migrate(
 
         // it will change rowset id and its create time
         // rowset create time is useful when load tablet from meta to check 
which tablet is the tablet to load
-        res = 
SnapshotManager::instance()->convert_rowset_ids(schema_hash_path, tablet_id, 
schema_hash, nullptr);
+        res = 
SnapshotManager::instance()->convert_rowset_ids(schema_hash_path, tablet_id, 
schema_hash);
         if (res != OLAP_SUCCESS) {
             LOG(WARNING) << "failed to convert rowset id when do storage 
migration"
                          << " path = " << schema_hash_path;
diff --git a/be/src/runtime/snapshot_loader.cpp 
b/be/src/runtime/snapshot_loader.cpp
index a864b52..4da8dbd 100644
--- a/be/src/runtime/snapshot_loader.cpp
+++ b/be/src/runtime/snapshot_loader.cpp
@@ -567,7 +567,7 @@ Status SnapshotLoader::move(
 
     // rename the rowset ids and tabletid info in rowset meta
     OLAPStatus convert_status = 
SnapshotManager::instance()->convert_rowset_ids(
-        snapshot_path, tablet_id, schema_hash, tablet);
+        snapshot_path, tablet_id, schema_hash);
     if (convert_status != OLAP_SUCCESS) {
         std::stringstream ss;
         ss << "failed to convert rowsetids in snapshot: " << snapshot_path


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to