This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bc78d93afb2 [cloud](topn-opt) support topn 2 phase read in cloud mode 
(#31548)
bc78d93afb2 is described below

commit bc78d93afb20ce3a50803a491b9e8eabf31d2e4d
Author: lihangyu <[email protected]>
AuthorDate: Thu Feb 29 18:58:57 2024 +0800

    [cloud](topn-opt) support topn 2 phase read in cloud mode (#31548)
---
 be/src/cloud/cloud_storage_engine.cpp              |   6 +
 be/src/cloud/cloud_storage_engine.h                |   1 -
 be/src/common/config.cpp                           |   1 +
 be/src/common/config.h                             |   1 +
 be/src/exec/rowid_fetcher.cpp                      | 197 +++++++++++++++++++
 be/src/exec/rowid_fetcher.h                        |   6 +
 be/src/olap/olap_server.cpp                        |   6 +
 be/src/olap/storage_engine.cpp                     |  40 ++--
 be/src/olap/storage_engine.h                       |  24 +--
 be/src/runtime/descriptors.h                       |   2 +
 be/src/service/internal_service.cpp                | 214 +--------------------
 be/src/service/internal_service.h                  |  25 ++-
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  23 +--
 .../pipeline/cloud_p0/conf/session_variables.sql   |   1 -
 14 files changed, 300 insertions(+), 247 deletions(-)

diff --git a/be/src/cloud/cloud_storage_engine.cpp 
b/be/src/cloud/cloud_storage_engine.cpp
index 8ca0d503e74..439c767211f 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -115,6 +115,12 @@ Status CloudStorageEngine::start_bg_threads() {
             [this]() { this->_sync_tablets_thread_callback(); }, 
&_bg_threads.emplace_back()));
     LOG(INFO) << "sync tablets thread started";
 
+    RETURN_IF_ERROR(Thread::create(
+            "CloudStorageEngine", "evict_querying_rowset_thread",
+            [this]() { this->_evict_quring_rowset_thread_callback(); },
+            &_evict_quering_rowset_thread));
+    LOG(INFO) << "evict quering thread started";
+
     // TODO(plat1ko): lease_compaction_thread
 
     // TODO(plat1ko): check_bucket_enable_versioning_thread
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
index 7ccfeaa784e..16b1cc5a1e0 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -67,7 +67,6 @@ private:
     void _sync_tablets_thread_callback();
 
     std::atomic_bool _stopped {false};
-    CountDownLatch _stop_background_threads_latch {1};
 
     std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
     std::unique_ptr<CloudTabletMgr> _tablet_mgr;
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index f7f4cc99a4b..dc3b7ae0449 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -278,6 +278,7 @@ DEFINE_mInt32(tablet_lookup_cache_stale_sweep_time_sec, 
"30");
 DEFINE_mInt32(point_query_row_cache_stale_sweep_time_sec, "300");
 DEFINE_mInt32(disk_stat_monitor_interval, "5");
 DEFINE_mInt32(unused_rowset_monitor_interval, "30");
+DEFINE_mInt32(quering_rowsets_evict_interval, "30");
 DEFINE_String(storage_root_path, "${DORIS_HOME}/storage");
 DEFINE_mString(broken_storage_path, "");
 
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 6fcbecd8f71..b95969b761b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -321,6 +321,7 @@ DECLARE_mInt32(tablet_lookup_cache_stale_sweep_time_sec);
 DECLARE_mInt32(point_query_row_cache_stale_sweep_time_sec);
 DECLARE_mInt32(disk_stat_monitor_interval);
 DECLARE_mInt32(unused_rowset_monitor_interval);
+DECLARE_mInt32(quering_rowsets_evict_interval);
 DECLARE_String(storage_root_path);
 DECLARE_mString(broken_storage_path);
 
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 6a7e21d81cc..c921be9509f 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -30,6 +30,7 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <unordered_map>
@@ -37,11 +38,19 @@
 #include <vector>
 
 #include "bthread/countdown_event.h"
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/config.h"
 #include "common/config.h"
 #include "common/consts.h"
 #include "common/exception.h"
 #include "exec/tablet_info.h" // DorisNodesInfo
 #include "olap/olap_common.h"
+#include "olap/rowset/beta_rowset.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_fwd.h"
+#include "olap/tablet_manager.h"
 #include "olap/tablet_schema.h"
 #include "olap/utils.h"
 #include "runtime/descriptors.h"
@@ -275,4 +284,192 @@ Status RowIDFetcher::fetch(const vectorized::ColumnPtr& 
column_row_ids,
     return Status::OK();
 }
 
+template <typename Func>
+auto scope_timer_run(Func fn, int64_t* cost) -> decltype(fn()) {
+    MonotonicStopWatch watch;
+    watch.start();
+    auto res = fn();
+    *cost += watch.elapsed_time() / 1000 / 1000;
+    return res;
+}
+
+struct IteratorKey {
+    int64_t tablet_id;
+    RowsetId rowset_id;
+    uint64_t segment_id;
+    int slot_id;
+
+    // unordered map std::equal_to
+    bool operator==(const IteratorKey& rhs) const {
+        return tablet_id == rhs.tablet_id && rowset_id == rhs.rowset_id &&
+               segment_id == rhs.segment_id && slot_id == rhs.slot_id;
+    }
+};
+
+struct HashOfIteratorKey {
+    size_t operator()(const IteratorKey& key) const {
+        size_t seed = 0;
+        seed = HashUtil::hash64(&key.tablet_id, sizeof(key.tablet_id), seed);
+        seed = HashUtil::hash64(&key.rowset_id.hi, sizeof(key.rowset_id.hi), 
seed);
+        seed = HashUtil::hash64(&key.rowset_id.mi, sizeof(key.rowset_id.mi), 
seed);
+        seed = HashUtil::hash64(&key.rowset_id.lo, sizeof(key.rowset_id.lo), 
seed);
+        seed = HashUtil::hash64(&key.segment_id, sizeof(key.segment_id), seed);
+        seed = HashUtil::hash64(&key.slot_id, sizeof(key.slot_id), seed);
+        return seed;
+    }
+};
+
+struct IteratorItem {
+    std::unique_ptr<ColumnIterator> iterator;
+    // for holding the reference of segment to avoid use after release
+    SegmentSharedPtr segment;
+};
+
+Status RowIdStorageReader::read_by_rowids(const PMultiGetRequest& request,
+                                          PMultiGetResponse* response) {
+    // read from storage engine row id by row id
+    OlapReaderStatistics stats;
+    vectorized::Block result_block;
+    int64_t acquire_tablet_ms = 0;
+    int64_t acquire_rowsets_ms = 0;
+    int64_t acquire_segments_ms = 0;
+    int64_t lookup_row_data_ms = 0;
+
+    // init desc
+    TupleDescriptor desc(request.desc());
+    std::vector<SlotDescriptor> slots;
+    slots.reserve(request.slots().size());
+    for (const auto& pslot : request.slots()) {
+        slots.push_back(SlotDescriptor(pslot));
+        desc.add_slot(&slots.back());
+    }
+
+    // init read schema
+    TabletSchema full_read_schema;
+    for (const ColumnPB& column_pb : request.column_desc()) {
+        full_read_schema.append_column(TabletColumn(column_pb));
+    }
+
+    std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey> 
iterator_map;
+    // read row by row
+    for (size_t i = 0; i < request.row_locs_size(); ++i) {
+        const auto& row_loc = request.row_locs(i);
+        MonotonicStopWatch watch;
+        watch.start();
+        BaseTabletSPtr tablet = scope_timer_run(
+                [&]() {
+                    auto res = ExecEnv::get_tablet(row_loc.tablet_id());
+                    return !res.has_value() ? nullptr
+                                            : 
std::dynamic_pointer_cast<BaseTablet>(res.value());
+                },
+                &acquire_tablet_ms);
+        RowsetId rowset_id;
+        rowset_id.init(row_loc.rowset_id());
+        if (!tablet) {
+            continue;
+        }
+        // We ensured it's rowset is not released when init Tablet reader 
param, rowset->update_delayed_expired_timestamp();
+        BetaRowsetSharedPtr rowset = 
std::static_pointer_cast<BetaRowset>(scope_timer_run(
+                [&]() {
+                    return 
ExecEnv::GetInstance()->storage_engine().get_quering_rowset(rowset_id);
+                },
+                &acquire_rowsets_ms));
+        if (!rowset) {
+            LOG(INFO) << "no such rowset " << rowset_id;
+            continue;
+        }
+        size_t row_size = 0;
+        Defer _defer([&]() {
+            LOG_EVERY_N(INFO, 100)
+                    << "multiget_data single_row, cost(us):" << 
watch.elapsed_time() / 1000
+                    << ", row_size:" << row_size;
+            *response->add_row_locs() = row_loc;
+        });
+        SegmentCacheHandle segment_cache;
+        RETURN_IF_ERROR(scope_timer_run(
+                [&]() {
+                    return SegmentLoader::instance()->load_segments(rowset, 
&segment_cache, true);
+                },
+                &acquire_segments_ms));
+        // find segment
+        auto it = std::find_if(segment_cache.get_segments().cbegin(),
+                               segment_cache.get_segments().cend(),
+                               [&row_loc](const segment_v2::SegmentSharedPtr& 
seg) {
+                                   return seg->id() == row_loc.segment_id();
+                               });
+        if (it == segment_cache.get_segments().end()) {
+            continue;
+        }
+        segment_v2::SegmentSharedPtr segment = *it;
+        GlobalRowLoacation row_location(row_loc.tablet_id(), 
rowset->rowset_id(),
+                                        row_loc.segment_id(), 
row_loc.ordinal_id());
+        // fetch by row store, more effcient way
+        if (request.fetch_row_store()) {
+            CHECK(tablet->tablet_schema()->store_row_column());
+            RowLocation loc(rowset_id, segment->id(), row_loc.ordinal_id());
+            string* value = response->add_binary_row_data();
+            RETURN_IF_ERROR(scope_timer_run(
+                    [&]() {
+                        return tablet->lookup_row_data({}, loc, rowset, &desc, 
stats, *value);
+                    },
+                    &lookup_row_data_ms));
+            row_size = value->size();
+            continue;
+        }
+
+        // fetch by column store
+        if (result_block.is_empty_column()) {
+            result_block = vectorized::Block(desc.slots(), 
request.row_locs().size());
+        }
+        VLOG_DEBUG << "Read row location "
+                   << fmt::format("{}, {}, {}, {}", row_location.tablet_id,
+                                  
row_location.row_location.rowset_id.to_string(),
+                                  row_location.row_location.segment_id,
+                                  row_location.row_location.row_id);
+        for (int x = 0; x < desc.slots().size(); ++x) {
+            auto row_id = 
static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
+            vectorized::MutableColumnPtr column =
+                    result_block.get_by_position(x).column->assume_mutable();
+            IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
+                                      .rowset_id = rowset_id,
+                                      .segment_id = row_loc.segment_id(),
+                                      .slot_id = desc.slots()[x]->id()};
+            IteratorItem& iterator_item = iterator_map[iterator_key];
+            if (iterator_item.segment == nullptr) {
+                // hold the reference
+                iterator_map[iterator_key].segment = segment;
+            }
+            segment = iterator_item.segment;
+            RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema, 
desc.slots()[x],
+                                                            row_id, column, 
stats,
+                                                            
iterator_item.iterator));
+        }
+    }
+    // serialize block if not empty
+    if (!result_block.is_empty_column()) {
+        VLOG_DEBUG << "dump block:" << result_block.dump_data(0, 10)
+                   << ", be_exec_version:" << request.be_exec_version();
+        [[maybe_unused]] size_t compressed_size = 0;
+        [[maybe_unused]] size_t uncompressed_size = 0;
+        int be_exec_version = request.has_be_exec_version() ? 
request.be_exec_version() : 0;
+        RETURN_IF_ERROR(result_block.serialize(be_exec_version, 
response->mutable_block(),
+                                               &uncompressed_size, 
&compressed_size,
+                                               
segment_v2::CompressionTypePB::LZ4));
+    }
+
+    LOG(INFO) << "Query stats: "
+              << fmt::format(
+                         "hit_cached_pages:{}, total_pages_read:{}, 
compressed_bytes_read:{}, "
+                         "io_latency:{}ns, "
+                         "uncompressed_bytes_read:{},"
+                         "bytes_read:{},"
+                         "acquire_tablet_ms:{}, acquire_rowsets_ms:{}, 
acquire_segments_ms:{}, "
+                         "lookup_row_data_ms:{}",
+                         stats.cached_pages_num, stats.total_pages_num, 
stats.compressed_bytes_read,
+                         stats.io_ns, stats.uncompressed_bytes_read, 
stats.bytes_read,
+                         acquire_tablet_ms, acquire_rowsets_ms, 
acquire_segments_ms,
+                         lookup_row_data_ms);
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h
index ae57c295a5f..78184ae8feb 100644
--- a/be/src/exec/rowid_fetcher.h
+++ b/be/src/exec/rowid_fetcher.h
@@ -26,6 +26,7 @@
 
 #include "common/status.h"
 #include "exec/tablet_info.h" // DorisNodesInfo
+#include "olap/storage_engine.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type.h"
 
@@ -67,4 +68,9 @@ private:
     FetchOption _fetch_option;
 };
 
+class RowIdStorageReader {
+public:
+    static Status read_by_rowids(const PMultiGetRequest& request, 
PMultiGetResponse* response);
+};
+
 } // namespace doris
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index c4239898892..242e71e3342 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -129,6 +129,12 @@ Status StorageEngine::start_bg_threads() {
             &_unused_rowset_monitor_thread));
     LOG(INFO) << "unused rowset monitor thread started";
 
+    RETURN_IF_ERROR(Thread::create(
+            "StorageEngine", "evict_querying_rowset_thread",
+            [this]() { this->_evict_quring_rowset_thread_callback(); },
+            &_evict_quering_rowset_thread));
+    LOG(INFO) << "evict quering thread started";
+
     // start thread for monitoring the snapshot and trash folder
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "garbage_sweeper_thread",
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 5e735dfb77f..0a94c264110 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -108,6 +108,7 @@ bvar::Adder<uint64_t> 
unused_rowsets_counter("ununsed_rowsets_counter");
 BaseStorageEngine::BaseStorageEngine(Type type, const UniqueId& backend_uid)
         : _type(type),
           
_rowset_id_generator(std::make_unique<UniqueRowsetIdGenerator>(backend_uid)),
+          _stop_background_threads_latch(1),
           _segment_meta_mem_tracker(std::make_shared<MemTracker>(
                   "SegmentMeta", 
ExecEnv::GetInstance()->experimental_mem_tracker())) {}
 
@@ -149,7 +150,6 @@ StorageEngine::StorageEngine(const EngineOptions& options)
           _is_all_cluster_id_exist(true),
           _stopped(false),
           
_segcompaction_mem_tracker(std::make_shared<MemTracker>("SegCompaction")),
-          _stop_background_threads_latch(1),
           _tablet_manager(new TabletManager(*this, 
config::tablet_map_shard_size)),
           _txn_manager(new TxnManager(*this, config::txn_map_shard_size, 
config::txn_shard_size)),
           _default_rowset_type(BETA_ROWSET),
@@ -1081,12 +1081,7 @@ void StorageEngine::start_delete_unused_rowset() {
     {
         std::lock_guard<std::mutex> lock(_gc_mutex);
         for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
-            uint64_t now = UnixSeconds();
             auto&& rs = it->second;
-            if (now > rs->delayed_expired_timestamp()) {
-                // We delay the GC time of this rowset since it's maybe still 
needed, see #20732
-                evict_querying_rowset(it->second->rowset_id());
-            }
             if (rs.use_count() == 1 && rs->need_delete_file()) {
                 // remote rowset data will be reclaimed by 
`remove_unused_remote_files`
                 if (rs->is_local()) {
@@ -1281,6 +1276,19 @@ bool BaseStorageEngine::notify_listener(std::string_view 
name) {
     return found;
 }
 
+void BaseStorageEngine::_evict_quring_rowset_thread_callback() {
+    int32_t interval = config::quering_rowsets_evict_interval;
+    do {
+        _evict_querying_rowset();
+        interval = config::quering_rowsets_evict_interval;
+        if (interval <= 0) {
+            LOG(WARNING) << "quering_rowsets_evict_interval config is illegal: 
" << interval
+                         << ", force set to 1";
+            interval = 1;
+        }
+    } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
+}
+
 // check whether any unused rowsets's id equal to rowset_id
 bool StorageEngine::check_rowset_id_in_unused_rowsets(const RowsetId& 
rowset_id) {
     std::lock_guard<std::mutex> lock(_gc_mutex);
@@ -1396,12 +1404,12 @@ Status 
StorageEngine::get_compaction_status_json(std::string* result) {
     return Status::OK();
 }
 
-void StorageEngine::add_quering_rowset(RowsetSharedPtr rs) {
+void BaseStorageEngine::add_quering_rowset(RowsetSharedPtr rs) {
     std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
     _querying_rowsets.emplace(rs->rowset_id(), rs);
 }
 
-RowsetSharedPtr StorageEngine::get_quering_rowset(RowsetId rs_id) {
+RowsetSharedPtr BaseStorageEngine::get_quering_rowset(RowsetId rs_id) {
     std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
     auto it = _querying_rowsets.find(rs_id);
     if (it != _querying_rowsets.end()) {
@@ -1410,9 +1418,19 @@ RowsetSharedPtr 
StorageEngine::get_quering_rowset(RowsetId rs_id) {
     return nullptr;
 }
 
-void StorageEngine::evict_querying_rowset(RowsetId rs_id) {
-    std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
-    _querying_rowsets.erase(rs_id);
+void BaseStorageEngine::_evict_querying_rowset() {
+    {
+        std::lock_guard<std::mutex> lock(_quering_rowsets_mutex);
+        for (auto it = _querying_rowsets.begin(); it != 
_querying_rowsets.end();) {
+            uint64_t now = UnixSeconds();
+            // We delay the GC time of this rowset since it's maybe still 
needed, see #20732
+            if (now > it->second->delayed_expired_timestamp()) {
+                it = _querying_rowsets.erase(it);
+            } else {
+                ++it;
+            }
+        }
+    }
 }
 
 bool StorageEngine::add_broken_path(std::string path) {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6e04269b5f2..6ebfb9479f4 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -129,7 +129,14 @@ public:
         return _segment_meta_mem_tracker;
     }
 
+    void add_quering_rowset(RowsetSharedPtr rs);
+
+    RowsetSharedPtr get_quering_rowset(RowsetId rs_id);
+
 protected:
+    void _evict_querying_rowset();
+    void _evict_quring_rowset_thread_callback();
+
     int32_t _effective_cluster_id = -1;
     HeartbeatFlags* _heartbeat_flags = nullptr;
 
@@ -140,6 +147,7 @@ protected:
     std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
     std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
     std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
+    CountDownLatch _stop_background_threads_latch;
 
     // This mem tracker is only for tracking memory use by segment meta data 
such as footer or index page.
     // The memory consumed by querying is tracked in segment iterator.
@@ -147,6 +155,11 @@ protected:
     // is similar to `-2912341218700198079`. So, temporarily put it in 
experimental type tracker.
     // maybe have to use ColumnReader count as segment meta size.
     std::shared_ptr<MemTracker> _segment_meta_mem_tracker;
+
+    // Hold reference of quering rowsets
+    std::mutex _quering_rowsets_mutex;
+    std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId> 
_querying_rowsets;
+    scoped_refptr<Thread> _evict_quering_rowset_thread;
 };
 
 class StorageEngine final : public BaseStorageEngine {
@@ -266,12 +279,6 @@ public:
                                 int64_t transaction_id, bool is_recover);
     int64_t get_pending_publish_min_version(int64_t tablet_id);
 
-    void add_quering_rowset(RowsetSharedPtr rs);
-
-    RowsetSharedPtr get_quering_rowset(RowsetId rs_id);
-
-    void evict_querying_rowset(RowsetId rs_id);
-
     bool add_broken_path(std::string path);
     bool remove_broken_path(std::string path);
 
@@ -406,14 +413,9 @@ private:
     PendingRowsetSet _pending_local_rowsets;
     PendingRowsetSet _pending_remote_rowsets;
 
-    // Hold reference of quering rowsets
-    std::mutex _quering_rowsets_mutex;
-    std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId> 
_querying_rowsets;
-
     // Count the memory consumption of segment compaction tasks.
     std::shared_ptr<MemTracker> _segcompaction_mem_tracker;
 
-    CountDownLatch _stop_background_threads_latch;
     scoped_refptr<Thread> _unused_rowset_monitor_thread;
     // thread to monitor snapshot expiry
     scoped_refptr<Thread> _garbage_sweeper_thread;
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index fff1ed339d5..3bc9099477c 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -121,6 +121,7 @@ private:
     friend class SchemaScanner;
     friend class OlapTableSchemaParam;
     friend class PInternalServiceImpl;
+    friend class RowIdStorageReader;
     friend class Tablet;
     friend class TabletSchema;
 
@@ -410,6 +411,7 @@ private:
     friend class SchemaScanner;
     friend class OlapTableSchemaParam;
     friend class PInternalServiceImpl;
+    friend class RowIdStorageReader;
     friend class TabletSchema;
 
     const TupleId _id;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 2abd1867908..185bb34b391 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -57,6 +57,7 @@
 #include "common/logging.h"
 #include "common/signal_handler.h"
 #include "common/status.h"
+#include "exec/rowid_fetcher.h"
 #include "gen_cpp/BackendService.h"
 #include "gen_cpp/PaloInternalService_types.h"
 #include "gen_cpp/internal_service.pb.h"
@@ -181,25 +182,6 @@ private:
     google::protobuf::Closure* _done = nullptr;
 };
 
-template <typename T>
-concept CanCancel = requires(T* response) { response->mutable_status(); };
-
-template <CanCancel T>
-void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
-    brpc::ClosureGuard closure_guard(done);
-    response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-    response->mutable_status()->add_error_msgs("fail to offer request to the 
work pool, pool=" +
-                                               pool.get_info());
-    LOG(WARNING) << "cancelled due to fail to offer request to the work pool, 
pool="
-                 << pool.get_info();
-}
-
-template <typename T>
-void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
-    brpc::ClosureGuard closure_guard(done);
-    LOG(WARNING) << "fail to offer request to the work pool, pool=" << 
pool.get_info();
-}
-
 PInternalService::PInternalService(ExecEnv* exec_env)
         : _exec_env(exec_env),
           _heavy_work_pool(config::brpc_heavy_work_pool_threads != -1
@@ -1738,201 +1720,17 @@ void 
PInternalServiceImpl::response_slave_tablet_pull_rowset(
     }
 }
 
-template <typename Func>
-auto scope_timer_run(Func fn, int64_t* cost) -> decltype(fn()) {
-    MonotonicStopWatch watch;
-    watch.start();
-    auto res = fn();
-    *cost += watch.elapsed_time() / 1000 / 1000;
-    return res;
-}
-
-struct IteratorKey {
-    int64_t tablet_id;
-    RowsetId rowset_id;
-    uint64_t segment_id;
-    int slot_id;
-
-    // unordered map std::equal_to
-    bool operator==(const IteratorKey& rhs) const {
-        return tablet_id == rhs.tablet_id && rowset_id == rhs.rowset_id &&
-               segment_id == rhs.segment_id && slot_id == rhs.slot_id;
-    }
-};
-
-struct HashOfIteratorKey {
-    size_t operator()(const IteratorKey& key) const {
-        size_t seed = 0;
-        seed = HashUtil::hash64(&key.tablet_id, sizeof(key.tablet_id), seed);
-        seed = HashUtil::hash64(&key.rowset_id.hi, sizeof(key.rowset_id.hi), 
seed);
-        seed = HashUtil::hash64(&key.rowset_id.mi, sizeof(key.rowset_id.mi), 
seed);
-        seed = HashUtil::hash64(&key.rowset_id.lo, sizeof(key.rowset_id.lo), 
seed);
-        seed = HashUtil::hash64(&key.segment_id, sizeof(key.segment_id), seed);
-        seed = HashUtil::hash64(&key.slot_id, sizeof(key.slot_id), seed);
-        return seed;
-    }
-};
-
-struct IteratorItem {
-    std::unique_ptr<ColumnIterator> iterator;
-    // for holding the reference of segment to avoid use after release
-    SegmentSharedPtr segment;
-};
-
-Status PInternalServiceImpl::_multi_get(const PMultiGetRequest& request,
-                                        PMultiGetResponse* response) {
-    OlapReaderStatistics stats;
-    vectorized::Block result_block;
-    int64_t acquire_tablet_ms = 0;
-    int64_t acquire_rowsets_ms = 0;
-    int64_t acquire_segments_ms = 0;
-    int64_t lookup_row_data_ms = 0;
-
-    // init desc
-    TupleDescriptor desc(request.desc());
-    std::vector<SlotDescriptor> slots;
-    slots.reserve(request.slots().size());
-    for (const auto& pslot : request.slots()) {
-        slots.push_back(SlotDescriptor(pslot));
-        desc.add_slot(&slots.back());
-    }
-
-    // init read schema
-    TabletSchema full_read_schema;
-    for (const ColumnPB& column_pb : request.column_desc()) {
-        full_read_schema.append_column(TabletColumn(column_pb));
-    }
-
-    std::unordered_map<IteratorKey, IteratorItem, HashOfIteratorKey> 
iterator_map;
-    // read row by row
-    for (size_t i = 0; i < request.row_locs_size(); ++i) {
-        const auto& row_loc = request.row_locs(i);
-        MonotonicStopWatch watch;
-        watch.start();
-        TabletSharedPtr tablet = scope_timer_run(
-                [&]() {
-                    return 
_engine.tablet_manager()->get_tablet(row_loc.tablet_id(),
-                                                                true /*include 
deleted*/);
-                },
-                &acquire_tablet_ms);
-        RowsetId rowset_id;
-        rowset_id.init(row_loc.rowset_id());
-        if (!tablet) {
-            continue;
-        }
-        // We ensured it's rowset is not released when init Tablet reader 
param, rowset->update_delayed_expired_timestamp();
-        BetaRowsetSharedPtr rowset = 
std::static_pointer_cast<BetaRowset>(scope_timer_run(
-                [&]() { return _engine.get_quering_rowset(rowset_id); }, 
&acquire_rowsets_ms));
-        if (!rowset) {
-            LOG(INFO) << "no such rowset " << rowset_id;
-            continue;
-        }
-        size_t row_size = 0;
-        Defer _defer([&]() {
-            LOG_EVERY_N(INFO, 100)
-                    << "multiget_data single_row, cost(us):" << 
watch.elapsed_time() / 1000
-                    << ", row_size:" << row_size;
-            *response->add_row_locs() = row_loc;
-        });
-        SegmentCacheHandle segment_cache;
-        RETURN_IF_ERROR(scope_timer_run(
-                [&]() {
-                    return SegmentLoader::instance()->load_segments(rowset, 
&segment_cache, true);
-                },
-                &acquire_segments_ms));
-        // find segment
-        auto it = std::find_if(segment_cache.get_segments().cbegin(),
-                               segment_cache.get_segments().cend(),
-                               [&row_loc](const segment_v2::SegmentSharedPtr& 
seg) {
-                                   return seg->id() == row_loc.segment_id();
-                               });
-        if (it == segment_cache.get_segments().end()) {
-            continue;
-        }
-        segment_v2::SegmentSharedPtr segment = *it;
-        GlobalRowLoacation row_location(row_loc.tablet_id(), 
rowset->rowset_id(),
-                                        row_loc.segment_id(), 
row_loc.ordinal_id());
-        // fetch by row store, more effcient way
-        if (request.fetch_row_store()) {
-            CHECK(tablet->tablet_schema()->store_row_column());
-            RowLocation loc(rowset_id, segment->id(), row_loc.ordinal_id());
-            string* value = response->add_binary_row_data();
-            RETURN_IF_ERROR(scope_timer_run(
-                    [&]() {
-                        return tablet->lookup_row_data({}, loc, rowset, &desc, 
stats, *value);
-                    },
-                    &lookup_row_data_ms));
-            row_size = value->size();
-            continue;
-        }
-
-        // fetch by column store
-        if (result_block.is_empty_column()) {
-            result_block = vectorized::Block(desc.slots(), 
request.row_locs().size());
-        }
-        VLOG_DEBUG << "Read row location "
-                   << fmt::format("{}, {}, {}, {}", row_location.tablet_id,
-                                  
row_location.row_location.rowset_id.to_string(),
-                                  row_location.row_location.segment_id,
-                                  row_location.row_location.row_id);
-        for (int x = 0; x < desc.slots().size(); ++x) {
-            auto row_id = 
static_cast<segment_v2::rowid_t>(row_loc.ordinal_id());
-            vectorized::MutableColumnPtr column =
-                    result_block.get_by_position(x).column->assume_mutable();
-            IteratorKey iterator_key {.tablet_id = tablet->tablet_id(),
-                                      .rowset_id = rowset_id,
-                                      .segment_id = row_loc.segment_id(),
-                                      .slot_id = desc.slots()[x]->id()};
-            IteratorItem& iterator_item = iterator_map[iterator_key];
-            if (iterator_item.segment == nullptr) {
-                // hold the reference
-                iterator_map[iterator_key].segment = segment;
-            }
-            segment = iterator_item.segment;
-            RETURN_IF_ERROR(segment->seek_and_read_by_rowid(full_read_schema, 
desc.slots()[x],
-                                                            row_id, column, 
stats,
-                                                            
iterator_item.iterator));
-        }
-    }
-    // serialize block if not empty
-    if (!result_block.is_empty_column()) {
-        VLOG_DEBUG << "dump block:" << result_block.dump_data(0, 10)
-                   << ", be_exec_version:" << request.be_exec_version();
-        [[maybe_unused]] size_t compressed_size = 0;
-        [[maybe_unused]] size_t uncompressed_size = 0;
-        int be_exec_version = request.has_be_exec_version() ? 
request.be_exec_version() : 0;
-        RETURN_IF_ERROR(result_block.serialize(be_exec_version, 
response->mutable_block(),
-                                               &uncompressed_size, 
&compressed_size,
-                                               
segment_v2::CompressionTypePB::LZ4));
-    }
-
-    LOG(INFO) << "Query stats: "
-              << fmt::format(
-                         "hit_cached_pages:{}, total_pages_read:{}, 
compressed_bytes_read:{}, "
-                         "io_latency:{}ns, "
-                         "uncompressed_bytes_read:{},"
-                         "bytes_read:{},"
-                         "acquire_tablet_ms:{}, acquire_rowsets_ms:{}, 
acquire_segments_ms:{}, "
-                         "lookup_row_data_ms:{}",
-                         stats.cached_pages_num, stats.total_pages_num, 
stats.compressed_bytes_read,
-                         stats.io_ns, stats.uncompressed_bytes_read, 
stats.bytes_read,
-                         acquire_tablet_ms, acquire_rowsets_ms, 
acquire_segments_ms,
-                         lookup_row_data_ms);
-    return Status::OK();
-}
-
-void PInternalServiceImpl::multiget_data(google::protobuf::RpcController* 
controller,
-                                         const PMultiGetRequest* request,
-                                         PMultiGetResponse* response,
-                                         google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.try_offer([request, response, done, this]() {
+void PInternalService::multiget_data(google::protobuf::RpcController* 
controller,
+                                     const PMultiGetRequest* request, 
PMultiGetResponse* response,
+                                     google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([request, response, done]() {
         signal::set_signal_task_id(request->query_id());
         // multi get data by rowid
         MonotonicStopWatch watch;
         watch.start();
         brpc::ClosureGuard closure_guard(done);
         response->mutable_status()->set_status_code(0);
-        Status st = _multi_get(*request, response);
+        Status st = RowIdStorageReader::read_by_rowids(*request, response);
         st.to_protobuf(response->mutable_status());
         LOG(INFO) << "multiget_data finished, cost(us):" << 
watch.elapsed_time() / 1000;
     });
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index b9d3432982c..1cf2b5ae42f 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <brpc/closure_guard.h>
 #include <gen_cpp/internal_service.pb.h>
 
 #include <string>
@@ -38,6 +39,25 @@ class PHandShakeResponse;
 class LoadStreamMgr;
 class RuntimeState;
 
+template <typename T>
+concept CanCancel = requires(T* response) { response->mutable_status(); };
+
+template <typename T>
+void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
+    brpc::ClosureGuard closure_guard(done);
+    LOG(WARNING) << "fail to offer request to the work pool, pool=" << 
pool.get_info();
+}
+
+template <CanCancel T>
+void offer_failed(T* response, google::protobuf::Closure* done, const 
FifoThreadPool& pool) {
+    brpc::ClosureGuard closure_guard(done);
+    response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
+    response->mutable_status()->add_error_msgs("fail to offer request to the 
work pool, pool=" +
+                                               pool.get_info());
+    LOG(WARNING) << "cancelled due to fail to offer request to the work pool, 
pool="
+                 << pool.get_info();
+}
+
 class PInternalService : public PBackendService {
 public:
     PInternalService(ExecEnv* exec_env);
@@ -183,6 +203,9 @@ public:
                             PGetWalQueueSizeResponse* response,
                             google::protobuf::Closure* done) override;
 
+    void multiget_data(google::protobuf::RpcController* controller, const 
PMultiGetRequest* request,
+                       PMultiGetResponse* response, google::protobuf::Closure* 
done) override;
+
 private:
     void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* 
controller,
                                         const PExecPlanFragmentRequest* 
request,
@@ -233,8 +256,6 @@ public:
                                            const PTabletWriteSlaveDoneRequest* 
request,
                                            PTabletWriteSlaveDoneResult* 
response,
                                            google::protobuf::Closure* done) 
override;
-    void multiget_data(google::protobuf::RpcController* controller, const 
PMultiGetRequest* request,
-                       PMultiGetResponse* response, google::protobuf::Closure* 
done) override;
 
     void tablet_fetch_data(google::protobuf::RpcController* controller,
                            const PTabletKeyLookupRequest* request,
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 920e25f0904..e8353f9544d 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -395,19 +395,16 @@ Status NewOlapScanner::_init_tablet_reader_params(
 
     // If this is a Two-Phase read query, and we need to delay the release of 
Rowset
     // by rowset->update_delayed_expired_timestamp().This could expand the 
lifespan of Rowset
-    // TODO(plat1ko): CloudStorageEngine
-    if (!config::is_cloud_mode()) {
-        if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) {
-            constexpr static int delayed_s = 60;
-            for (auto rs_reader : _tablet_reader_params.rs_splits) {
-                uint64_t delayed_expired_timestamp =
-                        UnixSeconds() + 
_tablet_reader_params.runtime_state->execution_timeout() +
-                        delayed_s;
-                
rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp(
-                        delayed_expired_timestamp);
-                
ExecEnv::GetInstance()->storage_engine().to_local().add_quering_rowset(
-                        rs_reader.rs_reader->rowset());
-            }
+    if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) {
+        constexpr static int delayed_s = 60;
+        for (auto rs_reader : _tablet_reader_params.rs_splits) {
+            uint64_t delayed_expired_timestamp =
+                    UnixSeconds() + 
_tablet_reader_params.runtime_state->execution_timeout() +
+                    delayed_s;
+            rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp(
+                    delayed_expired_timestamp);
+            ExecEnv::GetInstance()->storage_engine().add_quering_rowset(
+                    rs_reader.rs_reader->rowset());
         }
     }
 
diff --git a/regression-test/pipeline/cloud_p0/conf/session_variables.sql 
b/regression-test/pipeline/cloud_p0/conf/session_variables.sql
index 8885476ee06..96a849bc1ad 100644
--- a/regression-test/pipeline/cloud_p0/conf/session_variables.sql
+++ b/regression-test/pipeline/cloud_p0/conf/session_variables.sql
@@ -3,4 +3,3 @@ set global insert_visible_timeout_ms=60000;
 set global enable_auto_analyze=false;
 set global enable_audit_plugin=true;
 set global enable_memtable_on_sink_node=false;
-set global enable_two_phase_read_opt = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to