This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b552f77e95 [opt](topn)Optimize the time for topn to lazy materialized 
reading of external tables (#52114)
6b552f77e95 is described below

commit 6b552f77e95d163e5f9abe5db2abb50523770497
Author: daidai <[email protected]>
AuthorDate: Mon Jul 14 13:16:46 2025 +0800

    [opt](topn)Optimize the time for topn to lazy materialized reading of 
external tables (#52114)
    
    ### What problem does this PR solve?
    Related PR: #51329
    
    Problem Summary:
    Topn lazy materialize was introduced in pr#51329 , but the
    implementation had performance issues when reading external tables. This
    pr is used for optimization.
    1. Before this, the materialization phase read one row of data from the
    file each time. This pr classifies according to scan_range and reads
    multiple rows of data from the file at one time.
    2. Before this, the materialization phase was a single-threaded file
    reading phase. This pr creates a scan task and submits the task to the
    workload group to improve the reading speed.
    3. Before this, the runtime profile was transmitted through thrift. This
    pr introduces the implementation of protobuf and adds the profile
    information of `RowIDFetcher` to `MATERIALIZATION_OPERATOR`.
    The example is as follows:
    1FE 2BE
    sql :select * from ali_hive.tpch100_orc.lineitem order by l_partkey
    limit 10;
    ```
    MATERIALIZATION_OPERATOR  (id=3):(ExecTime:  2.645ms)
            -  BlocksProduced:  5
            -  CloseTime:  0ns
            -  ExecTime:  2.645ms
            -  InitTime:  0ns
            -  MemoryUsage:  0.00
            -  MemoryUsagePeak:  0.00
            -  OpenTime:  0ns
            -  ProjectionTime:  528.913us
            -  RowsProduced:  10
            -  WaitForDependency[MATERIALIZATION_COUNTER_DEPENDENCY]Time:  
12sec874ms
        RowIDFetcher:  BackendId:1750838859134:
                -  FileReadBytes:  {[2.89  MB,  ],  [9.51  MB,  ],  [6.81  MB,  
],  [4.74  MB,  ],  [22.33  MB,  ],  }
                -  FileReadLines:  {[1,  ],  [1,  ],  [1,  ],  [1,  ],  [1,  ], 
 }
                -  FileReadTime:  {[102.960ms,],  [104.028ms,],  [99.817ms,],  
[98.260ms,],  [120.129ms,],  }
                -  GetBlockAvgTime:  {14ms,  2ms,  2ms,  1ms,  3ms,  }
                -  InitReaderAvgTime:  {14ms,  2ms,  2ms,  1ms,  3ms,  }
                -  ScannersRunningTime:  {130ms,  124ms,  116ms,  113ms,  
151ms,  }
        RowIDFetcher:  BackendId:1750936290862:
                -  FileReadBytes:  {[13.80  MB,  ],  [21.28  MB,  ],  [8.18  
MB,  ],  [16.69  MB,  ],  [19.16  MB,  ],  }
                -  FileReadLines:  {[1,  ],  [1,  ],  [1,  ],  [1,  ],  [1,  ], 
 }
                -  FileReadTime:  {[113.031ms,],  [132.087ms,],  [105.361ms,],  
[117.245ms,],  [125.535ms,],  }
                -  GetBlockAvgTime:  {2ms,  2ms,  2ms,  1ms,  3ms,  }
                -  InitReaderAvgTime:  {2ms,  2ms,  2ms,  1ms,  3ms,  }
                -  ScannersRunningTime:  {144ms,  160ms,  127ms,  142ms,  
159ms,  }
    ```
---
 be/src/exec/rowid_fetcher.cpp                      | 299 ++++++++++++++++++---
 be/src/exec/rowid_fetcher.h                        |  26 +-
 be/src/olap/id_manager.h                           |   6 +-
 be/src/pipeline/dependency.cpp                     |  35 +++
 be/src/pipeline/dependency.h                       |   6 +
 be/src/pipeline/exec/file_scan_operator.cpp        |   2 +-
 .../exec/materialization_source_operator.cpp       |  11 +
 be/src/runtime/runtime_state.cpp                   |   6 +-
 be/src/runtime/runtime_state.h                     |   3 +-
 be/src/util/runtime_profile.cpp                    | 140 ++++++++++
 be/src/util/runtime_profile.h                      | 122 +++++++++
 be/src/util/runtime_profile_counter_tree_node.cpp  |  50 ++++
 be/src/util/runtime_profile_counter_tree_node.h    |   6 +
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  76 +++---
 .../exec/format/parquet/vparquet_group_reader.cpp  |   1 +
 .../exec/format/parquet/vparquet_group_reader.h    |   1 -
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |   8 +-
 be/src/vec/exec/scan/file_scanner.cpp              |  26 +-
 be/src/vec/exec/scan/file_scanner.h                |  15 +-
 be/test/util/runtime_profile_test.cpp              | 229 ++++++++++++++++
 .../vec/exec/format/parquet/parquet_read_lines.cpp |   6 +-
 be/test/vec/exec/orc/orc_read_lines.cpp            |   6 +-
 gensrc/proto/internal_service.proto                |   2 +
 gensrc/proto/runtime_profile.proto                 |  91 +++++++
 24 files changed, 1063 insertions(+), 110 deletions(-)

diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index b8f00e4d16c..9369ffb43a2 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -45,6 +45,7 @@
 #include "common/config.h"
 #include "common/consts.h"
 #include "common/exception.h"
+#include "common/signal_handler.h"
 #include "exec/tablet_info.h" // DorisNodesInfo
 #include "olap/olap_common.h"
 #include "olap/rowset/beta_rowset.h"
@@ -58,6 +59,7 @@
 #include "runtime/fragment_mgr.h"  // FragmentMgr
 #include "runtime/runtime_state.h" // RuntimeState
 #include "runtime/types.h"
+#include "runtime/workload_group/workload_group_manager.h"
 #include "util/brpc_client_cache.h" // BrpcClientCache
 #include "util/defer_op.h"
 #include "vec/columns/column.h"
@@ -488,8 +490,9 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequestV2& request,
         int64_t acquire_segments_ms = 0;
         int64_t lookup_row_data_ms = 0;
 
-        int64_t external_init_reader_ms = 0;
-        int64_t external_get_block_ms = 0;
+        int64_t external_init_reader_avg_ms = 0;
+        int64_t external_get_block_avg_ms = 0;
+        size_t external_scan_range_cnt = 0;
 
         // Add counters for different file mapping types
         std::unordered_map<FileMappingType, int64_t> file_type_counts;
@@ -507,6 +510,7 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequestV2& request,
 
         for (int i = 0; i < request.request_block_descs_size(); ++i) {
             const auto& request_block_desc = request.request_block_descs(i);
+            PMultiGetBlockV2* pblock = response->add_blocks();
             if (request_block_desc.row_id_size() >= 1) {
                 // Since this block belongs to the same table, we only need to 
take the first type for judgment.
                 auto first_file_id = request_block_desc.file_id(0);
@@ -542,9 +546,10 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequestV2& request,
                                 &acquire_segments_ms, &lookup_row_data_ms));
                     } else {
                         RETURN_IF_ERROR(read_batch_external_row(
-                                request_block_desc, id_file_map, slots, 
first_file_mapping,
-                                tquery_id, result_blocks[i], 
&external_init_reader_ms,
-                                &external_get_block_ms));
+                                request.wg_id(), request_block_desc, 
id_file_map, slots,
+                                first_file_mapping, tquery_id, 
result_blocks[i],
+                                pblock->mutable_profile(), 
&external_init_reader_avg_ms,
+                                &external_get_block_avg_ms, 
&external_scan_range_cnt));
                     }
                 } catch (const Exception& e) {
                     return Status::Error<false>(e.code(), "Row id fetch failed 
because {}",
@@ -558,9 +563,9 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequestV2& request,
             [[maybe_unused]] size_t compressed_size = 0;
             [[maybe_unused]] size_t uncompressed_size = 0;
             int be_exec_version = request.has_be_exec_version() ? 
request.be_exec_version() : 0;
-            RETURN_IF_ERROR(result_blocks[i].serialize(
-                    be_exec_version, response->add_blocks()->mutable_block(), 
&uncompressed_size,
-                    &compressed_size, segment_v2::CompressionTypePB::LZ4));
+            RETURN_IF_ERROR(result_blocks[i].serialize(be_exec_version, 
pblock->mutable_block(),
+                                                       &uncompressed_size, 
&compressed_size,
+                                                       
segment_v2::CompressionTypePB::LZ4));
         }
 
         // Build file type statistics string
@@ -579,12 +584,14 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequestV2& request,
                              "io_latency:{}ns, uncompressed_bytes_read:{}, 
bytes_read:{}, "
                              "acquire_tablet_ms:{}, acquire_rowsets_ms:{}, 
acquire_segments_ms:{}, "
                              "lookup_row_data_ms:{}, file_types:[{}]; "
-                             "External table : init_reader_ms:{}, 
get_block_ms:{}",
+                             "External table : init_reader_ms:{}, 
get_block_ms:{}, "
+                             "external_scan_range_cnt:{}",
                              stats.cached_pages_num, stats.total_pages_num,
                              stats.compressed_bytes_read, stats.io_ns,
                              stats.uncompressed_bytes_read, stats.bytes_read, 
acquire_tablet_ms,
                              acquire_rowsets_ms, acquire_segments_ms, 
lookup_row_data_ms,
-                             file_type_stats, external_init_reader_ms, 
external_get_block_ms);
+                             file_type_stats, external_init_reader_avg_ms,
+                             external_get_block_avg_ms, 
external_scan_range_cnt);
     }
 
     if (request.has_gc_id_map() && request.gc_id_map()) {
@@ -635,22 +642,23 @@ Status RowIdStorageReader::read_batch_doris_format_row(
     return Status::OK();
 }
 
-Status RowIdStorageReader::read_batch_external_row(const PRequestBlockDesc& 
request_block_desc,
-                                                   std::shared_ptr<IdFileMap> 
id_file_map,
-                                                   
std::vector<SlotDescriptor>& slots,
-                                                   
std::shared_ptr<FileMapping> first_file_mapping,
-                                                   const TUniqueId& query_id,
-                                                   vectorized::Block& 
result_block,
-                                                   int64_t* init_reader_ms, 
int64_t* get_block_ms) {
+const std::string RowIdStorageReader::ScannersRunningTimeProfile = 
"ScannersRunningTime";
+const std::string RowIdStorageReader::InitReaderAvgTimeProfile = 
"InitReaderAvgTime";
+const std::string RowIdStorageReader::GetBlockAvgTimeProfile = 
"GetBlockAvgTime";
+const std::string RowIdStorageReader::FileReadLinesProfile = "FileReadLines";
+
+Status RowIdStorageReader::read_batch_external_row(
+        const uint64_t workload_group_id, const PRequestBlockDesc& 
request_block_desc,
+        std::shared_ptr<IdFileMap> id_file_map, std::vector<SlotDescriptor>& 
slots,
+        std::shared_ptr<FileMapping> first_file_mapping, const TUniqueId& 
query_id,
+        vectorized::Block& result_block, PRuntimeProfileTree* pprofile, 
int64_t* init_reader_avg_ms,
+        int64_t* get_block_avg_ms, size_t* scan_range_cnt) {
     TFileScanRangeParams rpc_scan_params;
     TupleDescriptor tuple_desc(request_block_desc.desc(), false);
     std::unordered_map<std::string, int> colname_to_slot_id;
-    std::unique_ptr<RuntimeState> runtime_state = nullptr;
-    std::unique_ptr<RuntimeProfile> runtime_profile;
-    runtime_profile = std::make_unique<RuntimeProfile>("ExternalRowIDFetcher");
-
-    std::unique_ptr<vectorized::FileScanner> vfile_scanner_ptr = nullptr;
+    std::shared_ptr<RuntimeState> runtime_state = nullptr;
 
+    int max_file_scanners = 0;
     {
         if (result_block.is_empty_column()) [[likely]] {
             result_block = vectorized::Block(slots, 
request_block_desc.row_id_size());
@@ -698,16 +706,38 @@ Status RowIdStorageReader::read_batch_external_row(const 
PRequestBlockDesc& requ
          * To ensure the same behavior as the scan stage, I get query_options 
query_globals from id_file_map, then create runtime_state
          * and pass it to vfile_scanner so that the runtime_state information 
is the same as the scan stage and the behavior is also consistent.
          */
-        runtime_state = RuntimeState::create_unique(query_id, -1, 
query_options, query_globals,
-                                                    ExecEnv::GetInstance());
-
-        vfile_scanner_ptr = vectorized::FileScanner::create_unique(
-                runtime_state.get(), runtime_profile.get(), &rpc_scan_params, 
&colname_to_slot_id,
-                &tuple_desc);
+        runtime_state = RuntimeState::create_shared(
+                query_id, -1, query_options, query_globals, 
ExecEnv::GetInstance(),
+                ExecEnv::GetInstance()->rowid_storage_reader_tracker());
 
-        
RETURN_IF_ERROR(vfile_scanner_ptr->prepare_for_read_one_line(first_scan_range_desc));
+        max_file_scanners = id_file_map->get_max_file_scanners();
     }
 
+    // Hash(TFileRangeDesc) => { all the rows that need to be read and their 
positions in the result block. } +  file mapping
+    std::map<std::string,
+             std::pair<std::map<segment_v2::rowid_t, size_t>, 
std::shared_ptr<FileMapping>>>
+            scan_rows;
+
+    // Block corresponding to the order of `scan_rows` map.
+    std::vector<vectorized::Block> scan_blocks;
+
+    // row_id (Indexing of vectors) => < In which block, which line in the 
block >
+    std::vector<std::pair<size_t, size_t>> row_id_block_idx;
+
+    // Count the time/bytes it takes to read each TFileRangeDesc. (for profile)
+    std::vector<ExternalFetchStatistics> fetch_statistics;
+
+    auto hash_file_range = [](const TFileRangeDesc& file_range_desc) {
+        std::string value;
+        value.resize(file_range_desc.path.size() + 
sizeof(file_range_desc.start_offset));
+        auto* ptr = value.data();
+
+        memcpy(ptr, &file_range_desc.start_offset, 
sizeof(file_range_desc.start_offset));
+        ptr += sizeof(file_range_desc.start_offset);
+        memcpy(ptr, file_range_desc.path.data(), file_range_desc.path.size());
+        return value;
+    };
+
     for (size_t j = 0; j < request_block_desc.row_id_size(); ++j) {
         auto file_id = request_block_desc.file_id(j);
         auto file_mapping = id_file_map->get_file_mapping(file_id);
@@ -717,19 +747,212 @@ Status RowIdStorageReader::read_batch_external_row(const 
PRequestBlockDesc& requ
                     BackendOptions::get_localhost(), print_id(query_id), 
file_id);
         }
 
-        auto& external_info = file_mapping->get_external_file_info();
-        auto& scan_range_desc = external_info.scan_range_desc;
+        const auto& external_info = file_mapping->get_external_file_info();
+        const auto& scan_range_desc = external_info.scan_range_desc;
 
-        // Clear to avoid reading iceberg position delete file...
-        scan_range_desc.table_format_params.iceberg_params = TIcebergFileDesc 
{};
+        auto scan_range_hash = hash_file_range(scan_range_desc);
+        if (scan_rows.contains(scan_range_hash)) {
+            
scan_rows.at(scan_range_hash).first.emplace(request_block_desc.row_id(j), j);
+        } else {
+            std::map<segment_v2::rowid_t, size_t> tmp 
{{request_block_desc.row_id(j), j}};
+            scan_rows.emplace(scan_range_hash, std::make_pair(tmp, 
file_mapping));
+        }
+    }
 
-        // Clear to avoid reading hive transactional delete delta file...
-        scan_range_desc.table_format_params.transactional_hive_params = 
TTransactionalHiveDesc {};
+    scan_blocks.resize(scan_rows.size());
+    row_id_block_idx.resize(request_block_desc.row_id_size());
+    fetch_statistics.resize(scan_rows.size());
+
+    // Get the workload group for subsequent scan task submission.
+    std::vector<uint64_t> workload_group_ids;
+    workload_group_ids.emplace_back(workload_group_id);
+    auto wg = 
ExecEnv::GetInstance()->workload_group_mgr()->get_group(workload_group_ids);
+    doris::pipeline::TaskScheduler* exec_sched = nullptr;
+    vectorized::SimplifiedScanScheduler* scan_sched = nullptr;
+    vectorized::SimplifiedScanScheduler* remote_scan_sched = nullptr;
+    wg->get_query_scheduler(&exec_sched, &scan_sched, &remote_scan_sched);
+    DCHECK(remote_scan_sched);
+
+    int64_t scan_running_time = 0;
+    RETURN_IF_ERROR(scope_timer_run(
+            [&]() -> Status {
+                // Make sure to insert data into result_block only after all 
scan tasks have been executed.
+                std::atomic<int> producer_count {0};
+                std::condition_variable cv;
+                std::mutex mtx;
+
+                //semaphore: Limit the number of scan tasks submitted at one 
time
+                std::counting_semaphore semaphore {max_file_scanners};
+
+                size_t idx = 0;
+                for (const auto& [_, scan_info] : scan_rows) {
+                    semaphore.acquire();
+                    RETURN_IF_ERROR(
+                            
remote_scan_sched->submit_scan_task(vectorized::SimplifiedScanTask(
+                                    [&, scan_info, idx]() {
+                                        auto& row_ids = scan_info.first;
+                                        auto& file_mapping = scan_info.second;
+
+                                        SCOPED_ATTACH_TASK(
+                                                ExecEnv::GetInstance()
+                                                        
->rowid_storage_reader_tracker());
+                                        signal::set_signal_task_id(query_id);
+
+                                        scan_blocks[idx] =
+                                                vectorized::Block(slots, 
scan_info.first.size());
+
+                                        size_t j = 0;
+                                        std::list<int64_t> read_ids;
+                                        //Generate an ordered list with the 
help of the orderliness of the map.
+                                        for (const auto& [row_id, 
result_block_idx] : row_ids) {
+                                            read_ids.emplace_back(row_id);
+                                            row_id_block_idx[result_block_idx] 
=
+                                                    std::make_pair(idx, j);
+                                            j++;
+                                        }
+
+                                        auto& external_info =
+                                                
file_mapping->get_external_file_info();
+                                        auto& scan_range_desc = 
external_info.scan_range_desc;
+
+                                        // Clear to avoid reading iceberg 
position delete file...
+                                        
scan_range_desc.table_format_params.iceberg_params =
+                                                TIcebergFileDesc {};
+
+                                        // Clear to avoid reading hive 
transactional delete delta file...
+                                        scan_range_desc.table_format_params
+                                                .transactional_hive_params =
+                                                TTransactionalHiveDesc {};
+
+                                        std::unique_ptr<RuntimeProfile> 
sub_runtime_profile =
+                                                
std::make_unique<RuntimeProfile>(
+                                                        
"ExternalRowIDFetcher");
+                                        {
+                                            
std::unique_ptr<vectorized::FileScanner>
+                                                    vfile_scanner_ptr =
+                                                            
vectorized::FileScanner::create_unique(
+                                                                    
runtime_state.get(),
+                                                                    
sub_runtime_profile.get(),
+                                                                    
&rpc_scan_params,
+                                                                    
&colname_to_slot_id,
+                                                                    
&tuple_desc);
+
+                                            RETURN_IF_ERROR(
+                                                    
vfile_scanner_ptr->prepare_for_read_lines(
+                                                            scan_range_desc));
+                                            RETURN_IF_ERROR(
+                                                    
vfile_scanner_ptr->read_lines_from_range(
+                                                            scan_range_desc, 
read_ids,
+                                                            &scan_blocks[idx], 
external_info,
+                                                            
&fetch_statistics[idx].init_reader_ms,
+                                                            
&fetch_statistics[idx].get_block_ms));
+                                        }
+
+                                        auto file_read_bytes_counter =
+                                                
sub_runtime_profile->get_counter(
+                                                        
vectorized::FileScanner::
+                                                                
FileReadBytesProfile);
+
+                                        if (file_read_bytes_counter != 
nullptr) {
+                                            
fetch_statistics[idx].file_read_bytes =
+                                                    PrettyPrinter::print(
+                                                            
file_read_bytes_counter->value(),
+                                                            
file_read_bytes_counter->type());
+                                        }
+
+                                        auto file_read_times_counter =
+                                                
sub_runtime_profile->get_counter(
+                                                        
vectorized::FileScanner::
+                                                                
FileReadTimeProfile);
+                                        if (file_read_times_counter != 
nullptr) {
+                                            
fetch_statistics[idx].file_read_times =
+                                                    PrettyPrinter::print(
+                                                            
file_read_times_counter->value(),
+                                                            
file_read_times_counter->type());
+                                        }
+
+                                        semaphore.release();
+                                        if (++producer_count == 
scan_rows.size()) {
+                                            std::lock_guard<std::mutex> 
lock(mtx);
+                                            cv.notify_one();
+                                        }
+                                        return Status::OK();
+                                    },
+                                    nullptr)));
+                    idx++;
+                }
+
+                {
+                    std::unique_lock<std::mutex> lock(mtx);
+                    cv.wait(lock, [&] { return producer_count == 
scan_rows.size(); });
+                }
+                return Status::OK();
+            },
+            &scan_running_time));
+
+    // Insert the read data into result_block.
+    for (size_t column_id = 0; column_id < result_block.get_columns().size(); 
column_id++) {
+        auto dst_col =
+                
const_cast<vectorized::IColumn*>(result_block.get_columns()[column_id].get());
+
+        std::vector<const vectorized::IColumn*> scan_src_columns;
+        scan_src_columns.reserve(row_id_block_idx.size());
+        std::vector<size_t> scan_positions;
+        scan_positions.reserve(row_id_block_idx.size());
+        for (const auto& [pos_block, block_idx] : row_id_block_idx) {
+            DCHECK(scan_blocks.size() > pos_block);
+            DCHECK(scan_blocks[pos_block].get_columns().size() > column_id);
+            
scan_src_columns.emplace_back(scan_blocks[pos_block].get_columns()[column_id].get());
+            scan_positions.emplace_back(block_idx);
+        }
+        dst_col->insert_from_multi_column(scan_src_columns, scan_positions);
+    }
 
-        RETURN_IF_ERROR(vfile_scanner_ptr->read_one_line_from_range(
-                scan_range_desc, request_block_desc.row_id(j), &result_block, 
external_info,
-                init_reader_ms, get_block_ms));
+    // Statistical runtime profile information.
+    std::unique_ptr<RuntimeProfile> runtime_profile =
+            std::make_unique<RuntimeProfile>("ExternalRowIDFetcher");
+    {
+        runtime_profile->add_info_string(ScannersRunningTimeProfile,
+                                         std::to_string(scan_running_time) + 
"ms");
+        fmt::memory_buffer file_read_lines_buffer;
+        format_to(file_read_lines_buffer, "[");
+        fmt::memory_buffer file_read_bytes_buffer;
+        format_to(file_read_bytes_buffer, "[");
+        fmt::memory_buffer file_read_times_buffer;
+        format_to(file_read_times_buffer, "[");
+
+        size_t idx = 0;
+        for (const auto& [_, scan_info] : scan_rows) {
+            format_to(file_read_lines_buffer, "{}, ", scan_info.first.size());
+            *init_reader_avg_ms = fetch_statistics[idx].init_reader_ms;
+            *get_block_avg_ms += fetch_statistics[idx].get_block_ms;
+            format_to(file_read_bytes_buffer, "{}, ", 
fetch_statistics[idx].file_read_bytes);
+            format_to(file_read_times_buffer, "{}, ", 
fetch_statistics[idx].file_read_times);
+            idx++;
+        }
+
+        format_to(file_read_lines_buffer, "]");
+        format_to(file_read_bytes_buffer, "]");
+        format_to(file_read_times_buffer, "]");
+
+        *init_reader_avg_ms /= fetch_statistics.size();
+        *get_block_avg_ms /= fetch_statistics.size();
+        runtime_profile->add_info_string(InitReaderAvgTimeProfile,
+                                         std::to_string(*init_reader_avg_ms) + 
"ms");
+        runtime_profile->add_info_string(GetBlockAvgTimeProfile,
+                                         std::to_string(*init_reader_avg_ms) + 
"ms");
+        runtime_profile->add_info_string(FileReadLinesProfile,
+                                         
fmt::to_string(file_read_lines_buffer));
+        
runtime_profile->add_info_string(vectorized::FileScanner::FileReadBytesProfile,
+                                         
fmt::to_string(file_read_bytes_buffer));
+        
runtime_profile->add_info_string(vectorized::FileScanner::FileReadTimeProfile,
+                                         
fmt::to_string(file_read_times_buffer));
     }
+
+    runtime_profile->to_proto(pprofile, 2);
+
+    *scan_range_cnt = scan_rows.size();
+
     return Status::OK();
 }
 
diff --git a/be/src/exec/rowid_fetcher.h b/be/src/exec/rowid_fetcher.h
index c3cc48db6d4..08d306f7bb6 100644
--- a/be/src/exec/rowid_fetcher.h
+++ b/be/src/exec/rowid_fetcher.h
@@ -86,6 +86,12 @@ struct RowStoreReadStruct {
 
 class RowIdStorageReader {
 public:
+    //external profile info key.
+    static const std::string ScannersRunningTimeProfile;
+    static const std::string InitReaderAvgTimeProfile;
+    static const std::string GetBlockAvgTimeProfile;
+    static const std::string FileReadLinesProfile;
+
     static Status read_by_rowids(const PMultiGetRequest& request, 
PMultiGetResponse* response);
     static Status read_by_rowids(const PMultiGetRequestV2& request, 
PMultiGetResponseV2* response);
 
@@ -107,13 +113,19 @@ private:
             int64_t* acquire_tablet_ms, int64_t* acquire_rowsets_ms, int64_t* 
acquire_segments_ms,
             int64_t* lookup_row_data_ms);
 
-    static Status read_batch_external_row(const PRequestBlockDesc& 
request_block_desc,
-                                          std::shared_ptr<IdFileMap> 
id_file_map,
-                                          std::vector<SlotDescriptor>& slots,
-                                          std::shared_ptr<FileMapping> 
first_file_mapping,
-                                          const TUniqueId& query_id,
-                                          vectorized::Block& result_block, 
int64_t* init_reader_ms,
-                                          int64_t* get_block_ms);
+    static Status read_batch_external_row(
+            const uint64_t workload_group_id, const PRequestBlockDesc& 
request_block_desc,
+            std::shared_ptr<IdFileMap> id_file_map, 
std::vector<SlotDescriptor>& slots,
+            std::shared_ptr<FileMapping> first_file_mapping, const TUniqueId& 
query_id,
+            vectorized::Block& result_block, PRuntimeProfileTree* pprofile,
+            int64_t* init_reader_avg_ms, int64_t* get_block_avg_ms, size_t* 
scan_range_cnt);
+
+    struct ExternalFetchStatistics {
+        int64_t init_reader_ms = 0;
+        int64_t get_block_ms = 0;
+        std::string file_read_bytes;
+        std::string file_read_times;
+    };
 };
 
 template <typename Func>
diff --git a/be/src/olap/id_manager.h b/be/src/olap/id_manager.h
index 94765e5eab1..4b9974e0ee3 100644
--- a/be/src/olap/id_manager.h
+++ b/be/src/olap/id_manager.h
@@ -197,12 +197,13 @@ public:
 
     int64_t get_delayed_expired_timestamp() { return 
delayed_expired_timestamp; }
 
-    void set_external_scan_params(QueryContext* query_ctx) {
+    void set_external_scan_params(QueryContext* query_ctx, int 
max_file_scanners) {
         std::call_once(once_flag_for_external, [&] {
             DCHECK(query_ctx != nullptr);
             _query_global = query_ctx->get_query_globals();
             _query_options = query_ctx->get_query_options();
             _file_scan_range_params_map = 
query_ctx->file_scan_range_params_map;
+            _max_file_scanners = max_file_scanners;
         });
     }
 
@@ -214,6 +215,8 @@ public:
         return _file_scan_range_params_map;
     }
 
+    int get_max_file_scanners() const { return _max_file_scanners; }
+
 private:
     std::shared_mutex _mtx;
     uint32_t _init_id = 0;
@@ -225,6 +228,7 @@ private:
     TQueryOptions _query_options;
     std::map<int, TFileScanRangeParams> _file_scan_range_params_map;
     std::once_flag once_flag_for_external;
+    int _max_file_scanners = 10;
 
     // use in Doris Format to keep temp rowsets, preventing them from being 
deleted by compaction
     std::unordered_map<std::pair<int64_t, RowsetId>, RowsetSharedPtr> 
_temp_rowset_maps;
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 0292feb2d2c..035178763fb 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -21,6 +21,7 @@
 #include <mutex>
 
 #include "common/logging.h"
+#include "exec/rowid_fetcher.h"
 #include "pipeline/exec/multi_cast_data_streamer.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/pipeline_task.h"
@@ -28,6 +29,7 @@
 #include "runtime/memory/mem_tracker.h"
 #include "runtime_filter/runtime_filter_consumer.h"
 #include "util/brpc_client_cache.h"
+#include "vec/exec/scan/file_scanner.h"
 #include "vec/exprs/vectorized_agg_fn.h"
 #include "vec/exprs/vslot_ref.h"
 #include "vec/spill/spill_stream_manager.h"
@@ -480,6 +482,11 @@ Status 
MaterializationSharedState::merge_multi_response(vectorized::Block* block
             DCHECK(rpc_struct.callback->response_->blocks_size() > i);
             RETURN_IF_ERROR(
                     
partial_block.deserialize(rpc_struct.callback->response_->blocks(i).block()));
+            if (rpc_struct.callback->response_->blocks(i).has_profile()) {
+                auto response_profile = RuntimeProfile::from_proto(
+                        rpc_struct.callback->response_->blocks(i).profile());
+                _update_profile_info(backend_id, response_profile.get());
+            }
 
             if (!partial_block.is_empty_column()) {
                 _block_maps[backend_id] = 
std::make_pair(std::move(partial_block), 0);
@@ -533,6 +540,34 @@ Status 
MaterializationSharedState::merge_multi_response(vectorized::Block* block
     return Status::OK();
 }
 
+void MaterializationSharedState::_update_profile_info(int64_t backend_id,
+                                                      RuntimeProfile* 
response_profile) {
+    if (!backend_profile_info_string.contains(backend_id)) {
+        backend_profile_info_string.emplace(backend_id,
+                                            std::map<std::string, 
fmt::memory_buffer> {});
+    }
+    auto& info_map = backend_profile_info_string[backend_id];
+
+    auto update_profile_info_key = [&](const std::string& info_key) {
+        const auto* info_value = response_profile->get_info_string(info_key);
+        if (info_value == nullptr) [[unlikely]] {
+            LOG(WARNING) << "Get row id fetch rpc profile success, but no info 
key :" << info_key;
+            return;
+        }
+        if (!info_map.contains(info_key)) {
+            info_map.emplace(info_key, fmt::memory_buffer {});
+        }
+        fmt::format_to(info_map[info_key], "{}, ", *info_value);
+    };
+
+    update_profile_info_key(RowIdStorageReader::ScannersRunningTimeProfile);
+    update_profile_info_key(RowIdStorageReader::InitReaderAvgTimeProfile);
+    update_profile_info_key(RowIdStorageReader::GetBlockAvgTimeProfile);
+    update_profile_info_key(RowIdStorageReader::FileReadLinesProfile);
+    update_profile_info_key(vectorized::FileScanner::FileReadBytesProfile);
+    update_profile_info_key(vectorized::FileScanner::FileReadTimeProfile);
+}
+
 void MaterializationSharedState::create_counter_dependency(int operator_id, 
int node_id,
                                                            const std::string& 
name) {
     auto dep =
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 847ed44e2ac..eec06b115cc 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -824,6 +824,10 @@ public:
 
     void create_counter_dependency(int operator_id, int node_id, const 
std::string& name);
 
+private:
+    void _update_profile_info(int64_t backend_id, RuntimeProfile* 
response_profile);
+
+public:
     bool rpc_struct_inited = false;
     AtomicStatus rpc_status;
 
@@ -838,6 +842,8 @@ public:
     // Register each line in which block to ensure the order of the result.
     // Zero means NULL value.
     std::vector<std::vector<int64_t>> block_order_results;
+    // backend id => <rpc profile info string key, rpc profile info string 
value>.
+    std::map<int64_t, std::map<std::string, fmt::memory_buffer>> 
backend_profile_info_string;
 };
 #include "common/compile_check_end.h"
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 99081d18518..4f2a459e206 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -39,7 +39,7 @@ Status 
FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
 
     auto& id_file_map = state()->get_id_file_map();
     if (id_file_map != nullptr) {
-        id_file_map->set_external_scan_params(state()->get_query_ctx());
+        id_file_map->set_external_scan_params(state()->get_query_ctx(), 
_max_scanners);
     }
 
     auto& p = _parent->cast<FileScanOperatorX>();
diff --git a/be/src/pipeline/exec/materialization_source_operator.cpp 
b/be/src/pipeline/exec/materialization_source_operator.cpp
index 396ce319fb9..23d17c0167d 100644
--- a/be/src/pipeline/exec/materialization_source_operator.cpp
+++ b/be/src/pipeline/exec/materialization_source_operator.cpp
@@ -51,6 +51,17 @@ Status 
MaterializationSourceOperatorX::get_block(RuntimeState* state, vectorized
             max_rpc_time = std::max(max_rpc_time, 
rpc_struct.rpc_timer.elapsed_time());
         }
         COUNTER_SET(local_state._max_rpc_timer, (int64_t)max_rpc_time);
+
+        for (const auto& [backend_id, child_info] :
+             local_state._shared_state->backend_profile_info_string) {
+            auto child_profile = local_state.operator_profile()->create_child(
+                    "RowIDFetcher: BackendId:" + std::to_string(backend_id));
+            for (const auto& [info_key, info_value] :
+                 
local_state._shared_state->backend_profile_info_string[backend_id]) {
+                child_profile->add_info_string(info_key, "{" + 
fmt::to_string(info_value) + "}");
+            }
+            local_state.operator_profile()->add_child(child_profile, true);
+        }
     }
 
     return Status::OK();
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index af1670125be..ba459b7323b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -132,7 +132,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, 
int32_t fragment_id,
 
 RuntimeState::RuntimeState(const TUniqueId& query_id, int32_t fragment_id,
                            const TQueryOptions& query_options, const 
TQueryGlobals& query_globals,
-                           ExecEnv* exec_env)
+                           ExecEnv* exec_env,
+                           const std::shared_ptr<MemTrackerLimiter>& 
query_mem_tracker)
         : _profile("PipelineX  " + std::to_string(fragment_id)),
           _load_channel_profile("<unnamed>"),
           _obj_pool(new ObjectPool()),
@@ -150,7 +151,8 @@ RuntimeState::RuntimeState(const TUniqueId& query_id, 
int32_t fragment_id,
           _error_row_number(0) {
     Status status = init(TUniqueId(), query_options, query_globals, exec_env);
     DCHECK(status.ok());
-    init_mem_trackers("<unnamed>");
+    _query_mem_tracker = query_mem_tracker;
+    DCHECK(_query_mem_tracker != nullptr);
 }
 
 RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cf9333ff980..38331732e2c 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -97,7 +97,8 @@ public:
     // Only used in the materialization phase of delayed materialization,
     // when there may be no corresponding QueryContext.
     RuntimeState(const TUniqueId& query_id, int32_t fragment_id, const 
TQueryOptions& query_options,
-                 const TQueryGlobals& query_globals, ExecEnv* exec_env);
+                 const TQueryGlobals& query_globals, ExecEnv* exec_env,
+                 const std::shared_ptr<MemTrackerLimiter>& query_mem_tracker);
 
     // RuntimeState for executing expr in fe-support.
     RuntimeState(const TQueryGlobals& query_globals);
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index 3a5e066c561..251e3ec23af 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -58,6 +58,17 @@ std::unique_ptr<RuntimeProfile> 
RuntimeProfile::from_thrift(const TRuntimeProfil
     return res;
 }
 
+std::unique_ptr<RuntimeProfile> RuntimeProfile::from_proto(const 
PRuntimeProfileTree& tree) {
+    if (tree.nodes().empty()) {
+        return std::make_unique<RuntimeProfile>("");
+    }
+
+    const PRuntimeProfileNode& root_node = tree.nodes(0);
+    std::unique_ptr<RuntimeProfile> res = 
std::make_unique<RuntimeProfile>(root_node.name());
+    res->update(tree);
+    return res;
+}
+
 RuntimeProfile::RuntimeProfile(const std::string& name, bool 
is_averaged_profile)
         : _pool(new ObjectPool()),
           _name(name),
@@ -151,6 +162,12 @@ void RuntimeProfile::update(const TRuntimeProfileTree& 
thrift_profile) {
     DCHECK_EQ(idx, thrift_profile.nodes.size());
 }
 
+void RuntimeProfile::update(const PRuntimeProfileTree& proto_profile) {
+    int idx = 0;
+    update(proto_profile.nodes(), &idx);
+    DCHECK_EQ(idx, proto_profile.nodes_size());
+}
+
 void RuntimeProfile::update(const std::vector<TRuntimeProfileNode>& nodes, 
int* idx) {
     DCHECK_LT(*idx, nodes.size());
     const TRuntimeProfileNode& node = nodes[*idx];
@@ -233,6 +250,82 @@ void RuntimeProfile::update(const 
std::vector<TRuntimeProfileNode>& nodes, int*
     }
 }
 
+void RuntimeProfile::update(const 
google::protobuf::RepeatedPtrField<PRuntimeProfileNode>& nodes,
+                            int* idx) {
+    DCHECK_LT(*idx, nodes.size());
+    const PRuntimeProfileNode& node = nodes.Get(*idx);
+
+    {
+        std::lock_guard<std::mutex> l(_counter_map_lock);
+
+        for (const auto& pcounter : node.counters()) {
+            const std::string& name = pcounter.name();
+            auto j = _counter_map.find(name);
+
+            if (j == _counter_map.end()) {
+                _counter_map[name] =
+                        _pool->add(new 
Counter(unit_to_thrift(pcounter.type()), pcounter.value()));
+            } else {
+                if (unit_to_proto(j->second->type()) != pcounter.type()) {
+                    LOG(ERROR) << "Cannot update counters with the same name 
(" << name
+                               << ") but different types.";
+                } else {
+                    j->second->set(pcounter.value());
+                }
+            }
+        }
+
+        for (const auto& kv : node.child_counters_map()) {
+            std::set<std::string>* child_counters =
+                    find_or_insert(&_child_counter_map, kv.first, 
std::set<std::string>());
+            for (const auto& child_name : kv.second.child_counters()) {
+                child_counters->insert(child_name);
+            }
+        }
+    }
+
+    {
+        std::lock_guard<std::mutex> l(_info_strings_lock);
+        const auto& info_map = node.info_strings();
+
+        for (const std::string& key : node.info_strings_display_order()) {
+            auto it = info_map.find(key);
+            DCHECK(it != info_map.end());
+
+            auto existing = _info_strings.find(key);
+            if (existing == _info_strings.end()) {
+                _info_strings.insert(std::make_pair(key, it->second));
+                _info_strings_display_order.push_back(key);
+            } else {
+                _info_strings[key] = it->second;
+            }
+        }
+    }
+
+    ++*idx;
+
+    {
+        std::lock_guard<std::mutex> l(_children_lock);
+        for (int i = 0; i < node.num_children(); ++i) {
+            const PRuntimeProfileNode& pchild = nodes.Get(*idx);
+            RuntimeProfile* child = nullptr;
+
+            auto j = _child_map.find(pchild.name());
+            if (j != _child_map.end()) {
+                child = j->second;
+            } else {
+                child = _pool->add(new RuntimeProfile(pchild.name()));
+                child->_metadata = pchild.metadata();
+                child->_timestamp = pchild.timestamp();
+                _child_map[pchild.name()] = child;
+                _children.emplace_back(child, pchild.indent());
+            }
+
+            child->update(nodes, idx);
+        }
+    }
+}
+
 void RuntimeProfile::divide(int n) {
     DCHECK_GT(n, 0);
     std::map<std::string, Counter*>::iterator iter;
@@ -637,6 +730,53 @@ void 
RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes, int64_t
     }
 }
 
+void RuntimeProfile::to_proto(PRuntimeProfileTree* tree, int64_t 
profile_level) {
+    tree->clear_nodes();
+    to_proto(tree->mutable_nodes(), profile_level);
+}
+
+void 
RuntimeProfile::to_proto(google::protobuf::RepeatedPtrField<PRuntimeProfileNode>*
 nodes,
+                              int64_t profile_level) {
+    PRuntimeProfileNode* node = nodes->Add(); // allocate new node
+    node->set_name(_name);
+    node->set_metadata(_metadata);
+    node->set_timestamp(_timestamp);
+    node->set_indent(true);
+
+    {
+        std::lock_guard<std::mutex> l(_counter_map_lock);
+        RuntimeProfileCounterTreeNode counter_tree = 
RuntimeProfileCounterTreeNode::from_map(
+                _counter_map, _child_counter_map, ROOT_COUNTER);
+        counter_tree = 
RuntimeProfileCounterTreeNode::prune_the_tree(counter_tree, profile_level);
+        counter_tree.to_proto(node->mutable_counters(), 
node->mutable_child_counters_map());
+    }
+
+    {
+        std::lock_guard<std::mutex> l(_info_strings_lock);
+        auto* info_map = node->mutable_info_strings();
+        for (const auto& kv : _info_strings) {
+            (*info_map)[kv.first] = kv.second;
+        }
+        for (const auto& key : _info_strings_display_order) {
+            node->add_info_strings_display_order(key);
+        }
+    }
+
+    ChildVector children;
+    {
+        std::lock_guard<std::mutex> l(_children_lock);
+        children = _children;
+    }
+
+    node->set_num_children(children.size());
+
+    for (const auto& child : children) {
+        int child_index = nodes->size(); // capture index for indent correction
+        child.first->to_proto(nodes, profile_level);
+        (*nodes)[child_index].set_indent(child.second);
+    }
+}
+
 int64_t RuntimeProfile::units_per_second(const RuntimeProfile::Counter* 
total_counter,
                                          const RuntimeProfile::Counter* timer) 
{
     DCHECK(total_counter->type() == TUnit::BYTES || total_counter->type() == 
TUnit::UNIT);
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 0670eed5177..86f4bff4d5c 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -22,6 +22,7 @@
 
 #include <gen_cpp/Metrics_types.h>
 #include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/runtime_profile.pb.h>
 #include <glog/logging.h>
 #include <stdint.h>
 
@@ -99,6 +100,86 @@ class RuntimeProfile {
 public:
     static std::unique_ptr<RuntimeProfile> from_thrift(const 
TRuntimeProfileTree& node);
 
+    static std::unique_ptr<RuntimeProfile> from_proto(const 
PRuntimeProfileTree& tree);
+
+    static PProfileUnit unit_to_proto(const TUnit::type& type) {
+        switch (type) {
+        case TUnit::UNIT: {
+            return PProfileUnit::UNIT;
+        }
+        case TUnit::UNIT_PER_SECOND: {
+            return PProfileUnit::UNIT_PER_SECOND;
+        }
+        case TUnit::CPU_TICKS: {
+            return PProfileUnit::CPU_TICKS;
+        }
+        case TUnit::BYTES: {
+            return PProfileUnit::BYTES;
+        }
+        case TUnit::BYTES_PER_SECOND: {
+            return PProfileUnit::BYTES_PER_SECOND;
+        }
+        case TUnit::TIME_NS: {
+            return PProfileUnit::TIME_NS;
+        }
+        case TUnit::DOUBLE_VALUE: {
+            return PProfileUnit::DOUBLE_VALUE;
+        }
+        case TUnit::NONE: {
+            return PProfileUnit::NONE;
+        }
+        case TUnit::TIME_MS: {
+            return PProfileUnit::TIME_MS;
+        }
+        case TUnit::TIME_S: {
+            return PProfileUnit::TIME_S;
+        }
+        default: {
+            DCHECK(false);
+            return PProfileUnit::NONE;
+        }
+        }
+    }
+
+    static TUnit::type unit_to_thrift(const PProfileUnit& unit) {
+        switch (unit) {
+        case PProfileUnit::UNIT: {
+            return TUnit::UNIT;
+        }
+        case PProfileUnit::UNIT_PER_SECOND: {
+            return TUnit::UNIT_PER_SECOND;
+        }
+        case PProfileUnit::CPU_TICKS: {
+            return TUnit::CPU_TICKS;
+        }
+        case PProfileUnit::BYTES: {
+            return TUnit::BYTES;
+        }
+        case PProfileUnit::BYTES_PER_SECOND: {
+            return TUnit::BYTES_PER_SECOND;
+        }
+        case PProfileUnit::TIME_NS: {
+            return TUnit::TIME_NS;
+        }
+        case PProfileUnit::DOUBLE_VALUE: {
+            return TUnit::DOUBLE_VALUE;
+        }
+        case PProfileUnit::NONE: {
+            return TUnit::NONE;
+        }
+        case PProfileUnit::TIME_MS: {
+            return TUnit::TIME_MS;
+        }
+        case PProfileUnit::TIME_S: {
+            return TUnit::TIME_S;
+        }
+        default: {
+            DCHECK(false);
+            return TUnit::NONE;
+        }
+        }
+    }
+
     // The root counter name for all top level counters.
     static const std::string ROOT_COUNTER;
     class Counter {
@@ -135,6 +216,15 @@ public:
             return counter;
         }
 
+        virtual PProfileCounter to_proto(const std::string& name) const {
+            PProfileCounter counter;
+            counter.set_name(name);
+            counter.set_value(this->value());
+            counter.set_type(unit_to_proto(this->type()));
+            counter.set_level(this->value());
+            return counter;
+        }
+
         virtual void pretty_print(std::ostream* s, const std::string& prefix,
                                   const std::string& name) const {
             std::ostream& stream = *s;
@@ -192,6 +282,15 @@ public:
             return counter;
         }
 
+        PProfileCounter to_proto(const std::string& name) const override {
+            PProfileCounter counter;
+            counter.set_name(name);
+            counter.set_value(current_value());
+            counter.set_type(unit_to_proto(this->type()));
+            counter.set_level(this->value());
+            return counter;
+        }
+
         TCounter to_thrift_peak(std::string name) {
             TCounter counter;
             counter.name = std::move(name);
@@ -201,6 +300,10 @@ public:
             return counter;
         }
 
+        PProfileCounter to_proto_peak(const std::string& name) const {
+            return Counter::to_proto(name);
+        }
+
         virtual void pretty_print(std::ostream* s, const std::string& prefix,
                                   const std::string& name) const override {
             std::ostream& stream = *s;
@@ -364,6 +467,14 @@ public:
             return counter;
         }
 
+        PProfileCounter to_proto(const std::string& name) const override {
+            PProfileCounter counter;
+            counter.set_name(name);
+            counter.set_level(2);
+            counter.set_description(_description);
+            return counter;
+        }
+
     private:
         const std::string _description;
         const std::string _name;
@@ -401,6 +512,9 @@ public:
     // the key has already been registered.
     void update(const TRuntimeProfileTree& thrift_profile);
 
+    //Similar to `void update(const TRuntimeProfileTree& thrift_profile)`
+    void update(const PRuntimeProfileTree& proto_profile);
+
     // Add a counter with 'name'/'type'.  Returns a counter object that the 
caller can
     // update.  The counter is owned by the RuntimeProfile object.
     // If parent_counter_name is a non-empty string, the counter is added as a 
child of
@@ -476,6 +590,11 @@ public:
     void to_thrift(TRuntimeProfileTree* tree, int64_t profile_level = 2);
     void to_thrift(std::vector<TRuntimeProfileNode>* nodes, int64_t 
profile_level = 2);
 
+    // Similar to `to_thrift`.
+    void to_proto(PRuntimeProfileTree* tree, int64_t profile_level = 2);
+    void to_proto(google::protobuf::RepeatedPtrField<PRuntimeProfileNode>* 
nodes,
+                  int64_t profile_level = 2);
+
     // Divides all counters by n
     void divide(int n);
 
@@ -596,6 +715,9 @@ private:
     // On return, *idx points to the node immediately following this subtree.
     void update(const std::vector<TRuntimeProfileNode>& nodes, int* idx);
 
+    // Similar to `void update(const std::vector<TRuntimeProfileNode>& nodes, 
int* idx)`
+    void update(const google::protobuf::RepeatedPtrField<PRuntimeProfileNode>& 
nodes, int* idx);
+
     // Helper function to compute compute the fraction of the total time spent 
in
     // this profile and its children.
     // Called recursively.
diff --git a/be/src/util/runtime_profile_counter_tree_node.cpp 
b/be/src/util/runtime_profile_counter_tree_node.cpp
index bff458af139..cbbb02bdadf 100644
--- a/be/src/util/runtime_profile_counter_tree_node.cpp
+++ b/be/src/util/runtime_profile_counter_tree_node.cpp
@@ -98,6 +98,43 @@ void RuntimeProfileCounterTreeNode::to_thrift(
     }
 }
 
+void RuntimeProfileCounterTreeNode::to_proto(
+        google::protobuf::RepeatedPtrField<PProfileCounter>* proto_counters,
+        google::protobuf::Map<std::string, PProfileChildCounterSet>* 
child_counter_map) const {
+    if (name != RuntimeProfile::ROOT_COUNTER && counter != nullptr) {
+        if (auto* highWaterMarkCounter =
+                    
dynamic_cast<RuntimeProfile::HighWaterMarkCounter*>(counter)) {
+            // Convert both current and peak values
+            *proto_counters->Add() = highWaterMarkCounter->to_proto(name);
+            *proto_counters->Add() = highWaterMarkCounter->to_proto_peak(name 
+ "Peak");
+
+            
(*(*child_counter_map)[highWaterMarkCounter->parent_name()].mutable_child_counters())
+                    .Add(name + "Peak");
+
+        } else if (auto* nonZeroCounter = 
dynamic_cast<RuntimeProfile::NonZeroCounter*>(counter)) {
+            if (nonZeroCounter->value() > 0) {
+                *proto_counters->Add() = to_proto();
+            } else {
+                // Skip zero-valued counter and remove from parent's child map
+                auto it = 
child_counter_map->find(nonZeroCounter->parent_name());
+                if (it != child_counter_map->end()) {
+                    auto* set = it->second.mutable_child_counters();
+                    auto remove_it = std::find(set->begin(), set->end(), name);
+                    if (remove_it != set->end()) set->erase(remove_it);
+                }
+                return;
+            }
+        } else {
+            *proto_counters->Add() = to_proto();
+        }
+    }
+
+    for (const auto& child : children) {
+        (*child_counter_map)[name].add_child_counters(child.name);
+        child.to_proto(proto_counters, child_counter_map);
+    }
+}
+
 TCounter RuntimeProfileCounterTreeNode::to_thrift() const {
     TCounter tcounter;
     if (counter != nullptr) {
@@ -107,4 +144,17 @@ TCounter RuntimeProfileCounterTreeNode::to_thrift() const {
     }
     return tcounter;
 }
+
+PProfileCounter RuntimeProfileCounterTreeNode::to_proto() const {
+    PProfileCounter pcounter;
+
+    if (counter != nullptr) {
+        pcounter = counter->to_proto(name);
+    } else {
+        pcounter.set_name(name);
+    }
+
+    return pcounter;
+}
+
 } // namespace doris
diff --git a/be/src/util/runtime_profile_counter_tree_node.h 
b/be/src/util/runtime_profile_counter_tree_node.h
index 7963febe457..85284797cb6 100644
--- a/be/src/util/runtime_profile_counter_tree_node.h
+++ b/be/src/util/runtime_profile_counter_tree_node.h
@@ -50,9 +50,15 @@ public:
     void to_thrift(std::vector<TCounter>& tcounter,
                    std::map<std::string, std::set<std::string>>& 
child_counter_map) const;
 
+    void to_proto(
+            google::protobuf::RepeatedPtrField<PProfileCounter>* 
proto_counters,
+            google::protobuf::Map<std::string, PProfileChildCounterSet>* 
child_counter_map) const;
+
     // Conver this node to a TCounter object.
     TCounter to_thrift() const;
 
+    PProfileCounter to_proto() const;
+
 private:
     std::string name;
     // counter is not owned by this class
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 1a94e921052..852eafdf4c8 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -26,6 +26,7 @@
 
 #include <algorithm>
 #include <cctype>
+
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <exception>
@@ -77,6 +78,7 @@
 #include "vec/data_types/data_type_struct.h"
 #include "vec/exec/format/orc/orc_file_reader.h"
 #include "vec/exec/format/table/transactional_hive_common.h"
+#include "vec/exec/scan/file_scanner.h"
 #include "vec/exprs/vbloom_predicate.h"
 #include "vec/exprs/vdirect_in_predicate.h"
 #include "vec/exprs/vectorized_fn_call.h"
@@ -246,10 +248,11 @@ void OrcReader::_init_profile() {
     if (_profile != nullptr) {
         static const char* orc_profile = "OrcReader";
         ADD_TIMER_WITH_LEVEL(_profile, orc_profile, 1);
-        _orc_profile.read_time = ADD_TIMER_WITH_LEVEL(_profile, 
"FileReadTime", 1);
+        _orc_profile.read_time =
+                ADD_TIMER_WITH_LEVEL(_profile, 
FileScanner::FileReadTimeProfile, 1);
         _orc_profile.read_calls = ADD_COUNTER_WITH_LEVEL(_profile, 
"FileReadCalls", TUnit::UNIT, 1);
-        _orc_profile.read_bytes =
-                ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes", 
TUnit::BYTES, 1);
+        _orc_profile.read_bytes = ADD_COUNTER_WITH_LEVEL(
+                _profile, FileScanner::FileReadBytesProfile, TUnit::BYTES, 1);
         _orc_profile.column_read_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ColumnReadTime", 
orc_profile, 1);
         _orc_profile.get_batch_time =
@@ -1086,44 +1089,47 @@ Status OrcReader::set_fill_columns(
         _row_reader_options.include(_read_file_cols);
         _row_reader_options.setEnableLazyDecoding(true);
 
-        uint64_t number_of_stripes = _reader->getNumberOfStripes();
-        auto all_stripes_needed = 
_reader->getNeedReadStripes(_row_reader_options);
+        //orc reader should not use the tiny stripe optimization when reading 
by row id.
+        if (!_read_line_mode_mode) {
+            uint64_t number_of_stripes = _reader->getNumberOfStripes();
+            auto all_stripes_needed = 
_reader->getNeedReadStripes(_row_reader_options);
 
-        int64_t range_end_offset = _range_start_offset + _range_size;
+            int64_t range_end_offset = _range_start_offset + _range_size;
 
-        bool all_tiny_stripes = true;
-        std::vector<io::PrefetchRange> tiny_stripe_ranges;
+            bool all_tiny_stripes = true;
+            std::vector<io::PrefetchRange> tiny_stripe_ranges;
 
-        for (uint64_t i = 0; i < number_of_stripes; i++) {
-            std::unique_ptr<orc::StripeInformation> strip_info = 
_reader->getStripe(i);
-            uint64_t strip_start_offset = strip_info->getOffset();
-            uint64_t strip_end_offset = strip_start_offset + 
strip_info->getLength();
+            for (uint64_t i = 0; i < number_of_stripes; i++) {
+                std::unique_ptr<orc::StripeInformation> strip_info = 
_reader->getStripe(i);
+                uint64_t strip_start_offset = strip_info->getOffset();
+                uint64_t strip_end_offset = strip_start_offset + 
strip_info->getLength();
 
-            if (strip_start_offset >= range_end_offset || strip_end_offset < 
_range_start_offset ||
-                !all_stripes_needed[i]) {
-                continue;
-            }
-            if (strip_info->getLength() > _orc_tiny_stripe_threshold_bytes) {
-                all_tiny_stripes = false;
-                break;
-            }
+                if (strip_start_offset >= range_end_offset ||
+                    strip_end_offset < _range_start_offset || 
!all_stripes_needed[i]) {
+                    continue;
+                }
+                if (strip_info->getLength() > 
_orc_tiny_stripe_threshold_bytes) {
+                    all_tiny_stripes = false;
+                    break;
+                }
 
-            tiny_stripe_ranges.emplace_back(strip_start_offset, 
strip_end_offset);
-        }
-        if (all_tiny_stripes && number_of_stripes > 0) {
-            std::vector<io::PrefetchRange> prefetch_merge_ranges =
-                    
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
-                                                                 
_orc_max_merge_distance_bytes,
-                                                                 
_orc_once_max_read_bytes);
-            auto range_finder =
-                    
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));
+                tiny_stripe_ranges.emplace_back(strip_start_offset, 
strip_end_offset);
+            }
+            if (all_tiny_stripes && number_of_stripes > 0) {
+                std::vector<io::PrefetchRange> prefetch_merge_ranges =
+                        
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
+                                                                     
_orc_max_merge_distance_bytes,
+                                                                     
_orc_once_max_read_bytes);
+                auto range_finder = 
std::make_shared<io::LinearProbeRangeFinder>(
+                        std::move(prefetch_merge_ranges));
 
-            auto* orc_input_stream_ptr = 
static_cast<ORCFileInputStream*>(_reader->getStream());
-            orc_input_stream_ptr->set_all_tiny_stripes();
-            auto& orc_file_reader = orc_input_stream_ptr->get_file_reader();
-            auto orc_inner_reader = orc_input_stream_ptr->get_inner_reader();
-            orc_file_reader = 
std::make_shared<io::RangeCacheFileReader>(_profile, orc_inner_reader,
-                                                                         
range_finder);
+                auto* orc_input_stream_ptr = 
static_cast<ORCFileInputStream*>(_reader->getStream());
+                orc_input_stream_ptr->set_all_tiny_stripes();
+                auto& orc_file_reader = 
orc_input_stream_ptr->get_file_reader();
+                auto orc_inner_reader = 
orc_input_stream_ptr->get_inner_reader();
+                orc_file_reader = std::make_shared<io::RangeCacheFileReader>(
+                        _profile, orc_inner_reader, range_finder);
+            }
         }
 
         if (!_lazy_read_ctx.can_lazy_read) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index fc69a889ea7..7d14a87ea1d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -74,6 +74,7 @@ struct IOContext;
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
+static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = 
std::numeric_limits<uint32_t>::max();
 
 RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
                                const std::vector<std::string>& read_columns,
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 376d7d10f48..002163bec9a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -63,7 +63,6 @@ class RowGroup;
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
 // TODO: we need to determine it by test.
-static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = 
std::numeric_limits<uint32_t>::max();
 
 class RowGroupReader : public ProfileCollector {
 public:
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index c8450cc2de2..529fcabc545 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -50,6 +50,7 @@
 #include "vec/exec/format/parquet/vparquet_file_metadata.h"
 #include "vec/exec/format/parquet/vparquet_group_reader.h"
 #include "vec/exec/format/parquet/vparquet_page_index.h"
+#include "vec/exec/scan/file_scanner.h"
 #include "vec/exprs/vbloom_predicate.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
@@ -159,13 +160,14 @@ void ParquetReader::_init_profile() {
         _parquet_profile.row_group_filter_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RowGroupFilterTime", 
parquet_profile, 1);
 
-        _parquet_profile.file_read_time = ADD_TIMER_WITH_LEVEL(_profile, 
"FileReadTime", 1);
+        _parquet_profile.file_read_time =
+                ADD_TIMER_WITH_LEVEL(_profile, 
FileScanner::FileReadTimeProfile, 1);
         _parquet_profile.file_read_calls =
                 ADD_COUNTER_WITH_LEVEL(_profile, "FileReadCalls", TUnit::UNIT, 
1);
         _parquet_profile.file_meta_read_calls =
                 ADD_COUNTER_WITH_LEVEL(_profile, "FileMetaReadCalls", 
TUnit::UNIT, 1);
-        _parquet_profile.file_read_bytes =
-                ADD_COUNTER_WITH_LEVEL(_profile, "FileReadBytes", 
TUnit::BYTES, 1);
+        _parquet_profile.file_read_bytes = ADD_COUNTER_WITH_LEVEL(
+                _profile, FileScanner::FileReadBytesProfile, TUnit::BYTES, 1);
         _parquet_profile.decompress_time =
                 ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecompressTime", 
parquet_profile, 1);
         _parquet_profile.decompress_cnt = ADD_CHILD_COUNTER_WITH_LEVEL(
diff --git a/be/src/vec/exec/scan/file_scanner.cpp 
b/be/src/vec/exec/scan/file_scanner.cpp
index 6502e151140..2fcd067ad71 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -95,6 +95,9 @@ class ShardedKVCache;
 namespace doris::vectorized {
 using namespace ErrorCode;
 
+const std::string FileScanner::FileReadBytesProfile = "FileReadBytes";
+const std::string FileScanner::FileReadTimeProfile = "FileReadTime";
+
 FileScanner::FileScanner(
         RuntimeState* state, pipeline::FileScanLocalState* local_state, 
int64_t limit,
         std::shared_ptr<vectorized::SplitSourceConnector> split_source, 
RuntimeProfile* profile,
@@ -1349,7 +1352,7 @@ Status FileScanner::_generate_truncate_columns(bool 
need_to_get_parsed_schema) {
     return Status::OK();
 }
 
-Status FileScanner::prepare_for_read_one_line(const TFileRangeDesc& range) {
+Status FileScanner::prepare_for_read_lines(const TFileRangeDesc& range) {
     _current_range = range;
 
     RETURN_IF_ERROR(_init_io_ctx());
@@ -1366,10 +1369,10 @@ Status FileScanner::prepare_for_read_one_line(const 
TFileRangeDesc& range) {
     return Status::OK();
 }
 
-Status FileScanner::read_one_line_from_range(const TFileRangeDesc& range,
-                                             const segment_v2::rowid_t rowid, 
Block* result_block,
-                                             const ExternalFileMappingInfo& 
external_info,
-                                             int64_t* init_reader_ms, int64_t* 
get_block_ms) {
+Status FileScanner::read_lines_from_range(const TFileRangeDesc& range,
+                                          const std::list<int64_t>& row_ids, 
Block* result_block,
+                                          const ExternalFileMappingInfo& 
external_info,
+                                          int64_t* init_reader_ms, int64_t* 
get_block_ms) {
     _current_range = range;
     RETURN_IF_ERROR(_generate_parititon_columns());
 
@@ -1389,7 +1392,8 @@ Status FileScanner::read_one_line_from_range(const 
TFileRangeDesc& range,
                                             ? 
ExecEnv::GetInstance()->file_meta_cache()
                                             : nullptr,
                                     false);
-                    
RETURN_IF_ERROR(parquet_reader->set_read_lines_mode({rowid}));
+
+                    
RETURN_IF_ERROR(parquet_reader->set_read_lines_mode(row_ids));
                     
RETURN_IF_ERROR(_init_parquet_reader(std::move(parquet_reader)));
                     break;
                 }
@@ -1399,7 +1403,7 @@ Status FileScanner::read_one_line_from_range(const 
TFileRangeDesc& range,
                                                                  1, 
_state->timezone(),
                                                                  
_io_ctx.get(), false);
 
-                    RETURN_IF_ERROR(orc_reader->set_read_lines_mode({rowid}));
+                    RETURN_IF_ERROR(orc_reader->set_read_lines_mode(row_ids));
                     RETURN_IF_ERROR(_init_orc_reader(std::move(orc_reader)));
                     break;
                 }
@@ -1419,11 +1423,15 @@ Status FileScanner::read_one_line_from_range(const 
TFileRangeDesc& range,
 
     RETURN_IF_ERROR(scope_timer_run(
             [&]() -> Status {
-                bool eof = false;
-                return _get_block_impl(_state, result_block, &eof);
+                while (!_cur_reader_eof) {
+                    bool eof = false;
+                    RETURN_IF_ERROR(_get_block_impl(_state, result_block, 
&eof));
+                }
+                return Status::OK();
             },
             get_block_ms));
 
+    _cur_reader->collect_profile_before_close();
     RETURN_IF_ERROR(_cur_reader->close());
     return Status::OK();
 }
diff --git a/be/src/vec/exec/scan/file_scanner.h 
b/be/src/vec/exec/scan/file_scanner.h
index b4bea913bbf..a602aec96f9 100644
--- a/be/src/vec/exec/scan/file_scanner.h
+++ b/be/src/vec/exec/scan/file_scanner.h
@@ -62,6 +62,10 @@ class FileScanner : public Scanner {
 public:
     static constexpr const char* NAME = "FileScanner";
 
+    // sub profile name (for parquet/orc)
+    static const std::string FileReadBytesProfile;
+    static const std::string FileReadTimeProfile;
+
     FileScanner(RuntimeState* state, pipeline::FileScanLocalState* parent, 
int64_t limit,
                 std::shared_ptr<vectorized::SplitSourceConnector> split_source,
                 RuntimeProfile* profile, ShardedKVCache* kv_cache,
@@ -89,12 +93,11 @@ public:
               _col_name_to_slot_id(colname_to_slot_id),
               _real_tuple_desc(tuple_desc) {};
 
-    Status read_one_line_from_range(const TFileRangeDesc& range, const 
segment_v2::rowid_t rowid,
-                                    Block* result_block,
-                                    const ExternalFileMappingInfo& 
external_info,
-                                    int64_t* init_reader_ms, int64_t* 
get_block_ms);
+    Status read_lines_from_range(const TFileRangeDesc& range, const 
std::list<int64_t>& row_ids,
+                                 Block* result_block, const 
ExternalFileMappingInfo& external_info,
+                                 int64_t* init_reader_ms, int64_t* 
get_block_ms);
 
-    Status prepare_for_read_one_line(const TFileRangeDesc& range);
+    Status prepare_for_read_lines(const TFileRangeDesc& range);
 
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eof) 
override;
@@ -119,7 +122,7 @@ protected:
     TFileRangeDesc _current_range;
 
     std::unique_ptr<GenericReader> _cur_reader;
-    bool _cur_reader_eof;
+    bool _cur_reader_eof = false;
     const std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range = nullptr;
     // File source slot descriptors
     std::vector<SlotDescriptor*> _file_slot_descs;
diff --git a/be/test/util/runtime_profile_test.cpp 
b/be/test/util/runtime_profile_test.cpp
index 011b8208116..1a178b44ec5 100644
--- a/be/test/util/runtime_profile_test.cpp
+++ b/be/test/util/runtime_profile_test.cpp
@@ -104,6 +104,82 @@ TEST(RuntimeProfileTest, Basic) {
     EXPECT_EQ(counter_updated->value(), 1);
 }
 
+TEST(RuntimeProfileTest, ProtoBasic) {
+    RuntimeProfile profile_a("ProfileA");
+    RuntimeProfile profile_a1("ProfileA1");
+    RuntimeProfile profile_a2("ProfileA2");
+
+    profile_a.add_child(&profile_a1, true);
+    profile_a.add_child(&profile_a2, true);
+
+    PRuntimeProfileTree proto_profile;
+
+    // Test Empty serialization
+    profile_a.to_proto(&proto_profile);
+    EXPECT_EQ(proto_profile.nodes_size(), 3);
+    proto_profile.clear_nodes();
+
+    RuntimeProfile::Counter* counter_a;
+    RuntimeProfile::Counter* counter_b;
+    RuntimeProfile::Counter* counter_merged;
+
+    // Updating/setting counter
+    counter_a = profile_a.add_counter("A", TUnit::UNIT);
+    EXPECT_TRUE(counter_a != nullptr);
+    counter_a->update(10);
+    counter_a->update(-5);
+    EXPECT_EQ(counter_a->value(), 5);
+    counter_a->set(1L);
+    EXPECT_EQ(counter_a->value(), 1);
+
+    counter_b = profile_a2.add_counter("B", TUnit::BYTES);
+    EXPECT_TRUE(counter_b != nullptr);
+
+    std::stringstream ss;
+    // Serialize to proto
+    profile_a.to_proto(&proto_profile, 2);
+    profile_a.pretty_print(&ss, " ", 4);
+    std::cout << "Profile A:\n" << ss.str() << std::endl;
+    ss.str("");
+    ss.clear();
+
+    ASSERT_EQ(proto_profile.nodes_size(), 3);
+
+    // Deserialize from proto
+    std::unique_ptr<RuntimeProfile> from_proto = 
RuntimeProfile::from_proto(proto_profile);
+    from_proto->pretty_print(&ss, "", 4);
+    std::cout << "From proto profile:\n" << ss.str() << std::endl;
+    ss.str("");
+    ss.clear();
+
+    counter_merged = from_proto->get_counter("A");
+    ASSERT_NE(counter_merged, nullptr);
+    EXPECT_EQ(counter_merged->value(), 1);
+    EXPECT_TRUE(from_proto->get_counter("Not there") == nullptr);
+
+    // merge
+    RuntimeProfile merged_profile("Merged");
+    merged_profile.merge(from_proto.get());
+    counter_merged = merged_profile.get_counter("A");
+    EXPECT_EQ(counter_merged->value(), 1);
+
+    // merge 2 more times, counters should get aggregated
+    merged_profile.merge(from_proto.get());
+    merged_profile.merge(from_proto.get());
+    EXPECT_EQ(counter_merged->value(), 3);
+
+    // update
+    RuntimeProfile updated_profile("updated");
+    updated_profile.update(proto_profile);
+    RuntimeProfile::Counter* counter_updated = 
updated_profile.get_counter("A");
+    EXPECT_EQ(counter_updated->value(), 1);
+
+    // update 2 more times, counters should stay the same
+    updated_profile.update(proto_profile);
+    updated_profile.update(proto_profile);
+    EXPECT_EQ(counter_updated->value(), 1);
+}
+
 void ValidateCounter(RuntimeProfile* profile, const string& name, int64_t 
value) {
     RuntimeProfile::Counter* counter = profile->get_counter(name);
     ASSERT_TRUE(counter != nullptr);
@@ -227,6 +303,118 @@ TEST(RuntimeProfileTest, MergeAndupdate) {
     profile2.pretty_print(&dummy);
 }
 
+TEST(RuntimeProfileTest, ProtoMergeAndUpdate) {
+    ObjectPool pool;
+    RuntimeProfile profile1("Parent1");
+    RuntimeProfile p1_child1("Child1");
+    RuntimeProfile p1_child2("Child2");
+    profile1.add_child(&p1_child1, true);
+    profile1.add_child(&p1_child2, true);
+
+    RuntimeProfile profile2("Parent2");
+    RuntimeProfile p2_child1("Child1");
+    RuntimeProfile p2_child3("Child3");
+    profile2.add_child(&p2_child1, true);
+    profile2.add_child(&p2_child3, true);
+
+    // Create parent-level counters
+    RuntimeProfile::Counter* parent1_shared = profile1.add_counter("Parent 
Shared", TUnit::UNIT);
+    RuntimeProfile::Counter* parent2_shared = profile2.add_counter("Parent 
Shared", TUnit::UNIT);
+    RuntimeProfile::Counter* parent1_only = profile1.add_counter("Parent 1 
Only", TUnit::UNIT);
+    RuntimeProfile::Counter* parent2_only = profile2.add_counter("Parent 2 
Only", TUnit::UNIT);
+    parent1_shared->update(1);
+    parent2_shared->update(3);
+    parent1_only->update(2);
+    parent2_only->update(5);
+
+    // Create child-level counters
+    RuntimeProfile::Counter* p1_c1_shared = p1_child1.add_counter("Child1 
Shared", TUnit::UNIT);
+    RuntimeProfile::Counter* p1_c1_only =
+            p1_child1.add_counter("Child1 Parent 1 Only", TUnit::UNIT);
+    RuntimeProfile::Counter* p1_c2 = p1_child2.add_counter("Child2", 
TUnit::UNIT);
+
+    RuntimeProfile::Counter* p2_c1_shared = p2_child1.add_counter("Child1 
Shared", TUnit::UNIT);
+    RuntimeProfile::Counter* p2_c1_only =
+            p1_child1.add_counter("Child1 Parent 2 Only", TUnit::UNIT);
+    RuntimeProfile::Counter* p2_c3 = p2_child3.add_counter("Child3", 
TUnit::UNIT);
+
+    p1_c1_shared->update(10);
+    p1_c1_only->update(50);
+    p2_c1_shared->update(20);
+    p2_c1_only->update(100);
+    p2_c3->update(30);
+    p1_c2->update(40);
+
+    // Serialize profile1 to proto
+    PRuntimeProfileTree proto_profile1;
+    profile1.to_proto(&proto_profile1, 2);
+
+    // Deserialize from proto and merge with profile2
+    std::unique_ptr<RuntimeProfile> merged_profile = 
RuntimeProfile::from_proto(proto_profile1);
+    merged_profile->merge(&profile2);
+
+    std::stringstream ss;
+    merged_profile->pretty_print(&ss);
+    std::cout << "Merged profile:\n" << ss.str() << std::endl;
+
+    EXPECT_EQ(4, merged_profile->num_counters());
+    ValidateCounter(merged_profile.get(), "Parent Shared", 4);
+    ValidateCounter(merged_profile.get(), "Parent 1 Only", 2);
+    ValidateCounter(merged_profile.get(), "Parent 2 Only", 5);
+
+    std::vector<RuntimeProfile*> children;
+    merged_profile->get_children(&children);
+    EXPECT_EQ(children.size(), 3);
+
+    for (RuntimeProfile* profile : children) {
+        if (profile->name() == "Child1") {
+            EXPECT_EQ(4, profile->num_counters());
+            ValidateCounter(profile, "Child1 Shared", 30);
+            ValidateCounter(profile, "Child1 Parent 1 Only", 50);
+            ValidateCounter(profile, "Child1 Parent 2 Only", 100);
+        } else if (profile->name() == "Child2") {
+            EXPECT_EQ(2, profile->num_counters());
+            ValidateCounter(profile, "Child2", 40);
+        } else if (profile->name() == "Child3") {
+            EXPECT_EQ(2, profile->num_counters());
+            ValidateCounter(profile, "Child3", 30);
+        } else {
+            FAIL() << "Unexpected child profile name: " << profile->name();
+        }
+    }
+
+    // Test update: update profile2 with profile1's proto tree
+    profile2.update(proto_profile1);
+    EXPECT_EQ(4, profile2.num_counters());
+    ValidateCounter(&profile2, "Parent Shared", 1);
+    ValidateCounter(&profile2, "Parent 1 Only", 2);
+    ValidateCounter(&profile2, "Parent 2 Only", 5);
+
+    profile2.get_children(&children);
+    EXPECT_EQ(children.size(), 3);
+
+    for (RuntimeProfile* profile : children) {
+        if (profile->name() == "Child1") {
+            EXPECT_EQ(4, profile->num_counters());
+            ValidateCounter(profile, "Child1 Shared", 10);
+            ValidateCounter(profile, "Child1 Parent 1 Only", 50);
+            ValidateCounter(profile, "Child1 Parent 2 Only", 100);
+        } else if (profile->name() == "Child2") {
+            EXPECT_EQ(2, profile->num_counters());
+            ValidateCounter(profile, "Child2", 40);
+        } else if (profile->name() == "Child3") {
+            EXPECT_EQ(2, profile->num_counters());
+            ValidateCounter(profile, "Child3", 30);
+        } else {
+            FAIL() << "Unexpected child profile name: " << profile->name();
+        }
+    }
+
+    // Ensure pretty_print doesn't crash
+    std::stringstream dummy;
+    profile2.pretty_print(&dummy);
+}
+
 TEST(RuntimeProfileTest, DerivedCounters) {
     ObjectPool pool;
     RuntimeProfile profile("Profile");
@@ -290,6 +478,47 @@ TEST(RuntimeProfileTest, InfoStringTest) {
     EXPECT_EQ(*update_dst_profile.get_info_string("Foo"), "Bar");
 }
 
+TEST(RuntimeProfileTest, ProtoInfoStringTest) {
+    ObjectPool pool;
+    RuntimeProfile profile("Profile");
+
+    EXPECT_TRUE(profile.get_info_string("Key") == nullptr);
+
+    profile.add_info_string("Key", "Value");
+    const std::string* value = profile.get_info_string("Key");
+    EXPECT_TRUE(value != nullptr);
+    EXPECT_EQ(*value, "Value");
+
+    // Convert it to proto
+    PRuntimeProfileTree pprofile;
+    profile.to_proto(&pprofile);
+
+    // Convert it back from proto
+    std::unique_ptr<RuntimeProfile> from_proto = 
RuntimeProfile::from_proto(pprofile);
+    value = from_proto->get_info_string("Key");
+    EXPECT_TRUE(value != nullptr);
+    EXPECT_EQ(*value, "Value");
+
+    // Test update
+    RuntimeProfile update_dst_profile("Profile2");
+    update_dst_profile.update(pprofile);
+    value = update_dst_profile.get_info_string("Key");
+    EXPECT_TRUE(value != nullptr);
+    EXPECT_EQ(*value, "Value");
+
+    // Modify original profile, convert again, and update dst
+    profile.add_info_string("Key", "NewValue");
+    profile.add_info_string("Foo", "Bar");
+    EXPECT_EQ(*profile.get_info_string("Key"), "NewValue");
+    EXPECT_EQ(*profile.get_info_string("Foo"), "Bar");
+
+    profile.to_proto(&pprofile);
+    update_dst_profile.update(pprofile);
+
+    EXPECT_EQ(*update_dst_profile.get_info_string("Key"), "NewValue");
+    EXPECT_EQ(*update_dst_profile.get_info_string("Foo"), "Bar");
+}
+
 // This case could be added back when we fixed the issue.
 // TEST(RuntimeProfileTest, AddSameCounter) {
 //     RuntimeProfile profile("Profile");
diff --git a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp 
b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
index 981bedeb854..bd1fba17843 100644
--- a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
@@ -214,15 +214,15 @@ static void read_parquet_lines(std::vector<std::string> 
numeric_types,
 
     auto vf = FileScanner::create_unique(&runtime_state, 
runtime_profile.get(), &scan_params,
                                          &colname_to_slot_id, tuple_desc);
-    EXPECT_TRUE(vf->prepare_for_read_one_line(scan_range).ok());
+    EXPECT_TRUE(vf->prepare_for_read_lines(scan_range).ok());
     ExternalFileMappingInfo external_info(0, scan_range, false);
     int64_t init_reader_ms = 0;
     int64_t get_block_ms = 0;
 
     auto read_lines_tmp2 = read_lines;
     while (!read_lines_tmp2.empty()) {
-        auto st = vf->read_one_line_from_range(scan_range, 
read_lines_tmp2.front(), block.get(),
-                                               external_info, &init_reader_ms, 
&get_block_ms);
+        auto st = vf->read_lines_from_range(scan_range, 
{read_lines_tmp2.front()}, block.get(),
+                                            external_info, &init_reader_ms, 
&get_block_ms);
         std::cout << st.to_string() << "\n";
         EXPECT_TRUE(st.ok());
 
diff --git a/be/test/vec/exec/orc/orc_read_lines.cpp 
b/be/test/vec/exec/orc/orc_read_lines.cpp
index 4db22d8fbd0..532eae03314 100644
--- a/be/test/vec/exec/orc/orc_read_lines.cpp
+++ b/be/test/vec/exec/orc/orc_read_lines.cpp
@@ -188,12 +188,12 @@ static void read_orc_line(int64_t line, std::string 
block_dump) {
 
     auto vf = FileScanner::create_unique(runtime_state.get(), 
runtime_profile.get(), &params,
                                          &colname_to_slot_id, tuple_desc);
-    EXPECT_TRUE(vf->prepare_for_read_one_line(range).ok());
+    EXPECT_TRUE(vf->prepare_for_read_lines(range).ok());
     ExternalFileMappingInfo external_info(0, range, false);
     int64_t init_reader_ms = 0;
     int64_t get_block_ms = 0;
-    auto st = vf->read_one_line_from_range(range, line, block.get(), 
external_info, &init_reader_ms,
-                                           &get_block_ms);
+    auto st = vf->read_lines_from_range(range, {line}, block.get(), 
external_info, &init_reader_ms,
+                                        &get_block_ms);
     EXPECT_TRUE(st.ok());
     EXPECT_EQ(block->dump_data(1), block_dump);
 }
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index 2b5d935d7f3..ec408b68202 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -26,6 +26,7 @@ import "descriptors.proto";
 import "types.proto";
 import "olap_common.proto";
 import "olap_file.proto";
+import "runtime_profile.proto";
 
 option cc_generic_services = true;
 
@@ -795,6 +796,7 @@ message PMultiGetBlockV2 {
     };
     optional RowFormat format = 2;
     repeated bytes binary_row_data = 3;
+    optional PRuntimeProfileTree profile = 4;
 }
 
 message PMultiGetResponseV2 {
diff --git a/gensrc/proto/runtime_profile.proto 
b/gensrc/proto/runtime_profile.proto
new file mode 100644
index 00000000000..3d8503bec6a
--- /dev/null
+++ b/gensrc/proto/runtime_profile.proto
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax = "proto2";
+
+package doris;
+option java_package = "org.apache.doris.proto";
+option cc_generic_services = true;
+// Similar to gensrc/thrift/RuntimeProfile.thrift, this is changed to protobuf 
implementation
+
+// Counter unit types (equivalent to Metrics.TUnit)
+enum PProfileUnit {
+  // A dimensionless numerical quantity
+  UNIT = 0;
+  // Rate of a dimensionless numerical quantity
+  UNIT_PER_SECOND = 1;
+  CPU_TICKS = 2;
+  BYTES = 3;
+  BYTES_PER_SECOND = 4;
+  TIME_NS = 5;
+  DOUBLE_VALUE = 6;
+  // No units at all, may not be a numerical quantity
+  // It is used as a label now, so do not treat it as 
+  // a real counter.
+  NONE = 7;
+  TIME_MS = 8;
+  TIME_S = 9;
+}
+
+// A single performance counter
+message PProfileCounter {
+  required string name = 1;
+  required PProfileUnit type = 2;
+  required int64 value = 3;
+  optional int64 level = 4;
+  optional string description = 5;
+}
+
+// A set of child counters (used in map<string, set<string>>)
+message PProfileChildCounterSet {
+  repeated string child_counters = 1;
+}
+
+// A single runtime profile node
+message PRuntimeProfileNode {
+  required string name = 1;
+  required int32 num_children = 2;
+
+  // Flattened counters for this node and all its children
+  repeated PProfileCounter counters = 3;
+
+  // Node metadata (e.g., node id)
+  required int64 metadata = 4;
+
+  // Whether the child is indented
+  required bool indent = 5;
+
+  // Key-value info strings describing additional metadata
+  map<string, string> info_strings = 6;
+
+  // Order to display info strings
+  repeated string info_strings_display_order = 7;
+
+  // Map from parent counter name to a set of child counter names
+  map<string, PProfileChildCounterSet> child_counters_map = 8;
+
+  // Timestamp for this node
+  required int64 timestamp = 9;
+
+  // Deprecated field
+  optional bool deprecated_is_sink = 10;
+}
+
+// A flattened runtime profile tree in in-order traversal
+message PRuntimeProfileTree {
+  repeated PRuntimeProfileNode nodes = 1;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to