gavinchou commented on code in PR #58459:
URL: https://github.com/apache/doris/pull/58459#discussion_r2633746912
##########
cloud/src/recycler/recycler.cpp:
##########
@@ -1693,6 +1614,223 @@ int64_t calculate_restore_job_expired_time(
return final_expiration;
}
+int InstanceRecycler::abort_txn_for_related_rowset(int64_t txn_id) {
+ AbortTxnRequest req;
+ TxnInfoPB txn_info;
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::string msg;
+ std::stringstream ss;
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to create txn").tag("err", err);
+ return -1;
+ }
+
+ // get txn index
+ TxnIndexPB txn_idx_pb;
+ auto index_key = txn_index_key({instance_id_, txn_id});
+ std::string index_val;
+ err = txn->get(index_key, &index_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get txn index").tag("err", err);
+ return -1;
+ }
+ if (!txn_idx_pb.ParseFromString(index_val)) {
+ LOG_WARNING("failed to parse txn index").tag("err", err);
+ return -1;
+ }
+
+ auto info_key = txn_info_key({instance_id_,
txn_idx_pb.tablet_index().db_id(), txn_id});
+ std::string info_val;
+ err = txn->get(info_key, &info_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG_WARNING("failed to get txn info").tag("err", err);
+ return -1;
+ }
+ if (!txn_info.ParseFromString(info_val)) {
+ LOG_WARNING("failed to parse txn info").tag("err", err);
+ return -1;
+ }
+
+ if (txn_info.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
+ LOG_INFO("txn is not prepared status, txn_id={} status={}", txn_id,
txn_info.status());
+ return 0;
+ }
+
+ req.set_txn_id(txn_id);
+
+ LOG(INFO) << "begin abort txn for related rowset, txn_id=" << txn_id
+ << " instance_id=" << instance_id_ << " txn_info=" <<
txn_info.ShortDebugString();
+
+ _abort_txn(instance_id_, &req, txn.get(), txn_info, ss, code, msg);
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ code = cast_as<ErrCategory::COMMIT>(err);
+ ss << "failed to commit kv txn, txn_id=" << txn_info.txn_id() << "
err=" << err;
+ msg = ss.str();
+ return -1;
+ }
+
+ LOG(INFO) << "finish abort txn for related rowset, txn_id=" << txn_id
+ << " instance_id=" << instance_id_ << " txn_info=" <<
txn_info.ShortDebugString()
+ << " code=" << code << " msg=" << msg;
+
+ return 0;
+}
+
+int InstanceRecycler::abort_job_for_related_rowset(const RowsetMetaCloudPB&
rowset_meta) {
+ FinishTabletJobRequest req;
+ FinishTabletJobResponse res;
+ req.set_action(FinishTabletJobRequest::ABORT);
+ MetaServiceCode code = MetaServiceCode::OK;
+ std::string msg;
+ std::stringstream ss;
+
+ TabletIndexPB tablet_idx;
+ int ret = get_tablet_idx(txn_kv_.get(), instance_id_,
rowset_meta.tablet_id(), tablet_idx);
+ if (ret != 0) {
+ LOG(WARNING) << "failed to get tablet index, tablet_id=" <<
rowset_meta.tablet_id()
+ << " instance_id=" << instance_id_ << " ret=" << ret;
+ return ret;
+ }
+
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv_->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) {
+ LOG(WARNING) << "failed to create txn, instance_id=" << instance_id_
<< " err=" << err;
+ return -1;
+ }
+
+ std::string job_key =
+ job_tablet_key({instance_id_, tablet_idx.table_id(),
tablet_idx.index_id(),
+ tablet_idx.partition_id(),
tablet_idx.tablet_id()});
+ std::string job_val;
+ err = txn->get(job_key, &job_val);
+ if (err != TxnErrorCode::TXN_OK) {
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ LOG(INFO) << "job not exists, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id();
+ return 0;
+ }
+ LOG(WARNING) << "failed to get job, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id() << " err=" <<
err;
+ return -1;
+ }
+
+ TabletJobInfoPB job_pb;
+ if (!job_pb.ParseFromString(job_val)) {
+ LOG(WARNING) << "failed to parse job, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id();
+ return -1;
+ }
+
+ std::string job_id {};
+ if (!job_pb.compaction().empty()) {
+ for (const auto& c : job_pb.compaction()) {
+ if (c.id() == rowset_meta.job_id()) {
+ job_id = c.id();
+ break;
+ }
+ }
+ } else if (job_pb.has_schema_change()) {
+ job_id = job_pb.schema_change().id();
+ }
+
+ if (!job_id.empty() && rowset_meta.job_id() == job_id) {
+ LOG(INFO) << "begin to abort job for related rowset, job_id=" <<
rowset_meta.job_id()
+ << " instance_id=" << instance_id_ << " tablet_id=" <<
tablet_idx.tablet_id();
+ req.mutable_job()->CopyFrom(job_pb);
+ req.set_action(FinishTabletJobRequest::ABORT);
+ _finish_tablet_job(&req, &res, instance_id_, txn, txn_kv_.get(),
+ delete_bitmap_lock_white_list_.get(),
resource_mgr_.get(), code, msg,
+ ss);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "failed to abort job, instance_id=" << instance_id_
+ << " tablet_id=" << tablet_idx.tablet_id() << "
code=" << code
+ << " msg=" << msg;
+ return -1;
+ }
+ LOG(INFO) << "finish abort job for related rowset, job_id=" <<
rowset_meta.job_id()
+ << " instance_id=" << instance_id_ << " tablet_id=" <<
tablet_idx.tablet_id()
+ << " code=" << code << " msg=" << msg;
+ } else {
+ LOG(INFO) << "there is no job for related rowset, directly recycle
rowset data, "
+ "instance_id="
+ << instance_id_ << " tablet_id=" << tablet_idx.tablet_id();
+ }
+
+ return 0;
+}
+
+template <typename T>
+int InstanceRecycler::abort_txn_or_job_for_recycle(T& rowset_meta_pb) {
+ RowsetMetaCloudPB* rs_meta;
+ bool is_prepared_state = true;
+
+ if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
+ // For keys that are not in the RecycleRowsetPB::PREPARE state
+ // we do not need to check the job or txn state
+ // because tmp_rowset_key already exists when this key is generated.
+ is_prepared_state = rowset_meta_pb.type() == RecycleRowsetPB::PREPARE;
+ rs_meta = rowset_meta_pb.mutable_rowset_meta();
+ } else {
+ rs_meta = &rowset_meta_pb;
+ }
+
+ DCHECK(rs_meta != nullptr);
+
+ if (rs_meta->has_load_id()) {
+ // load
+ return abort_txn_for_related_rowset(rs_meta->txn_id());
+ } else {
+ // compaction / schema change
+ if (rs_meta->has_job_id() && is_prepared_state) {
+ return abort_job_for_related_rowset(*rs_meta);
+ }
+ }
+ return 0;
+}
+
+template <typename T>
+int mark_rowset_as_recycled(TxnKv* txn_kv, const std::string& instance_id,
std::string_view key,
+ T& rowset_meta_pb) {
+ RowsetMetaCloudPB* rs_meta;
+
+ if constexpr (std::is_same_v<T, RecycleRowsetPB>) {
+ rs_meta = rowset_meta_pb.mutable_rowset_meta();
+ } else {
+ rs_meta = &rowset_meta_pb;
+ }
+
+ bool need_write_back = false;
+ if ((!rs_meta->has_is_recycled() || !rs_meta->is_recycled())) {
+ rs_meta->set_is_recycled(true);
+ need_write_back = true;
+ }
+
+ if (need_write_back) {
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
Review Comment:
should not create a new transaction for this put only (write only txn cannot
guarantee that the key is not modified by other procedure), use the same txn as
we scan or we need to read the rowset again and then put with a new txn.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]