This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 64ee35ea81ec081a00dceb73b26311ac251cf612 Author: abmdocrt <[email protected]> AuthorDate: Mon Jul 10 14:06:11 2023 +0800 [Enhancement](Compaction) Caculate all committed rowsets delete bitmaps when do comapction (#20907) Here we will calculate all the rowsets delete bitmaps which are committed but not published to reduce the calculation pressure of publish phase. Step1: collect this tablet's all committed rowsets' delete bitmaps. Step2: calculate all rowsets' delete bitmaps which are published during compaction. Step3: write back updated delete bitmap and tablet info. --- be/src/olap/compaction.cpp | 32 ++++++++++++++++++++++++++++++-- be/src/olap/tablet.cpp | 7 +++---- be/src/olap/tablet.h | 2 +- be/src/olap/tablet_meta.cpp | 7 ++++++- be/src/olap/tablet_meta.h | 7 +++++++ be/src/olap/txn_manager.cpp | 22 ++++++++++++++++++++++ be/src/olap/txn_manager.h | 30 +++++++++++++++++++++++++----- be/test/olap/tablet_meta_test.cpp | 2 +- 8 files changed, 95 insertions(+), 14 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 023101fc0f..57ed2e2558 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -25,6 +25,7 @@ #include <cstdlib> #include <list> #include <map> +#include <memory> #include <mutex> #include <ostream> #include <set> @@ -50,6 +51,7 @@ #include "olap/tablet.h" #include "olap/tablet_meta.h" #include "olap/task/engine_checksum_task.h" +#include "olap/txn_manager.h" #include "olap/utils.h" #include "runtime/memory/mem_tracker_limiter.h" #include "util/time.h" @@ -578,7 +580,8 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { // of incremental data later. _tablet->calc_compaction_output_rowset_delete_bitmap( _input_rowsets, _rowid_conversion, 0, version.second + 1, &missed_rows, - &location_map, &output_rowset_delete_bitmap); + &location_map, _tablet->tablet_meta()->delete_bitmap(), + &output_rowset_delete_bitmap); std::size_t missed_rows_size = missed_rows.size(); if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { if (stats != nullptr && stats->merged_rows != missed_rows_size) { @@ -594,16 +597,41 @@ Status Compaction::modify_rowsets(const Merger::Statistics* stats) { RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, location_map)); location_map.clear(); + { std::lock_guard<std::mutex> wrlock_(_tablet->get_rowset_update_lock()); std::lock_guard<std::shared_mutex> wrlock(_tablet->get_header_lock()); SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); + // Here we will calculate all the rowsets delete bitmaps which are committed but not published to reduce the calculation pressure + // of publish phase. + // All rowsets which need to recalculate have been published so we don't need to acquire lock. + // Step1: collect this tablet's all committed rowsets' delete bitmaps + CommitTabletTxnInfoVec commit_tablet_txn_info_vec {}; + StorageEngine::instance()->txn_manager()->get_all_commit_tablet_txn_info_by_tablet( + _tablet, &commit_tablet_txn_info_vec); + + // Step2: calculate all rowsets' delete bitmaps which are published during compaction. + for (auto& it : commit_tablet_txn_info_vec) { + DeleteBitmap output_delete_bitmap(_tablet->tablet_id()); + _tablet->calc_compaction_output_rowset_delete_bitmap( + _input_rowsets, _rowid_conversion, 0, UINT64_MAX, &missed_rows, + &location_map, *it.delete_bitmap.get(), &output_delete_bitmap); + it.delete_bitmap->merge(output_delete_bitmap); + // Step3: write back updated delete bitmap and tablet info. + it.rowset_ids.insert(_output_rowset->rowset_id()); + StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap( + it.partition_id, it.transaction_id, _tablet->tablet_id(), + _tablet->schema_hash(), _tablet->tablet_uid(), true, it.delete_bitmap, + it.rowset_ids); + } + // Convert the delete bitmap of the input rowsets to output rowset for // incremental data. _tablet->calc_compaction_output_rowset_delete_bitmap( _input_rowsets, _rowid_conversion, version.second, UINT64_MAX, &missed_rows, - &location_map, &output_rowset_delete_bitmap); + &location_map, _tablet->tablet_meta()->delete_bitmap(), + &output_rowset_delete_bitmap); if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { DCHECK_EQ(missed_rows.size(), missed_rows_size); if (missed_rows.size() != missed_rows_size) { diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 4b6e688e0c..8eb2d0fd30 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3313,7 +3313,7 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap( const std::vector<RowsetSharedPtr>& input_rowsets, const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version, std::set<RowLocation>* missed_rows, std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>* location_map, - DeleteBitmap* output_rowset_delete_bitmap) { + const DeleteBitmap& input_delete_bitmap, DeleteBitmap* output_rowset_delete_bitmap) { RowLocation src; RowLocation dst; for (auto& rowset : input_rowsets) { @@ -3321,9 +3321,8 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap( for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { src.segment_id = seg_id; DeleteBitmap subset_map(tablet_id()); - _tablet_meta->delete_bitmap().subset({rowset->rowset_id(), seg_id, start_version}, - {rowset->rowset_id(), seg_id, end_version}, - &subset_map); + input_delete_bitmap.subset({rowset->rowset_id(), seg_id, start_version}, + {rowset->rowset_id(), seg_id, end_version}, &subset_map); // traverse all versions and convert rowid for (auto iter = subset_map.delete_bitmap.begin(); iter != subset_map.delete_bitmap.end(); ++iter) { diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 724ab81243..8b764326ed 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -477,7 +477,7 @@ public: const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version, std::set<RowLocation>* missed_rows, std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>* location_map, - DeleteBitmap* output_rowset_delete_bitmap); + const DeleteBitmap& input_delete_bitmap, DeleteBitmap* output_rowset_delete_bitmap); void merge_delete_bitmap(const DeleteBitmap& delete_bitmap); Status check_rowid_conversion( RowsetSharedPtr dst_rowset, diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 55e9e695d0..5fcfadffff 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -941,6 +941,11 @@ bool DeleteBitmap::contains_agg(const BitmapKey& bmk, uint32_t row_id) const { return get_agg(bmk)->contains(row_id); } +bool DeleteBitmap::empty() const { + std::shared_lock l(lock); + return delete_bitmap.empty(); +} + bool DeleteBitmap::contains_agg_without_cache(const BitmapKey& bmk, uint32_t row_id) const { std::shared_lock l(lock); DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), 0}; @@ -983,7 +988,7 @@ void DeleteBitmap::subset(const BitmapKey& start, const BitmapKey& end, roaring::Roaring roaring; DCHECK(start < end); std::shared_lock l(lock); - for (auto it = delete_bitmap.upper_bound(start); it != delete_bitmap.end(); ++it) { + for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end(); ++it) { auto& [k, bm] = *it; if (k >= end) { break; diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 2c6a98b111..70830b82de 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -356,6 +356,13 @@ public: */ bool contains(const BitmapKey& bmk, uint32_t row_id) const; + /** + * Checks if this delete bitmap is empty + * + * @return true if empty + */ + bool empty() const; + /** * Sets the bitmap of specific segment, it's may be insertion or replacement * diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 7f2eac5209..432f96e707 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -621,6 +621,28 @@ void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) { } } +void TxnManager::get_all_commit_tablet_txn_info_by_tablet( + const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec) { + for (int32_t i = 0; i < _txn_map_shard_size; i++) { + std::shared_lock txn_rdlock(_txn_map_locks[i]); + for (const auto& it : _txn_tablet_maps[i]) { + auto tablet_load_it = it.second.find(tablet->get_tablet_info()); + if (tablet_load_it != it.second.end()) { + TPartitionId partition_id = it.first.first; + TTransactionId transaction_id = it.first.second; + const RowsetSharedPtr& rowset = tablet_load_it->second.rowset; + const DeleteBitmapPtr& delete_bitmap = tablet_load_it->second.delete_bitmap; + const RowsetIdUnorderedSet& rowset_ids = tablet_load_it->second.rowset_ids; + if (!rowset || !delete_bitmap) { + continue; + } + commit_tablet_txn_info_vec->push_back(CommitTabletTxnInfo( + partition_id, transaction_id, rowset, delete_bitmap, rowset_ids)); + } + } + } +} + bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid) { pair<int64_t, int64_t> key(partition_id, transaction_id); diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 42476c5acb..d024e71e16 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -78,6 +78,24 @@ struct TabletTxnInfo { TabletTxnInfo() {} }; +struct CommitTabletTxnInfo { + CommitTabletTxnInfo(TPartitionId partition_id, TTransactionId transaction_id, + RowsetSharedPtr rowset, DeleteBitmapPtr delete_bitmap, + RowsetIdUnorderedSet rowset_ids) + : transaction_id(transaction_id), + partition_id(partition_id), + rowset(rowset), + delete_bitmap(delete_bitmap), + rowset_ids(rowset_ids) {} + TTransactionId transaction_id; + TPartitionId partition_id; + RowsetSharedPtr rowset; + DeleteBitmapPtr delete_bitmap; + RowsetIdUnorderedSet rowset_ids; +}; + +using CommitTabletTxnInfoVec = std::vector<CommitTabletTxnInfo>; + // txn manager is used to manage mapping between tablet and txns class TxnManager { public: @@ -172,6 +190,8 @@ public: TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids); + void get_all_commit_tablet_txn_info_by_tablet( + const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec); int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version); void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id); @@ -195,11 +215,11 @@ private: } }; - typedef std::unordered_map<TxnKey, std::map<TabletInfo, TabletTxnInfo>, TxnKeyHash, TxnKeyEqual> - txn_tablet_map_t; - typedef std::unordered_map<int64_t, std::unordered_set<int64_t>> txn_partition_map_t; - typedef std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>> - txn_tablet_delta_writer_map_t; + using txn_tablet_map_t = std::unordered_map<TxnKey, std::map<TabletInfo, TabletTxnInfo>, + TxnKeyHash, TxnKeyEqual>; + using txn_partition_map_t = std::unordered_map<int64_t, std::unordered_set<int64_t>>; + using txn_tablet_delta_writer_map_t = + std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>>; std::shared_mutex& _get_txn_map_lock(TTransactionId transactionId); diff --git a/be/test/olap/tablet_meta_test.cpp b/be/test/olap/tablet_meta_test.cpp index 962f619c11..993fbf2d49 100644 --- a/be/test/olap/tablet_meta_test.cpp +++ b/be/test/olap/tablet_meta_test.cpp @@ -179,7 +179,7 @@ TEST(TabletMetaTest, TestDeleteBitmap) { ASSERT_EQ(d.cardinality(), 4); ASSERT_EQ(db_upper.get({RowsetId {2, 0, 1, 1}, 1, 2}, &d), 0); ASSERT_EQ(d.cardinality(), 1); - ASSERT_EQ(db_upper.delete_bitmap.size(), 20); + ASSERT_EQ(db_upper.delete_bitmap.size(), 21); } { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
