github-actions[bot] commented on code in PR #59754:
URL: https://github.com/apache/doris/pull/59754#discussion_r2925144809


##########
be/src/cloud/cloud_txn_delete_bitmap_cache.cpp:
##########
@@ -96,6 +97,46 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
     return st;
 }
 
+Result<std::pair<RowsetSharedPtr, DeleteBitmapPtr>>
+CloudTxnDeleteBitmapCache::get_rowset_and_delete_bitmap(TTransactionId 
transaction_id,
+                                                        int64_t tablet_id) {
+    RowsetSharedPtr rowset;
+    {
+        std::shared_lock<std::shared_mutex> rlock(_rwlock);
+        TxnKey txn_key(transaction_id, tablet_id);
+        if (_empty_rowset_markers.contains(txn_key)) {
+            return std::make_pair(nullptr, nullptr);
+        }
+        auto iter = _txn_map.find(txn_key);
+        if (iter == _txn_map.end()) {
+            return ResultError(Status::InternalError<false>(""));
+        }
+        if (!(iter->second.publish_status &&
+              *(iter->second.publish_status) == PublishStatus::SUCCEED)) {
+            return ResultError(Status::InternalError<false>(""));
+        }
+        rowset = iter->second.rowset;
+    }
+
+    std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+    CacheKey key(key_str);
+    Cache::Handle* handle = lookup(key);
+
+    DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss", 
{
+        handle = nullptr;
+        LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss, 
make cache missed "
+                     "when get delete bitmap, txn_id:"
+                  << transaction_id << ", tablet_id: " << tablet_id;
+    });
+    DeleteBitmapCacheValue* val =
+            handle == nullptr ? nullptr : 
reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
+    if (!val) {
+        return ResultError(Status::InternalError<false>(""));
+    }
+
+    return std::make_pair(rowset, val->delete_bitmap);
+}

Review Comment:
   **Bug (High): Cache::Handle leak** - The `handle` obtained from `lookup()` 
is never released on the success path. This will prevent LRU eviction of this 
cache entry. Every other caller of `lookup()` in the codebase releases the 
handle, including the adjacent `get_delete_bitmap()` method in this same file 
(which has an explicit comment: "must call release handle to reduce the 
reference count, otherwise there will be memory leak").
   
   Suggested fix:
   ```cpp
   auto result = std::make_pair(rowset, val->delete_bitmap);
   release(handle);
   return result;
   ```
   Or use `Defer` for safety:
   ```cpp
   Defer defer {[this, handle] { release(handle); }};
   return std::make_pair(rowset, val->delete_bitmap);
   ```



##########
be/src/cloud/cloud_delete_task.cpp:
##########
@@ -109,13 +109,15 @@ Status CloudDeleteTask::execute(CloudStorageEngine& 
engine, const TPushReq& requ
     tablet->fetch_add_approximate_cumu_num_rowsets(1);
 
     // TODO(liaoxin) delete operator don't send calculate delete bitmap task 
from fe,
-    //  then we don't need to set_txn_related_delete_bitmap here.
+    //  then we don't need to set_txn_related_info here.
     if (tablet->enable_unique_key_merge_on_write()) {
         DeleteBitmapPtr delete_bitmap = 
std::make_shared<DeleteBitmap>(tablet->tablet_id());
         RowsetIdUnorderedSet rowset_ids;
         engine.txn_delete_bitmap_cache().set_tablet_txn_info(
                 request.transaction_id, tablet->tablet_id(), delete_bitmap, 
rowset_ids, rowset,
                 request.timeout, nullptr);
+    } else {
+        engine.meta_mgr().cache_committed_rowset(rowset->rowset_meta(), 
context.txn_expiration);
     }

Review Comment:
   **Issue (Medium): Missing config guard** - This `cache_committed_rowset()` 
call is not gated by `config::enable_cloud_make_rs_visible_on_be`, unlike the 
equivalent path in `cloud_rowset_builder.cpp` (line 162). When the feature is 
disabled (the default), this will still add entries to `CloudCommittedRSMgr`, 
consuming memory unnecessarily.
   
   Also, there is a pre-existing concern: if `commit_rowset()` on line ~106 
above returns an error status, this code still executes unconditionally (the 
`st` return value is not checked before reaching here). Consider adding an 
early return on `!st.ok()` after `commit_rowset`.



##########
be/src/cloud/cloud_committed_rs_mgr.cpp:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_committed_rs_mgr.h"
+
+#include <chrono>
+
+#include "cloud/config.h"
+#include "common/logging.h"
+#include "storage/rowset/rowset_meta.h"
+#include "util/thread.h"
+
+namespace doris {
+
+CloudCommittedRSMgr::CloudCommittedRSMgr() : _stop_latch(1) {}
+
+CloudCommittedRSMgr::~CloudCommittedRSMgr() {
+    _stop_latch.count_down();
+    if (_clean_thread) {
+        _clean_thread->join();
+    }
+}
+
+Status CloudCommittedRSMgr::init() {
+    auto st = Thread::create(
+            "CloudCommittedRSMgr", "clean_committed_rs_thread",
+            [this]() { this->_clean_thread_callback(); }, &_clean_thread);
+    if (!st.ok()) {
+        LOG(WARNING) << "failed to create thread for CloudCommittedRSMgr, 
error: " << st;
+    }
+    return st;
+}
+
+void CloudCommittedRSMgr::add_committed_rowset(int64_t txn_id, int64_t 
tablet_id,
+                                               RowsetMetaSharedPtr rowset_meta,
+                                               int64_t expiration_time) {
+    int64_t txn_expiration_min =
+            
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
+                    .count() +
+            config::tablet_txn_info_min_expired_seconds;
+    expiration_time = std::max(txn_expiration_min, expiration_time);
+    std::unique_lock<std::shared_mutex> wlock(_rwlock);
+    TxnTabletKey key(txn_id, tablet_id);
+    _committed_rs_map.insert_or_assign(key, CommittedRowsetValue(rowset_meta, 
expiration_time));
+    _expiration_map.emplace(expiration_time, key);
+    LOG(INFO) << "add pending rowset, txn_id=" << txn_id << ", tablet_id=" << 
tablet_id
+              << ", rowset_id=" << rowset_meta->rowset_id().to_string()
+              << ", expiration_time=" << expiration_time;
+}
+
+Result<std::pair<RowsetMetaSharedPtr, int64_t>> 
CloudCommittedRSMgr::get_committed_rowset(
+        int64_t txn_id, int64_t tablet_id) {
+    std::shared_lock<std::shared_mutex> rlock(_rwlock);
+    TxnTabletKey key(txn_id, tablet_id);
+    if (auto it = _empty_rowset_markers.find(key); it != 
_empty_rowset_markers.end()) {
+        return std::make_pair(nullptr, it->second);
+    }
+    auto iter = _committed_rs_map.find(key);
+    if (iter == _committed_rs_map.end()) {
+        return ResultError(Status::Error<ErrorCode::NOT_FOUND>(
+                "committed rowset not found, txn_id={}, tablet_id={}", txn_id, 
tablet_id));
+    }
+    return std::make_pair(iter->second.rowset_meta, 
iter->second.expiration_time);
+}
+
+void CloudCommittedRSMgr::remove_committed_rowset(int64_t txn_id, int64_t 
tablet_id) {
+    std::unique_lock<std::shared_mutex> wlock(_rwlock);
+    _committed_rs_map.erase({txn_id, tablet_id});
+}
+
+void CloudCommittedRSMgr::remove_expired_committed_rowsets() {
+    std::unique_lock<std::shared_mutex> wlock(_rwlock);
+    int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
+                                   
std::chrono::system_clock::now().time_since_epoch())
+                                   .count();
+
+    while (!_expiration_map.empty()) {
+        auto iter = _expiration_map.begin();
+        if (!_committed_rs_map.contains(iter->second) &&
+            !_empty_rowset_markers.contains(iter->second)) {
+            _expiration_map.erase(iter);
+            continue;
+        }
+        int64_t expiration_time = iter->first;
+        if (expiration_time > current_time) {
+            break;
+        }
+
+        auto key = iter->second;
+        _committed_rs_map.erase(key);
+        _empty_rowset_markers.erase(key);

Review Comment:
   **Bug (High): Premature deletion of newer entries** - When 
`add_committed_rowset()` or `mark_empty_rowset()` is called twice for the same 
`(txn_id, tablet_id)` with a later expiration time, `insert_or_assign` replaces 
the value in `_committed_rs_map`, but the old entry in `_expiration_map` is NOT 
removed. During cleanup, the stale expiration entry still finds the key in 
`_committed_rs_map.contains()` (returns true since the key exists with the 
newer value), so this code path executes and erases the entry prematurely.
   
   Example: add(key, exp=100), then add(key, exp=200). At time 150, cleanup 
finds stale entry {100->key}, `_committed_rs_map.contains(key)` is true, `100 < 
150`, so it erases the entry that should be valid until time 200.
   
   Fix: Compare the expiration time from `_expiration_map` against the 
authoritative expiration stored in `_committed_rs_map` / 
`_empty_rowset_markers`. If they differ, the `_expiration_map` entry is stale 
and should just be erased without touching the primary maps:
   ```cpp
   auto it_rs = _committed_rs_map.find(key);
   if (it_rs != _committed_rs_map.end() && it_rs->second.expiration_time != 
expiration_time) {
       _expiration_map.erase(iter);
       continue;
   }
   auto it_empty = _empty_rowset_markers.find(key);
   if (it_empty != _empty_rowset_markers.end() && it_empty->second != 
expiration_time) {
       _expiration_map.erase(iter);
       continue;
   }
   ```



##########
be/src/cloud/cloud_committed_rs_mgr.h:
##########
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+

Review Comment:
   **Issue (Medium): Missing compile_check headers** - This new header file 
does not include `compile_check_begin.h` / `compile_check_end.h`. Per project 
convention (all other files in `be/src/cloud/` include them), new header files 
should include paired compile_check headers.
   
   Add inside the namespace:
   ```cpp
   namespace doris {
   #include "common/compile_check_begin.h"
   // ... class definition ...
   #include "common/compile_check_end.h"
   } // namespace doris
   ```
   
   Same applies to the corresponding `.cpp` file `cloud_committed_rs_mgr.cpp`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to