dataroaring commented on code in PR #57770:
URL: https://github.com/apache/doris/pull/57770#discussion_r2512061073


##########
cloud/src/recycler/recycler.cpp:
##########
@@ -2062,6 +2444,178 @@ int InstanceRecycler::delete_rowset_data(const 
RowsetMetaCloudPB& rs_meta_pb) {
     return accessor->delete_files(file_paths);
 }
 
+int InstanceRecycler::process_merged_file_segment_index(
+        const doris::RowsetMetaCloudPB& rs_meta_pb,
+        std::vector<std::string>* merged_files_to_delete) {
+    if (merged_files_to_delete == nullptr) {
+        return -1;
+    }
+    const auto& index_map = rs_meta_pb.merge_file_segment_index();
+    if (index_map.empty()) {
+        return 0;
+    }
+    struct MergeSmallFileInfo {
+        std::string small_file_path;
+    };
+    std::unordered_map<std::string, std::vector<MergeSmallFileInfo>> 
merged_file_updates;
+    merged_file_updates.reserve(index_map.size());
+    for (const auto& [small_path, index_pb] : index_map) {
+        if (!index_pb.has_merge_file_path() || 
index_pb.merge_file_path().empty()) {
+            continue;
+        }
+        
merged_file_updates[index_pb.merge_file_path()].push_back(MergeSmallFileInfo 
{small_path});
+    }
+    if (merged_file_updates.empty()) {
+        return 0;
+    }
+
+    int ret = 0;
+    for (auto& [merged_file_path, small_files] : merged_file_updates) {
+        if (small_files.empty()) {
+            continue;
+        }
+
+        bool success = false;
+        do {
+            std::unique_ptr<Transaction> txn;
+            TxnErrorCode err = txn_kv_->create_txn(&txn);
+            if (err != TxnErrorCode::TXN_OK) {
+                LOG_WARNING("failed to create txn when updating merged file 
ref count")
+                        .tag("instance_id", instance_id_)
+                        .tag("merged_file_path", merged_file_path)
+                        .tag("rowset_id", rs_meta_pb.rowset_id_v2())
+                        .tag("tablet_id", rs_meta_pb.tablet_id())
+                        .tag("err", err);
+                ret = -1;
+                break;
+            }
+
+            std::string merged_key = merge_file_key({instance_id_, 
merged_file_path});
+            std::string merged_val;
+            err = txn->get(merged_key, &merged_val);
+            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                LOG_WARNING("merged file info not found when recycling rowset")
+                        .tag("instance_id", instance_id_)
+                        .tag("merged_file_path", merged_file_path)
+                        .tag("rowset_id", rs_meta_pb.rowset_id_v2())
+                        .tag("tablet_id", rs_meta_pb.tablet_id());
+                ret = -1;
+                break;
+            }
+            if (err != TxnErrorCode::TXN_OK) {
+                LOG_WARNING("failed to get merged file info when recycling 
rowset")
+                        .tag("instance_id", instance_id_)
+                        .tag("merged_file_path", merged_file_path)
+                        .tag("rowset_id", rs_meta_pb.rowset_id_v2())
+                        .tag("tablet_id", rs_meta_pb.tablet_id())
+                        .tag("err", err);
+                ret = -1;
+                break;
+            }
+
+            cloud::MergedFileInfoPB merge_info;
+            if (!merge_info.ParseFromString(merged_val)) {
+                LOG_WARNING("failed to parse merged file info when recycling 
rowset")
+                        .tag("instance_id", instance_id_)
+                        .tag("merged_file_path", merged_file_path)
+                        .tag("rowset_id", rs_meta_pb.rowset_id_v2())
+                        .tag("tablet_id", rs_meta_pb.tablet_id());
+                ret = -1;
+                break;
+            }
+
+            auto* small_file_entries = merge_info.mutable_small_files();
+            int64_t changed_files = 0;
+            for (const auto& small_file_info : small_files) {
+                bool found = false;
+                for (auto& small_file_entry : *small_file_entries) {
+                    if (small_file_entry.path() == 
small_file_info.small_file_path) {
+                        if (!small_file_entry.deleted()) {
+                            small_file_entry.set_deleted(true);
+                            ++changed_files;
+                        }
+                        found = true;
+                        break;
+                    }
+                }
+                if (!found) {
+                    LOG_WARNING("merged file info missing small file entry")
+                            .tag("instance_id", instance_id_)
+                            .tag("merged_file_path", merged_file_path)
+                            .tag("small_file_path", 
small_file_info.small_file_path)
+                            .tag("rowset_id", rs_meta_pb.rowset_id_v2())
+                            .tag("tablet_id", rs_meta_pb.tablet_id());
+                }
+            }
+
+            if (changed_files == 0) {
+                success = true;
+                break;
+            }
+
+            int64_t left_file_num = 0;
+            int64_t left_file_bytes = 0;
+            for (const auto& small_file_entry : merge_info.small_files()) {
+                if (!small_file_entry.deleted()) {
+                    ++left_file_num;
+                    left_file_bytes += small_file_entry.size();
+                }
+            }
+            merge_info.set_left_file_num(left_file_num);
+            merge_info.set_left_file_bytes(left_file_bytes);
+            merge_info.set_ref_cnt(left_file_num);
+            if (left_file_num == 0) {
+                merge_info.set_state(cloud::MergedFileInfoPB::RECYCLING);
+            }
+
+            std::string updated_val;
+            if (!merge_info.SerializeToString(&updated_val)) {
+                LOG_WARNING("failed to serialize merged file info when 
recycling rowset")
+                        .tag("instance_id", instance_id_)
+                        .tag("merged_file_path", merged_file_path)
+                        .tag("rowset_id", rs_meta_pb.rowset_id_v2())
+                        .tag("tablet_id", rs_meta_pb.tablet_id());
+                ret = -1;
+                break;
+            }
+
+            txn->put(merged_key, updated_val);
+            err = txn->commit();
+            if (err == TxnErrorCode::TXN_OK) {
+                success = true;
+                if (left_file_num == 0) {
+                    merged_files_to_delete->push_back(merged_file_path);

Review Comment:
   add a log info here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to