xy720 commented on code in PR #47300:
URL: https://github.com/apache/doris/pull/47300#discussion_r2227249312
##########
cloud/src/meta-service/meta_service.cpp:
##########
@@ -963,6 +967,596 @@ static void set_schema_in_existed_rowset(MetaServiceCode&
code, std::string& msg
}
}
+void scan_snapshot_rowset(
+ Transaction* txn, const std::string& instance_id, int64_t tablet_id,
MetaServiceCode& code,
+ std::string& msg,
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>*
snapshot_rs_metas) {
+ std::stringstream ss;
+ SnapshotRowsetKeyInfo rs_key_info0 {instance_id, tablet_id, 0};
+ SnapshotRowsetKeyInfo rs_key_info1 {instance_id, tablet_id + 1, 0};
+ std::string snapshot_rs_key0;
+ std::string snapshot_rs_key1;
+ snapshot_rowset_key(rs_key_info0, &snapshot_rs_key0);
+ snapshot_rowset_key(rs_key_info1, &snapshot_rs_key1);
+
+ int num_rowsets = 0;
+ std::unique_ptr<int, std::function<void(int*)>> defer_log_range(
+ (int*)0x01, [snapshot_rs_key0, snapshot_rs_key1,
&num_rowsets](int*) {
+ LOG(INFO) << "get snapshot rs meta, num_rowsets=" <<
num_rowsets << " range=["
+ << hex(snapshot_rs_key0) << "," <<
hex(snapshot_rs_key1) << "]";
+ });
+
+ std::unique_ptr<RangeGetIterator> it;
+ do {
+ TxnErrorCode err = txn->get(snapshot_rs_key0, snapshot_rs_key1, &it,
true);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ ss << "failed to get snapshot rs meta while committing,"
+ << " tablet_id=" << tablet_id << " err=" << err;
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ LOG(INFO) << "range_get snapshot_rs_key=" << hex(k) << "
tablet_id=" << tablet_id;
+ snapshot_rs_metas->emplace_back();
+ if (!snapshot_rs_metas->back().second.ParseFromArray(v.data(),
v.size())) {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ ss << "malformed snapshot rowset meta, tablet_id=" << tablet_id
+ << " key=" << hex(k);
+ msg = ss.str();
+ LOG(WARNING) << msg;
+ return;
+ }
+ snapshot_rs_metas->back().first = std::string(k.data(), k.size());
+ ++num_rowsets;
+ if (!it->has_next()) snapshot_rs_key0 = k;
+ }
+ snapshot_rs_key0.push_back('\x00'); // Update to next smallest key for
iteration
+ } while (it->more());
+ return;
+}
+
+void MetaServiceImpl::make_snapshot(::google::protobuf::RpcController*
controller,
+ const SnapshotRequest* request,
SnapshotResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(make_snapshot);
+ if (!request->has_tablet_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty tablet_id";
+ return;
+ }
+
+ bool is_restore = request->has_is_restore() && request->is_restore();
+ if (is_restore) {
+ if (!request->has_tablet_meta() ||
!request->tablet_meta().rs_metas_size()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = !request->has_tablet_meta() ? "no tablet meta" : "no rowset
meta";
+ return;
+ }
+ if (!request->tablet_meta().has_schema() &&
!request->tablet_meta().has_schema_version()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "tablet_meta must have either schema or schema_version";
+ return;
+ }
+ }
+
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ RPC_RATE_LIMIT(make_snapshot)
+
+ std::unique_ptr<Transaction> txn0;
+ TxnErrorCode err = txn_kv_->create_txn(&txn0);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ // validate request
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(),
tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+
+ auto key = snapshot_tablet_key({instance_id, tablet_idx.tablet_id()});
+ std::string val;
+ err = txn0->get(key, &val);
+ if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND)
{
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to check snapshot {} existence, err={}",
tablet_idx.tablet_id(),
+ err);
+ return;
+ }
+ if (err == TxnErrorCode::TXN_OK) {
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(val)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "malformed snapshot";
+ LOG_WARNING(msg);
+ return;
+ }
+ if (snapshot_pb.state() != SnapshotPB::DROPPED) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("snapshot {} already exists, state: {}",
tablet_idx.tablet_id(),
+ SnapshotPB::State_Name(snapshot_pb.state()));
+ return;
+ }
+ }
+
+ TabletMetaCloudPB tablet_meta;
+ std::vector<doris::RowsetMetaCloudPB> rs_metas;
+ if (is_restore) {
+ tablet_meta = request->tablet_meta();
+ rs_metas.assign(tablet_meta.rs_metas().begin(),
tablet_meta.rs_metas().end());
+ } else {
+ // backup not implemented
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "not implemented";
+ return;
+ }
+ tablet_meta.clear_rs_metas(); // strip off rs meta
+
+ // 1. save tablet snapshot
+ std::string to_save_val;
+ {
+ SnapshotPB pb;
+ pb.set_tablet_id(tablet_idx.tablet_id());
+ pb.mutable_tablet_meta()->Swap(&tablet_meta);
+ pb.set_creation_time(::time(nullptr));
+ pb.set_expiration(request->expiration());
+ pb.set_is_restore(is_restore);
+ pb.set_state(SnapshotPB::PREPARED);
+ pb.SerializeToString(&to_save_val);
+ }
+ LOG_INFO("put tablet snapshot")
+ .tag("snapshot_tablet_key", hex(key))
+ .tag("tablet_id", tablet_idx.tablet_id())
+ .tag("is_restore", is_restore)
+ .tag("state", SnapshotPB::PREPARED);
+ txn0->put(key, to_save_val);
+ err = txn0->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ msg = fmt::format("failed to commit txn: {}", err);
+ return;
+ }
+
+ // 2. save rs snapshots
+ for (auto& rowset_meta : rs_metas) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+ // put snapshot rowset kv
+ std::string snapshot_rs_key;
+ std::string snapshot_rs_val;
+ SnapshotRowsetKeyInfo rs_key_info {instance_id, tablet_idx.tablet_id(),
+ rowset_meta.end_version()};
+ snapshot_rowset_key(rs_key_info, &snapshot_rs_key);
+ if (!rowset_meta.SerializeToString(&snapshot_rs_val)) {
+ code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
+ msg = "failed to serialize rowset meta";
+ return;
+ }
+ txn->put(snapshot_rs_key, snapshot_rs_val);
+ LOG_INFO("put rowset snapshot")
+ .tag("snapshot_rowset_key", hex(snapshot_rs_key))
+ .tag("tablet_id", tablet_idx.tablet_id())
+ .tag("rowset_id", rowset_meta.rowset_id_v2())
+ .tag("rowset_size", snapshot_rs_key.size() +
snapshot_rs_val.size())
+ .tag("rowset_meta", rowset_meta.DebugString());
+
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to make snapshot,"
+ << " tablet_id=" << tablet_idx.tablet_id()
+ << " rowset_id=" << rowset_meta.rowset_id_v2() << " err=" <<
err;
+ msg = ss.str();
+ return;
+ }
+ }
+}
+
+void MetaServiceImpl::commit_snapshot(::google::protobuf::RpcController*
controller,
+ const SnapshotRequest* request,
SnapshotResponse* response,
+ ::google::protobuf::Closure* done) {
+ RPC_PREPROCESS(commit_snapshot);
+ if (!request->has_tablet_id()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty tablet_id";
+ return;
+ }
+
+ bool is_restore = request->has_is_restore() && request->is_restore();
+ if (!is_restore) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "commit snapshot only support for restore";
+ return;
+ }
+
+ instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
+ if (instance_id.empty()) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "empty instance_id";
+ LOG(INFO) << msg << ", cloud_unique_id=" << request->cloud_unique_id();
+ return;
+ }
+
+ RPC_RATE_LIMIT(commit_snapshot)
+
+ std::unique_ptr<Transaction> txn0;
+ TxnErrorCode err = txn_kv_->create_txn(&txn0);
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::CREATE>(err);
+ msg = "failed to init txn";
+ return;
+ }
+
+ TabletIndexPB tablet_idx;
+ get_tablet_idx(code, msg, txn0.get(), instance_id, request->tablet_id(),
tablet_idx);
+ if (code != MetaServiceCode::OK) {
+ return;
+ }
+
+ // 1. get tablet snapshot
+ auto key = snapshot_tablet_key({instance_id, request->tablet_id()});
+ std::string val;
+ err = txn0->get(key, &val);
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "snapshot not exists or has been recycled";
+ return;
+ }
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::READ>(err);
+ msg = fmt::format("failed to check snapshot existence, err={}", err);
+ LOG_WARNING(msg);
+ return;
+ }
+
+ SnapshotPB snapshot_pb;
+ if (!snapshot_pb.ParseFromString(val)) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = "malformed snapshot";
+ LOG_WARNING(msg);
+ return;
+ }
+
+ if (snapshot_pb.state() != SnapshotPB::PREPARED) {
+ code = MetaServiceCode::INVALID_ARGUMENT;
+ msg = fmt::format("snapshot {} with invalid state: {}",
tablet_idx.tablet_id(),
+ SnapshotPB::State_Name(snapshot_pb.state()));
+ return;
+ }
+
+ // 2. get rs snapshots
+ std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>
snapshot_rs_metas;
+ scan_snapshot_rowset(txn0.get(), instance_id, request->tablet_id(), code,
msg,
Review Comment:
已加入todo
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]