dataroaring commented on code in PR #53824:
URL: https://github.com/apache/doris/pull/53824#discussion_r2262064561


##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -267,6 +282,248 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema() 
const {
     return _merged_tablet_schema;
 }
 
+bool CloudTablet::split_rowsets_by_version_overlap(
+        const std::vector<RowsetSharedPtr>& input_rowsets,
+        std::vector<RowsetSharedPtr>* new_rowsets,
+        std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+    auto max_version = max_version_unlocked();
+    for (auto rs : input_rowsets) {
+        if (rs->version().first > max_version) {
+            new_rowsets->push_back(rs);
+        } else if (rs->version().second <= max_version) {
+            overlapping_rowsets->push_back(rs);
+        } else {
+            new_rowsets->clear();
+            overlapping_rowsets->clear();
+            return false;
+        }
+    }
+    return true;
+}
+
+WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
+    std::shared_lock rlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    return _rowset_warm_up_states[rowset_id].first;
+}
+
+bool CloudTablet::add_rowset_warmup_state(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    std::lock_guard wlock(_meta_lock);
+    return add_rowset_warmup_state_unlocked(rowset, state);
+}
+
+bool CloudTablet::add_rowset_warmup_state_unlocked(RowsetMetaSharedPtr rowset, 
WarmUpState state) {
+    if (_rowset_warm_up_states.find(rowset->rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return false;
+    }
+    if (state == WarmUpState::TRIGGERED_BY_JOB) {
+        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
+    } else if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
+    }
+    _rowset_warm_up_states[rowset->rowset_id()] = std::make_pair(state, 
rowset->num_segments());
+    return true;
+}
+
+WarmUpState CloudTablet::complete_rowset_segment_warmup(RowsetId rowset_id, 
Status status) {
+    std::lock_guard wlock(_meta_lock);
+    if (_rowset_warm_up_states.find(rowset_id) == 
_rowset_warm_up_states.end()) {
+        return WarmUpState::NONE;
+    }
+    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << 
", " << status;
+    if (status.ok()) {
+        g_file_cache_warm_up_segment_complete_num << 1;
+        _rowset_warm_up_states[rowset_id].second--;
+        if (_rowset_warm_up_states[rowset_id].second == 0) {
+            g_file_cache_warm_up_rowset_complete_num << 1;
+            _rowset_warm_up_states[rowset_id].first = WarmUpState::DONE;
+        }
+        return _rowset_warm_up_states[rowset_id].first;
+    }
+    // !status.ok()
+    g_file_cache_warm_up_segment_complete_num << 1;
+    g_file_cache_warm_up_rowset_complete_num << 1;
+    _rowset_warm_up_states.erase(rowset_id);
+    return WarmUpState::NONE;
+}
+
+void CloudTablet::warm_up_rowset_unlocked(RowsetSharedPtr rowset, bool 
version_overlap,
+                                          bool delay_add_rowset) {
+    if (_rowset_warm_up_states.find(rowset->rowset_id()) != 
_rowset_warm_up_states.end()) {
+        return;
+    }
+    if (delay_add_rowset) {
+        g_file_cache_query_driven_warmup_delayed_rowset_num << 1;
+        LOG(INFO) << "triggered a warm up for overlapping rowset " << 
rowset->version()
+                  << ", will add it to tablet meta latter";
+    }
+    // warmup rowset data in background
+    bool download_task_submitted = false;
+    for (int seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
+        const auto& rowset_meta = rowset->rowset_meta();
+        constexpr int64_t interval = 600; // 10 mins
+        // When BE restart and receive the `load_sync` rpc, it will sync all 
historical rowsets first time.
+        // So we need to filter out the old rowsets avoid to download the 
whole table.
+        if (!version_overlap &&
+            ::time(nullptr) - rowset_meta->newest_write_timestamp() >= 
interval) {
+            continue;
+        }
+
+        auto storage_resource = rowset_meta->remote_storage_resource();
+        if (!storage_resource) {
+            LOG(WARNING) << storage_resource.error();
+            continue;
+        }
+
+        int64_t expiration_time =
+                _tablet_meta->ttl_seconds() == 0 || 
rowset_meta->newest_write_timestamp() <= 0
+                        ? 0
+                        : rowset_meta->newest_write_timestamp() + 
_tablet_meta->ttl_seconds();
+        g_file_cache_cloud_tablet_submitted_segment_num << 1;
+        if (rowset->rowset_meta()->segment_file_size(seg_id) > 0) {
+            g_file_cache_cloud_tablet_submitted_segment_size
+                    << rowset->rowset_meta()->segment_file_size(seg_id);
+        }
+        auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
+        // clang-format off
+        
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta 
{
+                .path = 
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+                .file_size = rowset->rowset_meta()->segment_file_size(seg_id),
+                .file_system = storage_resource.value()->fs,
+                .ctx =
+                        {
+                                .expiration_time = expiration_time,
+                                .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                        },
+                .download_done {[self, rowset, delay_add_rowset](Status st) {
+                    self->warm_up_done_cb(rowset, st, delay_add_rowset);
+                    if (!st) {
+                        LOG_WARNING("add rowset warm up error ").error(st);
+                    }
+                }},
+        });
+        download_task_submitted = true;
+
+        auto download_idx_file = [&](const io::Path& idx_path, int64_t 
idx_size) {
+            io::DownloadFileMeta meta {
+                    .path = idx_path,
+                    .file_size = idx_size,
+                    .file_system = storage_resource.value()->fs,
+                    .ctx =
+                            {
+                                    .expiration_time = expiration_time,
+                                    .is_dryrun = 
config::enable_reader_dryrun_when_download_file_cache,
+                            },
+                     // TODO: consider index file for warm up state management
+                    .download_done {[](Status st) {
+                        if (!st) {
+                            LOG_WARNING("add rowset warm up error ").error(st);
+                        }
+                    }},
+            };
+            
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
+            g_file_cache_cloud_tablet_submitted_index_num << 1;
+            g_file_cache_cloud_tablet_submitted_index_size << idx_size;
+        };
+        // clang-format on
+        auto schema_ptr = rowset_meta->tablet_schema();
+        auto idx_version = schema_ptr->get_inverted_index_storage_format();
+        if (idx_version == InvertedIndexStorageFormatPB::V1) {
+            std::unordered_map<int64_t, int64_t> index_size_map;
+            auto&& inverted_index_info = 
rowset_meta->inverted_index_file_info(seg_id);
+            for (const auto& info : inverted_index_info.index_info()) {
+                if (info.index_file_size() != -1) {
+                    index_size_map[info.index_id()] = info.index_file_size();
+                } else {
+                    VLOG_DEBUG << "Invalid index_file_size for segment_id " << 
seg_id
+                               << ", index_id " << info.index_id();
+                }
+            }
+            for (const auto& index : schema_ptr->inverted_indexes()) {
+                auto idx_path = storage_resource.value()->remote_idx_v1_path(
+                        *rowset_meta, seg_id, index->index_id(), 
index->get_index_suffix());
+                download_idx_file(idx_path, index_size_map[index->index_id()]);
+            }
+        } else {
+            if (schema_ptr->has_inverted_index()) {
+                auto&& inverted_index_info = 
rowset_meta->inverted_index_file_info(seg_id);
+                int64_t idx_size = 0;
+                if (inverted_index_info.has_index_size()) {
+                    idx_size = inverted_index_info.index_size();
+                } else {
+                    VLOG_DEBUG << "index_size is not set for segment " << 
seg_id;
+                }
+                auto idx_path = 
storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id);
+                download_idx_file(idx_path, idx_size);
+            }
+        }
+    }
+    if (download_task_submitted) {
+        VLOG_DEBUG << "warm up rowset " << rowset->version() << " triggerd by 
sync rowset";
+        add_rowset_warmup_state_unlocked(rowset->rowset_meta(),
+                                         
WarmUpState::TRIGGERED_BY_SYNC_ROWSET);
+    }
+}
+
+bool CloudTablet::is_warm_up_conflict_with_compaction() {
+    std::shared_lock rdlock(_meta_lock);
+    for (auto& [rowset_id, state] : _rowset_warm_up_states) {
+        if (state.first != WarmUpState::DONE) {
+            return true;
+        }
+    }
+    return false;
+}
+
+void CloudTablet::warm_up_done_cb(RowsetSharedPtr rowset, Status status, bool 
delay_add_rowset) {
+    if (delay_add_rowset) {
+        DBUG_EXECUTE_IF("CloudTablet.warm_up_done_cb.inject_sleep_s", {
+            auto sleep_time = dp->param("sleep", 3);
+            LOG_INFO("CloudTablet.warm_up_done_cb.inject_sleep {} s", 
sleep_time)
+                    .tag("tablet_id", tablet_id());
+            std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
+        });
+    }
+    VLOG_DEBUG << "warm up rowset " << rowset->version() << " done";
+    auto res = complete_rowset_segment_warmup(rowset->rowset_id(), status);
+    if (res != WarmUpState::DONE && res != WarmUpState::NONE) {
+        if (res == WarmUpState::TRIGGERED_BY_JOB) {
+            LOG(WARNING) << "should not happen, rowset: " << rowset->version()
+                         << " warm up is triggered by warm up job but use "
+                            "CloudTablet::warm_up_done_cb as done callback";
+        }
+        // none success or failure
+        return;
+    }
+    if (config::enable_query_driven_warmup && delay_add_rowset) {
+        g_file_cache_query_driven_warmup_delayed_rowset_add_num << 1;
+        LOG(INFO) << "warm up completed, rowset: " << rowset->rowset_id()
+                  << ", version: " << rowset->version();
+        std::unique_lock<std::shared_mutex> meta_lock(_meta_lock);
+        for (auto [ver, rs] : _rs_version_map) {
+            // e.g. ver = [5-10]
+            // if rowset->version() is [5-8], it should not be added

Review Comment:
   The function add_rowsets has two assumptions:
   1. newly rowsets added after max version.
   2. some rowsets are replaced like compaction or sc.
   
   It does not handle rowsets are not aligned with existing. e.g. there are 
rowsets [5,10][10,20] then [5,15] should not be added.
   
   I am not sure if this case can happen.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to