This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 64ee35ea81ec081a00dceb73b26311ac251cf612
Author: abmdocrt <[email protected]>
AuthorDate: Mon Jul 10 14:06:11 2023 +0800

    [Enhancement](Compaction) Caculate all committed rowsets delete bitmaps 
when do comapction (#20907)
    
    Here we will calculate all the rowsets delete bitmaps which are committed 
but not published to reduce the calculation pressure of publish phase.
    
    Step1: collect this tablet's all committed rowsets' delete bitmaps.
    
    Step2: calculate all rowsets' delete bitmaps which are published during 
compaction.
    
    Step3: write back updated delete bitmap and tablet info.
---
 be/src/olap/compaction.cpp        | 32 ++++++++++++++++++++++++++++++--
 be/src/olap/tablet.cpp            |  7 +++----
 be/src/olap/tablet.h              |  2 +-
 be/src/olap/tablet_meta.cpp       |  7 ++++++-
 be/src/olap/tablet_meta.h         |  7 +++++++
 be/src/olap/txn_manager.cpp       | 22 ++++++++++++++++++++++
 be/src/olap/txn_manager.h         | 30 +++++++++++++++++++++++++-----
 be/test/olap/tablet_meta_test.cpp |  2 +-
 8 files changed, 95 insertions(+), 14 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 023101fc0f..57ed2e2558 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -25,6 +25,7 @@
 #include <cstdlib>
 #include <list>
 #include <map>
+#include <memory>
 #include <mutex>
 #include <ostream>
 #include <set>
@@ -50,6 +51,7 @@
 #include "olap/tablet.h"
 #include "olap/tablet_meta.h"
 #include "olap/task/engine_checksum_task.h"
+#include "olap/txn_manager.h"
 #include "olap/utils.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "util/time.h"
@@ -578,7 +580,8 @@ Status Compaction::modify_rowsets(const Merger::Statistics* 
stats) {
         // of incremental data later.
         _tablet->calc_compaction_output_rowset_delete_bitmap(
                 _input_rowsets, _rowid_conversion, 0, version.second + 1, 
&missed_rows,
-                &location_map, &output_rowset_delete_bitmap);
+                &location_map, _tablet->tablet_meta()->delete_bitmap(),
+                &output_rowset_delete_bitmap);
         std::size_t missed_rows_size = missed_rows.size();
         if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
             if (stats != nullptr && stats->merged_rows != missed_rows_size) {
@@ -594,16 +597,41 @@ Status Compaction::modify_rowsets(const 
Merger::Statistics* stats) {
 
         RETURN_IF_ERROR(_tablet->check_rowid_conversion(_output_rowset, 
location_map));
         location_map.clear();
+
         {
             std::lock_guard<std::mutex> 
wrlock_(_tablet->get_rowset_update_lock());
             std::lock_guard<std::shared_mutex> 
wrlock(_tablet->get_header_lock());
             SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
 
+            // Here we will calculate all the rowsets delete bitmaps which are 
committed but not published to reduce the calculation pressure
+            // of publish phase.
+            // All rowsets which need to recalculate have been published so we 
don't need to acquire lock.
+            // Step1: collect this tablet's all committed rowsets' delete 
bitmaps
+            CommitTabletTxnInfoVec commit_tablet_txn_info_vec {};
+            
StorageEngine::instance()->txn_manager()->get_all_commit_tablet_txn_info_by_tablet(
+                    _tablet, &commit_tablet_txn_info_vec);
+
+            // Step2: calculate all rowsets' delete bitmaps which are 
published during compaction.
+            for (auto& it : commit_tablet_txn_info_vec) {
+                DeleteBitmap output_delete_bitmap(_tablet->tablet_id());
+                _tablet->calc_compaction_output_rowset_delete_bitmap(
+                        _input_rowsets, _rowid_conversion, 0, UINT64_MAX, 
&missed_rows,
+                        &location_map, *it.delete_bitmap.get(), 
&output_delete_bitmap);
+                it.delete_bitmap->merge(output_delete_bitmap);
+                // Step3: write back updated delete bitmap and tablet info.
+                it.rowset_ids.insert(_output_rowset->rowset_id());
+                
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
+                        it.partition_id, it.transaction_id, 
_tablet->tablet_id(),
+                        _tablet->schema_hash(), _tablet->tablet_uid(), true, 
it.delete_bitmap,
+                        it.rowset_ids);
+            }
+
             // Convert the delete bitmap of the input rowsets to output rowset 
for
             // incremental data.
             _tablet->calc_compaction_output_rowset_delete_bitmap(
                     _input_rowsets, _rowid_conversion, version.second, 
UINT64_MAX, &missed_rows,
-                    &location_map, &output_rowset_delete_bitmap);
+                    &location_map, _tablet->tablet_meta()->delete_bitmap(),
+                    &output_rowset_delete_bitmap);
             if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) 
{
                 DCHECK_EQ(missed_rows.size(), missed_rows_size);
                 if (missed_rows.size() != missed_rows_size) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 4b6e688e0c..8eb2d0fd30 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3313,7 +3313,7 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap(
         const std::vector<RowsetSharedPtr>& input_rowsets, const 
RowIdConversion& rowid_conversion,
         uint64_t start_version, uint64_t end_version, std::set<RowLocation>* 
missed_rows,
         std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, 
RowLocation>>>* location_map,
-        DeleteBitmap* output_rowset_delete_bitmap) {
+        const DeleteBitmap& input_delete_bitmap, DeleteBitmap* 
output_rowset_delete_bitmap) {
     RowLocation src;
     RowLocation dst;
     for (auto& rowset : input_rowsets) {
@@ -3321,9 +3321,8 @@ void Tablet::calc_compaction_output_rowset_delete_bitmap(
         for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
             src.segment_id = seg_id;
             DeleteBitmap subset_map(tablet_id());
-            _tablet_meta->delete_bitmap().subset({rowset->rowset_id(), seg_id, 
start_version},
-                                                 {rowset->rowset_id(), seg_id, 
end_version},
-                                                 &subset_map);
+            input_delete_bitmap.subset({rowset->rowset_id(), seg_id, 
start_version},
+                                       {rowset->rowset_id(), seg_id, 
end_version}, &subset_map);
             // traverse all versions and convert rowid
             for (auto iter = subset_map.delete_bitmap.begin();
                  iter != subset_map.delete_bitmap.end(); ++iter) {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 724ab81243..8b764326ed 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -477,7 +477,7 @@ public:
             const RowIdConversion& rowid_conversion, uint64_t start_version, 
uint64_t end_version,
             std::set<RowLocation>* missed_rows,
             std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, 
RowLocation>>>* location_map,
-            DeleteBitmap* output_rowset_delete_bitmap);
+            const DeleteBitmap& input_delete_bitmap, DeleteBitmap* 
output_rowset_delete_bitmap);
     void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);
     Status check_rowid_conversion(
             RowsetSharedPtr dst_rowset,
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 55e9e695d0..5fcfadffff 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -941,6 +941,11 @@ bool DeleteBitmap::contains_agg(const BitmapKey& bmk, 
uint32_t row_id) const {
     return get_agg(bmk)->contains(row_id);
 }
 
+bool DeleteBitmap::empty() const {
+    std::shared_lock l(lock);
+    return delete_bitmap.empty();
+}
+
 bool DeleteBitmap::contains_agg_without_cache(const BitmapKey& bmk, uint32_t 
row_id) const {
     std::shared_lock l(lock);
     DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), 0};
@@ -983,7 +988,7 @@ void DeleteBitmap::subset(const BitmapKey& start, const 
BitmapKey& end,
     roaring::Roaring roaring;
     DCHECK(start < end);
     std::shared_lock l(lock);
-    for (auto it = delete_bitmap.upper_bound(start); it != 
delete_bitmap.end(); ++it) {
+    for (auto it = delete_bitmap.lower_bound(start); it != 
delete_bitmap.end(); ++it) {
         auto& [k, bm] = *it;
         if (k >= end) {
             break;
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 2c6a98b111..70830b82de 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -356,6 +356,13 @@ public:
      */
     bool contains(const BitmapKey& bmk, uint32_t row_id) const;
 
+    /**
+     * Checks if this delete bitmap is empty
+     *
+     * @return true if empty
+     */
+    bool empty() const;
+
     /**
      * Sets the bitmap of specific segment, it's may be insertion or 
replacement
      *
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 7f2eac5209..432f96e707 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -621,6 +621,28 @@ void 
TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) {
     }
 }
 
+void TxnManager::get_all_commit_tablet_txn_info_by_tablet(
+        const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* 
commit_tablet_txn_info_vec) {
+    for (int32_t i = 0; i < _txn_map_shard_size; i++) {
+        std::shared_lock txn_rdlock(_txn_map_locks[i]);
+        for (const auto& it : _txn_tablet_maps[i]) {
+            auto tablet_load_it = it.second.find(tablet->get_tablet_info());
+            if (tablet_load_it != it.second.end()) {
+                TPartitionId partition_id = it.first.first;
+                TTransactionId transaction_id = it.first.second;
+                const RowsetSharedPtr& rowset = tablet_load_it->second.rowset;
+                const DeleteBitmapPtr& delete_bitmap = 
tablet_load_it->second.delete_bitmap;
+                const RowsetIdUnorderedSet& rowset_ids = 
tablet_load_it->second.rowset_ids;
+                if (!rowset || !delete_bitmap) {
+                    continue;
+                }
+                commit_tablet_txn_info_vec->push_back(CommitTabletTxnInfo(
+                        partition_id, transaction_id, rowset, delete_bitmap, 
rowset_ids));
+            }
+        }
+    }
+}
+
 bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId 
transaction_id,
                          TTabletId tablet_id, SchemaHash schema_hash, 
TabletUid tablet_uid) {
     pair<int64_t, int64_t> key(partition_id, transaction_id);
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 42476c5acb..d024e71e16 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -78,6 +78,24 @@ struct TabletTxnInfo {
     TabletTxnInfo() {}
 };
 
+struct CommitTabletTxnInfo {
+    CommitTabletTxnInfo(TPartitionId partition_id, TTransactionId 
transaction_id,
+                        RowsetSharedPtr rowset, DeleteBitmapPtr delete_bitmap,
+                        RowsetIdUnorderedSet rowset_ids)
+            : transaction_id(transaction_id),
+              partition_id(partition_id),
+              rowset(rowset),
+              delete_bitmap(delete_bitmap),
+              rowset_ids(rowset_ids) {}
+    TTransactionId transaction_id;
+    TPartitionId partition_id;
+    RowsetSharedPtr rowset;
+    DeleteBitmapPtr delete_bitmap;
+    RowsetIdUnorderedSet rowset_ids;
+};
+
+using CommitTabletTxnInfoVec = std::vector<CommitTabletTxnInfo>;
+
 // txn manager is used to manage mapping between tablet and txns
 class TxnManager {
 public:
@@ -172,6 +190,8 @@ public:
                                        TabletUid tablet_uid, bool 
unique_key_merge_on_write,
                                        DeleteBitmapPtr delete_bitmap,
                                        const RowsetIdUnorderedSet& rowset_ids);
+    void get_all_commit_tablet_txn_info_by_tablet(
+            const TabletSharedPtr& tablet, CommitTabletTxnInfoVec* 
commit_tablet_txn_info_vec);
 
     int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version);
     void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t 
txn_id);
@@ -195,11 +215,11 @@ private:
         }
     };
 
-    typedef std::unordered_map<TxnKey, std::map<TabletInfo, TabletTxnInfo>, 
TxnKeyHash, TxnKeyEqual>
-            txn_tablet_map_t;
-    typedef std::unordered_map<int64_t, std::unordered_set<int64_t>> 
txn_partition_map_t;
-    typedef std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>>
-            txn_tablet_delta_writer_map_t;
+    using txn_tablet_map_t = std::unordered_map<TxnKey, std::map<TabletInfo, 
TabletTxnInfo>,
+                                                TxnKeyHash, TxnKeyEqual>;
+    using txn_partition_map_t = std::unordered_map<int64_t, 
std::unordered_set<int64_t>>;
+    using txn_tablet_delta_writer_map_t =
+            std::unordered_map<int64_t, std::map<int64_t, DeltaWriter*>>;
 
     std::shared_mutex& _get_txn_map_lock(TTransactionId transactionId);
 
diff --git a/be/test/olap/tablet_meta_test.cpp 
b/be/test/olap/tablet_meta_test.cpp
index 962f619c11..993fbf2d49 100644
--- a/be/test/olap/tablet_meta_test.cpp
+++ b/be/test/olap/tablet_meta_test.cpp
@@ -179,7 +179,7 @@ TEST(TabletMetaTest, TestDeleteBitmap) {
         ASSERT_EQ(d.cardinality(), 4);
         ASSERT_EQ(db_upper.get({RowsetId {2, 0, 1, 1}, 1, 2}, &d), 0);
         ASSERT_EQ(d.cardinality(), 1);
-        ASSERT_EQ(db_upper.delete_bitmap.size(), 20);
+        ASSERT_EQ(db_upper.delete_bitmap.size(), 21);
     }
 
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to