gavinchou commented on code in PR #58459:
URL: https://github.com/apache/doris/pull/58459#discussion_r2633714911
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -1693,6 +1614,223 @@ int64_t calculate_restore_job_expired_time(
return final_expiration;
}
+int InstanceRecycler::abort_txn_for_related_rowset(int64_t txn_id) {
+ AbortTxnRequest req;
+ TxnInfoPB txn_info;
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::string msg;
+ std::stringstream ss;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create txn").tag("err", err);
+ return -1;
+ }
+
+ // get txn index
+ TxnIndexPB txn_idx_pb;
+ auto index_key = txn_index_key({instance_id_, txn_id});
+ std::string index_val;
+ err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get txn index").tag("err", err);
+ return -1;
+ }
+ if (!txn_idx_pb.ParseFromString(index_val)) {
+ LOG_WARNING("failed to parse txn index").tag("err", err);
+ return -1;
+ }
+
+ auto info_key = txn_info_key({instance_id_,
txn_idx_pb.tablet_index().db_id(), txn_id});
+ std::string info_val;
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get txn info").tag("err", err);
+ return -1;
+ }
+ if (!txn_info.ParseFromString(info_val)) {
+ LOG_WARNING("failed to parse txn info").tag("err", err);
+ return -1;
+ }
+
+ if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+ LOG_INFO("txn is not prepared status, txn_id={} status={}", txn_id,
txn_info.status());
+ return 0;
+ }
+
+ req.set_txn_id(txn_id);
+
+ LOG(INFO) << "begin abort txn for related rowset, txn_id=" << txn_id
+ << " instance_id=" << instance_id_ << " txn_info=" <<
txn_info.ShortDebugString();
+
+ _abort_txn(instance_id_, &req, txn.get(), txn_info, ss, code, msg);
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit kv txn, txn_id=" << txn_info.txn_id() << "
err=" << err;
+ msg = ss.str();
+ return -1;
+ }
+
+ LOG(INFO) << "finish abort txn for related rowset, txn_id=" << txn_id
+ << " instance_id=" << instance_id_ << " txn_info=" <<
txn_info.ShortDebugString()
+ << " code=" << code << " msg=" << msg;
+
+ return 0;
+}
+
+int InstanceRecycler::abort_job_for_related_rowset(const RowsetMetaCloudPB&
rowset_meta) {
+ FinishTabletJobRequest req;
+ FinishTabletJobResponse res;
+ req.set_action(FinishTabletJobRequest::ABORT);
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::string msg;
+ std::stringstream ss;
+
+ TabletIndexPB tablet_idx;
+ int ret = get_tablet_idx(txn_kv_.get(), instance_id_,
rowset_meta.tablet_id(), tablet_idx);
+ if (ret != 0) {
+ LOG(WARNING) << "failed to get tablet index, tablet_id=" <<
rowset_meta.tablet_id()
+ << " instance_id=" << instance_id_ << " ret=" << ret;
+ return ret;
+ }
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn, instance_id=" << instance_id_
<< " err=" << err;
+ return -1;
+ }
+
+ std::string job_key =
+ job_tablet_key({instance_id_, tablet_idx.table_id(),
tablet_idx.index_id(),
+ tablet_idx.partition_id(),
tablet_idx.tablet_id()});
+ std::string job_val;
+ err = txn->get(job_key, &job_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ LOG(INFO) << "job not exists, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id();
+ return 0;
+ }
+ LOG(WARNING) << "failed to get job, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id() << " err=" <<
err;
+ return -1;
+ }
+
+ TabletJobInfoPB job_pb;
+ if (!job_pb.ParseFromString(job_val)) {
+ LOG(WARNING) << "failed to parse job, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id();
+ return -1;
+ }
+
+ std::string job_id {};
+ if (!job_pb.compaction().empty()) {
+ for (const auto& c : job_pb.compaction()) {
+ if (c.id() == rowset_meta.job_id()) {
+ job_id = c.id();
+ break;
+ }
+ }
+ } else if (job_pb.has_schema_change()) {
+ job_id = job_pb.schema_change().id();
+ }
+
+ if (!job_id.empty() && rowset_meta.job_id() == job_id) {
+ LOG(INFO) << "begin to abort job for related rowset, job_id=" <<
rowset_meta.job_id()
+ << " instance_id=" << instance_id_ << " tablet_id=" <<
tablet_idx.tablet_id();
+ req.mutable_job()->CopyFrom(job_pb);
+ req.set_action(FinishTabletJobRequest::ABORT);
+ _finish_tablet_job(&req, &res, instance_id_, txn, txn_kv_.get(),
+ delete_bitmap_lock_white_list_.get(),
resource_mgr_.get(), code, msg,
+ ss);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "failed to abort job, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id() << "
code=" << code
+ << " msg=" << msg;
+ return -1;
+ }
+ LOG(INFO) << "finish abort job for related rowset, job_id=" <<
rowset_meta.job_id()
+ << " instance_id=" << instance_id_ << " tablet_id=" <<
tablet_idx.tablet_id()
+ << " code=" << code << " msg=" << msg;
+ } else {
+ LOG(INFO) << "there is no job for related rowset, directly recycle
rowset data, "
Review Comment:
future todo: LOG job_id and rowset id
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]