xy720 commented on code in PR #47300:
URL: https://github.com/apache/doris/pull/47300#discussion_r2226892036
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -963,6 +967,596 @@ static void set_schema_in_existed_rowset(MetaServiceCode&
code, std::string& msg
}
}
+void scan_snapshot_rowset(
+ Transaction* txn, const std::string& instance_id, int64_t tablet_id,
MetaServiceCode& code,
+ std::string& msg,
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>*
snapshot_rs_metas) {
+ std::stringstream ss;
+ SnapshotRowsetKeyInfo rs_key_info0 {instance_id, tablet_id, 0};
+ SnapshotRowsetKeyInfo rs_key_info1 {instance_id, tablet_id + 1, 0};
+ std::string snapshot_rs_key0;
+ std::string snapshot_rs_key1;
+ snapshot_rowset_key(rs_key_info0, &snapshot_rs_key0);
+ snapshot_rowset_key(rs_key_info1, &snapshot_rs_key1);
+
+ int num_rowsets = 0;
+ std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+ (int*)0x01, [snapshot_rs_key0, snapshot_rs_key1,
&num_rowsets](int*) {
+ LOG(INFO) << "get snapshot rs meta, num_rowsets=" <<
num_rowsets << " range=["
+ << hex(snapshot_rs_key0) << "," <<
hex(snapshot_rs_key1) << "]";
+ });
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ TxnErrorCode err = txn->get(snapshot_rs_key0, snapshot_rs_key1, &it,
true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get snapshot rs meta while committing,"
+ << " tablet_id=" << tablet_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ LOG(INFO) << "range_get snapshot_rs_key=" << hex(k) << "
tablet_id=" << tablet_id;
+ snapshot_rs_metas->emplace_back();
+ if (!snapshot_rs_metas->back().second.ParseFromArray(v.data(),
v.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed snapshot rowset meta, tablet_id=" << tablet_id
+ << " key=" << hex(k);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ snapshot_rs_metas->back().first = std::string(k.data(), k.size());
+ ++num_rowsets;
+ if (!it->has_next()) snapshot_rs_key0 = k;
+ }
+ snapshot_rs_key0.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+ return;
+}
+
+void MetaServiceImpl::make_snapshot(::google::protobuf::RpcController*
controller,
+ const SnapshotRequest* request,
SnapshotResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(make_snapshot);
+ if (!request->has_tablet_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty tablet_id";
+ return;
+ }
+
+ bool is_restore = request->has_is_restore() && request->is_restore();
+ if (is_restore) {
+ if (!request->has_tablet_meta() ||
!request->tablet_meta().rs_metas_size()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = !request->has_tablet_meta() ? "no tablet meta" : "no rowset
meta";
+ return;
+ }
+ if (!request->tablet_meta().has_schema() &&
!request->tablet_meta().has_schema_version()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "tablet_meta must have either schema or schema_version";
+ return;
+ }
+ }
+
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ RPC_RATE_LIMIT(make_snapshot)
+
+ std::unique_ptr<Transaction> txn0;
+ TxnErrorCode err = txn_kv_->create_txn(&txn0);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ // validate request
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(),
tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+
+ auto key = snapshot_tablet_key({instance_id, tablet_idx.tablet_id()});
+ std::string val;
+ err = txn0->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to check snapshot {} existence, err={}",
tablet_idx.tablet_id(),
+ err);
+ return;
+ }
+ if (err == TxnErrorCode::TXN_OK) {
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(val)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "malformed snapshot";
+ LOG_WARNING(msg);
+ return;
+ }
+ if (snapshot_pb.state() != SnapshotPB::DROPPED) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("snapshot {} already exists, state: {}",
tablet_idx.tablet_id(),
+ SnapshotPB::State_Name(snapshot_pb.state()));
+ return;
+ }
+ }
+
+ TabletMetaCloudPB tablet_meta;
+ std::vector<doris::RowsetMetaCloudPB> rs_metas;
+ if (is_restore) {
+ tablet_meta = request->tablet_meta();
+ rs_metas.assign(tablet_meta.rs_metas().begin(),
tablet_meta.rs_metas().end());
+ } else {
+ // backup not implemented
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "not implemented";
+ return;
+ }
+ tablet_meta.clear_rs_metas(); // strip off rs meta
+
+ // 1. save tablet snapshot
+ std::string to_save_val;
+ {
+ SnapshotPB pb;
+ pb.set_tablet_id(tablet_idx.tablet_id());
+ pb.mutable_tablet_meta()->Swap(&tablet_meta);
+ pb.set_creation_time(::time(nullptr));
+ pb.set_expiration(request->expiration());
+ pb.set_is_restore(is_restore);
+ pb.set_state(SnapshotPB::PREPARED);
+ pb.SerializeToString(&to_save_val);
+ }
+ LOG_INFO("put tablet snapshot")
+ .tag("snapshot_tablet_key", hex(key))
+ .tag("tablet_id", tablet_idx.tablet_id())
+ .tag("is_restore", is_restore)
+ .tag("state", SnapshotPB::PREPARED);
+ txn0->put(key, to_save_val);
+ err = txn0->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ msg = fmt::format("failed to commit txn: {}", err);
+ return;
+ }
+
+ // 2. save rs snapshots
+ for (auto& rowset_meta : rs_metas) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+ // put snapshot rowset kv
+ std::string snapshot_rs_key;
+ std::string snapshot_rs_val;
+ SnapshotRowsetKeyInfo rs_key_info {instance_id, tablet_idx.tablet_id(),
+ rowset_meta.end_version()};
+ snapshot_rowset_key(rs_key_info, &snapshot_rs_key);
+ if (!rowset_meta.SerializeToString(&snapshot_rs_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = "failed to serialize rowset meta";
+ return;
+ }
+ txn->put(snapshot_rs_key, snapshot_rs_val);
+ LOG_INFO("put rowset snapshot")
+ .tag("snapshot_rowset_key", hex(snapshot_rs_key))
+ .tag("tablet_id", tablet_idx.tablet_id())
+ .tag("rowset_id", rowset_meta.rowset_id_v2())
+ .tag("rowset_size", snapshot_rs_key.size() +
snapshot_rs_val.size())
+ .tag("rowset_meta", rowset_meta.DebugString());
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
Review Comment:
这里任意一步失败了,都会导致整个restore job失败,restore job内要不所有tablet的请求都成功,要么整个restore
job都失败。后续在meta service残留的data由recycler来解决
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
+ }
+ int64_t expiration = snapshot.expiration() > 0
+ ? snapshot.creation_time() +
snapshot.expiration()
+ : snapshot.creation_time();
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_snapshot_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ std::vector<std::string_view> snapshot_keys;
+ auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
+ ++num_scanned;
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed recycle partition value").tag("key",
hex(k));
+ return -1;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(snapshot_pb)) { // not expired
+ return 0;
+ }
+ ++num_expired;
+
+ int64_t tablet_id = snapshot_pb.tablet_id();
+ bool is_restore = snapshot_pb.is_restore();
Review Comment:
is_restore已经去除,现在这个接口只会给restore job使用
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
+ int report_counter = 0;
+ int total_num = src_to_dest_path.size();
+ int finished_num = 0;
+ for (const auto& iter : src_to_dest_path) {
+ const std::string& remote_path = iter.first;
+ const std::string& tablet_str = iter.second;
+ int64_t target_tablet_id = -1;
+ try {
+ target_tablet_id = std::stoll(tablet_str);
+ } catch (std::exception& e) {
+ return Status::InternalError("failed to parse target tablet id {},
{}", tablet_str,
+ e.what());
+ }
+ const std::string target_path =
_storage_resource->remote_tablet_path(target_tablet_id);
+
+ // 1.1. check target path not exists
+ bool target_path_exist = false;
+ if (!storage_fs()->exists(target_path, &target_path_exist).ok() ||
target_path_exist) {
+ std::stringstream ss;
+ ss << "failed to download snapshot files, target path already
exists: " << target_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ downloaded_tablet_ids->push_back(target_tablet_id);
+
+ int64_t remote_tablet_id;
+ RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path,
&remote_tablet_id));
+ VLOG_CRITICAL << "get target tablet id: " << target_tablet_id
+ << ", remote tablet id: " << remote_tablet_id;
+
+ // 1.2. get remote files
+ std::map<std::string, FileStat> remote_files;
+ RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
+ if (remote_files.empty()) {
+ std::stringstream ss;
+ ss << "get nothing from remote path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ auto remote_hdr_file_path = [&remote_files, &remote_path](std::string&
full_hdr_path,
+ size_t*
hdr_file_len) {
+ for (auto iter = remote_files.begin(); iter !=
remote_files.end();) {
+ if (_end_with(iter->first, ".hdr")) {
+ *hdr_file_len = iter->second.size;
+ full_hdr_path = remote_path + "/" + iter->first + "." +
iter->second.md5;
+ // remove hdr file from remote_files
+ iter = remote_files.erase(iter);
+ return true;
+ } else {
+ ++iter;
+ }
+ }
+ return false;
+ };
+
+ size_t hdr_file_len;
+ std::string full_remote_hdr_path;
+ if (!remote_hdr_file_path(full_remote_hdr_path, &hdr_file_len)) {
+ std::stringstream ss;
+ ss << "failed to find hdr file from remote_path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ // 1.3. download hdr file
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(hdr_file_len),
+ };
+ LOG(INFO) << "download hdr file: " << full_remote_hdr_path;
+ io::FileReaderSPtr hdr_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_hdr_path,
&hdr_reader, &reader_options));
+ std::unique_ptr<char[]> read_buf =
std::make_unique_for_overwrite<char[]>(hdr_file_len);
+ size_t read_len = 0;
+ Slice hdr_slice(read_buf.get(), hdr_file_len);
+ RETURN_IF_ERROR(hdr_reader->read_at(0, hdr_slice, &read_len));
+ if (read_len != hdr_file_len) {
+ std::stringstream ss;
+ ss << "failed to read hdr file: " << full_remote_hdr_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ RETURN_IF_ERROR(
+ _report_every(0, &tmp_counter, finished_num, total_num,
TTaskType::type::DOWNLOAD));
+
+ // 1.4. make snapshot
+ std::unordered_map<std::string, std::string> file_mapping;
+ RETURN_IF_ERROR(_engine.cloud_snapshot_mgr().make_snapshot(
+ target_tablet_id, *_storage_resource, file_mapping, true,
&hdr_slice));
+
+ LOG(INFO) << "finish to make snapshot for tablet: " <<
target_tablet_id;
+
+ // 1.5. download files
+ for (auto& iter : remote_files) {
+ RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num,
total_num,
+ TTaskType::type::DOWNLOAD));
+ const std::string& remote_file = iter.first;
+ const FileStat& file_stat = iter.second;
+ auto find = file_mapping.find(remote_file);
+ if (find == file_mapping.end()) {
+ continue;
+ }
+ std::string target_file = find->second;
+ std::string full_remote_file = remote_path + "/" + remote_file +
"." + file_stat.md5;
+ std::string full_target_file = target_path + "/" + target_file;
+ LOG(INFO) << "begin to download from " << full_remote_file << " to
"
+ << full_target_file;
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(file_stat.size),
+ };
+ io::FileReaderSPtr file_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_file,
&file_reader, &reader_options));
+ io::FileWriterPtr file_writer = nullptr;
+ RETURN_IF_ERROR(storage_fs()->create_file(full_target_file,
&file_writer));
+ size_t buf_size = config::s3_file_system_local_upload_buffer_size;
+ std::unique_ptr<char[]> transfer_buffer =
+ std::make_unique_for_overwrite<char[]>(buf_size);
+ size_t cur_offset = 0;
+ while (true) {
+ size_t read_len = 0;
+ RETURN_IF_ERROR(file_reader->read_at(
+ cur_offset, Slice {transfer_buffer.get(), buf_size},
&read_len));
+ cur_offset += read_len;
+ if (read_len == 0) {
Review Comment:
已加入后续todo
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
Review Comment:
done
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
+ int report_counter = 0;
+ int total_num = src_to_dest_path.size();
+ int finished_num = 0;
+ for (const auto& iter : src_to_dest_path) {
+ const std::string& remote_path = iter.first;
+ const std::string& tablet_str = iter.second;
+ int64_t target_tablet_id = -1;
+ try {
+ target_tablet_id = std::stoll(tablet_str);
+ } catch (std::exception& e) {
+ return Status::InternalError("failed to parse target tablet id {},
{}", tablet_str,
+ e.what());
+ }
+ const std::string target_path =
_storage_resource->remote_tablet_path(target_tablet_id);
+
+ // 1.1. check target path not exists
+ bool target_path_exist = false;
+ if (!storage_fs()->exists(target_path, &target_path_exist).ok() ||
target_path_exist) {
+ std::stringstream ss;
+ ss << "failed to download snapshot files, target path already
exists: " << target_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ downloaded_tablet_ids->push_back(target_tablet_id);
+
+ int64_t remote_tablet_id;
+ RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path,
&remote_tablet_id));
+ VLOG_CRITICAL << "get target tablet id: " << target_tablet_id
+ << ", remote tablet id: " << remote_tablet_id;
+
+ // 1.2. get remote files
+ std::map<std::string, FileStat> remote_files;
+ RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
+ if (remote_files.empty()) {
+ std::stringstream ss;
+ ss << "get nothing from remote path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ auto remote_hdr_file_path = [&remote_files, &remote_path](std::string&
full_hdr_path,
+ size_t*
hdr_file_len) {
+ for (auto iter = remote_files.begin(); iter !=
remote_files.end();) {
+ if (_end_with(iter->first, ".hdr")) {
+ *hdr_file_len = iter->second.size;
+ full_hdr_path = remote_path + "/" + iter->first + "." +
iter->second.md5;
+ // remove hdr file from remote_files
+ iter = remote_files.erase(iter);
+ return true;
+ } else {
+ ++iter;
+ }
+ }
+ return false;
+ };
+
+ size_t hdr_file_len;
+ std::string full_remote_hdr_path;
+ if (!remote_hdr_file_path(full_remote_hdr_path, &hdr_file_len)) {
+ std::stringstream ss;
+ ss << "failed to find hdr file from remote_path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ // 1.3. download hdr file
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(hdr_file_len),
+ };
+ LOG(INFO) << "download hdr file: " << full_remote_hdr_path;
+ io::FileReaderSPtr hdr_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_hdr_path,
&hdr_reader, &reader_options));
+ std::unique_ptr<char[]> read_buf =
std::make_unique_for_overwrite<char[]>(hdr_file_len);
+ size_t read_len = 0;
+ Slice hdr_slice(read_buf.get(), hdr_file_len);
+ RETURN_IF_ERROR(hdr_reader->read_at(0, hdr_slice, &read_len));
+ if (read_len != hdr_file_len) {
+ std::stringstream ss;
+ ss << "failed to read hdr file: " << full_remote_hdr_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ RETURN_IF_ERROR(
+ _report_every(0, &tmp_counter, finished_num, total_num,
TTaskType::type::DOWNLOAD));
+
+ // 1.4. make snapshot
+ std::unordered_map<std::string, std::string> file_mapping;
+ RETURN_IF_ERROR(_engine.cloud_snapshot_mgr().make_snapshot(
Review Comment:
这里把make
snapshot和downloading合并在一起,是因为分离不需要在be本地去生成snapshot了(原来一体是单独一个步骤的),可以直接在download或者upload阶段去做
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
+ }
+ int64_t expiration = snapshot.expiration() > 0
+ ? snapshot.creation_time() +
snapshot.expiration()
+ : snapshot.creation_time();
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_snapshot_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ std::vector<std::string_view> snapshot_keys;
+ auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
Review Comment:
同上,这块的流程应该由Fe来严格保证,同一时间一个库只能有一个restore job执行,在job内每个阶段的状态流转以及此状态下会发送的请求都是确定的。
如set dropped只能由finish restore job请求执行,等finish restore
job执行完后,整个job的状态已经是Cancelled或者Finished了,FE会保证后续就不会再有commit请求过来
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
+ }
+ int64_t expiration = snapshot.expiration() > 0
+ ? snapshot.creation_time() +
snapshot.expiration()
+ : snapshot.creation_time();
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_snapshot_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ std::vector<std::string_view> snapshot_keys;
+ auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
+ ++num_scanned;
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed recycle partition value").tag("key",
hex(k));
+ return -1;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(snapshot_pb)) { // not expired
+ return 0;
+ }
+ ++num_expired;
+
+ int64_t tablet_id = snapshot_pb.tablet_id();
+ bool is_restore = snapshot_pb.is_restore();
+ LOG(INFO) << "begin to recycle expired snapshot, instance_id=" <<
instance_id_
+ << " tablet_id=" << snapshot_pb.tablet_id() << "
is_restore=" << is_restore;
+
+ std::string snapshot_rs_key0 = snapshot_rowset_key({instance_id_,
tablet_id, 0});
+ std::string snapshot_rs_key1 = snapshot_rowset_key({instance_id_,
tablet_id + 1, 0});
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to recycle snapshot")
+ .tag("err", err)
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("reason", "failed to create txn");
+ return -1;
+ }
+
+ std::string msg;
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
snapshot_rs_metas;
+ scan_snapshot_rowset(txn.get(), instance_id_, tablet_id, code, msg,
&snapshot_rs_metas);
+ if (code != MetaServiceCode::OK) {
+ LOG_WARNING("scan snapshot rowsets failed when recycle snapshot")
+ .tag("tablet id", tablet_id)
+ .tag("msg", msg)
+ .tag("code", code)
+ .tag("instance id", instance_id_);
+ return -1;
+ }
+
+ if (is_restore && recycle_tablet(tablet_id) != 0) {
+ LOG_WARNING("failed to recycle tablet snapshot")
+ .tag("tablet_id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("is_restore", is_restore);
+ return -1;
+ } else {
+ txn.reset();
+ err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to recycle snapshot")
+ .tag("err", err)
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("reason", "failed to create txn");
+ return -1;
+ }
+
+ // delete all snapshot rowset kv
+ txn->remove(snapshot_rs_key0, snapshot_rs_key1);
Review Comment:
done
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
+ }
+ int64_t expiration = snapshot.expiration() > 0
+ ? snapshot.creation_time() +
snapshot.expiration()
+ : snapshot.creation_time();
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_snapshot_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ std::vector<std::string_view> snapshot_keys;
+ auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
+ ++num_scanned;
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed recycle partition value").tag("key",
hex(k));
+ return -1;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(snapshot_pb)) { // not expired
+ return 0;
+ }
+ ++num_expired;
+
+ int64_t tablet_id = snapshot_pb.tablet_id();
+ bool is_restore = snapshot_pb.is_restore();
+ LOG(INFO) << "begin to recycle expired snapshot, instance_id=" <<
instance_id_
Review Comment:
done
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
Review Comment:
这是一个远端repo的路径,我加了注释说明
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
+ int report_counter = 0;
+ int total_num = src_to_dest_path.size();
+ int finished_num = 0;
+ for (const auto& iter : src_to_dest_path) {
+ const std::string& remote_path = iter.first;
+ const std::string& tablet_str = iter.second;
+ int64_t target_tablet_id = -1;
+ try {
+ target_tablet_id = std::stoll(tablet_str);
+ } catch (std::exception& e) {
+ return Status::InternalError("failed to parse target tablet id {},
{}", tablet_str,
+ e.what());
+ }
+ const std::string target_path =
_storage_resource->remote_tablet_path(target_tablet_id);
+
+ // 1.1. check target path not exists
+ bool target_path_exist = false;
+ if (!storage_fs()->exists(target_path, &target_path_exist).ok() ||
target_path_exist) {
+ std::stringstream ss;
+ ss << "failed to download snapshot files, target path already
exists: " << target_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ downloaded_tablet_ids->push_back(target_tablet_id);
+
+ int64_t remote_tablet_id;
+ RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path,
&remote_tablet_id));
+ VLOG_CRITICAL << "get target tablet id: " << target_tablet_id
+ << ", remote tablet id: " << remote_tablet_id;
+
+ // 1.2. get remote files
+ std::map<std::string, FileStat> remote_files;
+ RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
+ if (remote_files.empty()) {
+ std::stringstream ss;
+ ss << "get nothing from remote path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ auto remote_hdr_file_path = [&remote_files, &remote_path](std::string&
full_hdr_path,
+ size_t*
hdr_file_len) {
+ for (auto iter = remote_files.begin(); iter !=
remote_files.end();) {
+ if (_end_with(iter->first, ".hdr")) {
+ *hdr_file_len = iter->second.size;
+ full_hdr_path = remote_path + "/" + iter->first + "." +
iter->second.md5;
+ // remove hdr file from remote_files
+ iter = remote_files.erase(iter);
+ return true;
+ } else {
+ ++iter;
+ }
+ }
+ return false;
+ };
+
+ size_t hdr_file_len;
+ std::string full_remote_hdr_path;
+ if (!remote_hdr_file_path(full_remote_hdr_path, &hdr_file_len)) {
+ std::stringstream ss;
+ ss << "failed to find hdr file from remote_path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ // 1.3. download hdr file
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(hdr_file_len),
+ };
+ LOG(INFO) << "download hdr file: " << full_remote_hdr_path;
+ io::FileReaderSPtr hdr_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_hdr_path,
&hdr_reader, &reader_options));
+ std::unique_ptr<char[]> read_buf =
std::make_unique_for_overwrite<char[]>(hdr_file_len);
+ size_t read_len = 0;
+ Slice hdr_slice(read_buf.get(), hdr_file_len);
+ RETURN_IF_ERROR(hdr_reader->read_at(0, hdr_slice, &read_len));
+ if (read_len != hdr_file_len) {
+ std::stringstream ss;
+ ss << "failed to read hdr file: " << full_remote_hdr_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ RETURN_IF_ERROR(
+ _report_every(0, &tmp_counter, finished_num, total_num,
TTaskType::type::DOWNLOAD));
+
+ // 1.4. make snapshot
+ std::unordered_map<std::string, std::string> file_mapping;
+ RETURN_IF_ERROR(_engine.cloud_snapshot_mgr().make_snapshot(
+ target_tablet_id, *_storage_resource, file_mapping, true,
&hdr_slice));
+
+ LOG(INFO) << "finish to make snapshot for tablet: " <<
target_tablet_id;
+
+ // 1.5. download files
+ for (auto& iter : remote_files) {
+ RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num,
total_num,
+ TTaskType::type::DOWNLOAD));
+ const std::string& remote_file = iter.first;
+ const FileStat& file_stat = iter.second;
+ auto find = file_mapping.find(remote_file);
+ if (find == file_mapping.end()) {
+ continue;
+ }
+ std::string target_file = find->second;
+ std::string full_remote_file = remote_path + "/" + remote_file +
"." + file_stat.md5;
+ std::string full_target_file = target_path + "/" + target_file;
+ LOG(INFO) << "begin to download from " << full_remote_file << " to
"
+ << full_target_file;
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(file_stat.size),
+ };
+ io::FileReaderSPtr file_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_file,
&file_reader, &reader_options));
+ io::FileWriterPtr file_writer = nullptr;
+ RETURN_IF_ERROR(storage_fs()->create_file(full_target_file,
&file_writer));
+ size_t buf_size = config::s3_file_system_local_upload_buffer_size;
+ std::unique_ptr<char[]> transfer_buffer =
+ std::make_unique_for_overwrite<char[]>(buf_size);
+ size_t cur_offset = 0;
+ while (true) {
+ size_t read_len = 0;
+ RETURN_IF_ERROR(file_reader->read_at(
+ cur_offset, Slice {transfer_buffer.get(), buf_size},
&read_len));
+ cur_offset += read_len;
+ if (read_len == 0) {
+ break;
+ }
+ RETURN_IF_ERROR(file_writer->append({transfer_buffer.get(),
read_len}));
+ }
+ RETURN_IF_ERROR(file_writer->close());
Review Comment:
已加入后续todo
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -963,6 +967,596 @@ static void set_schema_in_existed_rowset(MetaServiceCode&
code, std::string& msg
}
}
+void scan_snapshot_rowset(
+ Transaction* txn, const std::string& instance_id, int64_t tablet_id,
MetaServiceCode& code,
+ std::string& msg,
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>*
snapshot_rs_metas) {
+ std::stringstream ss;
+ SnapshotRowsetKeyInfo rs_key_info0 {instance_id, tablet_id, 0};
+ SnapshotRowsetKeyInfo rs_key_info1 {instance_id, tablet_id + 1, 0};
+ std::string snapshot_rs_key0;
+ std::string snapshot_rs_key1;
+ snapshot_rowset_key(rs_key_info0, &snapshot_rs_key0);
+ snapshot_rowset_key(rs_key_info1, &snapshot_rs_key1);
+
+ int num_rowsets = 0;
+ std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+ (int*)0x01, [snapshot_rs_key0, snapshot_rs_key1,
&num_rowsets](int*) {
+ LOG(INFO) << "get snapshot rs meta, num_rowsets=" <<
num_rowsets << " range=["
+ << hex(snapshot_rs_key0) << "," <<
hex(snapshot_rs_key1) << "]";
+ });
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ TxnErrorCode err = txn->get(snapshot_rs_key0, snapshot_rs_key1, &it,
true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get snapshot rs meta while committing,"
+ << " tablet_id=" << tablet_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ LOG(INFO) << "range_get snapshot_rs_key=" << hex(k) << "
tablet_id=" << tablet_id;
+ snapshot_rs_metas->emplace_back();
+ if (!snapshot_rs_metas->back().second.ParseFromArray(v.data(),
v.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed snapshot rowset meta, tablet_id=" << tablet_id
+ << " key=" << hex(k);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ snapshot_rs_metas->back().first = std::string(k.data(), k.size());
+ ++num_rowsets;
+ if (!it->has_next()) snapshot_rs_key0 = k;
+ }
+ snapshot_rs_key0.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+ return;
+}
+
+void MetaServiceImpl::make_snapshot(::google::protobuf::RpcController*
controller,
+ const SnapshotRequest* request,
SnapshotResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(make_snapshot);
+ if (!request->has_tablet_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty tablet_id";
+ return;
+ }
+
+ bool is_restore = request->has_is_restore() && request->is_restore();
+ if (is_restore) {
+ if (!request->has_tablet_meta() ||
!request->tablet_meta().rs_metas_size()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = !request->has_tablet_meta() ? "no tablet meta" : "no rowset
meta";
+ return;
+ }
+ if (!request->tablet_meta().has_schema() &&
!request->tablet_meta().has_schema_version()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "tablet_meta must have either schema or schema_version";
+ return;
+ }
+ }
+
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ RPC_RATE_LIMIT(make_snapshot)
+
+ std::unique_ptr<Transaction> txn0;
+ TxnErrorCode err = txn_kv_->create_txn(&txn0);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ // validate request
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(),
tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+
+ auto key = snapshot_tablet_key({instance_id, tablet_idx.tablet_id()});
+ std::string val;
+ err = txn0->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to check snapshot {} existence, err={}",
tablet_idx.tablet_id(),
+ err);
+ return;
+ }
+ if (err == TxnErrorCode::TXN_OK) {
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(val)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "malformed snapshot";
+ LOG_WARNING(msg);
+ return;
+ }
+ if (snapshot_pb.state() != SnapshotPB::DROPPED) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("snapshot {} already exists, state: {}",
tablet_idx.tablet_id(),
+ SnapshotPB::State_Name(snapshot_pb.state()));
+ return;
+ }
+ }
+
+ TabletMetaCloudPB tablet_meta;
+ std::vector<doris::RowsetMetaCloudPB> rs_metas;
+ if (is_restore) {
+ tablet_meta = request->tablet_meta();
+ rs_metas.assign(tablet_meta.rs_metas().begin(),
tablet_meta.rs_metas().end());
+ } else {
+ // backup not implemented
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "not implemented";
+ return;
+ }
+ tablet_meta.clear_rs_metas(); // strip off rs meta
+
+ // 1. save tablet snapshot
+ std::string to_save_val;
+ {
+ SnapshotPB pb;
+ pb.set_tablet_id(tablet_idx.tablet_id());
+ pb.mutable_tablet_meta()->Swap(&tablet_meta);
+ pb.set_creation_time(::time(nullptr));
+ pb.set_expiration(request->expiration());
Review Comment:
已加上校验数据
##########
cloud/src/meta-service/keys.h:
##########
@@ -69,6 +69,9 @@
//
// 0x01 "storage_vault" ${instance_id} "vault" ${resource_id}
-> StorageVaultPB
//
+// 0x01 "snapshot" ${instance_id} "tablet" ${tablet_id}
-> SnapshotPB
Review Comment:
key和相关代码已经全部更改
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
+ int report_counter = 0;
+ int total_num = src_to_dest_path.size();
+ int finished_num = 0;
+ for (const auto& iter : src_to_dest_path) {
+ const std::string& remote_path = iter.first;
+ const std::string& tablet_str = iter.second;
+ int64_t target_tablet_id = -1;
+ try {
+ target_tablet_id = std::stoll(tablet_str);
+ } catch (std::exception& e) {
+ return Status::InternalError("failed to parse target tablet id {},
{}", tablet_str,
+ e.what());
+ }
+ const std::string target_path =
_storage_resource->remote_tablet_path(target_tablet_id);
+
+ // 1.1. check target path not exists
+ bool target_path_exist = false;
+ if (!storage_fs()->exists(target_path, &target_path_exist).ok() ||
target_path_exist) {
+ std::stringstream ss;
+ ss << "failed to download snapshot files, target path already
exists: " << target_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ downloaded_tablet_ids->push_back(target_tablet_id);
Review Comment:
这里和一体的流程保持一致
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]