dataroaring commented on code in PR #53908:
URL: https://github.com/apache/doris/pull/53908#discussion_r2287137422


##########
be/src/cloud/cloud_meta_mgr.cpp:
##########
@@ -1004,6 +1105,204 @@ Status 
CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
     return Status::OK();
 }
 
+Status CloudMetaMgr::sync_tablet_delete_bitmap_v2(
+        CloudTablet* tablet, int64_t old_max_version, std::ranges::range 
auto&& rs_metas,
+        const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* 
delete_bitmap,
+        bool full_sync, SyncRowsetStats* sync_stats,
+        std::map<std::string, std::string> rowset_to_resource, bool all_sync) {
+    // TODO support sync_tablet_delete_bitmap_by_cache, now sync from ms to 
check the correctness
+
+    GetDeleteBitmapRequest req;
+    GetDeleteBitmapResponse res;
+    req.set_cloud_unique_id(config::cloud_unique_id);
+    req.set_tablet_id(tablet->tablet_id());
+    req.set_store_version(2);
+    if (all_sync) {
+        for (auto& [rowset_id, _] : rowset_to_resource) {
+            req.add_rowset_ids(rowset_id);
+        }
+    } else {
+        DCHECK_EQ(rowset_to_resource.size(), 0);
+        req.set_base_compaction_cnt(stats.base_compaction_cnt());
+        req.set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
+        req.set_cumulative_point(stats.cumulative_point());
+        // When there are many delete bitmaps that need to be synchronized, it
+        // may take a longer time, especially when loading the tablet for the
+        // first time, so set a relatively long timeout time.
+        *(req.mutable_idx()) = idx;
+        for (const auto& rs_meta : rs_metas) {
+            req.add_rowset_ids(rs_meta.rowset_id_v2());
+            rowset_to_resource[rs_meta.rowset_id_v2()] = rs_meta.resource_id();
+        }
+    }
+
+    if (sync_stats) {
+        sync_stats->get_remote_delete_bitmap_rowsets_num += 
req.rowset_ids_size();
+    }
+
+    VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
+
+    auto start = std::chrono::steady_clock::now();
+    auto st = retry_rpc("get delete bitmap", req, &res, 
&MetaService_Stub::get_delete_bitmap);
+    auto end = std::chrono::steady_clock::now();
+    if (st.code() == ErrorCode::THRIFT_RPC_ERROR) {
+        return st;
+    }
+
+    if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
+        return Status::NotFound("failed to get delete bitmap: {}", 
res.status().msg());
+    }
+    // The delete bitmap of stale rowsets will be removed when commit 
compaction job,
+    // then delete bitmap of stale rowsets cannot be obtained. But the rowsets 
obtained
+    // by sync_tablet_rowsets may include these stale rowsets. When this case 
happend, the
+    // error code of ROWSETS_EXPIRED will be returned, we need to retry sync 
rowsets again.
+    //
+    // Be query thread             meta-service          Be compaction thread
+    //      |                            |                         |
+    //      |        get rowset          |                         |
+    //      |--------------------------->|                         |
+    //      |    return get rowset       |                         |
+    //      |<---------------------------|                         |
+    //      |                            |        commit job       |
+    //      |                            |<------------------------|
+    //      |                            |    return commit job    |
+    //      |                            |------------------------>|
+    //      |      get delete bitmap     |                         |
+    //      |--------------------------->|                         |
+    //      |  return get delete bitmap  |                         |
+    //      |<---------------------------|                         |
+    //      |                            |                         |
+    if (res.status().code() == MetaServiceCode::ROWSETS_EXPIRED) {
+        return Status::Error<ErrorCode::ROWSETS_EXPIRED, false>("failed to get 
delete bitmap: {}",
+                                                                
res.status().msg());
+    }
+    if (res.status().code() != MetaServiceCode::OK) {
+        return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to get 
delete bitmap: {}",
+                                                               
res.status().msg());
+    }
+    const auto& rowset_ids = res.rowset_ids();
+    const auto& delete_bitmap_storages = res.delete_bitmap_storages();
+    if (rowset_ids.size() != delete_bitmap_storages.size()) {
+        return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
+                "get delete bitmap data wrong, 
rowset_ids.size={},delete_bitmap_storages.size={}",
+                rowset_ids.size(), delete_bitmap_storages.size());
+    }
+    if (sync_stats) {
+        sync_stats->get_remote_delete_bitmap_rpc_ns +=
+                std::chrono::duration_cast<std::chrono::nanoseconds>(end - 
start).count();
+        sync_stats->get_remote_delete_bitmap_key_count += 
delete_bitmap_storages.size();
+    }
+
+    RowsetIdUnorderedSet all_rs_ids;
+    if (all_sync) {
+        LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
+                  << ", old_max_version=" << old_max_version
+                  << ", all rowset num=" << rowset_to_resource.size()
+                  << ". rowset has delete bitmap num=" << rowset_ids.size();
+    } else {
+        if (old_max_version > 0) {
+            RETURN_IF_ERROR(tablet->get_all_rs_id(old_max_version, 
&all_rs_ids));
+        }
+        for (const auto& rs_meta : rs_metas) {
+            RowsetId rs_id;
+            rs_id.init(rs_meta.rowset_id_v2());
+            all_rs_ids.emplace(rs_id);
+        }
+        LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
+                  << ", old_max_version=" << old_max_version
+                  << ", new rowset num=" << rs_metas.size()
+                  << ", rowset has delete bitmap num=" << rowset_ids.size()
+                  << ". all rowset num=" << all_rs_ids.size();
+    }
+
+    std::mutex result_mtx;
+    Status result;
+    auto merge_delete_bitmap = [&](const std::string& rowset_id, 
DeleteBitmapPB& dbm) {
+        if (dbm.rowset_ids_size() != dbm.segment_ids_size() ||
+            dbm.rowset_ids_size() != dbm.versions_size() ||
+            dbm.rowset_ids_size() != dbm.segment_delete_bitmaps_size()) {
+            return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
+                    "get delete bitmap data wrong, rowset_id={}"
+                    
"rowset_ids.size={},segment_ids.size={},vers.size={},delete_bitmaps.size={}",
+                    rowset_id, dbm.rowset_ids_size(), dbm.segment_ids_size(), 
dbm.versions_size(),
+                    dbm.segment_delete_bitmaps_size());
+        }
+        LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
+                  << ", rowset_id=" << rowset_id
+                  << ", delete_bitmap num=" << 
dbm.segment_delete_bitmaps_size();
+        std::lock_guard lock(result_mtx);
+        for (int j = 0; j < dbm.rowset_ids_size(); j++) {
+            RowsetId rst_id;
+            rst_id.init(dbm.rowset_ids(j));
+            if (!all_sync && !all_rs_ids.contains(rst_id)) {
+                LOG(INFO) << "skip merge delete bitmap for tablet_id=" << 
tablet->tablet_id()
+                          << ", rowset_id=" << rowset_id << ", unused 
rowset_id=" << rst_id;
+                continue;
+            }
+            delete_bitmap->merge(
+                    {rst_id, dbm.segment_ids(j), dbm.versions(j)},
+                    
roaring::Roaring::readSafe(dbm.segment_delete_bitmaps(j).data(),
+                                               
dbm.segment_delete_bitmaps(j).length()));
+            if (sync_stats) {
+                sync_stats->get_remote_delete_bitmap_bytes +=
+                        dbm.segment_delete_bitmaps(j).length();
+            }
+        }
+        return Status::OK();
+    };
+    auto get_delete_bitmap_from_file = [&](const std::string& rowset_id) {
+        LOG(INFO) << "get delete bitmap for tablet_id=" << tablet->tablet_id()
+                  << ", rowset_id=" << rowset_id << " from file";
+        if (rowset_to_resource.find(rowset_id) == rowset_to_resource.end()) {
+            return Status::InternalError("vault id not found for tablet_id={}, 
rowset_id={}",
+                                         tablet->tablet_id(), rowset_id);
+        }
+        auto resource_id = rowset_to_resource[rowset_id];
+        CloudStorageEngine& engine = 
ExecEnv::GetInstance()->storage_engine().to_cloud();
+        auto storage_resource = engine.get_storage_resource(resource_id);
+        if (!storage_resource) {
+            return Status::InternalError("vault id not found, maybe not sync, 
vault id {}",
+                                         resource_id);
+        }
+        DeleteBitmapFileReader reader(tablet->tablet_id(), rowset_id, 
storage_resource);
+        RETURN_IF_ERROR(reader.init());
+        DeleteBitmapPB dbm;
+        RETURN_IF_ERROR(reader.read(dbm));
+        RETURN_IF_ERROR(reader.close());
+        return merge_delete_bitmap(rowset_id, dbm);
+    };
+    CloudStorageEngine& engine = 
ExecEnv::GetInstance()->storage_engine().to_cloud();
+    std::unique_ptr<ThreadPoolToken> token = 
engine.sync_delete_bitmap_thread_pool().new_token(
+            ThreadPool::ExecutionMode::CONCURRENT);
+    for (int i = 0; i < rowset_ids.size(); i++) {
+        auto& rowset_id = rowset_ids[i];
+        if (delete_bitmap_storages[i].store_in_fdb()) {
+            DeleteBitmapPB dbm = delete_bitmap_storages[i].delete_bitmap();
+            RETURN_IF_ERROR(merge_delete_bitmap(rowset_id, dbm));
+        } else {
+            auto submit_st = token->submit_func([&]() {
+                auto status = get_delete_bitmap_from_file(rowset_id);
+                if (!status.ok()) {
+                    LOG(WARNING) << "failed to get delete bitmap for 
tablet_id="
+                                 << tablet->tablet_id() << ", rowset_id=" << 
rowset_id
+                                 << " from file, st=" << status.to_string();
+                    std::lock_guard lock(result_mtx);
+                    if (result.ok()) {
+                        result = status;
+                    }
+                }
+            });
+            RETURN_IF_ERROR(submit_st);
+        }
+    }
+    // wait for all finished
+    token->wait();

Review Comment:
   We should not hang pthread of bthread here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to