This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f80986f1e [feature](merge-cloud) Sync cloud tablet rowsets before 
scan (#30639)
d8f80986f1e is described below

commit d8f80986f1e56f16a2e0b226af5c77cb6784a6da
Author: walter <[email protected]>
AuthorDate: Wed Jan 31 23:01:48 2024 +0800

    [feature](merge-cloud) Sync cloud tablet rowsets before scan (#30639)
---
 be/src/cloud/cloud_tablet.cpp                      | 18 ++++++++++
 be/src/cloud/cloud_tablet.h                        |  3 ++
 be/src/olap/base_tablet.cpp                        | 31 +++++++++++++++++
 be/src/olap/base_tablet.h                          | 11 ++++++
 be/src/olap/full_compaction.cpp                    |  2 +-
 be/src/olap/parallel_scanner_builder.cpp           |  2 +-
 be/src/olap/parallel_scanner_builder.h             |  6 ----
 be/src/olap/schema_change.cpp                      |  8 ++---
 be/src/olap/snapshot_manager.cpp                   |  6 ++--
 be/src/olap/tablet.cpp                             | 39 +++-------------------
 be/src/olap/tablet.h                               |  7 ++--
 be/src/olap/task/engine_checksum_task.cpp          |  3 +-
 be/src/olap/task/engine_storage_migration_task.cpp |  6 ++--
 be/src/vec/exec/scan/new_olap_scan_node.cpp        | 26 ++++++++++-----
 14 files changed, 102 insertions(+), 66 deletions(-)

diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 24c924fb6bc..6c0b50807c6 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -31,6 +31,7 @@
 #include "olap/olap_define.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_fwd.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/rowset/segment_v2/inverted_index_desc.h"
 
@@ -48,6 +49,23 @@ bool CloudTablet::exceed_version_limit(int32_t limit) {
     return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
 }
 
+Status CloudTablet::capture_consistent_rowsets_unlocked(
+        const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) 
const {
+    Versions version_path;
+    auto st = 
_timestamped_version_tracker.capture_consistent_versions(spec_version, 
&version_path);
+    if (!st.ok()) {
+        // Check no missed versions or req version is merged
+        auto missed_versions = get_missed_versions(spec_version.second);
+        if (missed_versions.empty()) {
+            st.set_code(VERSION_ALREADY_MERGED); // Reset error code
+        }
+        st.append(" tablet_id=" + std::to_string(tablet_id()));
+        return st;
+    }
+    VLOG_DEBUG << "capture consitent versions: " << version_path;
+    return _capture_consistent_rowsets_unlocked(version_path, rowsets);
+}
+
 Status CloudTablet::capture_rs_readers(const Version& spec_version,
                                        std::vector<RowSetSplits>* rs_splits,
                                        bool skip_missing_version) {
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index e057d23fd51..10e13904961 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -37,6 +37,9 @@ public:
     Status capture_rs_readers(const Version& spec_version, 
std::vector<RowSetSplits>* rs_splits,
                               bool skip_missing_version) override;
 
+    Status capture_consistent_rowsets_unlocked(
+            const Version& spec_version, std::vector<RowsetSharedPtr>* 
rowsets) const override;
+
     size_t tablet_footprint() override {
         return _approximate_data_size.load(std::memory_order_relaxed);
     }
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 2e82740887d..83c6bb36fbf 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -1028,4 +1028,35 @@ void BaseTablet::_rowset_ids_difference(const 
RowsetIdUnorderedSet& cur,
     }
 }
 
+Status BaseTablet::_capture_consistent_rowsets_unlocked(
+        const std::vector<Version>& version_path, 
std::vector<RowsetSharedPtr>* rowsets) const {
+    DCHECK(rowsets != nullptr);
+    rowsets->reserve(version_path.size());
+    for (const auto& version : version_path) {
+        bool is_find = false;
+        do {
+            auto it = _rs_version_map.find(version);
+            if (it != _rs_version_map.end()) {
+                is_find = true;
+                rowsets->push_back(it->second);
+                break;
+            }
+
+            auto it_expired = _stale_rs_version_map.find(version);
+            if (it_expired != _stale_rs_version_map.end()) {
+                is_find = true;
+                rowsets->push_back(it_expired->second);
+                break;
+            }
+        } while (false);
+
+        if (!is_find) {
+            return Status::Error<CAPTURE_ROWSET_ERROR>(
+                    "fail to find Rowset for version. tablet={}, version={}", 
tablet_id(),
+                    version.to_string());
+        }
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 0e3501b2730..5152f0a541d 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -35,6 +35,11 @@ class RowsetWriter;
 class CalcDeleteBitmapToken;
 class SegmentCacheHandle;
 
+struct TabletWithVersion {
+    BaseTabletSPtr tablet;
+    int64_t version;
+};
+
 // Base class for all tablet classes
 class BaseTablet {
 public:
@@ -82,6 +87,9 @@ public:
     virtual Result<std::unique_ptr<RowsetWriter>> 
create_rowset_writer(RowsetWriterContext& context,
                                                                        bool 
vertical) = 0;
 
+    virtual Status capture_consistent_rowsets_unlocked(
+            const Version& spec_version, std::vector<RowsetSharedPtr>* 
rowsets) const = 0;
+
     virtual Status capture_rs_readers(const Version& spec_version,
                                       std::vector<RowSetSplits>* rs_splits,
                                       bool skip_missing_version) = 0;
@@ -208,6 +216,9 @@ protected:
                                        const RowsetIdUnorderedSet& pre,
                                        RowsetIdUnorderedSet* to_add, 
RowsetIdUnorderedSet* to_del);
 
+    Status _capture_consistent_rowsets_unlocked(const std::vector<Version>& 
version_path,
+                                                std::vector<RowsetSharedPtr>* 
rowsets) const;
+
     void sort_block(vectorized::Block& in_block, vectorized::Block& 
output_block);
 
     mutable std::shared_mutex _meta_lock;
diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 9e8525f6168..f98ac2a24e7 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -150,7 +150,7 @@ Status 
FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedP
     int64_t max_version = _tablet->max_version().second;
     DCHECK(max_version >= rowset->version().second);
     if (max_version > rowset->version().second) {
-        RETURN_IF_ERROR(_tablet->capture_consistent_rowsets(
+        RETURN_IF_ERROR(_tablet->capture_consistent_rowsets_unlocked(
                 {rowset->version().second + 1, max_version}, &tmp_rowsets));
     }
 
diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index d25bcdc7a2f..e7f51881288 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -170,7 +170,7 @@ Status ParallelScannerBuilder<ParentType>::_load() {
         auto& rowsets = _all_rowsets[tablet_id];
         {
             std::shared_lock read_lock(tablet->get_header_lock());
-            RETURN_IF_ERROR(tablet->capture_consistent_rowsets({0, version}, 
&rowsets));
+            RETURN_IF_ERROR(tablet->capture_consistent_rowsets_unlocked({0, 
version}, &rowsets));
         }
 
         for (auto& rowset : rowsets) {
diff --git a/be/src/olap/parallel_scanner_builder.h 
b/be/src/olap/parallel_scanner_builder.h
index b9d659abc27..8b23ce27735 100644
--- a/be/src/olap/parallel_scanner_builder.h
+++ b/be/src/olap/parallel_scanner_builder.h
@@ -32,14 +32,8 @@ namespace vectorized {
 class VScanner;
 }
 
-using TabletSPtr = std::shared_ptr<Tablet>;
 using VScannerSPtr = std::shared_ptr<vectorized::VScanner>;
 
-struct TabletWithVersion {
-    TabletSPtr tablet;
-    int64_t version;
-};
-
 template <typename ParentType>
 class ParallelScannerBuilder {
 public:
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 320f6a3e645..1543a6dfd5d 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1375,8 +1375,8 @@ Status 
SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr ne
                   << "double write rowsets for version: " << alter_version + 1 
<< "-" << max_version
                   << " new_tablet=" << new_tablet->tablet_id();
         std::shared_lock<std::shared_mutex> 
rlock(new_tablet->get_header_lock());
-        RETURN_IF_ERROR(
-                new_tablet->capture_consistent_rowsets({alter_version + 1, 
max_version}, &rowsets));
+        RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets_unlocked(
+                {alter_version + 1, max_version}, &rowsets));
     }
     for (auto rowset_ptr : rowsets) {
         std::lock_guard<std::mutex> 
rwlock(new_tablet->get_rowset_update_lock());
@@ -1394,8 +1394,8 @@ Status 
SchemaChangeHandler::_calc_delete_bitmap_for_mow_table(TabletSharedPtr ne
         LOG(INFO) << "alter table for unique with merge-on-write, calculate 
delete bitmap of "
                   << "incremental rowsets for version: " << max_version + 1 << 
"-"
                   << new_max_version << " new_tablet=" << 
new_tablet->tablet_id();
-        RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets({max_version + 
1, new_max_version},
-                                                               &rowsets));
+        RETURN_IF_ERROR(new_tablet->capture_consistent_rowsets_unlocked(
+                {max_version + 1, new_max_version}, &rowsets));
     }
     for (auto&& rowset_ptr : rowsets) {
         RETURN_IF_ERROR(Tablet::update_delete_bitmap_without_lock(new_tablet, 
rowset_ptr));
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 53920b7692a..f8235a83af5 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -516,13 +516,13 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
                     res = check_version_continuity(consistent_rowsets);
                     if (res.ok() && max_cooldowned_version < version) {
                         // Pick consistent rowsets of remaining required 
version
-                        res = ref_tablet->capture_consistent_rowsets(
+                        res = ref_tablet->capture_consistent_rowsets_unlocked(
                                 {max_cooldowned_version + 1, version}, 
&consistent_rowsets);
                     }
                 } else {
                     // get shortest version path
-                    res = ref_tablet->capture_consistent_rowsets(Version(0, 
version),
-                                                                 
&consistent_rowsets);
+                    res = 
ref_tablet->capture_consistent_rowsets_unlocked(Version(0, version),
+                                                                          
&consistent_rowsets);
                 }
                 if (!res.ok()) {
                     LOG(WARNING) << "fail to select versions to span. res=" << 
res;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 4eac632f0f0..9730c149f74 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -391,8 +391,8 @@ Status Tablet::revise_tablet_meta(const 
std::vector<RowsetSharedPtr>& to_add,
         }
 
         if (calc_delete_bitmap_ver.first <= calc_delete_bitmap_ver.second) {
-            Status res =
-                    capture_consistent_rowsets(calc_delete_bitmap_ver, 
&calc_delete_bitmap_rowsets);
+            Status res = 
capture_consistent_rowsets_unlocked(calc_delete_bitmap_ver,
+                                                             
&calc_delete_bitmap_rowsets);
             // Because the data in memory has been changed, can't return an 
error.
             CHECK(res.ok()) << "fail to capture_consistent_rowsets, res: " << 
res;
 
@@ -858,8 +858,8 @@ void Tablet::acquire_version_and_rowsets(
     }
 }
 
-Status Tablet::capture_consistent_rowsets(const Version& spec_version,
-                                          std::vector<RowsetSharedPtr>* 
rowsets) const {
+Status Tablet::capture_consistent_rowsets_unlocked(const Version& spec_version,
+                                                   
std::vector<RowsetSharedPtr>* rowsets) const {
     std::vector<Version> version_path;
     RETURN_IF_ERROR(
             capture_consistent_versions_unlocked(spec_version, &version_path, 
false, false));
@@ -867,37 +867,6 @@ Status Tablet::capture_consistent_rowsets(const Version& 
spec_version,
     return Status::OK();
 }
 
-Status Tablet::_capture_consistent_rowsets_unlocked(const 
std::vector<Version>& version_path,
-                                                    
std::vector<RowsetSharedPtr>* rowsets) const {
-    DCHECK(rowsets != nullptr);
-    rowsets->reserve(version_path.size());
-    for (auto& version : version_path) {
-        bool is_find = false;
-        do {
-            auto it = _rs_version_map.find(version);
-            if (it != _rs_version_map.end()) {
-                is_find = true;
-                rowsets->push_back(it->second);
-                break;
-            }
-
-            auto it_expired = _stale_rs_version_map.find(version);
-            if (it_expired != _stale_rs_version_map.end()) {
-                is_find = true;
-                rowsets->push_back(it_expired->second);
-                break;
-            }
-        } while (false);
-
-        if (!is_find) {
-            return Status::Error<CAPTURE_ROWSET_ERROR>(
-                    "fail to find Rowset for version. tablet={}, version={}", 
tablet_id(),
-                    version.to_string());
-        }
-    }
-    return Status::OK();
-}
-
 Status Tablet::capture_rs_readers(const Version& spec_version, 
std::vector<RowSetSplits>* rs_splits,
                                   bool skip_missing_version) {
     std::shared_lock rlock(_meta_lock);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index ddc1c276fea..7b993d1af32 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -168,8 +168,9 @@ public:
     void acquire_version_and_rowsets(
             std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets) 
const;
 
-    Status capture_consistent_rowsets(const Version& spec_version,
-                                      std::vector<RowsetSharedPtr>* rowsets) 
const;
+    Status capture_consistent_rowsets_unlocked(
+            const Version& spec_version, std::vector<RowsetSharedPtr>* 
rowsets) const override;
+
     // If skip_missing_version is true, skip versions if they are missing.
     Status capture_rs_readers(const Version& spec_version, 
std::vector<RowSetSplits>* rs_splits,
                               bool skip_missing_version) override;
@@ -470,8 +471,6 @@ private:
     /// Delete stale rowset by version. This method not only delete the 
version in expired rowset map,
     /// but also delete the version in rowset meta vector.
     void _delete_stale_rowset_by_version(const Version& version);
-    Status _capture_consistent_rowsets_unlocked(const std::vector<Version>& 
version_path,
-                                                std::vector<RowsetSharedPtr>* 
rowsets) const;
 
     uint32_t _calc_cumulative_compaction_score(
             std::shared_ptr<CumulativeCompactionPolicy> 
cumulative_compaction_policy);
diff --git a/be/src/olap/task/engine_checksum_task.cpp 
b/be/src/olap/task/engine_checksum_task.cpp
index 4099c378377..67b90dfc27b 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -81,7 +81,8 @@ Status EngineChecksumTask::_compute_checksum() {
     vectorized::Block block;
     {
         std::shared_lock rdlock(tablet->get_header_lock());
-        Status acquire_reader_st = tablet->capture_consistent_rowsets(version, 
&input_rowsets);
+        Status acquire_reader_st =
+                tablet->capture_consistent_rowsets_unlocked(version, 
&input_rowsets);
         if (!acquire_reader_st.ok()) {
             LOG(WARNING) << "fail to captute consistent rowsets. tablet=" << 
tablet->tablet_id()
                          << "res=" << acquire_reader_st;
diff --git a/be/src/olap/task/engine_storage_migration_task.cpp 
b/be/src/olap/task/engine_storage_migration_task.cpp
index e778a0b8f8d..acee9d2a834 100644
--- a/be/src/olap/task/engine_storage_migration_task.cpp
+++ b/be/src/olap/task/engine_storage_migration_task.cpp
@@ -84,8 +84,8 @@ Status EngineStorageMigrationTask::_get_versions(int32_t 
start_version, int32_t*
                    << ", start_version=" << start_version << ", end_version=" 
<< *end_version;
         return Status::OK();
     }
-    return _tablet->capture_consistent_rowsets(Version(start_version, 
*end_version),
-                                               consistent_rowsets);
+    return _tablet->capture_consistent_rowsets_unlocked(Version(start_version, 
*end_version),
+                                                        consistent_rowsets);
 }
 
 bool EngineStorageMigrationTask::_is_timeout() {
@@ -93,7 +93,7 @@ bool EngineStorageMigrationTask::_is_timeout() {
     int64_t timeout = std::max<int64_t>(config::migration_task_timeout_secs,
                                         _tablet->tablet_local_size() >> 20);
     if (time_elapsed > timeout) {
-        LOG(WARNING) << "migration failed due to timeout, time_eplapsed=" << 
time_elapsed
+        LOG(WARNING) << "migration failed due to timeout, time_elapsed=" << 
time_elapsed
                      << ", tablet=" << _tablet->tablet_id();
         return true;
     }
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index dc82ebd9e81..82617210230 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -34,7 +34,9 @@
 #include <variant>
 #include <vector>
 
-#include "common/config.h"
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_tablet.h"
+#include "cloud/config.h"
 #include "common/logging.h"
 #include "common/object_pool.h"
 #include "common/status.h"
@@ -494,24 +496,32 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
     bool is_dup_mow_key = false;
     size_t segment_count = 0;
     std::vector<TabletReader::ReadSource> tablets_read_source;
-    tablets_read_source.reserve(_scan_ranges.size());
     std::vector<std::vector<size_t>> tablet_rs_seg_count;
+    std::vector<TabletWithVersion> tablets_to_scan;
+
+    tablets_read_source.reserve(_scan_ranges.size());
     tablet_rs_seg_count.reserve(_scan_ranges.size());
-    std::vector<std::pair<BaseTabletSPtr, int64_t /* version */>> 
tablets_to_scan;
     tablets_to_scan.reserve(_scan_ranges.size());
 
-    std::vector<TabletWithVersion> tablets;
-
     for (auto&& scan_range : _scan_ranges) {
         auto tablet = DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
         int64_t version = 0;
         std::from_chars(scan_range->version.data(),
                         scan_range->version.data() + 
scan_range->version.size(), version);
-        tablets.emplace_back(
-                TabletWithVersion {std::dynamic_pointer_cast<Tablet>(tablet), 
version});
         tablets_to_scan.emplace_back(std::move(tablet), version);
     }
 
+    if (config::is_cloud_mode()) {
+        std::vector<std::function<Status()>> tasks;
+        tasks.reserve(_scan_ranges.size());
+        for (auto&& [tablet, version] : tablets_to_scan) {
+            tasks.emplace_back([tablet, version]() {
+                return 
std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(version);
+            });
+        }
+        RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
+    }
+
     bool enable_parallel_scan = _state->enable_parallel_scan();
 
     // Split tablet segment by scanner, only use in pipeline in duplicate key
@@ -564,7 +574,7 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
         }
 
         ParallelScannerBuilder<NewOlapScanNode> scanner_builder(
-                this, tablets, _scanner_profile, key_ranges, _state, 
_limit_per_scanner,
+                this, tablets_to_scan, _scanner_profile, key_ranges, _state, 
_limit_per_scanner,
                 is_dup_mow_key, _olap_scan_node.is_preaggregation);
 
         int max_scanners_count = _state->parallel_scan_max_scanners_count();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to