chaoyli commented on a change in pull request #1200: Merge refactor code into 
master
URL: https://github.com/apache/incubator-doris/pull/1200#discussion_r288398752
 
 

 ##########
 File path: be/src/olap/task/engine_clone_task.cpp
 ##########
 @@ -0,0 +1,903 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/task/engine_clone_task.h"
+
+#include <set>
+
+#include "olap/olap_snapshot_converter.h"
+#include "olap/snapshot_manager.h"
+#include "olap/rowset/alpha_rowset.h"
+#include "olap/rowset/alpha_rowset_writer.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_id_generator.h"
+#include "olap/rowset/rowset_writer.h"
+
+using std::set;
+using std::stringstream;
+
+namespace doris {
+
+const std::string HTTP_REQUEST_PREFIX = "/api/_tablet/_download?";
+const std::string HTTP_REQUEST_TOKEN_PARAM = "token=";
+const std::string HTTP_REQUEST_FILE_PARAM = "&file=";
+const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
+const uint32_t LIST_REMOTE_FILE_TIMEOUT = 15;
+
+EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, 
+                    const TMasterInfo& master_info,  
+                    int64_t signature, 
+                    vector<string>* error_msgs, 
+                    vector<TTabletInfo>* tablet_infos,
+                    AgentStatus* res_status) :
+    _clone_req(clone_req),
+    _error_msgs(error_msgs), 
+    _tablet_infos(tablet_infos),
+    _res_status(res_status),
+    _signature(signature), 
+    _master_info(master_info) {}
+
+OLAPStatus EngineCloneTask::execute() {
+    AgentStatus status = DORIS_SUCCESS;
+    string src_file_path;
+    TBackend src_host;
+    // Check local tablet exist or not
+    int32_t snapshot_version = 1;
+    TabletSharedPtr tablet =
+            StorageEngine::instance()->tablet_manager()->get_tablet(
+            _clone_req.tablet_id, _clone_req.schema_hash);
+    bool is_new_tablet = tablet == nullptr;
+    // try to repair a tablet with missing version
+    if (tablet != nullptr) {
+        LOG(INFO) << "clone tablet exist yet, begin to incremental clone. "
+                    << "signature:" << _signature
+                    << ", tablet_id:" << _clone_req.tablet_id
+                    << ", schema_hash:" << _clone_req.schema_hash
+                    << ", committed_version:" << _clone_req.committed_version;
+
+        // try to incremental clone
+        vector<Version> missed_versions;
+        tablet->calc_missed_versions(_clone_req.committed_version, 
&missed_versions);
+        LOG(INFO) << "finish to calculate missed versions when clone. "
+                  << "tablet=" << tablet->full_name()
+                  << ", committed_version=" << _clone_req.committed_version
+                  << ", missed_versions_size=" << missed_versions.size();
+        // if missed version size is 0, then it is useless to clone from 
remote be, it means local data is 
+        // completed. Or remote be will just return header not the rowset 
files. clone will failed.
+        if (missed_versions.size() == 0) {
+            LOG(INFO) << "missed version size = 0, skip clone and reture 
success";
+            return OLAP_SUCCESS;
+        }
+        // get download path
+        string local_data_path = tablet->tablet_path() + CLONE_PREFIX;
+
+        bool allow_incremental_clone = false;
+        status = _clone_copy(*(tablet->data_dir()), _clone_req, _signature, 
local_data_path,
+                            &src_host, &src_file_path, _error_msgs,
+                            &missed_versions,
+                            &allow_incremental_clone, 
+                            &snapshot_version, tablet);
+        if (status == DORIS_SUCCESS && allow_incremental_clone) {
+            OLAPStatus olap_status = _finish_clone(tablet, local_data_path, 
_clone_req.committed_version, allow_incremental_clone, snapshot_version);
+            if (olap_status != OLAP_SUCCESS) {
+                LOG(WARNING) << "failed to finish incremental clone. [table=" 
<< tablet->full_name()
+                             << " res=" << olap_status << "]";
+                _error_msgs->push_back("incremental clone error.");
+                status = DORIS_ERROR;
+            }
+        } else {
+            // begin to full clone if incremental failed
+            LOG(INFO) << "begin to full clone. [table=" << tablet->full_name();
+            status = _clone_copy(*(tablet->data_dir()), _clone_req, 
_signature, local_data_path,
+                                &src_host, &src_file_path,  _error_msgs,
+                                NULL, NULL, &snapshot_version, tablet);
+            if (status == DORIS_SUCCESS) {
+                LOG(INFO) << "download successfully when full clone. [table=" 
<< tablet->full_name()
+                          << " src_host=" << src_host.host << " 
src_file_path=" << src_file_path
+                          << " local_data_path=" << local_data_path << "]";
+
+                OLAPStatus olap_status = _finish_clone(tablet, 
local_data_path, _clone_req.committed_version, false, snapshot_version);
+
+                if (olap_status != OLAP_SUCCESS) {
+                    LOG(WARNING) << "fail to finish full clone. [table=" << 
tablet->full_name()
+                                 << " res=" << olap_status << "]";
+                    _error_msgs->push_back("full clone error.");
+                    status = DORIS_ERROR;
+                }
+            }
+        }
+    } else {
+        LOG(INFO) << "clone tablet not exist, begin clone a new tablet from 
remote be. "
+                    << "signature:" << _signature
+                    << ", tablet_id:" << _clone_req.tablet_id
+                    << ", schema_hash:" << _clone_req.schema_hash
+                    << ", committed_version:" << _clone_req.committed_version;
+        // create a new tablet in this be
+        // Get local disk from olap
+        string local_shard_root_path;
+        DataDir* store = nullptr;
+        OLAPStatus olap_status = StorageEngine::instance()->obtain_shard_path(
+            _clone_req.storage_medium, &local_shard_root_path, &store);
+        if (olap_status != OLAP_SUCCESS) {
+            LOG(WARNING) << "clone get local root path failed. signature: " << 
_signature;
+            _error_msgs->push_back("clone get local root path failed.");
+            status = DORIS_ERROR;
+        }
+        stringstream tablet_dir_stream;
+        tablet_dir_stream << local_shard_root_path
+                            << "/" << _clone_req.tablet_id
+                            << "/" << _clone_req.schema_hash;
+
+        if (status == DORIS_SUCCESS) {
+            status = _clone_copy(*store,
+                                _clone_req,
+                                _signature,
+                                tablet_dir_stream.str(),
+                                &src_host,
+                                &src_file_path,
+                                _error_msgs,
+                                nullptr, nullptr, &snapshot_version, nullptr);
+        }
+
+        if (status == DORIS_SUCCESS) {
+            LOG(INFO) << "clone copy done. src_host: " << src_host.host
+                        << " src_file_path: " << src_file_path;
+            stringstream schema_hash_path_stream;
+            schema_hash_path_stream << local_shard_root_path
+                                    << "/" << _clone_req.tablet_id
+                                    << "/" << _clone_req.schema_hash;
+            string header_path = 
TabletMeta::construct_header_file_path(schema_hash_path_stream.str(), 
+                _clone_req.tablet_id);
+            OLAPStatus reset_id_status = 
TabletMeta::reset_tablet_uid(header_path);
+            if (reset_id_status != OLAP_SUCCESS) {
+                LOG(WARNING) << "errors while set tablet uid: '" << 
header_path;
+                _error_msgs->push_back("errors while set tablet uid.");
+                status = DORIS_ERROR;
+            } else {
+                OLAPStatus load_header_status =  
StorageEngine::instance()->tablet_manager()->load_tablet_from_dir(
+                    store, _clone_req.tablet_id, _clone_req.schema_hash, 
schema_hash_path_stream.str(), false);
+                if (load_header_status != OLAP_SUCCESS) {
+                    LOG(WARNING) << "load header failed. 
local_shard_root_path: '" << local_shard_root_path
+                                << "' schema_hash: " << _clone_req.schema_hash 
<< ". status: " << load_header_status
+                                << ". signature: " << _signature;
+                    _error_msgs->push_back("load header failed.");
+                    status = DORIS_ERROR;
+                }
+            }
+            // clone success, delete .hdr file because tablet meta is stored 
in rocksdb
+            string cloned_meta_file = tablet_dir_stream.str() + "/" + 
std::to_string(_clone_req.tablet_id) + ".hdr";
+            remove_dir(cloned_meta_file);
+        }
+        // Clean useless dir, if failed, ignore it.
+        if (status != DORIS_SUCCESS && status != DORIS_CREATE_TABLE_EXIST) {
+            stringstream local_data_path_stream;
+            local_data_path_stream << local_shard_root_path
+                                    << "/" << _clone_req.tablet_id;
+            string local_data_path = local_data_path_stream.str();
+            LOG(INFO) << "clone failed. want to delete local dir: " << 
local_data_path
+                        << ". signature: " << _signature;
+            try {
+                boost::filesystem::path local_path(local_data_path);
+                if (boost::filesystem::exists(local_path)) {
+                    boost::filesystem::remove_all(local_path);
+                }
+            } catch (boost::filesystem::filesystem_error e) {
+                // Ignore the error, OLAP will delete it
+                LOG(WARNING) << "clone delete useless dir failed. "
+                             << " error: " << e.what()
+                             << " local dir: " << local_data_path.c_str()
+                             << " signature: " << _signature;
+            }
+        }
+    }
+
+    // Get clone tablet info
+    if (status == DORIS_SUCCESS || status == DORIS_CREATE_TABLE_EXIST) {
+        TTabletInfo tablet_info;
+        tablet_info.__set_tablet_id(_clone_req.tablet_id);
+        tablet_info.__set_schema_hash(_clone_req.schema_hash);
+        OLAPStatus get_tablet_info_status = 
StorageEngine::instance()->tablet_manager()->report_tablet_info(&tablet_info);
+        if (get_tablet_info_status != OLAP_SUCCESS) {
+            LOG(WARNING) << "clone success, but get tablet info failed."
+                         << " tablet id: " <<  _clone_req.tablet_id
+                         << " schema hash: " << _clone_req.schema_hash
+                         << " signature: " << _signature;
+            _error_msgs->push_back("clone success, but get tablet info 
failed.");
+            status = DORIS_ERROR;
+        } else if (
+            (_clone_req.__isset.committed_version
+                    && _clone_req.__isset.committed_version_hash)
+                    && (tablet_info.version < _clone_req.committed_version ||
+                        (tablet_info.version == _clone_req.committed_version
+                        && tablet_info.version_hash != 
_clone_req.committed_version_hash))) {
+            LOG(WARNING) << "failed to clone tablet. tablet_id:" << 
_clone_req.tablet_id
+                      << ", schema_hash:" << _clone_req.schema_hash
+                      << ", signature:" << _signature
+                      << ", version:" << tablet_info.version
+                      << ", version_hash:" << tablet_info.version_hash
+                      << ", expected_version: " << _clone_req.committed_version
+                      << ", version_hash:" << 
_clone_req.committed_version_hash;
+            // if it is a new tablet and clone failed, then remove the tablet
+            // if it is incremental clone, then must not drop the tablet
+            if (is_new_tablet) {
+                // we need to check if this cloned table's version is what we 
expect.
+                // if not, maybe this is a stale remaining table which is 
waiting for drop.
+                // we drop it.
+                LOG(WARNING) << "begin to drop the stale tablet. tablet_id:" 
<< _clone_req.tablet_id
+                            << ", schema_hash:" << _clone_req.schema_hash
+                            << ", signature:" << _signature
+                            << ", version:" << tablet_info.version
+                            << ", version_hash:" << tablet_info.version_hash
+                            << ", expected_version: " << 
_clone_req.committed_version
+                            << ", version_hash:" << 
_clone_req.committed_version_hash;
+                OLAPStatus drop_status = 
StorageEngine::instance()->tablet_manager()->drop_tablet(_clone_req.tablet_id, 
+                    _clone_req.schema_hash);
+                if (drop_status != OLAP_SUCCESS && drop_status != 
OLAP_ERR_TABLE_NOT_FOUND) {
+                    // just log
+                    LOG(WARNING) << "drop stale cloned table failed! tabelt 
id: " << _clone_req.tablet_id;
+                }
+            }
+            status = DORIS_ERROR;
+        } else {
+            LOG(INFO) << "clone get tablet info success. tablet_id:" << 
_clone_req.tablet_id
+                        << ", schema_hash:" << _clone_req.schema_hash
+                        << ", signature:" << _signature
+                        << ", version:" << tablet_info.version
+                        << ", version_hash:" << tablet_info.version_hash;
+            _tablet_infos->push_back(tablet_info);
+        }
+    }
+    *_res_status = status;
+    return OLAP_SUCCESS;
+}
+
+AgentStatus EngineCloneTask::_clone_copy(
+        DataDir& data_dir,
+        const TCloneReq& clone_req,
+        int64_t signature,
+        const string& local_data_path,
+        TBackend* src_host,
+        string* src_file_path,
+        vector<string>* error_msgs,
+        const vector<Version>* missed_versions,
+        bool* allow_incremental_clone, 
+        int32_t* snapshot_version, 
+        TabletSharedPtr tablet) {
+    AgentStatus status = DORIS_SUCCESS;
+
+    std::string token = _master_info.token;
+    for (auto src_backend : clone_req.src_backends) {
+        stringstream http_host_stream;
+        http_host_stream << "http://"; << src_backend.host << ":" << 
src_backend.http_port;
+        string http_host = http_host_stream.str();
+        // Make snapshot in remote olap engine
+        *src_host = src_backend;
+        AgentServerClient agent_client(*src_host);
+        TAgentResult make_snapshot_result;
+        status = DORIS_SUCCESS;
+
+        LOG(INFO) << "pre make snapshot. backend_ip: " << src_host->host;
+        TSnapshotRequest snapshot_request;
+        snapshot_request.__set_tablet_id(clone_req.tablet_id);
+        snapshot_request.__set_schema_hash(clone_req.schema_hash);
+        // This is a new version be, should set preferred version to 2
+        
snapshot_request.__set_preferred_snapshot_version(PREFERRED_SNAPSHOT_VERSION);
+        if (missed_versions != NULL) {
+            // TODO: missing version composed of singleton delta.
+            // if not, this place should be rewrote.
+            vector<int64_t> snapshot_versions;
+            for (Version version : *missed_versions) {
+               snapshot_versions.push_back(version.first); 
+            } 
+            snapshot_request.__set_missing_version(snapshot_versions);
+        }
+        agent_client.make_snapshot(
+                snapshot_request,
+                &make_snapshot_result);
+        *snapshot_version = make_snapshot_result.snapshot_version;
 
 Review comment:
   if snapshot_version < 2 or make_snapshot_result has no snapshot_version 
field, there is no judgement to judge it ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to