github-actions[bot] commented on code in PR #59754:
URL: https://github.com/apache/doris/pull/59754#discussion_r2925144809
##########
be/src/cloud/cloud_txn_delete_bitmap_cache.cpp:
##########
@@ -96,6 +97,46 @@ Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
return st;
}
+Result<std::pair<RowsetSharedPtr, DeleteBitmapPtr>>
+CloudTxnDeleteBitmapCache::get_rowset_and_delete_bitmap(TTransactionId
transaction_id,
+ int64_t tablet_id) {
+ RowsetSharedPtr rowset;
+ {
+ std::shared_lock<std::shared_mutex> rlock(_rwlock);
+ TxnKey txn_key(transaction_id, tablet_id);
+ if (_empty_rowset_markers.contains(txn_key)) {
+ return std::make_pair(nullptr, nullptr);
+ }
+ auto iter = _txn_map.find(txn_key);
+ if (iter == _txn_map.end()) {
+ return ResultError(Status::InternalError<false>(""));
+ }
+ if (!(iter->second.publish_status &&
+ *(iter->second.publish_status) == PublishStatus::SUCCEED)) {
+ return ResultError(Status::InternalError<false>(""));
+ }
+ rowset = iter->second.rowset;
+ }
+
+ std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
+ CacheKey key(key_str);
+ Cache::Handle* handle = lookup(key);
+
+ DBUG_EXECUTE_IF("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss",
{
+ handle = nullptr;
+ LOG(INFO) << "CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss,
make cache missed "
+ "when get delete bitmap, txn_id:"
+ << transaction_id << ", tablet_id: " << tablet_id;
+ });
+ DeleteBitmapCacheValue* val =
+ handle == nullptr ? nullptr :
reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
+ if (!val) {
+ return ResultError(Status::InternalError<false>(""));
+ }
+
+ return std::make_pair(rowset, val->delete_bitmap);
+}
Review Comment:
**Bug (High): Cache::Handle leak** - The `handle` obtained from `lookup()`
is never released on the success path. This will prevent LRU eviction of this
cache entry. Every other caller of `lookup()` in the codebase releases the
handle, including the adjacent `get_delete_bitmap()` method in this same file
(which has an explicit comment: "must call release handle to reduce the
reference count, otherwise there will be memory leak").
Suggested fix:
```cpp
auto result = std::make_pair(rowset, val->delete_bitmap);
release(handle);
return result;
```
Or use `Defer` for safety:
```cpp
Defer defer {[this, handle] { release(handle); }};
return std::make_pair(rowset, val->delete_bitmap);
```
##########
be/src/cloud/cloud_delete_task.cpp:
##########
@@ -109,13 +109,15 @@ Status CloudDeleteTask::execute(CloudStorageEngine&
engine, const TPushReq& requ
tablet->fetch_add_approximate_cumu_num_rowsets(1);
// TODO(liaoxin) delete operator don't send calculate delete bitmap task
from fe,
- // then we don't need to set_txn_related_delete_bitmap here.
+ // then we don't need to set_txn_related_info here.
if (tablet->enable_unique_key_merge_on_write()) {
DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(tablet->tablet_id());
RowsetIdUnorderedSet rowset_ids;
engine.txn_delete_bitmap_cache().set_tablet_txn_info(
request.transaction_id, tablet->tablet_id(), delete_bitmap,
rowset_ids, rowset,
request.timeout, nullptr);
+ } else {
+ engine.meta_mgr().cache_committed_rowset(rowset->rowset_meta(),
context.txn_expiration);
}
Review Comment:
**Issue (Medium): Missing config guard** - This `cache_committed_rowset()`
call is not gated by `config::enable_cloud_make_rs_visible_on_be`, unlike the
equivalent path in `cloud_rowset_builder.cpp` (line 162). When the feature is
disabled (the default), this will still add entries to `CloudCommittedRSMgr`,
consuming memory unnecessarily.
Also, there is a pre-existing concern: if `commit_rowset()` on line ~106
above returns an error status, this code still executes unconditionally (the
`st` return value is not checked before reaching here). Consider adding an
early return on `!st.ok()` after `commit_rowset`.
##########
be/src/cloud/cloud_committed_rs_mgr.cpp:
##########
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_committed_rs_mgr.h"
+
+#include <chrono>
+
+#include "cloud/config.h"
+#include "common/logging.h"
+#include "storage/rowset/rowset_meta.h"
+#include "util/thread.h"
+
+namespace doris {
+
+CloudCommittedRSMgr::CloudCommittedRSMgr() : _stop_latch(1) {}
+
+CloudCommittedRSMgr::~CloudCommittedRSMgr() {
+ _stop_latch.count_down();
+ if (_clean_thread) {
+ _clean_thread->join();
+ }
+}
+
+Status CloudCommittedRSMgr::init() {
+ auto st = Thread::create(
+ "CloudCommittedRSMgr", "clean_committed_rs_thread",
+ [this]() { this->_clean_thread_callback(); }, &_clean_thread);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to create thread for CloudCommittedRSMgr,
error: " << st;
+ }
+ return st;
+}
+
+void CloudCommittedRSMgr::add_committed_rowset(int64_t txn_id, int64_t
tablet_id,
+ RowsetMetaSharedPtr rowset_meta,
+ int64_t expiration_time) {
+ int64_t txn_expiration_min =
+
duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
+ .count() +
+ config::tablet_txn_info_min_expired_seconds;
+ expiration_time = std::max(txn_expiration_min, expiration_time);
+ std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ TxnTabletKey key(txn_id, tablet_id);
+ _committed_rs_map.insert_or_assign(key, CommittedRowsetValue(rowset_meta,
expiration_time));
+ _expiration_map.emplace(expiration_time, key);
+ LOG(INFO) << "add pending rowset, txn_id=" << txn_id << ", tablet_id=" <<
tablet_id
+ << ", rowset_id=" << rowset_meta->rowset_id().to_string()
+ << ", expiration_time=" << expiration_time;
+}
+
+Result<std::pair<RowsetMetaSharedPtr, int64_t>>
CloudCommittedRSMgr::get_committed_rowset(
+ int64_t txn_id, int64_t tablet_id) {
+ std::shared_lock<std::shared_mutex> rlock(_rwlock);
+ TxnTabletKey key(txn_id, tablet_id);
+ if (auto it = _empty_rowset_markers.find(key); it !=
_empty_rowset_markers.end()) {
+ return std::make_pair(nullptr, it->second);
+ }
+ auto iter = _committed_rs_map.find(key);
+ if (iter == _committed_rs_map.end()) {
+ return ResultError(Status::Error<ErrorCode::NOT_FOUND>(
+ "committed rowset not found, txn_id={}, tablet_id={}", txn_id,
tablet_id));
+ }
+ return std::make_pair(iter->second.rowset_meta,
iter->second.expiration_time);
+}
+
+void CloudCommittedRSMgr::remove_committed_rowset(int64_t txn_id, int64_t
tablet_id) {
+ std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ _committed_rs_map.erase({txn_id, tablet_id});
+}
+
+void CloudCommittedRSMgr::remove_expired_committed_rowsets() {
+ std::unique_lock<std::shared_mutex> wlock(_rwlock);
+ int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+
+ while (!_expiration_map.empty()) {
+ auto iter = _expiration_map.begin();
+ if (!_committed_rs_map.contains(iter->second) &&
+ !_empty_rowset_markers.contains(iter->second)) {
+ _expiration_map.erase(iter);
+ continue;
+ }
+ int64_t expiration_time = iter->first;
+ if (expiration_time > current_time) {
+ break;
+ }
+
+ auto key = iter->second;
+ _committed_rs_map.erase(key);
+ _empty_rowset_markers.erase(key);
Review Comment:
**Bug (High): Premature deletion of newer entries** - When
`add_committed_rowset()` or `mark_empty_rowset()` is called twice for the same
`(txn_id, tablet_id)` with a later expiration time, `insert_or_assign` replaces
the value in `_committed_rs_map`, but the old entry in `_expiration_map` is NOT
removed. During cleanup, the stale expiration entry still finds the key in
`_committed_rs_map.contains()` (returns true since the key exists with the
newer value), so this code path executes and erases the entry prematurely.
Example: add(key, exp=100), then add(key, exp=200). At time 150, cleanup
finds stale entry {100->key}, `_committed_rs_map.contains(key)` is true, `100 <
150`, so it erases the entry that should be valid until time 200.
Fix: Compare the expiration time from `_expiration_map` against the
authoritative expiration stored in `_committed_rs_map` /
`_empty_rowset_markers`. If they differ, the `_expiration_map` entry is stale
and should just be erased without touching the primary maps:
```cpp
auto it_rs = _committed_rs_map.find(key);
if (it_rs != _committed_rs_map.end() && it_rs->second.expiration_time !=
expiration_time) {
_expiration_map.erase(iter);
continue;
}
auto it_empty = _empty_rowset_markers.find(key);
if (it_empty != _empty_rowset_markers.end() && it_empty->second !=
expiration_time) {
_expiration_map.erase(iter);
continue;
}
```
##########
be/src/cloud/cloud_committed_rs_mgr.h:
##########
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
Review Comment:
**Issue (Medium): Missing compile_check headers** - This new header file
does not include `compile_check_begin.h` / `compile_check_end.h`. Per project
convention (all other files in `be/src/cloud/` include them), new header files
should include paired compile_check headers.
Add inside the namespace:
```cpp
namespace doris {
#include "common/compile_check_begin.h"
// ... class definition ...
#include "common/compile_check_end.h"
} // namespace doris
```
Same applies to the corresponding `.cpp` file `cloud_committed_rs_mgr.cpp`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]