gavinchou commented on code in PR #47300:
URL: https://github.com/apache/doris/pull/47300#discussion_r2045228390
##########
be/src/olap/tablet_meta.cpp:
##########
@@ -475,6 +475,22 @@ Status TabletMeta::load_from_file(const string& file_path,
TabletMetaPB* tablet_
return Status::OK();
}
+Status TabletMeta::create_from_buffer(const uint8_t* buffer, size_t
buffer_size) {
+ FileHeader<TabletMetaPB> file_header(""); // empty file path
+ RETURN_IF_ERROR(file_header.deserialize_from_memory(buffer, buffer_size));
+
+ TabletMetaPB tablet_meta_pb;
+ try {
+ tablet_meta_pb.CopyFrom(file_header.message());
+ } catch (...) {
+ return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
Review Comment:
should print e.what
##########
be/src/cloud/cloud_snapshot_mgr.h:
##########
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "cloud/cloud_tablet.h"
+#include "common/status.h"
+#include "olap/rowset/rowset_fwd.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_fwd.h"
+
+namespace doris {
+class CloudStorageEngine;
+class RowsetMetaPB;
+class MemTrackerLimiter;
+
+// In cloud mode, snapshot only includes tablet metas.
+class CloudSnapshotMgr {
+public:
+ CloudSnapshotMgr(CloudStorageEngine& engine);
+
+ ~CloudSnapshotMgr() = default;
+
+ Status make_snapshot(int64_t target_tablet_id, StorageResource&
storage_resource,
Review Comment:
所有共有的接口都要加上注释,接口是什么行为,有什么假设,包括每个参数和返回值的意义
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -963,6 +967,596 @@ static void set_schema_in_existed_rowset(MetaServiceCode&
code, std::string& msg
}
}
+void scan_snapshot_rowset(
+ Transaction* txn, const std::string& instance_id, int64_t tablet_id,
MetaServiceCode& code,
+ std::string& msg,
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>*
snapshot_rs_metas) {
+ std::stringstream ss;
+ SnapshotRowsetKeyInfo rs_key_info0 {instance_id, tablet_id, 0};
+ SnapshotRowsetKeyInfo rs_key_info1 {instance_id, tablet_id + 1, 0};
+ std::string snapshot_rs_key0;
+ std::string snapshot_rs_key1;
+ snapshot_rowset_key(rs_key_info0, &snapshot_rs_key0);
+ snapshot_rowset_key(rs_key_info1, &snapshot_rs_key1);
+
+ int num_rowsets = 0;
+ std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+ (int*)0x01, [snapshot_rs_key0, snapshot_rs_key1,
&num_rowsets](int*) {
+ LOG(INFO) << "get snapshot rs meta, num_rowsets=" <<
num_rowsets << " range=["
+ << hex(snapshot_rs_key0) << "," <<
hex(snapshot_rs_key1) << "]";
+ });
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ TxnErrorCode err = txn->get(snapshot_rs_key0, snapshot_rs_key1, &it,
true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get snapshot rs meta while committing,"
+ << " tablet_id=" << tablet_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ LOG(INFO) << "range_get snapshot_rs_key=" << hex(k) << "
tablet_id=" << tablet_id;
+ snapshot_rs_metas->emplace_back();
+ if (!snapshot_rs_metas->back().second.ParseFromArray(v.data(),
v.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed snapshot rowset meta, tablet_id=" << tablet_id
+ << " key=" << hex(k);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ snapshot_rs_metas->back().first = std::string(k.data(), k.size());
+ ++num_rowsets;
+ if (!it->has_next()) snapshot_rs_key0 = k;
+ }
+ snapshot_rs_key0.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+ return;
+}
+
+void MetaServiceImpl::make_snapshot(::google::protobuf::RpcController*
controller,
+ const SnapshotRequest* request,
SnapshotResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(make_snapshot);
+ if (!request->has_tablet_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty tablet_id";
+ return;
+ }
+
+ bool is_restore = request->has_is_restore() && request->is_restore();
+ if (is_restore) {
+ if (!request->has_tablet_meta() ||
!request->tablet_meta().rs_metas_size()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = !request->has_tablet_meta() ? "no tablet meta" : "no rowset
meta";
+ return;
+ }
+ if (!request->tablet_meta().has_schema() &&
!request->tablet_meta().has_schema_version()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "tablet_meta must have either schema or schema_version";
+ return;
+ }
+ }
+
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ RPC_RATE_LIMIT(make_snapshot)
+
+ std::unique_ptr<Transaction> txn0;
+ TxnErrorCode err = txn_kv_->create_txn(&txn0);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ // validate request
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(),
tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+
+ auto key = snapshot_tablet_key({instance_id, tablet_idx.tablet_id()});
+ std::string val;
+ err = txn0->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to check snapshot {} existence, err={}",
tablet_idx.tablet_id(),
+ err);
+ return;
+ }
+ if (err == TxnErrorCode::TXN_OK) {
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(val)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "malformed snapshot";
+ LOG_WARNING(msg);
+ return;
+ }
+ if (snapshot_pb.state() != SnapshotPB::DROPPED) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("snapshot {} already exists, state: {}",
tablet_idx.tablet_id(),
+ SnapshotPB::State_Name(snapshot_pb.state()));
+ return;
+ }
+ }
+
+ TabletMetaCloudPB tablet_meta;
+ std::vector<doris::RowsetMetaCloudPB> rs_metas;
+ if (is_restore) {
+ tablet_meta = request->tablet_meta();
+ rs_metas.assign(tablet_meta.rs_metas().begin(),
tablet_meta.rs_metas().end());
+ } else {
+ // backup not implemented
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "not implemented";
+ return;
+ }
+ tablet_meta.clear_rs_metas(); // strip off rs meta
+
+ // 1. save tablet snapshot
+ std::string to_save_val;
+ {
+ SnapshotPB pb;
Review Comment:
如果是dropped的snapshot也可以直接覆盖吗?
##########
be/src/cloud/cloud_meta_mgr.h:
##########
@@ -80,6 +81,12 @@ class CloudMetaMgr {
Status precommit_txn(const StreamLoadContext& ctx);
+ Status make_snapshot(const TabletMetaPB& tablet_meta, bool is_restore);
Review Comment:
接口需要详细加注释 比如下边的vault那样
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -963,6 +967,596 @@ static void set_schema_in_existed_rowset(MetaServiceCode&
code, std::string& msg
}
}
+void scan_snapshot_rowset(
+ Transaction* txn, const std::string& instance_id, int64_t tablet_id,
MetaServiceCode& code,
+ std::string& msg,
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>*
snapshot_rs_metas) {
+ std::stringstream ss;
+ SnapshotRowsetKeyInfo rs_key_info0 {instance_id, tablet_id, 0};
+ SnapshotRowsetKeyInfo rs_key_info1 {instance_id, tablet_id + 1, 0};
+ std::string snapshot_rs_key0;
+ std::string snapshot_rs_key1;
+ snapshot_rowset_key(rs_key_info0, &snapshot_rs_key0);
+ snapshot_rowset_key(rs_key_info1, &snapshot_rs_key1);
+
+ int num_rowsets = 0;
+ std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+ (int*)0x01, [snapshot_rs_key0, snapshot_rs_key1,
&num_rowsets](int*) {
+ LOG(INFO) << "get snapshot rs meta, num_rowsets=" <<
num_rowsets << " range=["
+ << hex(snapshot_rs_key0) << "," <<
hex(snapshot_rs_key1) << "]";
+ });
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ TxnErrorCode err = txn->get(snapshot_rs_key0, snapshot_rs_key1, &it,
true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get snapshot rs meta while committing,"
+ << " tablet_id=" << tablet_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ LOG(INFO) << "range_get snapshot_rs_key=" << hex(k) << "
tablet_id=" << tablet_id;
+ snapshot_rs_metas->emplace_back();
+ if (!snapshot_rs_metas->back().second.ParseFromArray(v.data(),
v.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed snapshot rowset meta, tablet_id=" << tablet_id
+ << " key=" << hex(k);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ snapshot_rs_metas->back().first = std::string(k.data(), k.size());
+ ++num_rowsets;
+ if (!it->has_next()) snapshot_rs_key0 = k;
+ }
+ snapshot_rs_key0.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+ return;
+}
+
+void MetaServiceImpl::make_snapshot(::google::protobuf::RpcController*
controller,
+ const SnapshotRequest* request,
SnapshotResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(make_snapshot);
+ if (!request->has_tablet_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty tablet_id";
+ return;
+ }
+
+ bool is_restore = request->has_is_restore() && request->is_restore();
+ if (is_restore) {
+ if (!request->has_tablet_meta() ||
!request->tablet_meta().rs_metas_size()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = !request->has_tablet_meta() ? "no tablet meta" : "no rowset
meta";
+ return;
+ }
+ if (!request->tablet_meta().has_schema() &&
!request->tablet_meta().has_schema_version()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "tablet_meta must have either schema or schema_version";
+ return;
+ }
+ }
+
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ RPC_RATE_LIMIT(make_snapshot)
+
+ std::unique_ptr<Transaction> txn0;
+ TxnErrorCode err = txn_kv_->create_txn(&txn0);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ // validate request
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(),
tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+
+ auto key = snapshot_tablet_key({instance_id, tablet_idx.tablet_id()});
+ std::string val;
+ err = txn0->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to check snapshot {} existence, err={}",
tablet_idx.tablet_id(),
+ err);
+ return;
+ }
+ if (err == TxnErrorCode::TXN_OK) {
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(val)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "malformed snapshot";
+ LOG_WARNING(msg);
+ return;
+ }
+ if (snapshot_pb.state() != SnapshotPB::DROPPED) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("snapshot {} already exists, state: {}",
tablet_idx.tablet_id(),
+ SnapshotPB::State_Name(snapshot_pb.state()));
+ return;
+ }
+ }
+
+ TabletMetaCloudPB tablet_meta;
+ std::vector<doris::RowsetMetaCloudPB> rs_metas;
+ if (is_restore) {
+ tablet_meta = request->tablet_meta();
+ rs_metas.assign(tablet_meta.rs_metas().begin(),
tablet_meta.rs_metas().end());
+ } else {
+ // backup not implemented
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "not implemented";
+ return;
+ }
+ tablet_meta.clear_rs_metas(); // strip off rs meta
+
+ // 1. save tablet snapshot
+ std::string to_save_val;
+ {
+ SnapshotPB pb;
+ pb.set_tablet_id(tablet_idx.tablet_id());
+ pb.mutable_tablet_meta()->Swap(&tablet_meta);
+ pb.set_creation_time(::time(nullptr));
+ pb.set_expiration(request->expiration());
Review Comment:
这个tabletsnapshot 里还需要存一些用来在commit的时候对账用的东西,比如 多少个rowset size是多少
因为后边commit tablet snapshot是可以"断点续传"的 所以一定要做足了检查 才能保证无论ms怎么重启宕机恢复出来的的一定是对的
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
Review Comment:
打印出来 日志 把force flag 和 snapshot 打出来
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
+ }
+ int64_t expiration = snapshot.expiration() > 0
+ ? snapshot.creation_time() +
snapshot.expiration()
+ : snapshot.creation_time();
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_snapshot_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ std::vector<std::string_view> snapshot_keys;
+ auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
+ ++num_scanned;
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed recycle partition value").tag("key",
hex(k));
+ return -1;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(snapshot_pb)) { // not expired
+ return 0;
+ }
+ ++num_expired;
+
+ int64_t tablet_id = snapshot_pb.tablet_id();
+ bool is_restore = snapshot_pb.is_restore();
+ LOG(INFO) << "begin to recycle expired snapshot, instance_id=" <<
instance_id_
+ << " tablet_id=" << snapshot_pb.tablet_id() << "
is_restore=" << is_restore;
+
+ std::string snapshot_rs_key0 = snapshot_rowset_key({instance_id_,
tablet_id, 0});
+ std::string snapshot_rs_key1 = snapshot_rowset_key({instance_id_,
tablet_id + 1, 0});
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to recycle snapshot")
+ .tag("err", err)
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("reason", "failed to create txn");
+ return -1;
+ }
+
+ std::string msg;
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
snapshot_rs_metas;
+ scan_snapshot_rowset(txn.get(), instance_id_, tablet_id, code, msg,
&snapshot_rs_metas);
+ if (code != MetaServiceCode::OK) {
+ LOG_WARNING("scan snapshot rowsets failed when recycle snapshot")
+ .tag("tablet id", tablet_id)
+ .tag("msg", msg)
+ .tag("code", code)
+ .tag("instance id", instance_id_);
+ return -1;
+ }
+
+ if (is_restore && recycle_tablet(tablet_id) != 0) {
Review Comment:
这里也加一个注释
通过 回收tablet 来把所有的 数据 和 kv 都回收了
另外下边的else是不是也可以直接通过recycle tablet来做。就是说else里的优化没必要做,不用有额外学习和解释的成本 代码也是统一的
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
+ int report_counter = 0;
+ int total_num = src_to_dest_path.size();
+ int finished_num = 0;
+ for (const auto& iter : src_to_dest_path) {
+ const std::string& remote_path = iter.first;
+ const std::string& tablet_str = iter.second;
+ int64_t target_tablet_id = -1;
+ try {
+ target_tablet_id = std::stoll(tablet_str);
+ } catch (std::exception& e) {
+ return Status::InternalError("failed to parse target tablet id {},
{}", tablet_str,
+ e.what());
+ }
+ const std::string target_path =
_storage_resource->remote_tablet_path(target_tablet_id);
+
+ // 1.1. check target path not exists
+ bool target_path_exist = false;
+ if (!storage_fs()->exists(target_path, &target_path_exist).ok() ||
target_path_exist) {
+ std::stringstream ss;
+ ss << "failed to download snapshot files, target path already
exists: " << target_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ downloaded_tablet_ids->push_back(target_tablet_id);
+
+ int64_t remote_tablet_id;
+ RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path,
&remote_tablet_id));
+ VLOG_CRITICAL << "get target tablet id: " << target_tablet_id
+ << ", remote tablet id: " << remote_tablet_id;
+
+ // 1.2. get remote files
+ std::map<std::string, FileStat> remote_files;
+ RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
+ if (remote_files.empty()) {
+ std::stringstream ss;
+ ss << "get nothing from remote path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ auto remote_hdr_file_path = [&remote_files, &remote_path](std::string&
full_hdr_path,
+ size_t*
hdr_file_len) {
+ for (auto iter = remote_files.begin(); iter !=
remote_files.end();) {
+ if (_end_with(iter->first, ".hdr")) {
+ *hdr_file_len = iter->second.size;
+ full_hdr_path = remote_path + "/" + iter->first + "." +
iter->second.md5;
+ // remove hdr file from remote_files
+ iter = remote_files.erase(iter);
+ return true;
+ } else {
+ ++iter;
+ }
+ }
+ return false;
+ };
+
+ size_t hdr_file_len;
+ std::string full_remote_hdr_path;
+ if (!remote_hdr_file_path(full_remote_hdr_path, &hdr_file_len)) {
+ std::stringstream ss;
+ ss << "failed to find hdr file from remote_path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ // 1.3. download hdr file
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(hdr_file_len),
+ };
+ LOG(INFO) << "download hdr file: " << full_remote_hdr_path;
+ io::FileReaderSPtr hdr_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_hdr_path,
&hdr_reader, &reader_options));
+ std::unique_ptr<char[]> read_buf =
std::make_unique_for_overwrite<char[]>(hdr_file_len);
+ size_t read_len = 0;
+ Slice hdr_slice(read_buf.get(), hdr_file_len);
+ RETURN_IF_ERROR(hdr_reader->read_at(0, hdr_slice, &read_len));
+ if (read_len != hdr_file_len) {
+ std::stringstream ss;
+ ss << "failed to read hdr file: " << full_remote_hdr_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ RETURN_IF_ERROR(
+ _report_every(0, &tmp_counter, finished_num, total_num,
TTaskType::type::DOWNLOAD));
+
+ // 1.4. make snapshot
+ std::unordered_map<std::string, std::string> file_mapping;
+ RETURN_IF_ERROR(_engine.cloud_snapshot_mgr().make_snapshot(
+ target_tablet_id, *_storage_resource, file_mapping, true,
&hdr_slice));
+
+ LOG(INFO) << "finish to make snapshot for tablet: " <<
target_tablet_id;
+
+ // 1.5. download files
+ for (auto& iter : remote_files) {
+ RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num,
total_num,
+ TTaskType::type::DOWNLOAD));
+ const std::string& remote_file = iter.first;
+ const FileStat& file_stat = iter.second;
+ auto find = file_mapping.find(remote_file);
+ if (find == file_mapping.end()) {
+ continue;
+ }
+ std::string target_file = find->second;
+ std::string full_remote_file = remote_path + "/" + remote_file +
"." + file_stat.md5;
+ std::string full_target_file = target_path + "/" + target_file;
+ LOG(INFO) << "begin to download from " << full_remote_file << " to
"
+ << full_target_file;
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(file_stat.size),
+ };
+ io::FileReaderSPtr file_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_file,
&file_reader, &reader_options));
+ io::FileWriterPtr file_writer = nullptr;
+ RETURN_IF_ERROR(storage_fs()->create_file(full_target_file,
&file_writer));
+ size_t buf_size = config::s3_file_system_local_upload_buffer_size;
+ std::unique_ptr<char[]> transfer_buffer =
+ std::make_unique_for_overwrite<char[]>(buf_size);
+ size_t cur_offset = 0;
+ while (true) {
+ size_t read_len = 0;
+ RETURN_IF_ERROR(file_reader->read_at(
+ cur_offset, Slice {transfer_buffer.get(), buf_size},
&read_len));
+ cur_offset += read_len;
+ if (read_len == 0) {
+ break;
+ }
+ RETURN_IF_ERROR(file_writer->append({transfer_buffer.get(),
read_len}));
+ }
+ RETURN_IF_ERROR(file_writer->close());
Review Comment:
这里再来个finish download, 也统计下耗时, 后续可以加上bvar metrics来统计.
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
+ }
+ int64_t expiration = snapshot.expiration() > 0
+ ? snapshot.creation_time() +
snapshot.expiration()
+ : snapshot.creation_time();
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_snapshot_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ std::vector<std::string_view> snapshot_keys;
+ auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
+ ++num_scanned;
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed recycle partition value").tag("key",
hex(k));
+ return -1;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(snapshot_pb)) { // not expired
+ return 0;
+ }
+ ++num_expired;
+
+ int64_t tablet_id = snapshot_pb.tablet_id();
+ bool is_restore = snapshot_pb.is_restore();
+ LOG(INFO) << "begin to recycle expired snapshot, instance_id=" <<
instance_id_
+ << " tablet_id=" << snapshot_pb.tablet_id() << "
is_restore=" << is_restore;
+
+ std::string snapshot_rs_key0 = snapshot_rowset_key({instance_id_,
tablet_id, 0});
+ std::string snapshot_rs_key1 = snapshot_rowset_key({instance_id_,
tablet_id + 1, 0});
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to recycle snapshot")
+ .tag("err", err)
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("reason", "failed to create txn");
+ return -1;
+ }
+
+ std::string msg;
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
snapshot_rs_metas;
+ scan_snapshot_rowset(txn.get(), instance_id_, tablet_id, code, msg,
&snapshot_rs_metas);
+ if (code != MetaServiceCode::OK) {
+ LOG_WARNING("scan snapshot rowsets failed when recycle snapshot")
+ .tag("tablet id", tablet_id)
+ .tag("msg", msg)
+ .tag("code", code)
+ .tag("instance id", instance_id_);
+ return -1;
+ }
+
+ if (is_restore && recycle_tablet(tablet_id) != 0) {
+ LOG_WARNING("failed to recycle tablet snapshot")
+ .tag("tablet_id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("is_restore", is_restore);
+ return -1;
+ } else {
+ txn.reset();
+ err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to recycle snapshot")
+ .tag("err", err)
+ .tag("tablet id", tablet_id)
+ .tag("instance_id", instance_id_)
+ .tag("reason", "failed to create txn");
+ return -1;
+ }
+
+ // delete all snapshot rowset kv
+ txn->remove(snapshot_rs_key0, snapshot_rs_key1);
Review Comment:
这个else要加注释 为什么能这么做
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
+ int report_counter = 0;
+ int total_num = src_to_dest_path.size();
+ int finished_num = 0;
+ for (const auto& iter : src_to_dest_path) {
+ const std::string& remote_path = iter.first;
+ const std::string& tablet_str = iter.second;
+ int64_t target_tablet_id = -1;
+ try {
+ target_tablet_id = std::stoll(tablet_str);
+ } catch (std::exception& e) {
+ return Status::InternalError("failed to parse target tablet id {},
{}", tablet_str,
+ e.what());
+ }
+ const std::string target_path =
_storage_resource->remote_tablet_path(target_tablet_id);
+
+ // 1.1. check target path not exists
+ bool target_path_exist = false;
+ if (!storage_fs()->exists(target_path, &target_path_exist).ok() ||
target_path_exist) {
+ std::stringstream ss;
+ ss << "failed to download snapshot files, target path already
exists: " << target_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ downloaded_tablet_ids->push_back(target_tablet_id);
+
+ int64_t remote_tablet_id;
+ RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path,
&remote_tablet_id));
+ VLOG_CRITICAL << "get target tablet id: " << target_tablet_id
+ << ", remote tablet id: " << remote_tablet_id;
+
+ // 1.2. get remote files
+ std::map<std::string, FileStat> remote_files;
+ RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
+ if (remote_files.empty()) {
+ std::stringstream ss;
+ ss << "get nothing from remote path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ auto remote_hdr_file_path = [&remote_files, &remote_path](std::string&
full_hdr_path,
+ size_t*
hdr_file_len) {
+ for (auto iter = remote_files.begin(); iter !=
remote_files.end();) {
+ if (_end_with(iter->first, ".hdr")) {
+ *hdr_file_len = iter->second.size;
+ full_hdr_path = remote_path + "/" + iter->first + "." +
iter->second.md5;
+ // remove hdr file from remote_files
+ iter = remote_files.erase(iter);
+ return true;
+ } else {
+ ++iter;
+ }
+ }
+ return false;
+ };
+
+ size_t hdr_file_len;
+ std::string full_remote_hdr_path;
+ if (!remote_hdr_file_path(full_remote_hdr_path, &hdr_file_len)) {
+ std::stringstream ss;
+ ss << "failed to find hdr file from remote_path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ // 1.3. download hdr file
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(hdr_file_len),
+ };
+ LOG(INFO) << "download hdr file: " << full_remote_hdr_path;
+ io::FileReaderSPtr hdr_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_hdr_path,
&hdr_reader, &reader_options));
+ std::unique_ptr<char[]> read_buf =
std::make_unique_for_overwrite<char[]>(hdr_file_len);
+ size_t read_len = 0;
+ Slice hdr_slice(read_buf.get(), hdr_file_len);
+ RETURN_IF_ERROR(hdr_reader->read_at(0, hdr_slice, &read_len));
+ if (read_len != hdr_file_len) {
+ std::stringstream ss;
+ ss << "failed to read hdr file: " << full_remote_hdr_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ RETURN_IF_ERROR(
+ _report_every(0, &tmp_counter, finished_num, total_num,
TTaskType::type::DOWNLOAD));
+
+ // 1.4. make snapshot
+ std::unordered_map<std::string, std::string> file_mapping;
+ RETURN_IF_ERROR(_engine.cloud_snapshot_mgr().make_snapshot(
+ target_tablet_id, *_storage_resource, file_mapping, true,
&hdr_slice));
+
+ LOG(INFO) << "finish to make snapshot for tablet: " <<
target_tablet_id;
+
+ // 1.5. download files
+ for (auto& iter : remote_files) {
+ RETURN_IF_ERROR(_report_every(10, &report_counter, finished_num,
total_num,
+ TTaskType::type::DOWNLOAD));
+ const std::string& remote_file = iter.first;
+ const FileStat& file_stat = iter.second;
+ auto find = file_mapping.find(remote_file);
+ if (find == file_mapping.end()) {
+ continue;
+ }
+ std::string target_file = find->second;
+ std::string full_remote_file = remote_path + "/" + remote_file +
"." + file_stat.md5;
+ std::string full_target_file = target_path + "/" + target_file;
+ LOG(INFO) << "begin to download from " << full_remote_file << " to
"
+ << full_target_file;
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(file_stat.size),
+ };
+ io::FileReaderSPtr file_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_file,
&file_reader, &reader_options));
+ io::FileWriterPtr file_writer = nullptr;
+ RETURN_IF_ERROR(storage_fs()->create_file(full_target_file,
&file_writer));
+ size_t buf_size = config::s3_file_system_local_upload_buffer_size;
+ std::unique_ptr<char[]> transfer_buffer =
+ std::make_unique_for_overwrite<char[]>(buf_size);
+ size_t cur_offset = 0;
+ while (true) {
+ size_t read_len = 0;
+ RETURN_IF_ERROR(file_reader->read_at(
+ cur_offset, Slice {transfer_buffer.get(), buf_size},
&read_len));
+ cur_offset += read_len;
+ if (read_len == 0) {
Review Comment:
关键: 这个条件不一定是对的, 这个结束之后最好 需要再校验一下 源的长度和 目标文件长度是一致 (可靠性增强校验, 加开关 可以后续加)
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
+ int report_counter = 0;
+ int total_num = src_to_dest_path.size();
+ int finished_num = 0;
+ for (const auto& iter : src_to_dest_path) {
+ const std::string& remote_path = iter.first;
+ const std::string& tablet_str = iter.second;
+ int64_t target_tablet_id = -1;
+ try {
+ target_tablet_id = std::stoll(tablet_str);
+ } catch (std::exception& e) {
+ return Status::InternalError("failed to parse target tablet id {},
{}", tablet_str,
+ e.what());
+ }
+ const std::string target_path =
_storage_resource->remote_tablet_path(target_tablet_id);
+
+ // 1.1. check target path not exists
+ bool target_path_exist = false;
+ if (!storage_fs()->exists(target_path, &target_path_exist).ok() ||
target_path_exist) {
+ std::stringstream ss;
+ ss << "failed to download snapshot files, target path already
exists: " << target_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ downloaded_tablet_ids->push_back(target_tablet_id);
Review Comment:
严格来说是要放到checksum之后的?
##########
fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionInfo.java:
##########
@@ -465,13 +465,15 @@ public void readFields(DataInput in) throws IOException {
}
idToInMemory.put(partitionId, in.readBoolean());
- if (Config.isCloudMode()) {
- // HACK: the origin implementation of the cloud mode has code
likes:
- //
- // idToPersistent.put(partitionId, in.readBoolean());
- //
- // keep the compatibility here.
- in.readBoolean();
+ if (Env.getCurrentEnvJournalVersion() > FeMetaVersion.VERSION_129)
{
+ if (Config.isCloudMode()) {
+ // HACK: the origin implementation of the cloud mode has
code likes:
+ //
+ // idToPersistent.put(partitionId, in.readBoolean());
+ //
+ // keep the compatibility here.
+ in.readBoolean();
+ }
Review Comment:
add the above comment to this code, confused too...
##########
be/src/agent/task_worker_pool.cpp:
##########
@@ -1266,6 +1268,54 @@ void download_callback(StorageEngine& engine, ExecEnv*
env, const TAgentTaskRequ
remove_task_info(req.task_type, req.signature);
}
+void download_callback(CloudStorageEngine& engine, ExecEnv* env, const
TAgentTaskRequest& req) {
+ const auto& download_request = req.download_req;
+ LOG(INFO) << "get download task. signature=" << req.signature
+ << ", job_id=" << download_request.job_id
+ << ", task detail: " <<
apache::thrift::ThriftDebugString(download_request);
+
+ std::vector<int64_t> transferred_tablet_ids;
+
+ auto status = Status::OK();
+ if (download_request.__isset.remote_tablet_snapshots) {
+ status = Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
+ "remote tablet snapshot is not supported.");
+ } else {
+ std::unique_ptr<CloudSnapshotLoader> loader =
std::make_unique<CloudSnapshotLoader>(
+ engine, env, download_request.job_id, req.signature,
download_request.broker_addr,
+ download_request.broker_prop);
+ status = loader->init(download_request.__isset.storage_backend
+ ? download_request.storage_backend
+ : TStorageBackendType::type::BROKER,
+ download_request.__isset.location ?
download_request.location : "",
+ download_request.vault_id);
+ if (status.ok()) {
+ status = loader->download(download_request.src_dest_map,
&transferred_tablet_ids);
Review Comment:
百万级别或者千万级别 tablet 个数的表 可能有风险, 单个请求可能放不下, 请求体太大了.
##########
be/src/cloud/cloud_snapshot_mgr.cpp:
##########
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_mgr.h"
+
+#include <fmt/format.h>
+#include <gen_cpp/olap_file.pb.h>
+
+#include <map>
+#include <unordered_map>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "io/fs/local_file_system.h"
+#include "olap/data_dir.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/pb_helper.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+#include "olap/utils.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
+#include "util/slice.h"
+#include "util/uid_util.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+CloudSnapshotMgr::CloudSnapshotMgr(CloudStorageEngine& engine) :
_engine(engine) {
+ _mem_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"CloudSnapshotMgr");
+}
+
+Status CloudSnapshotMgr::make_snapshot(int64_t target_tablet_id,
StorageResource& storage_resource,
+ std::unordered_map<std::string,
std::string>& file_mapping,
+ bool is_restore, const Slice* slice) {
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ if (is_restore && slice == nullptr) {
+ return Status::Error<INVALID_ARGUMENT>("slice cannot be null in
restore.");
+ }
+
+ CloudTabletSPtr target_tablet =
DORIS_TRY(_engine.tablet_mgr().get_tablet(target_tablet_id));
+ if (target_tablet == nullptr) {
+ return Status::Error<TABLE_NOT_FOUND>("failed to get tablet.
tablet={}", target_tablet_id);
+ }
+
+ TabletMeta tablet_meta;
+ if (is_restore) {
+ // 1. deserialize tablet meta from memory
+ RETURN_IF_ERROR(tablet_meta.create_from_buffer((const
uint8_t*)slice->data, slice->size));
+ TabletMetaPB tablet_meta_pb;
+ tablet_meta.to_meta_pb(&tablet_meta_pb);
+
+ tablet_meta_pb.clear_rs_metas(); // copy the rs meta
+ if (tablet_meta.all_rs_metas().size() > 0) {
+
tablet_meta_pb.mutable_inc_rs_metas()->Reserve(tablet_meta.all_rs_metas().size());
+ for (auto& rs : tablet_meta.all_rs_metas()) {
+ rs->to_rowset_pb(tablet_meta_pb.add_rs_metas());
+ }
+ }
+ tablet_meta_pb.clear_stale_rs_metas(); // copy the stale rs meta
+ if (tablet_meta.all_stale_rs_metas().size() > 0) {
+ tablet_meta_pb.mutable_stale_rs_metas()->Reserve(
+ tablet_meta.all_stale_rs_metas().size());
+ for (auto& rs : tablet_meta.all_stale_rs_metas()) {
+ rs->to_rowset_pb(tablet_meta_pb.add_stale_rs_metas());
+ }
+ }
+
+ // 2. convert rowsets
+ TabletMetaPB new_tablet_meta_pb;
+ RETURN_IF_ERROR(convert_rowsets(&new_tablet_meta_pb, tablet_meta_pb,
target_tablet_id,
+ target_tablet, storage_resource,
file_mapping));
+
+ // 3. send make snapshot request
+ RETURN_IF_ERROR(_engine.meta_mgr().make_snapshot(new_tablet_meta_pb,
true));
+ return Status::OK();
+ }
+
+ // backup not implemented
+
+ LOG(INFO) << "success to make snapshot. [tablet_id=" << target_tablet_id
<< "]";
+ return Status::OK();
+}
+
+Status CloudSnapshotMgr::commit_snapshot(int64_t tablet_id) {
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ CloudTabletSPtr tablet =
DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id));
+ if (tablet == nullptr) {
+ return Status::Error<TABLE_NOT_FOUND>("failed to get tablet.
tablet={}", tablet_id);
+ }
+ RETURN_IF_ERROR(_engine.meta_mgr().commit_snapshot(tablet_id));
+ tablet->clear_cache();
+ LOG(INFO) << "success to commit snapshot. [tablet_id=" << tablet_id << "]";
+ return Status::OK();
+}
+
+Status CloudSnapshotMgr::release_snapshot(int64_t tablet_id) {
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ RETURN_IF_ERROR(_engine.meta_mgr().release_snapshot(tablet_id));
+ LOG(INFO) << "success to release snapshot. [tablet_id=" << tablet_id <<
"]";
+ return Status::OK();
+}
+
+Status CloudSnapshotMgr::convert_rowsets(
+ TabletMetaPB* out, const TabletMetaPB& in, int64_t tablet_id,
+ CloudTabletSPtr& target_tablet, StorageResource& storage_resource,
+ std::unordered_map<std::string, std::string>& file_mapping) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
+ // deep copy
+ *out = in;
+
+ out->clear_rs_metas();
+ out->clear_inc_rs_metas();
+ out->clear_stale_rs_metas();
+ // modify id
+ out->set_tablet_id(tablet_id);
+ *out->mutable_tablet_uid() = TabletUid::gen_uid().to_proto();
+ out->set_table_id(target_tablet->table_id());
+ out->set_partition_id(target_tablet->partition_id());
+ out->set_index_id(target_tablet->index_id());
+ PUniqueId* cooldown_meta_id = out->mutable_cooldown_meta_id();
+ cooldown_meta_id->set_hi(0);
+ cooldown_meta_id->set_lo(0);
+
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ tablet_schema->init_from_pb(in.schema());
+
+ std::unordered_map<Version, RowsetMetaPB*, HashOfVersion> rs_version_map;
+ std::unordered_map<RowsetId, RowsetId> rowset_id_mapping;
+ for (auto&& rowset_meta_pb : in.rs_metas()) {
+ RowsetMetaPB* new_rowset_meta_pb = out->add_rs_metas();
+ RETURN_IF_ERROR(_create_rowset_meta(new_rowset_meta_pb,
rowset_meta_pb, tablet_id,
+ target_tablet, storage_resource,
tablet_schema,
+ file_mapping, rowset_id_mapping));
+ Version rowset_version = {rowset_meta_pb.start_version(),
rowset_meta_pb.end_version()};
+ rs_version_map[rowset_version] = new_rowset_meta_pb;
+ }
+
+ for (auto&& stale_rowset_pb : in.stale_rs_metas()) {
+ Version rowset_version = {stale_rowset_pb.start_version(),
stale_rowset_pb.end_version()};
+ auto exist_rs = rs_version_map.find(rowset_version);
+ if (exist_rs != rs_version_map.end()) {
+ continue;
+ }
+ RowsetMetaPB* new_rowset_meta_pb = out->add_stale_rs_metas();
+ RETURN_IF_ERROR(_create_rowset_meta(new_rowset_meta_pb,
stale_rowset_pb, tablet_id,
+ target_tablet, storage_resource,
tablet_schema,
+ file_mapping, rowset_id_mapping));
+ }
+
+ if (!rowset_id_mapping.empty() && in.has_delete_bitmap()) {
+ const auto& old_del_bitmap_pb = in.delete_bitmap();
+ DeleteBitmapPB* new_del_bitmap_pb = out->mutable_delete_bitmap();
+ const int rst_ids_size = old_del_bitmap_pb.rowset_ids_size();
+ if (rst_ids_size > 0) {
+ new_del_bitmap_pb->mutable_rowset_ids()->Reserve(rst_ids_size);
+ }
+ LOG(INFO) << "convert delete bitmap rowset_ids. [rowset_ids_size=" <<
rst_ids_size << "]";
+ for (size_t i = 0; i < rst_ids_size; ++i) {
+ RowsetId rst_id;
+ rst_id.init(old_del_bitmap_pb.rowset_ids(i));
+ auto it = rowset_id_mapping.find(rst_id);
+ // It should not happen, if we can't convert some rowid in delete
bitmap, the
+ // data might be inconsist.
+ CHECK(it != rowset_id_mapping.end())
+ << "can't find rowset_id " << rst_id.to_string() << " in
convert_rowset_ids";
+ new_del_bitmap_pb->set_rowset_ids(i, it->second.to_string());
+ }
+ }
+ return Status::OK();
+}
+
+Status CloudSnapshotMgr::_create_rowset_meta(
+ RowsetMetaPB* new_rowset_meta_pb, const RowsetMetaPB& source_meta_pb,
+ int64_t target_tablet_id, CloudTabletSPtr& target_tablet,
StorageResource& storage_resource,
+ TabletSchemaSPtr tablet_schema, std::unordered_map<std::string,
std::string>& file_mapping,
+ std::unordered_map<RowsetId, RowsetId>& rowset_id_mapping) {
+ RowsetId dst_rs_id = _engine.next_rowset_id();
+ RowsetWriterContext context;
+ context.rowset_id = dst_rs_id;
+ context.tablet_id = target_tablet_id;
+ context.partition_id = target_tablet->partition_id();
+ context.index_id = target_tablet->index_id();
+ // Note: use origin txn id
+ context.txn_id = source_meta_pb.txn_id();
+ context.txn_expiration = 0;
+ context.rowset_state = source_meta_pb.rowset_state();
+ context.storage_resource = storage_resource;
+ context.tablet = target_tablet;
+ context.version = {source_meta_pb.start_version(),
source_meta_pb.end_version()};
+ context.segments_overlap = source_meta_pb.segments_overlap_pb();
+ context.tablet_schema_hash = source_meta_pb.tablet_schema_hash();
+ if (source_meta_pb.has_tablet_schema()) {
+ context.tablet_schema = std::make_shared<TabletSchema>();
+ context.tablet_schema->init_from_pb(source_meta_pb.tablet_schema());
+ } else {
+ context.tablet_schema = tablet_schema;
+ }
+ context.newest_write_timestamp = source_meta_pb.newest_write_timestamp();
+
+ auto rs_writer = DORIS_TRY(RowsetFactory::create_rowset_writer(_engine,
context, false));
+ // prepare rowsets
+ //
RETURN_IF_ERROR(_engine.meta_mgr().prepare_rowset(*rs_writer->rowset_meta()));
Review Comment:
remove unused code
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
Review Comment:
加注释 这个函数具体做了什么 12345
##########
be/src/cloud/cloud_snapshot_mgr.cpp:
##########
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_mgr.h"
+
+#include <fmt/format.h>
+#include <gen_cpp/olap_file.pb.h>
+
+#include <map>
+#include <unordered_map>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "io/fs/local_file_system.h"
+#include "olap/data_dir.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/pb_helper.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+#include "olap/utils.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
+#include "util/slice.h"
+#include "util/uid_util.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+CloudSnapshotMgr::CloudSnapshotMgr(CloudStorageEngine& engine) :
_engine(engine) {
+ _mem_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"CloudSnapshotMgr");
+}
+
+Status CloudSnapshotMgr::make_snapshot(int64_t target_tablet_id,
StorageResource& storage_resource,
+ std::unordered_map<std::string,
std::string>& file_mapping,
+ bool is_restore, const Slice* slice) {
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ if (is_restore && slice == nullptr) {
+ return Status::Error<INVALID_ARGUMENT>("slice cannot be null in
restore.");
+ }
+
+ CloudTabletSPtr target_tablet =
DORIS_TRY(_engine.tablet_mgr().get_tablet(target_tablet_id));
+ if (target_tablet == nullptr) {
+ return Status::Error<TABLE_NOT_FOUND>("failed to get tablet.
tablet={}", target_tablet_id);
+ }
+
+ TabletMeta tablet_meta;
+ if (is_restore) {
+ // 1. deserialize tablet meta from memory
+ RETURN_IF_ERROR(tablet_meta.create_from_buffer((const
uint8_t*)slice->data, slice->size));
+ TabletMetaPB tablet_meta_pb;
+ tablet_meta.to_meta_pb(&tablet_meta_pb);
+
+ tablet_meta_pb.clear_rs_metas(); // copy the rs meta
+ if (tablet_meta.all_rs_metas().size() > 0) {
+
tablet_meta_pb.mutable_inc_rs_metas()->Reserve(tablet_meta.all_rs_metas().size());
+ for (auto& rs : tablet_meta.all_rs_metas()) {
+ rs->to_rowset_pb(tablet_meta_pb.add_rs_metas());
+ }
+ }
+ tablet_meta_pb.clear_stale_rs_metas(); // copy the stale rs meta
+ if (tablet_meta.all_stale_rs_metas().size() > 0) {
Review Comment:
应该不需要stale rowset?不需要处理 也没法处理?
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
+ }
+ int64_t expiration = snapshot.expiration() > 0
+ ? snapshot.creation_time() +
snapshot.expiration()
+ : snapshot.creation_time();
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_snapshot_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ std::vector<std::string_view> snapshot_keys;
+ auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
Review Comment:
这个流程和 makesnapshot 的流程判断dropped直接覆盖是有正确性问题的
有可能recycler读到dropped后开始recycle tablet 到一半(回收了restore正在写的数据)
restore流程又先于 回收流程结束 commit了snapshot
这个情况下恢复出来是丢数据了的
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
+ }
+ int64_t expiration = snapshot.expiration() > 0
+ ? snapshot.creation_time() +
snapshot.expiration()
+ : snapshot.creation_time();
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_snapshot_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ std::vector<std::string_view> snapshot_keys;
+ auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
+ ++num_scanned;
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed recycle partition value").tag("key",
hex(k));
+ return -1;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(snapshot_pb)) { // not expired
+ return 0;
+ }
+ ++num_expired;
+
+ int64_t tablet_id = snapshot_pb.tablet_id();
+ bool is_restore = snapshot_pb.is_restore();
+ LOG(INFO) << "begin to recycle expired snapshot, instance_id=" <<
instance_id_
Review Comment:
把整个snapshot pb内容都打印出来 这个量是很有限的 不用担心会很多
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -963,6 +967,596 @@ static void set_schema_in_existed_rowset(MetaServiceCode&
code, std::string& msg
}
}
+void scan_snapshot_rowset(
+ Transaction* txn, const std::string& instance_id, int64_t tablet_id,
MetaServiceCode& code,
+ std::string& msg,
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>*
snapshot_rs_metas) {
+ std::stringstream ss;
+ SnapshotRowsetKeyInfo rs_key_info0 {instance_id, tablet_id, 0};
+ SnapshotRowsetKeyInfo rs_key_info1 {instance_id, tablet_id + 1, 0};
+ std::string snapshot_rs_key0;
+ std::string snapshot_rs_key1;
+ snapshot_rowset_key(rs_key_info0, &snapshot_rs_key0);
+ snapshot_rowset_key(rs_key_info1, &snapshot_rs_key1);
+
+ int num_rowsets = 0;
+ std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+ (int*)0x01, [snapshot_rs_key0, snapshot_rs_key1,
&num_rowsets](int*) {
+ LOG(INFO) << "get snapshot rs meta, num_rowsets=" <<
num_rowsets << " range=["
+ << hex(snapshot_rs_key0) << "," <<
hex(snapshot_rs_key1) << "]";
+ });
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ TxnErrorCode err = txn->get(snapshot_rs_key0, snapshot_rs_key1, &it,
true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get snapshot rs meta while committing,"
+ << " tablet_id=" << tablet_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ LOG(INFO) << "range_get snapshot_rs_key=" << hex(k) << "
tablet_id=" << tablet_id;
+ snapshot_rs_metas->emplace_back();
+ if (!snapshot_rs_metas->back().second.ParseFromArray(v.data(),
v.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed snapshot rowset meta, tablet_id=" << tablet_id
+ << " key=" << hex(k);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ snapshot_rs_metas->back().first = std::string(k.data(), k.size());
+ ++num_rowsets;
+ if (!it->has_next()) snapshot_rs_key0 = k;
+ }
+ snapshot_rs_key0.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+ return;
+}
+
+void MetaServiceImpl::make_snapshot(::google::protobuf::RpcController*
controller,
+ const SnapshotRequest* request,
SnapshotResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(make_snapshot);
+ if (!request->has_tablet_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty tablet_id";
+ return;
+ }
+
+ bool is_restore = request->has_is_restore() && request->is_restore();
+ if (is_restore) {
+ if (!request->has_tablet_meta() ||
!request->tablet_meta().rs_metas_size()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = !request->has_tablet_meta() ? "no tablet meta" : "no rowset
meta";
+ return;
+ }
+ if (!request->tablet_meta().has_schema() &&
!request->tablet_meta().has_schema_version()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "tablet_meta must have either schema or schema_version";
+ return;
+ }
+ }
+
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ RPC_RATE_LIMIT(make_snapshot)
+
+ std::unique_ptr<Transaction> txn0;
+ TxnErrorCode err = txn_kv_->create_txn(&txn0);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ // validate request
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(),
tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+
+ auto key = snapshot_tablet_key({instance_id, tablet_idx.tablet_id()});
+ std::string val;
+ err = txn0->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to check snapshot {} existence, err={}",
tablet_idx.tablet_id(),
+ err);
+ return;
+ }
+ if (err == TxnErrorCode::TXN_OK) {
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(val)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "malformed snapshot";
+ LOG_WARNING(msg);
+ return;
+ }
+ if (snapshot_pb.state() != SnapshotPB::DROPPED) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("snapshot {} already exists, state: {}",
tablet_idx.tablet_id(),
+ SnapshotPB::State_Name(snapshot_pb.state()));
+ return;
+ }
+ }
+
+ TabletMetaCloudPB tablet_meta;
+ std::vector<doris::RowsetMetaCloudPB> rs_metas;
+ if (is_restore) {
+ tablet_meta = request->tablet_meta();
+ rs_metas.assign(tablet_meta.rs_metas().begin(),
tablet_meta.rs_metas().end());
+ } else {
+ // backup not implemented
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "not implemented";
+ return;
+ }
+ tablet_meta.clear_rs_metas(); // strip off rs meta
+
+ // 1. save tablet snapshot
+ std::string to_save_val;
+ {
+ SnapshotPB pb;
+ pb.set_tablet_id(tablet_idx.tablet_id());
+ pb.mutable_tablet_meta()->Swap(&tablet_meta);
+ pb.set_creation_time(::time(nullptr));
+ pb.set_expiration(request->expiration());
+ pb.set_is_restore(is_restore);
+ pb.set_state(SnapshotPB::PREPARED);
+ pb.SerializeToString(&to_save_val);
+ }
+ LOG_INFO("put tablet snapshot")
+ .tag("snapshot_tablet_key", hex(key))
+ .tag("tablet_id", tablet_idx.tablet_id())
+ .tag("is_restore", is_restore)
+ .tag("state", SnapshotPB::PREPARED);
+ txn0->put(key, to_save_val);
+ err = txn0->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ msg = fmt::format("failed to commit txn: {}", err);
+ return;
+ }
+
+ // 2. save rs snapshots
+ for (auto& rowset_meta : rs_metas) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+ // put snapshot rowset kv
+ std::string snapshot_rs_key;
+ std::string snapshot_rs_val;
+ SnapshotRowsetKeyInfo rs_key_info {instance_id, tablet_idx.tablet_id(),
+ rowset_meta.end_version()};
+ snapshot_rowset_key(rs_key_info, &snapshot_rs_key);
+ if (!rowset_meta.SerializeToString(&snapshot_rs_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = "failed to serialize rowset meta";
+ return;
+ }
+ txn->put(snapshot_rs_key, snapshot_rs_val);
+ LOG_INFO("put rowset snapshot")
+ .tag("snapshot_rowset_key", hex(snapshot_rs_key))
+ .tag("tablet_id", tablet_idx.tablet_id())
+ .tag("rowset_id", rowset_meta.rowset_id_v2())
+ .tag("rowset_size", snapshot_rs_key.size() +
snapshot_rs_val.size())
+ .tag("rowset_meta", rowset_meta.DebugString());
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to make snapshot,"
+ << " tablet_id=" << tablet_idx.tablet_id()
+ << " rowset_id=" << rowset_meta.rowset_id_v2() << " err=" <<
err;
+ msg = ss.str();
+ return;
+ }
+ }
+}
+
+void MetaServiceImpl::commit_snapshot(::google::protobuf::RpcController*
controller,
+ const SnapshotRequest* request,
SnapshotResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(commit_snapshot);
+ if (!request->has_tablet_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty tablet_id";
+ return;
+ }
+
+ bool is_restore = request->has_is_restore() && request->is_restore();
+ if (!is_restore) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "commit snapshot only support for restore";
+ return;
+ }
+
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ RPC_RATE_LIMIT(commit_snapshot)
+
+ std::unique_ptr<Transaction> txn0;
+ TxnErrorCode err = txn_kv_->create_txn(&txn0);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(),
tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+
+ // 1. get tablet snapshot
+ auto key = snapshot_tablet_key({instance_id, request->tablet_id()});
+ std::string val;
+ err = txn0->get(key, &val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "snapshot not exists or has been recycled";
+ return;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to check snapshot existence, err={}", err);
+ LOG_WARNING(msg);
+ return;
+ }
+
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(val)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "malformed snapshot";
+ LOG_WARNING(msg);
+ return;
+ }
+
+ if (snapshot_pb.state() != SnapshotPB::PREPARED) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("snapshot {} with invalid state: {}",
tablet_idx.tablet_id(),
+ SnapshotPB::State_Name(snapshot_pb.state()));
+ return;
+ }
+
+ // 2. get rs snapshots
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
snapshot_rs_metas;
+ scan_snapshot_rowset(txn0.get(), instance_id, request->tablet_id(), code,
msg,
Review Comment:
这个scan不一定能在一个kv事务里完成,可能超10MB或者超5秒。
其他用了这个函数的地方类似。
这意味着可能要拆多个kv事务,需要着重考虑下并发的commit的时候还有 写snapshot rowset的情况
可能是有问题的
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
+ const std::string task_name = "recycle_snapshots";
+ int64_t num_scanned = 0;
+ int64_t num_expired = 0;
+ int64_t num_recycled = 0;
+ int64_t total_recycle_snapshot_rs_number = 0;
+
+ SnapshotTabletKeyInfo snapshot_key_info0 {instance_id_, 0};
+ SnapshotTabletKeyInfo snapshot_key_info1 {instance_id_, INT64_MAX};
+ std::string snapshot_key0;
+ std::string snapshot_key1;
+ snapshot_tablet_key(snapshot_key_info0, &snapshot_key0);
+ snapshot_tablet_key(snapshot_key_info1, &snapshot_key1);
+
+ LOG_INFO("begin to recycle snapshots").tag("instance_id", instance_id_);
+
+ int64_t start_time =
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count();
+ register_recycle_task(task_name, start_time);
+
+ std::unique_ptr<int, std::function<void(int*)>>
defer_log_statistics((int*)0x01, [&](int*) {
+ unregister_recycle_task(task_name);
+ int64_t cost =
+
duration_cast<seconds>(steady_clock::now().time_since_epoch()).count() -
start_time;
+ LOG_INFO("recycle snapshots finished, cost={}s", cost)
+ .tag("instance_id", instance_id_)
+ .tag("num_scanned", num_scanned)
+ .tag("num_expired", num_expired)
+ .tag("num_recycled", num_recycled)
+ .tag("total_recycle_snapshot_rs_number",
total_recycle_snapshot_rs_number);
+ });
+
+ int64_t earlest_ts = std::numeric_limits<int64_t>::max();
+
+ auto calc_expiration = [&earlest_ts, this](const SnapshotPB& snapshot) {
+ if (config::force_immediate_recycle || snapshot.state() ==
SnapshotPB::DROPPED) {
+ return 0L;
+ }
+ int64_t expiration = snapshot.expiration() > 0
+ ? snapshot.creation_time() +
snapshot.expiration()
+ : snapshot.creation_time();
+ int64_t final_expiration = expiration + config::retention_seconds;
+ if (earlest_ts > final_expiration) {
+ earlest_ts = final_expiration;
+ g_bvar_recycler_recycle_snapshot_earlest_ts.put(instance_id_,
earlest_ts);
+ }
+ return final_expiration;
+ };
+
+ std::vector<std::string_view> snapshot_keys;
+ auto recycle_func = [&, this](std::string_view k, std::string_view v) ->
int {
+ ++num_scanned;
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromArray(v.data(), v.size())) {
+ LOG_WARNING("malformed recycle partition value").tag("key",
hex(k));
+ return -1;
+ }
+ int64_t current_time = ::time(nullptr);
+ if (current_time < calc_expiration(snapshot_pb)) { // not expired
+ return 0;
+ }
+ ++num_expired;
+
+ int64_t tablet_id = snapshot_pb.tablet_id();
+ bool is_restore = snapshot_pb.is_restore();
Review Comment:
这个字段 用一个状态机可能合适一些
添加一个和dropped 同等级的 状态
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -963,6 +967,596 @@ static void set_schema_in_existed_rowset(MetaServiceCode&
code, std::string& msg
}
}
+void scan_snapshot_rowset(
+ Transaction* txn, const std::string& instance_id, int64_t tablet_id,
MetaServiceCode& code,
+ std::string& msg,
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>*
snapshot_rs_metas) {
+ std::stringstream ss;
+ SnapshotRowsetKeyInfo rs_key_info0 {instance_id, tablet_id, 0};
+ SnapshotRowsetKeyInfo rs_key_info1 {instance_id, tablet_id + 1, 0};
+ std::string snapshot_rs_key0;
+ std::string snapshot_rs_key1;
+ snapshot_rowset_key(rs_key_info0, &snapshot_rs_key0);
+ snapshot_rowset_key(rs_key_info1, &snapshot_rs_key1);
+
+ int num_rowsets = 0;
+ std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+ (int*)0x01, [snapshot_rs_key0, snapshot_rs_key1,
&num_rowsets](int*) {
+ LOG(INFO) << "get snapshot rs meta, num_rowsets=" <<
num_rowsets << " range=["
+ << hex(snapshot_rs_key0) << "," <<
hex(snapshot_rs_key1) << "]";
+ });
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ TxnErrorCode err = txn->get(snapshot_rs_key0, snapshot_rs_key1, &it,
true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get snapshot rs meta while committing,"
+ << " tablet_id=" << tablet_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ LOG(INFO) << "range_get snapshot_rs_key=" << hex(k) << "
tablet_id=" << tablet_id;
+ snapshot_rs_metas->emplace_back();
+ if (!snapshot_rs_metas->back().second.ParseFromArray(v.data(),
v.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed snapshot rowset meta, tablet_id=" << tablet_id
+ << " key=" << hex(k);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ snapshot_rs_metas->back().first = std::string(k.data(), k.size());
+ ++num_rowsets;
+ if (!it->has_next()) snapshot_rs_key0 = k;
+ }
+ snapshot_rs_key0.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+ return;
+}
+
+void MetaServiceImpl::make_snapshot(::google::protobuf::RpcController*
controller,
+ const SnapshotRequest* request,
SnapshotResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(make_snapshot);
+ if (!request->has_tablet_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty tablet_id";
+ return;
+ }
+
+ bool is_restore = request->has_is_restore() && request->is_restore();
+ if (is_restore) {
+ if (!request->has_tablet_meta() ||
!request->tablet_meta().rs_metas_size()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = !request->has_tablet_meta() ? "no tablet meta" : "no rowset
meta";
+ return;
+ }
+ if (!request->tablet_meta().has_schema() &&
!request->tablet_meta().has_schema_version()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "tablet_meta must have either schema or schema_version";
+ return;
+ }
+ }
+
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ RPC_RATE_LIMIT(make_snapshot)
+
+ std::unique_ptr<Transaction> txn0;
+ TxnErrorCode err = txn_kv_->create_txn(&txn0);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ // validate request
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(),
tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+
+ auto key = snapshot_tablet_key({instance_id, tablet_idx.tablet_id()});
+ std::string val;
+ err = txn0->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to check snapshot {} existence, err={}",
tablet_idx.tablet_id(),
+ err);
+ return;
+ }
+ if (err == TxnErrorCode::TXN_OK) {
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(val)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "malformed snapshot";
+ LOG_WARNING(msg);
+ return;
+ }
+ if (snapshot_pb.state() != SnapshotPB::DROPPED) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("snapshot {} already exists, state: {}",
tablet_idx.tablet_id(),
+ SnapshotPB::State_Name(snapshot_pb.state()));
+ return;
+ }
+ }
+
+ TabletMetaCloudPB tablet_meta;
+ std::vector<doris::RowsetMetaCloudPB> rs_metas;
+ if (is_restore) {
+ tablet_meta = request->tablet_meta();
+ rs_metas.assign(tablet_meta.rs_metas().begin(),
tablet_meta.rs_metas().end());
+ } else {
+ // backup not implemented
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "not implemented";
+ return;
+ }
+ tablet_meta.clear_rs_metas(); // strip off rs meta
+
+ // 1. save tablet snapshot
+ std::string to_save_val;
+ {
+ SnapshotPB pb;
+ pb.set_tablet_id(tablet_idx.tablet_id());
+ pb.mutable_tablet_meta()->Swap(&tablet_meta);
+ pb.set_creation_time(::time(nullptr));
+ pb.set_expiration(request->expiration());
+ pb.set_is_restore(is_restore);
+ pb.set_state(SnapshotPB::PREPARED);
+ pb.SerializeToString(&to_save_val);
+ }
+ LOG_INFO("put tablet snapshot")
+ .tag("snapshot_tablet_key", hex(key))
+ .tag("tablet_id", tablet_idx.tablet_id())
+ .tag("is_restore", is_restore)
+ .tag("state", SnapshotPB::PREPARED);
+ txn0->put(key, to_save_val);
+ err = txn0->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ msg = fmt::format("failed to commit txn: {}", err);
+ return;
+ }
+
+ // 2. save rs snapshots
+ for (auto& rowset_meta : rs_metas) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+ // put snapshot rowset kv
+ std::string snapshot_rs_key;
+ std::string snapshot_rs_val;
+ SnapshotRowsetKeyInfo rs_key_info {instance_id, tablet_idx.tablet_id(),
+ rowset_meta.end_version()};
+ snapshot_rowset_key(rs_key_info, &snapshot_rs_key);
+ if (!rowset_meta.SerializeToString(&snapshot_rs_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = "failed to serialize rowset meta";
+ return;
+ }
+ txn->put(snapshot_rs_key, snapshot_rs_val);
+ LOG_INFO("put rowset snapshot")
+ .tag("snapshot_rowset_key", hex(snapshot_rs_key))
+ .tag("tablet_id", tablet_idx.tablet_id())
+ .tag("rowset_id", rowset_meta.rowset_id_v2())
+ .tag("rowset_size", snapshot_rs_key.size() +
snapshot_rs_val.size())
+ .tag("rowset_meta", rowset_meta.DebugString());
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
Review Comment:
失败了之后 tablet snapshot 不用处理吗?
后续有什么流程驱动 tablet snapshot?
##########
gensrc/proto/cloud.proto:
##########
@@ -468,6 +468,20 @@ message RecycleStagePB {
optional StagePB stage = 3;
}
+message SnapshotPB {
+ enum State {
+ UNKNOWN = 0;
Review Comment:
多加一些state 用来表示状态, 比如 downloading之类
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
Review Comment:
这里注释可以举个例子 remote path里都可能会有一些什么(就是文件列表)
##########
be/src/cloud/cloud_snapshot_mgr.cpp:
##########
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_mgr.h"
+
+#include <fmt/format.h>
+#include <gen_cpp/olap_file.pb.h>
+
+#include <map>
+#include <unordered_map>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "common/config.h"
+#include "common/logging.h"
+#include "common/status.h"
+#include "io/fs/local_file_system.h"
+#include "olap/data_dir.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/pb_helper.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/storage_policy.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
+#include "olap/tablet_schema_cache.h"
+#include "olap/utils.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
+#include "util/slice.h"
+#include "util/uid_util.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+CloudSnapshotMgr::CloudSnapshotMgr(CloudStorageEngine& engine) :
_engine(engine) {
+ _mem_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER,
"CloudSnapshotMgr");
+}
+
+Status CloudSnapshotMgr::make_snapshot(int64_t target_tablet_id,
StorageResource& storage_resource,
+ std::unordered_map<std::string,
std::string>& file_mapping,
+ bool is_restore, const Slice* slice) {
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ if (is_restore && slice == nullptr) {
+ return Status::Error<INVALID_ARGUMENT>("slice cannot be null in
restore.");
+ }
+
+ CloudTabletSPtr target_tablet =
DORIS_TRY(_engine.tablet_mgr().get_tablet(target_tablet_id));
+ if (target_tablet == nullptr) {
+ return Status::Error<TABLE_NOT_FOUND>("failed to get tablet.
tablet={}", target_tablet_id);
+ }
+
+ TabletMeta tablet_meta;
+ if (is_restore) {
+ // 1. deserialize tablet meta from memory
+ RETURN_IF_ERROR(tablet_meta.create_from_buffer((const
uint8_t*)slice->data, slice->size));
+ TabletMetaPB tablet_meta_pb;
+ tablet_meta.to_meta_pb(&tablet_meta_pb);
+
+ tablet_meta_pb.clear_rs_metas(); // copy the rs meta
+ if (tablet_meta.all_rs_metas().size() > 0) {
+
tablet_meta_pb.mutable_inc_rs_metas()->Reserve(tablet_meta.all_rs_metas().size());
+ for (auto& rs : tablet_meta.all_rs_metas()) {
+ rs->to_rowset_pb(tablet_meta_pb.add_rs_metas());
+ }
+ }
+ tablet_meta_pb.clear_stale_rs_metas(); // copy the stale rs meta
+ if (tablet_meta.all_stale_rs_metas().size() > 0) {
+ tablet_meta_pb.mutable_stale_rs_metas()->Reserve(
+ tablet_meta.all_stale_rs_metas().size());
+ for (auto& rs : tablet_meta.all_stale_rs_metas()) {
+ rs->to_rowset_pb(tablet_meta_pb.add_stale_rs_metas());
+ }
+ }
+
+ // 2. convert rowsets
+ TabletMetaPB new_tablet_meta_pb;
+ RETURN_IF_ERROR(convert_rowsets(&new_tablet_meta_pb, tablet_meta_pb,
target_tablet_id,
+ target_tablet, storage_resource,
file_mapping));
+
+ // 3. send make snapshot request
+ RETURN_IF_ERROR(_engine.meta_mgr().make_snapshot(new_tablet_meta_pb,
true));
+ return Status::OK();
+ }
+
+ // backup not implemented
+
+ LOG(INFO) << "success to make snapshot. [tablet_id=" << target_tablet_id
<< "]";
+ return Status::OK();
+}
+
+Status CloudSnapshotMgr::commit_snapshot(int64_t tablet_id) {
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ CloudTabletSPtr tablet =
DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id));
+ if (tablet == nullptr) {
+ return Status::Error<TABLE_NOT_FOUND>("failed to get tablet.
tablet={}", tablet_id);
+ }
+ RETURN_IF_ERROR(_engine.meta_mgr().commit_snapshot(tablet_id));
+ tablet->clear_cache();
+ LOG(INFO) << "success to commit snapshot. [tablet_id=" << tablet_id << "]";
+ return Status::OK();
+}
+
+Status CloudSnapshotMgr::release_snapshot(int64_t tablet_id) {
+ SCOPED_ATTACH_TASK(_mem_tracker);
+ RETURN_IF_ERROR(_engine.meta_mgr().release_snapshot(tablet_id));
+ LOG(INFO) << "success to release snapshot. [tablet_id=" << tablet_id <<
"]";
+ return Status::OK();
+}
+
+Status CloudSnapshotMgr::convert_rowsets(
+ TabletMetaPB* out, const TabletMetaPB& in, int64_t tablet_id,
+ CloudTabletSPtr& target_tablet, StorageResource& storage_resource,
+ std::unordered_map<std::string, std::string>& file_mapping) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker);
+ // deep copy
+ *out = in;
+
+ out->clear_rs_metas();
+ out->clear_inc_rs_metas();
+ out->clear_stale_rs_metas();
+ // modify id
+ out->set_tablet_id(tablet_id);
+ *out->mutable_tablet_uid() = TabletUid::gen_uid().to_proto();
+ out->set_table_id(target_tablet->table_id());
+ out->set_partition_id(target_tablet->partition_id());
+ out->set_index_id(target_tablet->index_id());
+ PUniqueId* cooldown_meta_id = out->mutable_cooldown_meta_id();
+ cooldown_meta_id->set_hi(0);
+ cooldown_meta_id->set_lo(0);
+
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ tablet_schema->init_from_pb(in.schema());
+
+ std::unordered_map<Version, RowsetMetaPB*, HashOfVersion> rs_version_map;
+ std::unordered_map<RowsetId, RowsetId> rowset_id_mapping;
+ for (auto&& rowset_meta_pb : in.rs_metas()) {
+ RowsetMetaPB* new_rowset_meta_pb = out->add_rs_metas();
+ RETURN_IF_ERROR(_create_rowset_meta(new_rowset_meta_pb,
rowset_meta_pb, tablet_id,
+ target_tablet, storage_resource,
tablet_schema,
+ file_mapping, rowset_id_mapping));
+ Version rowset_version = {rowset_meta_pb.start_version(),
rowset_meta_pb.end_version()};
+ rs_version_map[rowset_version] = new_rowset_meta_pb;
+ }
+
+ for (auto&& stale_rowset_pb : in.stale_rs_metas()) {
+ Version rowset_version = {stale_rowset_pb.start_version(),
stale_rowset_pb.end_version()};
+ auto exist_rs = rs_version_map.find(rowset_version);
+ if (exist_rs != rs_version_map.end()) {
+ continue;
+ }
+ RowsetMetaPB* new_rowset_meta_pb = out->add_stale_rs_metas();
+ RETURN_IF_ERROR(_create_rowset_meta(new_rowset_meta_pb,
stale_rowset_pb, tablet_id,
+ target_tablet, storage_resource,
tablet_schema,
+ file_mapping, rowset_id_mapping));
+ }
+
+ if (!rowset_id_mapping.empty() && in.has_delete_bitmap()) {
Review Comment:
variant 不需要单独处理吗?
##########
be/src/cloud/cloud_snapshot_loader.cpp:
##########
@@ -0,0 +1,237 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_snapshot_loader.h"
+
+#include <gen_cpp/Types_types.h>
+
+#include <unordered_map>
+
+#include "cloud/cloud_snapshot_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "common/logging.h"
+#include "io/fs/broker_file_system.h"
+#include "io/fs/file_system.h"
+#include "io/fs/hdfs_file_system.h"
+#include "io/fs/path.h"
+#include "io/fs/remote_file_system.h"
+#include "io/fs/s3_file_system.h"
+#include "olap/olap_common.h"
+#include "olap/olap_define.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/tablet.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace {
+bool _end_with(std::string_view str, std::string_view match) {
+ return str.size() >= match.size() &&
+ str.compare(str.size() - match.size(), match.size(), match) == 0;
+}
+} // namespace
+
+CloudSnapshotLoader::CloudSnapshotLoader(CloudStorageEngine& engine, ExecEnv*
env, int64_t job_id,
+ int64_t task_id, const
TNetworkAddress& broker_addr,
+ const std::map<std::string,
std::string>& broker_prop)
+ : BaseSnapshotLoader(env, job_id, task_id, broker_addr, broker_prop),
_engine(engine) {};
+
+Status CloudSnapshotLoader::init(TStorageBackendType::type type, const
std::string& location,
+ std::string vault_id) {
+ RETURN_IF_ERROR(BaseSnapshotLoader::init(type, location));
+ _storage_resource = _engine.get_storage_resource(vault_id);
+ if (!_storage_resource) {
+ return Status::InternalError("vault id not found, vault id {}",
vault_id);
+ }
+ return Status::OK();
+}
+
+io::RemoteFileSystemSPtr CloudSnapshotLoader::storage_fs() {
+ return _storage_resource->fs;
+}
+
+Status CloudSnapshotLoader::upload(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::map<int64_t,
std::vector<std::string>>* tablet_files) {
+ return Status::NotSupported("upload not supported");
+}
+
+Status CloudSnapshotLoader::download(const std::map<std::string, std::string>&
src_to_dest_path,
+ std::vector<int64_t>*
downloaded_tablet_ids) {
+ if (!_remote_fs || !_storage_resource) {
+ return Status::InternalError("Storage backend not initialized.");
+ }
+
+ LOG(INFO) << "begin to transfer snapshot files. num: " <<
src_to_dest_path.size()
+ << ", broker addr: " << _broker_addr << ", job: " << _job_id
+ << ", task id: " << _task_id;
+
+ // check if job has already been cancelled
+ int tmp_counter = 1;
+ RETURN_IF_ERROR(_report_every(0, &tmp_counter, 0, 0,
TTaskType::type::DOWNLOAD));
+
+ Status status = Status::OK();
+
+ // 1. for each src path, transfer files to target path
+ int report_counter = 0;
+ int total_num = src_to_dest_path.size();
+ int finished_num = 0;
+ for (const auto& iter : src_to_dest_path) {
+ const std::string& remote_path = iter.first;
+ const std::string& tablet_str = iter.second;
+ int64_t target_tablet_id = -1;
+ try {
+ target_tablet_id = std::stoll(tablet_str);
+ } catch (std::exception& e) {
+ return Status::InternalError("failed to parse target tablet id {},
{}", tablet_str,
+ e.what());
+ }
+ const std::string target_path =
_storage_resource->remote_tablet_path(target_tablet_id);
+
+ // 1.1. check target path not exists
+ bool target_path_exist = false;
+ if (!storage_fs()->exists(target_path, &target_path_exist).ok() ||
target_path_exist) {
+ std::stringstream ss;
+ ss << "failed to download snapshot files, target path already
exists: " << target_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ downloaded_tablet_ids->push_back(target_tablet_id);
+
+ int64_t remote_tablet_id;
+ RETURN_IF_ERROR(_get_tablet_id_from_remote_path(remote_path,
&remote_tablet_id));
+ VLOG_CRITICAL << "get target tablet id: " << target_tablet_id
+ << ", remote tablet id: " << remote_tablet_id;
+
+ // 1.2. get remote files
+ std::map<std::string, FileStat> remote_files;
+ RETURN_IF_ERROR(_list_with_checksum(remote_path, &remote_files));
+ if (remote_files.empty()) {
+ std::stringstream ss;
+ ss << "get nothing from remote path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ auto remote_hdr_file_path = [&remote_files, &remote_path](std::string&
full_hdr_path,
+ size_t*
hdr_file_len) {
+ for (auto iter = remote_files.begin(); iter !=
remote_files.end();) {
+ if (_end_with(iter->first, ".hdr")) {
+ *hdr_file_len = iter->second.size;
+ full_hdr_path = remote_path + "/" + iter->first + "." +
iter->second.md5;
+ // remove hdr file from remote_files
+ iter = remote_files.erase(iter);
+ return true;
+ } else {
+ ++iter;
+ }
+ }
+ return false;
+ };
+
+ size_t hdr_file_len;
+ std::string full_remote_hdr_path;
+ if (!remote_hdr_file_path(full_remote_hdr_path, &hdr_file_len)) {
+ std::stringstream ss;
+ ss << "failed to find hdr file from remote_path: " << remote_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ // 1.3. download hdr file
+ io::FileReaderOptions reader_options {
+ .cache_type = io::FileCachePolicy::NO_CACHE,
+ .is_doris_table = false,
+ .cache_base_path = "",
+ .file_size = static_cast<int64_t>(hdr_file_len),
+ };
+ LOG(INFO) << "download hdr file: " << full_remote_hdr_path;
+ io::FileReaderSPtr hdr_reader = nullptr;
+ RETURN_IF_ERROR(_remote_fs->open_file(full_remote_hdr_path,
&hdr_reader, &reader_options));
+ std::unique_ptr<char[]> read_buf =
std::make_unique_for_overwrite<char[]>(hdr_file_len);
+ size_t read_len = 0;
+ Slice hdr_slice(read_buf.get(), hdr_file_len);
+ RETURN_IF_ERROR(hdr_reader->read_at(0, hdr_slice, &read_len));
+ if (read_len != hdr_file_len) {
+ std::stringstream ss;
+ ss << "failed to read hdr file: " << full_remote_hdr_path;
+ LOG(WARNING) << ss.str();
+ return Status::InternalError(ss.str());
+ }
+
+ RETURN_IF_ERROR(
+ _report_every(0, &tmp_counter, finished_num, total_num,
TTaskType::type::DOWNLOAD));
+
+ // 1.4. make snapshot
+ std::unordered_map<std::string, std::string> file_mapping;
+ RETURN_IF_ERROR(_engine.cloud_snapshot_mgr().make_snapshot(
Review Comment:
make_snapshot 就等价于 downloading了吗?
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2167,6 +2230,175 @@ bool is_txn_aborted(std::shared_ptr<TxnKv> txn_kv,
const std::string& instance_i
return false;
}
+int InstanceRecycler::recycle_snapshots() {
Review Comment:
看完了这个函数实现 回过头来想,这个流程是不是可以不用单独做。
给 recycle tablet里 多加一个回收环节也行?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]