This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 720aaf9dd66 fix compile
720aaf9dd66 is described below
commit 720aaf9dd66216f5aa1db67b20178f0b7ed39fe0
Author: yiguolei <[email protected]>
AuthorDate: Fri Mar 15 18:13:41 2024 +0800
fix compile
---
be/src/cloud/cloud_tablet_mgr.cpp | 353 -------------------------
be/src/cloud/cloud_txn_delete_bitmap_cache.cpp | 183 -------------
be/src/cloud/cloud_txn_delete_bitmap_cache.h | 98 -------
3 files changed, 634 deletions(-)
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
deleted file mode 100644
index 7fe86fedde3..00000000000
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ /dev/null
@@ -1,353 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "cloud/cloud_tablet_mgr.h"
-
-#include <bthread/countdown_event.h>
-
-#include "cloud/cloud_meta_mgr.h"
-#include "cloud/cloud_storage_engine.h"
-#include "cloud/cloud_tablet.h"
-#include "cloud/config.h"
-#include "common/status.h"
-#include "olap/lru_cache.h"
-#include "runtime/memory/cache_policy.h"
-
-namespace doris {
-namespace {
-
-// port from
-//
https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go
-template <typename Key, typename Val>
-class SingleFlight {
-public:
- SingleFlight() = default;
-
- SingleFlight(const SingleFlight&) = delete;
- void operator=(const SingleFlight&) = delete;
-
- using Loader = std::function<Val(const Key&)>;
-
- // Do executes and returns the results of the given function, making
- // sure that only one execution is in-flight for a given key at a
- // time. If a duplicate comes in, the duplicate caller waits for the
- // original to complete and receives the same results.
- Val load(const Key& key, Loader loader) {
- std::unique_lock lock(_call_map_mtx);
-
- auto it = _call_map.find(key);
- if (it != _call_map.end()) {
- auto call = it->second;
- lock.unlock();
- if (int ec = call->event.wait(); ec != 0) {
- throw std::system_error(std::error_code(ec,
std::system_category()),
- "CountdownEvent wait failed");
- }
- return call->val;
- }
- auto call = std::make_shared<Call>();
- _call_map.emplace(key, call);
- lock.unlock();
-
- call->val = loader(key);
- call->event.signal();
-
- lock.lock();
- _call_map.erase(key);
- lock.unlock();
-
- return call->val;
- }
-
-private:
- // `Call` is an in-flight or completed `load` call
- struct Call {
- bthread::CountdownEvent event;
- Val val;
- };
-
- std::mutex _call_map_mtx;
- std::unordered_map<Key, std::shared_ptr<Call>> _call_map;
-};
-
-SingleFlight<int64_t /* tablet_id */, std::shared_ptr<CloudTablet>>
s_singleflight_load_tablet;
-
-} // namespace
-
-// tablet_id -> cached tablet
-// This map owns all cached tablets. The lifetime of tablet can be longer than
the LRU handle.
-// It's also used for scenarios where users want to access the tablet by
`tablet_id` without changing the LRU order.
-// TODO(plat1ko): multi shard to increase concurrency
-class CloudTabletMgr::TabletMap {
-public:
- void put(std::shared_ptr<CloudTablet> tablet) {
- std::lock_guard lock(_mtx);
- _map[tablet->tablet_id()] = std::move(tablet);
- }
-
- void erase(CloudTablet* tablet) {
- std::lock_guard lock(_mtx);
- auto it = _map.find(tablet->tablet_id());
- // According to the implementation of `LRUCache`, `deleter` may be
called after a tablet
- // with same tablet id insert into cache and `TabletMap`. So we MUST
check if the tablet
- // instance to be erased is the same one in the map.
- if (it != _map.end() && it->second.get() == tablet) {
- _map.erase(it);
- }
- }
-
- std::shared_ptr<CloudTablet> get(int64_t tablet_id) {
- std::lock_guard lock(_mtx);
- if (auto it = _map.find(tablet_id); it != _map.end()) {
- return it->second;
- }
- return nullptr;
- }
-
- size_t size() { return _map.size(); }
-
- void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)>
visitor) {
- std::lock_guard lock(_mtx);
- for (auto& [_, tablet] : _map) {
- visitor(tablet);
- }
- }
-
-private:
- std::mutex _mtx;
- std::unordered_map<int64_t, std::shared_ptr<CloudTablet>> _map;
-};
-
-// TODO(plat1ko): Prune cache
-CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine)
- : _engine(engine),
- _tablet_map(std::make_unique<TabletMap>()),
- _cache(std::make_unique<LRUCachePolicy>(
- CachePolicy::CacheType::CLOUD_TABLET_CACHE,
config::tablet_cache_capacity,
- LRUCacheType::NUMBER, 0, config::tablet_cache_shards)) {}
-
-CloudTabletMgr::~CloudTabletMgr() = default;
-
-Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t
tablet_id,
- bool
warmup_data) {
- // LRU value type. `Value`'s lifetime MUST NOT be longer than
`CloudTabletMgr`
- class Value : public LRUCacheValueBase {
- public:
- Value(const std::shared_ptr<CloudTablet>& tablet, TabletMap&
tablet_map)
- :
LRUCacheValueBase(CachePolicy::CacheType::CLOUD_TABLET_CACHE),
- tablet(tablet),
- tablet_map(tablet_map) {}
- ~Value() override { tablet_map.erase(tablet.get()); }
-
- // FIXME(plat1ko): The ownership of tablet seems to belong to
'TabletMap', while `Value`
- // only requires a reference.
- std::shared_ptr<CloudTablet> tablet;
- TabletMap& tablet_map;
- };
-
- auto tablet_id_str = std::to_string(tablet_id);
- CacheKey key(tablet_id_str);
- auto* handle = _cache->lookup(key);
- if (handle == nullptr) {
- auto load_tablet = [this, &key,
- warmup_data](int64_t tablet_id) ->
std::shared_ptr<CloudTablet> {
- TabletMetaSharedPtr tablet_meta;
- auto st = _engine.meta_mgr().get_tablet_meta(tablet_id,
&tablet_meta);
- if (!st.ok()) {
- LOG(WARNING) << "failed to tablet " << tablet_id << ": " << st;
- return nullptr;
- }
-
- auto tablet = std::make_shared<CloudTablet>(_engine,
std::move(tablet_meta));
- auto value = std::make_unique<Value>(tablet, *_tablet_map);
- // MUST sync stats to let compaction scheduler work correctly
- st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(),
warmup_data);
- if (!st.ok()) {
- LOG(WARNING) << "failed to sync tablet " << tablet_id << ": "
<< st;
- return nullptr;
- }
-
- auto* handle = _cache->insert(key, value.release(), 1,
sizeof(CloudTablet),
- CachePriority::NORMAL);
- auto ret = std::shared_ptr<CloudTablet>(
- tablet.get(), [this, handle](...) {
_cache->release(handle); });
- _tablet_map->put(std::move(tablet));
- return ret;
- };
-
- auto tablet = s_singleflight_load_tablet.load(tablet_id,
std::move(load_tablet));
- if (tablet == nullptr) {
- return ResultError(Status::InternalError("failed to get tablet
{}", tablet_id));
- }
- return tablet;
- }
-
- CloudTablet* tablet_raw_ptr =
reinterpret_cast<Value*>(_cache->value(handle))->tablet.get();
- auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr,
- [this, handle](...) {
_cache->release(handle); });
- return tablet;
-}
-
-void CloudTabletMgr::erase_tablet(int64_t tablet_id) {
- auto tablet_id_str = std::to_string(tablet_id);
- CacheKey key(tablet_id_str.data(), tablet_id_str.size());
- _cache->erase(key);
-}
-
-void CloudTabletMgr::vacuum_stale_rowsets() {
- LOG_INFO("begin to vacuum stale rowsets");
- std::vector<std::shared_ptr<CloudTablet>> tablets_to_vacuum;
- tablets_to_vacuum.reserve(_tablet_map->size());
- _tablet_map->traverse([&tablets_to_vacuum](auto&& t) {
- if (t->has_stale_rowsets()) {
- tablets_to_vacuum.push_back(t);
- }
- });
- int num_vacuumed = 0;
- for (auto& t : tablets_to_vacuum) {
- num_vacuumed += t->delete_expired_stale_rowsets();
- }
- LOG_INFO("finish vacuum stale rowsets").tag("num_vacuumed", num_vacuumed);
-}
-
-std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() {
- std::vector<std::weak_ptr<CloudTablet>> weak_tablets;
- weak_tablets.reserve(_tablet_map->size());
- _tablet_map->traverse([&weak_tablets](auto& t) {
weak_tablets.push_back(t); });
- return weak_tablets;
-}
-
-void CloudTabletMgr::sync_tablets() {
- LOG_INFO("begin to sync tablets");
- int64_t last_sync_time_bound = ::time(nullptr) -
config::tablet_sync_interval_s;
-
- auto weak_tablets = get_weak_tablets();
-
- // sort by last_sync_time
- static auto cmp = [](const auto& a, const auto& b) { return a.first <
b.first; };
- std::multiset<std::pair<int64_t, std::weak_ptr<CloudTablet>>,
decltype(cmp)>
- sync_time_tablet_set(cmp);
-
- for (auto& weak_tablet : weak_tablets) {
- if (auto tablet = weak_tablet.lock()) {
- if (tablet->tablet_state() != TABLET_RUNNING) {
- continue;
- }
- int64_t last_sync_time = tablet->last_sync_time_s;
- if (last_sync_time <= last_sync_time_bound) {
- sync_time_tablet_set.emplace(last_sync_time, weak_tablet);
- }
- }
- }
-
- int num_sync = 0;
- for (auto&& [_, weak_tablet] : sync_time_tablet_set) {
- if (auto tablet = weak_tablet.lock()) {
- if (tablet->last_sync_time_s > last_sync_time_bound) {
- continue;
- }
-
- ++num_sync;
- auto st = tablet->sync_meta();
- if (!st) {
- LOG_WARNING("failed to sync tablet meta {}",
tablet->tablet_id()).error(st);
- if (st.is<ErrorCode::NOT_FOUND>()) {
- continue;
- }
- }
-
- st = tablet->sync_rowsets(-1);
- if (!st) {
- LOG_WARNING("failed to sync tablet rowsets {}",
tablet->tablet_id()).error(st);
- }
- }
- }
- LOG_INFO("finish sync tablets").tag("num_sync", num_sync);
-}
-
-Status CloudTabletMgr::get_topn_tablets_to_compact(
- int n, CompactionType compaction_type, const
std::function<bool(CloudTablet*)>& filter_out,
- std::vector<std::shared_ptr<CloudTablet>>* tablets, int64_t*
max_score) {
- DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
- compaction_type == CompactionType::CUMULATIVE_COMPACTION);
- *max_score = 0;
- int64_t max_score_tablet_id = 0;
- // clang-format off
- auto score = [compaction_type](CloudTablet* t) {
- return compaction_type == CompactionType::BASE_COMPACTION ?
t->get_cloud_base_compaction_score()
- : compaction_type == CompactionType::CUMULATIVE_COMPACTION ?
t->get_cloud_cumu_compaction_score()
- : 0;
- };
-
- using namespace std::chrono;
- auto now =
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
- auto skip = [now, compaction_type](CloudTablet* t) {
- if (compaction_type == CompactionType::BASE_COMPACTION) {
- return now - t->last_base_compaction_success_time_ms <
config::base_compaction_freeze_interval_s * 1000;
- }
- // If tablet has too many rowsets but not be compacted for a long
time, compaction should be performed
- // regardless of whether there is a load job recently.
- return now - t->last_cumu_no_suitable_version_ms <
config::min_compaction_failure_interval_ms ||
- (now - t->last_load_time_ms >
config::cu_compaction_freeze_interval_s * 1000
- && now - t->last_cumu_compaction_success_time_ms <
config::cumu_compaction_interval_s * 1000
- && t->fetch_add_approximate_num_rowsets(0) <
config::max_tablet_version_num / 2);
- };
- // We don't schedule tablets that are disabled for compaction
- auto disable = [](CloudTablet* t) { return
t->tablet_meta()->tablet_schema()->disable_auto_compaction(); };
-
- auto [num_filtered, num_disabled, num_skipped] = std::make_tuple(0, 0, 0);
-
- auto weak_tablets = get_weak_tablets();
- std::vector<std::pair<std::shared_ptr<CloudTablet>, int64_t>> buf;
- buf.reserve(n + 1);
- for (auto& weak_tablet : weak_tablets) {
- auto t = weak_tablet.lock();
- if (t == nullptr) { continue; }
-
- int64_t s = score(t.get());
- if (s > *max_score) {
- max_score_tablet_id = t->tablet_id();
- *max_score = s;
- }
-
- if (filter_out(t.get())) { ++num_filtered; continue; }
- if (disable(t.get())) { ++num_disabled; continue; }
- if (skip(t.get())) { ++num_skipped; continue; }
-
- buf.emplace_back(std::move(t), s);
- std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return
a.second > b.second; });
- if (buf.size() > n) { buf.pop_back(); }
- }
-
- LOG_EVERY_N(INFO, 1000) << "get_topn_compaction_score, n=" << n << "
type=" << compaction_type
- << " num_tablets=" << weak_tablets.size() << " num_skipped=" <<
num_skipped
- << " num_disabled=" << num_disabled << " num_filtered=" <<
num_filtered
- << " max_score=" << *max_score << " max_score_tablet=" <<
max_score_tablet_id
- << " tablets=[" << [&buf] { std::stringstream ss; for (auto& i
: buf) ss << i.first->tablet_id() << ":" << i.second << ","; return ss.str();
}() << "]"
- ;
- // clang-format on
-
- tablets->clear();
- tablets->reserve(n + 1);
- for (auto& [t, _] : buf) {
- tablets->emplace_back(std::move(t));
- }
-
- return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
b/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
deleted file mode 100644
index 6f0d90b37bb..00000000000
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.cpp
+++ /dev/null
@@ -1,183 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "cloud/cloud_txn_delete_bitmap_cache.h"
-
-#include <fmt/core.h>
-
-#include <chrono>
-#include <memory>
-#include <shared_mutex>
-
-#include "common/status.h"
-#include "common/sync_point.h"
-#include "olap/olap_common.h"
-#include "olap/tablet_meta.h"
-
-namespace doris {
-
-CloudTxnDeleteBitmapCache::CloudTxnDeleteBitmapCache(size_t size_in_bytes)
- :
LRUCachePolicy(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE,
size_in_bytes,
- LRUCacheType::SIZE, 86400, 4),
- _stop_latch(1) {}
-
-CloudTxnDeleteBitmapCache::~CloudTxnDeleteBitmapCache() {
- _stop_latch.count_down();
- _clean_thread->join();
-}
-
-Status CloudTxnDeleteBitmapCache::init() {
- auto st = Thread::create(
- "CloudTxnDeleteBitmapCache", "clean_txn_dbm_thread",
- [this]() { this->_clean_thread_callback(); }, &_clean_thread);
- if (!st.ok()) {
- LOG(WARNING) << "failed to create thread for
CloudTxnDeleteBitmapCache, error: " << st;
- }
- return st;
-}
-
-Status CloudTxnDeleteBitmapCache::get_tablet_txn_info(
- TTransactionId transaction_id, int64_t tablet_id, RowsetSharedPtr*
rowset,
- DeleteBitmapPtr* delete_bitmap, RowsetIdUnorderedSet* rowset_ids,
int64_t* txn_expiration,
- std::shared_ptr<PartialUpdateInfo>* partial_update_info) {
- {
- std::shared_lock<std::shared_mutex> rlock(_rwlock);
- TxnKey key(transaction_id, tablet_id);
- auto iter = _txn_map.find(key);
- if (iter == _txn_map.end()) {
- return Status::Error<ErrorCode::NOT_FOUND, false>(
- "not found txn info, tablet_id={}, transaction_id={}",
tablet_id,
- transaction_id);
- }
- *rowset = iter->second.rowset;
- *txn_expiration = iter->second.txn_expiration;
- *partial_update_info = iter->second.partial_update_info;
- }
- std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
- CacheKey key(key_str);
- Cache::Handle* handle = lookup(key);
-
- DeleteBitmapCacheValue* val =
- handle == nullptr ? nullptr :
reinterpret_cast<DeleteBitmapCacheValue*>(value(handle));
- if (val) {
- *delete_bitmap = val->delete_bitmap;
- *rowset_ids = val->rowset_ids;
- // must call release handle to reduce the reference count,
- // otherwise there will be memory leak
- release(handle);
- } else {
- LOG_INFO("cache missed when get delete bitmap")
- .tag("txn_id", transaction_id)
- .tag("tablet_id", tablet_id);
- // Becasue of the rowset_ids become empty, all delete bitmap
- // will be recalculate in CalcDeleteBitmapTask
- *delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id);
- }
- return Status::OK();
-}
-
-void CloudTxnDeleteBitmapCache::set_tablet_txn_info(
- TTransactionId transaction_id, int64_t tablet_id, DeleteBitmapPtr
delete_bitmap,
- const RowsetIdUnorderedSet& rowset_ids, RowsetSharedPtr rowset,
int64_t txn_expiration,
- std::shared_ptr<PartialUpdateInfo> partial_update_info) {
- if (txn_expiration <= 0) {
- txn_expiration = duration_cast<std::chrono::seconds>(
-
std::chrono::system_clock::now().time_since_epoch())
- .count() +
- 120;
- }
- {
- std::unique_lock<std::shared_mutex> wlock(_rwlock);
- TxnKey txn_key(transaction_id, tablet_id);
- _txn_map[txn_key] = TxnVal(rowset, txn_expiration,
std::move(partial_update_info));
- _expiration_txn.emplace(txn_expiration, txn_key);
- }
- std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
- CacheKey key(key_str);
-
- auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
- size_t charge = sizeof(DeleteBitmapCacheValue);
- for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
- charge += v.getSizeInBytes();
- }
- auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
- // must call release handle to reduce the reference count,
- // otherwise there will be memory leak
- release(handle);
- LOG_INFO("set txn related delete bitmap")
- .tag("txn_id", transaction_id)
- .tag("expiration", txn_expiration)
- .tag("tablet_id", tablet_id)
- .tag("delete_bitmap_size", charge);
-}
-
-void CloudTxnDeleteBitmapCache::update_tablet_txn_info(TTransactionId
transaction_id,
- int64_t tablet_id,
- DeleteBitmapPtr
delete_bitmap,
- const
RowsetIdUnorderedSet& rowset_ids) {
- std::string key_str = fmt::format("{}/{}", transaction_id, tablet_id);
- CacheKey key(key_str);
-
- auto val = new DeleteBitmapCacheValue(delete_bitmap, rowset_ids);
- size_t charge = sizeof(DeleteBitmapCacheValue);
- for (auto& [k, v] : val->delete_bitmap->delete_bitmap) {
- charge += v.getSizeInBytes();
- }
- auto* handle = insert(key, val, charge, charge, CachePriority::NORMAL);
- // must call release handle to reduce the reference count,
- // otherwise there will be memory leak
- release(handle);
- LOG_INFO("update txn related delete bitmap")
- .tag("txn_id", transaction_id)
- .tag("tablt_id", tablet_id)
- .tag("delete_bitmap_size", charge);
-}
-
-void CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info() {
-
TEST_SYNC_POINT_RETURN_WITH_VOID("CloudTxnDeleteBitmapCache::remove_expired_tablet_txn_info");
- std::unique_lock<std::shared_mutex> wlock(_rwlock);
- while (!_expiration_txn.empty()) {
- auto iter = _expiration_txn.begin();
- int64_t current_time = duration_cast<std::chrono::seconds>(
-
std::chrono::system_clock::now().time_since_epoch())
- .count();
- if (iter->first > current_time) {
- break;
- }
- auto txn_iter = _txn_map.find(iter->second);
- if (iter->first == txn_iter->second.txn_expiration) {
- LOG_INFO("clean expired delete bitmap")
- .tag("txn_id", txn_iter->first.txn_id)
- .tag("expiration", txn_iter->second.txn_expiration)
- .tag("tablt_id", txn_iter->first.tablet_id);
- std::string key_str = std::to_string(txn_iter->first.txn_id) + "/"
+
- std::to_string(txn_iter->first.tablet_id);
// Cache key container
- CacheKey cache_key(key_str);
- erase(cache_key);
- _txn_map.erase(iter->second);
- }
- _expiration_txn.erase(iter);
- }
-}
-
-void CloudTxnDeleteBitmapCache::_clean_thread_callback() {
- do {
- remove_expired_tablet_txn_info();
- } while (!_stop_latch.wait_for(std::chrono::seconds(300)));
-}
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/cloud/cloud_txn_delete_bitmap_cache.h
b/be/src/cloud/cloud_txn_delete_bitmap_cache.h
deleted file mode 100644
index 3b1d1d1d857..00000000000
--- a/be/src/cloud/cloud_txn_delete_bitmap_cache.h
+++ /dev/null
@@ -1,98 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <mutex>
-
-#include "olap/lru_cache.h"
-#include "olap/olap_common.h"
-#include "olap/partial_update_info.h"
-#include "olap/rowset/rowset.h"
-#include "olap/tablet_meta.h"
-#include "util/countdown_latch.h"
-
-namespace doris {
-
-// Record transaction related delete bitmaps using a lru cache.
-class CloudTxnDeleteBitmapCache : public LRUCachePolicy {
-public:
- CloudTxnDeleteBitmapCache(size_t size_in_bytes);
-
- ~CloudTxnDeleteBitmapCache() override;
-
- Status init();
-
- Status get_tablet_txn_info(TTransactionId transaction_id, int64_t
tablet_id,
- RowsetSharedPtr* rowset, DeleteBitmapPtr*
delete_bitmap,
- RowsetIdUnorderedSet* rowset_ids, int64_t*
txn_expiration,
- std::shared_ptr<PartialUpdateInfo>*
partial_update_info);
-
- void set_tablet_txn_info(TTransactionId transaction_id, int64_t tablet_id,
- DeleteBitmapPtr delete_bitmap, const
RowsetIdUnorderedSet& rowset_ids,
- RowsetSharedPtr rowset, int64_t txn_expirationm,
- std::shared_ptr<PartialUpdateInfo>
partial_update_info);
-
- void update_tablet_txn_info(TTransactionId transaction_id, int64_t
tablet_id,
- DeleteBitmapPtr delete_bitmap,
- const RowsetIdUnorderedSet& rowset_ids);
-
- void remove_expired_tablet_txn_info();
-
-private:
- void _clean_thread_callback();
-
- class DeleteBitmapCacheValue : public LRUCacheValueBase {
- public:
- DeleteBitmapPtr delete_bitmap;
- // records rowsets calc in commit txn
- RowsetIdUnorderedSet rowset_ids;
-
- DeleteBitmapCacheValue(DeleteBitmapPtr delete_bitmap_, const
RowsetIdUnorderedSet& ids_)
- :
LRUCacheValueBase(CachePolicy::CacheType::CLOUD_TXN_DELETE_BITMAP_CACHE),
- delete_bitmap(std::move(delete_bitmap_)),
- rowset_ids(ids_) {}
- };
-
- struct TxnKey {
- TTransactionId txn_id;
- int64_t tablet_id;
- TxnKey(TTransactionId txn_id_, int64_t tablet_id_)
- : txn_id(txn_id_), tablet_id(tablet_id_) {}
- auto operator<=>(const TxnKey&) const = default;
- };
-
- struct TxnVal {
- RowsetSharedPtr rowset;
- int64_t txn_expiration;
- std::shared_ptr<PartialUpdateInfo> partial_update_info;
- TxnVal() : txn_expiration(0) {};
- TxnVal(RowsetSharedPtr rowset_, int64_t txn_expiration_,
- std::shared_ptr<PartialUpdateInfo> partial_update_info_)
- : rowset(std::move(rowset_)),
- txn_expiration(txn_expiration_),
- partial_update_info(std::move(partial_update_info_)) {}
- };
-
- std::map<TxnKey, TxnVal> _txn_map;
- std::multimap<int64_t, TxnKey> _expiration_txn;
- std::shared_mutex _rwlock;
- scoped_refptr<Thread> _clean_thread;
- CountDownLatch _stop_latch;
-};
-
-} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]