This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new df2a963e742 [Refactor](block) pick some trim block pr #57737 #57860 
#58124 (#58209)
df2a963e742 is described below

commit df2a963e742cc0ef7a088c96fdbe53e55e68527d
Author: Pxl <[email protected]>
AuthorDate: Fri Nov 21 16:53:59 2025 +0800

    [Refactor](block) pick some trim block pr #57737 #57860 #58124 (#58209)
    
    pick from #57737 #57860 #58124
    
    ---------
    
    Co-authored-by: HappenLee <[email protected]>
---
 be/src/common/consts.h                             |   2 -
 be/src/exec/rowid_fetcher.cpp                      |  16 +-
 be/src/olap/base_tablet.cpp                        |   7 +-
 be/src/olap/partial_update_info.cpp                |  28 +--
 be/src/olap/push_handler.cpp                       |   4 +-
 be/src/olap/tablet_schema.h                        |   1 -
 be/src/pipeline/exec/materialization_opertor.cpp   |   5 +-
 be/src/pipeline/exec/scan_operator.cpp             |   7 -
 be/src/pipeline/exec/schema_scan_operator.cpp      |   4 +-
 be/src/pipeline/exec/schema_scan_operator.h        |   5 +
 be/src/runtime/tablets_channel.cpp                 |   4 +-
 .../arrow_flight/arrow_flight_batch_reader.cpp     |   5 +-
 .../aggregate_functions/aggregate_function_sort.h  |   8 +-
 be/src/vec/core/block.cpp                          | 211 ++-------------------
 be/src/vec/core/block.h                            |  70 ++-----
 be/src/vec/core/sort_block.cpp                     |   9 +-
 be/src/vec/core/sort_description.h                 |  11 +-
 .../vec/exec/format/arrow/arrow_stream_reader.cpp  |   9 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  85 ++++++---
 .../exec/format/parquet/vparquet_group_reader.cpp  |  95 +++++++---
 be/src/vec/exec/format/table/equality_delete.cpp   |  42 ++--
 be/src/vec/exec/format/table/iceberg_reader.cpp    |  22 ++-
 be/src/vec/exec/format/wal/wal_reader.cpp          |   4 +-
 be/src/vec/exec/jni_connector.cpp                  |   4 +-
 be/src/vec/exec/scan/file_scanner.cpp              |  21 +-
 be/src/vec/exec/scan/olap_scanner.cpp              |  16 +-
 be/src/vec/exec/scan/scanner.cpp                   |   7 -
 be/src/vec/functions/function_helpers.cpp          |   8 -
 be/src/vec/olap/block_reader.cpp                   |   1 -
 be/src/vec/olap/vcollect_iterator.cpp              |  15 +-
 be/src/vec/olap/vertical_block_reader.cpp          |   1 -
 be/src/vec/runtime/vdata_stream_recvr.cpp          |   4 +-
 be/src/vec/runtime/vdata_stream_recvr.h            |   7 +-
 be/src/vec/sink/varrow_flight_result_writer.cpp    |   8 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |   5 +-
 be/src/vec/sink/vtablet_block_convertor.cpp        |   4 -
 be/src/vec/sink/writer/vtablet_writer.cpp          |   3 +-
 be/src/vec/sink/writer/vwal_writer.cpp             |   3 +-
 be/src/vec/spill/spill_reader.cpp                  |   4 +-
 be/src/vec/spill/spill_writer.cpp                  |   3 +-
 be/test/olap/wal/wal_reader_writer_test.cpp        |  10 +-
 be/test/pipeline/exec/vdata_stream_recvr_test.cpp  |   3 +-
 .../operator/materialization_shared_state_test.cpp |  18 +-
 be/test/testutil/mock/mock_data_stream_sender.h    |  10 +-
 be/test/vec/core/block_test.cpp                    | 156 +++++++--------
 be/test/vec/data_types/common_data_type_test.h     |   7 +-
 .../vec/exec/format/parquet/parquet_read_lines.cpp |   6 +-
 be/test/vec/exec/orc/orc_read_lines.cpp            |   6 +-
 48 files changed, 443 insertions(+), 541 deletions(-)

diff --git a/be/src/common/consts.h b/be/src/common/consts.h
index 4618ffe7d74..ca4662c839e 100644
--- a/be/src/common/consts.h
+++ b/be/src/common/consts.h
@@ -24,8 +24,6 @@ namespace BeConsts {
 const std::string CSV = "csv";
 const std::string CSV_WITH_NAMES = "csv_with_names";
 const std::string CSV_WITH_NAMES_AND_TYPES = "csv_with_names_and_types";
-const std::string BLOCK_TEMP_COLUMN_PREFIX = "__TEMP__";
-const std::string BLOCK_TEMP_COLUMN_SCANNER_FILTERED = 
"__TEMP__scanner_filtered";
 const std::string ROWID_COL = "__DORIS_ROWID_COL__";
 const std::string GLOBAL_ROWID_COL = "__DORIS_GLOBAL_ROWID_COL__";
 const std::string ROW_STORE_COL = "__DORIS_ROW_STORE_COL__";
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 0f4619e1303..4e09c9ac36c 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -182,7 +182,11 @@ Status RowIDFetcher::_merge_rpc_results(const 
PMultiGetRequest& request,
         }
         // Merge partial blocks
         vectorized::Block partial_block;
-        RETURN_IF_ERROR(partial_block.deserialize(resp.block()));
+        [[maybe_unused]] size_t uncompressed_size = 0;
+        [[maybe_unused]] int64_t uncompressed_time = 0;
+
+        RETURN_IF_ERROR(
+                partial_block.deserialize(resp.block(), &uncompressed_size, 
&uncompressed_time));
         if (partial_block.is_empty_column()) {
             return Status::OK();
         }
@@ -493,9 +497,10 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequest& request,
                    << ", be_exec_version:" << request.be_exec_version();
         [[maybe_unused]] size_t compressed_size = 0;
         [[maybe_unused]] size_t uncompressed_size = 0;
+        [[maybe_unused]] int64_t compress_time = 0;
         int be_exec_version = request.has_be_exec_version() ? 
request.be_exec_version() : 0;
         RETURN_IF_ERROR(result_block.serialize(be_exec_version, 
response->mutable_block(),
-                                               &uncompressed_size, 
&compressed_size,
+                                               &uncompressed_size, 
&compressed_size, &compress_time,
                                                
segment_v2::CompressionTypePB::LZ4));
     }
 
@@ -600,10 +605,11 @@ Status RowIdStorageReader::read_by_rowids(const 
PMultiGetRequestV2& request,
 
             [[maybe_unused]] size_t compressed_size = 0;
             [[maybe_unused]] size_t uncompressed_size = 0;
+            [[maybe_unused]] int64_t compress_time = 0;
             int be_exec_version = request.has_be_exec_version() ? 
request.be_exec_version() : 0;
-            RETURN_IF_ERROR(result_blocks[i].serialize(be_exec_version, 
pblock->mutable_block(),
-                                                       &uncompressed_size, 
&compressed_size,
-                                                       
segment_v2::CompressionTypePB::LZ4));
+            RETURN_IF_ERROR(result_blocks[i].serialize(
+                    be_exec_version, pblock->mutable_block(), 
&uncompressed_size, &compressed_size,
+                    &compress_time, segment_v2::CompressionTypePB::LZ4));
         }
 
         // Build file type statistics string
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 9cde2bbdf45..2c858e557bb 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -941,11 +941,10 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr 
input_rowset, uint32_t
 
 const signed char* BaseTablet::get_delete_sign_column_data(const 
vectorized::Block& block,
                                                            size_t 
rows_at_least) {
-    if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
-                block.try_get_by_name(DELETE_SIGN);
-        delete_sign_column != nullptr) {
+    if (int pos = block.get_position_by_name(DELETE_SIGN); pos != -1) {
+        const vectorized::ColumnWithTypeAndName& delete_sign_column = 
block.get_by_position(pos);
         const auto& delete_sign_col =
-                reinterpret_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column->column));
+                assert_cast<const 
vectorized::ColumnInt8&>(*(delete_sign_column.column));
         if (delete_sign_col.size() >= rows_at_least) {
             return delete_sign_col.get_data().data();
         }
diff --git a/be/src/olap/partial_update_info.cpp 
b/be/src/olap/partial_update_info.cpp
index 5e6082c5c13..928ae80b38f 100644
--- a/be/src/olap/partial_update_info.cpp
+++ b/be/src/olap/partial_update_info.cpp
@@ -311,9 +311,7 @@ Status FixedReadPlan::read_columns_by_plan(
         const signed char* __restrict cur_delete_signs) const {
     if (force_read_old_delete_signs) {
         // always read delete sign column from historical data
-        if (const vectorized::ColumnWithTypeAndName* old_delete_sign_column =
-                    block.try_get_by_name(DELETE_SIGN);
-            old_delete_sign_column == nullptr) {
+        if (block.get_position_by_name(DELETE_SIGN) == -1) {
             auto del_col_cid = tablet_schema.field_index(DELETE_SIGN);
             cids_to_read.emplace_back(del_col_cid);
             block.swap(tablet_schema.create_block_by_cids(cids_to_read));
@@ -384,7 +382,10 @@ Status FixedReadPlan::fill_missing_columns(
                                          old_value_block, &read_index, true, 
nullptr));
 
     const auto* old_delete_signs = 
BaseTablet::get_delete_sign_column_data(old_value_block);
-    DCHECK(old_delete_signs != nullptr);
+    if (old_delete_signs == nullptr) {
+        return Status::InternalError("old delete signs column not found, 
block: {}",
+                                     old_value_block.dump_structure());
+    }
     // build default value columns
     auto default_value_block = old_value_block.clone_empty();
     RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
@@ -420,27 +421,30 @@ Status FixedReadPlan::fill_missing_columns(
             }
 
             if (should_use_default) {
-                // clang-format off
                 if (tablet_column.has_default_value()) {
                     
missing_col->insert_from(*mutable_default_value_columns[i], 0);
                 } else if (tablet_column.is_nullable()) {
-                    auto* nullable_column = 
assert_cast<vectorized::ColumnNullable*, 
TypeCheckOnRelease::DISABLE>(missing_col.get());
+                    auto* nullable_column =
+                            
assert_cast<vectorized::ColumnNullable*>(missing_col.get());
                     nullable_column->insert_many_defaults(1);
                 } else if (tablet_schema.auto_increment_column() == 
tablet_column.name()) {
-                    const auto& column = 
*DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name()));
+                    const auto& column =
+                            
*DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name()));
                     DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
                     auto* auto_inc_column =
-                            assert_cast<vectorized::ColumnInt64*, 
TypeCheckOnRelease::DISABLE>(missing_col.get());
-                    
auto_inc_column->insert(vectorized::Field::create_field<TYPE_BIGINT>(
-assert_cast<const vectorized::ColumnInt64*, TypeCheckOnRelease::DISABLE>(
-block->get_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL).column.get())->get_element(idx)));
+                            
assert_cast<vectorized::ColumnInt64*>(missing_col.get());
+                    int pos = 
block->get_position_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL);
+                    if (pos == -1) {
+                        return Status::InternalError("auto increment column 
not found in block {}",
+                                                     block->dump_structure());
+                    }
+                    
auto_inc_column->insert_from(*block->get_by_position(pos).column.get(), idx);
                 } else {
                     // If the control flow reaches this branch, the column 
neither has default value
                     // nor is nullable. It means that the row's delete sign is 
marked, and the value
                     // columns are useless and won't be read. So we can just 
put arbitary values in the cells
                     
missing_col->insert(tablet_column.get_vec_type()->get_default());
                 }
-                // clang-format on
             } else {
                 
missing_col->insert_from(*old_value_block.get_by_position(i).column,
                                          pos_in_old_block);
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 07586acc5a2..e48585b1aac 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -478,10 +478,10 @@ Status PushBrokerReader::_cast_to_input_block() {
         if (slot_desc->type()->get_primitive_type() == 
PrimitiveType::TYPE_VARIANT) {
             continue;
         }
-        auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
         // remove nullable here, let the get_function decide whether nullable
         auto return_type = slot_desc->get_data_type_ptr();
         idx = _src_block_name_to_idx[slot_desc->col_name()];
+        auto& arg = _src_block_ptr->get_by_position(idx);
         // bitmap convert:src -> to_base64 -> bitmap_from_base64
         if (slot_desc->type()->get_primitive_type() == TYPE_BITMAP) {
             auto base64_return_type = 
vectorized::DataTypeFactory::instance().create_data_type(
@@ -491,7 +491,7 @@ Status PushBrokerReader::_cast_to_input_block() {
             RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr, 
{idx}, idx,
                                                     arg.column->size()));
             _src_block_ptr->get_by_position(idx).type = 
std::move(base64_return_type);
-            auto& arg_base64 = 
_src_block_ptr->get_by_name(slot_desc->col_name());
+            auto& arg_base64 = _src_block_ptr->get_by_position(idx);
             auto func_bitmap_from_base64 =
                     vectorized::SimpleFunctionFactory::instance().get_function(
                             "bitmap_from_base64", {arg_base64}, return_type);
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 57463316cdf..b839231464b 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -463,7 +463,6 @@ public:
     void set_skip_write_index_on_load(bool skip) { _skip_write_index_on_load = 
skip; }
     bool skip_write_index_on_load() const { return _skip_write_index_on_load; }
     int32_t delete_sign_idx() const { return _delete_sign_idx; }
-    void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = 
delete_sign_idx; }
     bool has_sequence_col() const { return _sequence_col_idx != -1; }
     int32_t sequence_col_idx() const { return _sequence_col_idx; }
     void set_version_col_idx(int32_t version_col_idx) { _version_col_idx = 
version_col_idx; }
diff --git a/be/src/pipeline/exec/materialization_opertor.cpp 
b/be/src/pipeline/exec/materialization_opertor.cpp
index 63fcd7b47a6..689d345dd0a 100644
--- a/be/src/pipeline/exec/materialization_opertor.cpp
+++ b/be/src/pipeline/exec/materialization_opertor.cpp
@@ -59,8 +59,11 @@ Status MaterializationSharedState::merge_multi_response() {
     for (int i = 0; i < block_order_results.size(); ++i) {
         for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
             vectorized::Block partial_block;
+            size_t uncompressed_size = 0;
+            int64_t uncompressed_time = 0;
             DCHECK(rpc_struct.response.blocks_size() > i);
-            
RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block()));
+            
RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(),
+                                                      &uncompressed_size, 
&uncompressed_time));
             if (rpc_struct.response.blocks(i).has_profile()) {
                 auto response_profile =
                         
RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile());
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 60536cbbbc9..61e42c81503 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1318,13 +1318,6 @@ Status 
ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized:
                                                 bool* eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    // in inverted index apply logic, in order to optimize query performance,
-    // we built some temporary columns into block, these columns only used in 
scan node level,
-    // remove them when query leave scan node to avoid other nodes use 
block->columns() to make a wrong decision
-    Defer drop_block_temp_column {[&]() {
-        std::unique_lock l(local_state._block_lock);
-        block->erase_tmp_columns();
-    }};
 
     if (state->is_cancelled()) {
         if (local_state._scanner_ctx) {
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp 
b/be/src/pipeline/exec/schema_scan_operator.cpp
index e57fbd75573..79987c001de 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -179,6 +179,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
         int j = 0;
         for (; j < columns_desc.size(); ++j) {
             if (boost::iequals(_dest_tuple_desc->slots()[i]->col_name(), 
columns_desc[j].name)) {
+                _slot_offsets[i] = j;
                 break;
             }
         }
@@ -250,11 +251,10 @@ Status SchemaScanOperatorX::get_block(RuntimeState* 
state, vectorized::Block* bl
         if (src_block.rows()) {
             // block->check_number_of_rows();
             for (int i = 0; i < _slot_num; ++i) {
-                auto* dest_slot_desc = _dest_tuple_desc->slots()[i];
                 vectorized::MutableColumnPtr column_ptr =
                         std::move(*block->get_by_position(i).column).mutate();
                 column_ptr->insert_range_from(
-                        
*src_block.get_by_name(dest_slot_desc->col_name()).column, 0,
+                        
*src_block.safe_get_by_position(_slot_offsets[i]).column, 0,
                         src_block.rows());
             }
             RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, 
block,
diff --git a/be/src/pipeline/exec/schema_scan_operator.h 
b/be/src/pipeline/exec/schema_scan_operator.h
index 6846d9b59af..0634b9d5693 100644
--- a/be/src/pipeline/exec/schema_scan_operator.h
+++ b/be/src/pipeline/exec/schema_scan_operator.h
@@ -19,6 +19,8 @@
 
 #include <stdint.h>
 
+#include <unordered_map>
+
 #include "common/status.h"
 #include "exec/schema_scanner.h"
 #include "operator.h"
@@ -85,6 +87,9 @@ private:
     int _tuple_idx;
     // slot num need to fill in and return
     int _slot_num;
+
+    // slot index mapping to src column index
+    std::unordered_map<int, int> _slot_offsets;
 };
 
 #include "common/compile_check_end.h"
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index acab5e12935..460879978cd 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -569,7 +569,9 @@ Status BaseTabletsChannel::_write_block_data(
         std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
         PTabletWriterAddBlockResult* response) {
     vectorized::Block send_data;
-    RETURN_IF_ERROR(send_data.deserialize(request.block()));
+    [[maybe_unused]] size_t uncompressed_size = 0;
+    [[maybe_unused]] int64_t uncompressed_time = 0;
+    RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, 
&uncompressed_time));
     CHECK(send_data.rows() == request.tablet_ids_size())
             << "block rows: " << send_data.rows()
             << ", tablet_ids_size: " << request.tablet_ids_size();
diff --git a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp 
b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
index 07e46cfcfed..24fc977d027 100644
--- a/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
+++ b/be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
@@ -268,7 +268,10 @@ arrow::Status ArrowFlightBatchRemoteReader::_fetch_data() {
         {
             SCOPED_ATOMIC_TIMER(&_deserialize_block_timer);
             _block = vectorized::Block::create_shared();
-            st = _block->deserialize(callback->response_->block());
+            [[maybe_unused]] size_t uncompressed_size = 0;
+            [[maybe_unused]] int64_t uncompressed_time = 0;
+            st = _block->deserialize(callback->response_->block(), 
&uncompressed_size,
+                                     &uncompressed_time);
             ARROW_RETURN_NOT_OK(to_arrow_status(st));
             break;
         }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h 
b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index 79fdf4979b1..3d8bc2480c2 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -72,8 +72,10 @@ struct AggregateFunctionSortData {
         PBlock pblock;
         size_t uncompressed_bytes = 0;
         size_t compressed_bytes = 0;
+        int64_t compressed_time = 0;
         auto st = block.serialize(state->be_exec_version(), &pblock, 
&uncompressed_bytes,
-                                  &compressed_bytes, 
segment_v2::CompressionTypePB::NO_COMPRESSION);
+                                  &compressed_bytes, &compressed_time,
+                                  
segment_v2::CompressionTypePB::NO_COMPRESSION);
         if (!st.ok()) {
             throw doris::Exception(st);
         }
@@ -87,7 +89,9 @@ struct AggregateFunctionSortData {
 
         PBlock pblock;
         pblock.ParseFromString(data);
-        auto st = block.deserialize(pblock);
+        [[maybe_unused]] size_t uncompressed_size = 0;
+        [[maybe_unused]] int64_t uncompressed_time = 0;
+        auto st = block.deserialize(pblock, &uncompressed_size, 
&uncompressed_time);
         // If memory allocate failed during deserialize, st is not ok, throw 
exception here to
         // stop the query.
         if (!st.ok()) {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index c09a7188d3e..dbaf8ee023b 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -82,13 +82,9 @@ template void 
clear_blocks<Block>(moodycamel::ConcurrentQueue<Block>&,
 template void clear_blocks<BlockUPtr>(moodycamel::ConcurrentQueue<BlockUPtr>&,
                                       RuntimeProfile::Counter* 
memory_used_counter);
 
-Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {
-    initialize_index_by_name();
-}
+Block::Block(std::initializer_list<ColumnWithTypeAndName> il) : data {il} {}
 
-Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {
-    initialize_index_by_name();
-}
+Block::Block(ColumnsWithTypeAndName data_) : data {std::move(data_)} {}
 
 Block::Block(const std::vector<SlotDescriptor*>& slots, size_t block_size,
              bool ignore_trivial_slot) {
@@ -112,7 +108,8 @@ Block::Block(const std::vector<SlotDescriptor>& slots, 
size_t block_size,
     *this = Block(slot_ptrs, block_size, ignore_trivial_slot);
 }
 
-Status Block::deserialize(const PBlock& pblock) {
+Status Block::deserialize(const PBlock& pblock, size_t* uncompressed_bytes,
+                          int64_t* decompress_time) {
     swap(Block());
     int be_exec_version = pblock.has_be_exec_version() ? 
pblock.be_exec_version() : 0;
     
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
@@ -121,7 +118,7 @@ Status Block::deserialize(const PBlock& pblock) {
     std::string compression_scratch;
     if (pblock.compressed()) {
         // Decompress
-        SCOPED_RAW_TIMER(&_decompress_time_ns);
+        SCOPED_RAW_TIMER(decompress_time);
         const char* compressed_data = pblock.column_values().c_str();
         size_t compressed_size = pblock.column_values().size();
         size_t uncompressed_size = 0;
@@ -144,7 +141,7 @@ Status Block::deserialize(const PBlock& pblock) {
                                             compression_scratch.data());
             DCHECK(success) << "snappy::RawUncompress failed";
         }
-        _decompressed_bytes = uncompressed_size;
+        *uncompressed_bytes = uncompressed_size;
         buf = compression_scratch.data();
     } else {
         buf = pblock.column_values().data();
@@ -158,22 +155,14 @@ Status Block::deserialize(const PBlock& pblock) {
                 buf = type->deserialize(buf, &data_column, 
pblock.be_exec_version()));
         data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
     }
-    initialize_index_by_name();
 
     return Status::OK();
 }
 
 void Block::reserve(size_t count) {
-    index_by_name.reserve(count);
     data.reserve(count);
 }
 
-void Block::initialize_index_by_name() {
-    for (size_t i = 0, size = data.size(); i < size; ++i) {
-        index_by_name[data[i].name] = i;
-    }
-}
-
 void Block::insert(size_t position, const ColumnWithTypeAndName& elem) {
     if (position > data.size()) {
         throw Exception(ErrorCode::INTERNAL_ERROR,
@@ -181,13 +170,6 @@ void Block::insert(size_t position, const 
ColumnWithTypeAndName& elem) {
                         data.size(), dump_names());
     }
 
-    for (auto& name_pos : index_by_name) {
-        if (name_pos.second >= position) {
-            ++name_pos.second;
-        }
-    }
-
-    index_by_name.emplace(elem.name, position);
     data.emplace(data.begin() + position, elem);
 }
 
@@ -198,30 +180,20 @@ void Block::insert(size_t position, 
ColumnWithTypeAndName&& elem) {
                         data.size(), dump_names());
     }
 
-    for (auto& name_pos : index_by_name) {
-        if (name_pos.second >= position) {
-            ++name_pos.second;
-        }
-    }
-
-    index_by_name.emplace(elem.name, position);
     data.emplace(data.begin() + position, std::move(elem));
 }
 
 void Block::clear_names() {
-    index_by_name.clear();
     for (auto& entry : data) {
         entry.name.clear();
     }
 }
 
 void Block::insert(const ColumnWithTypeAndName& elem) {
-    index_by_name.emplace(elem.name, data.size());
     data.emplace_back(elem);
 }
 
 void Block::insert(ColumnWithTypeAndName&& elem) {
-    index_by_name.emplace(elem.name, data.size());
     data.emplace_back(std::move(elem));
 }
 
@@ -235,13 +207,6 @@ void Block::erase_tail(size_t start) {
     DCHECK(start <= data.size()) << fmt::format(
             "Position out of bound in Block::erase(), max position = {}", 
data.size());
     data.erase(data.begin() + start, data.end());
-    for (auto it = index_by_name.begin(); it != index_by_name.end();) {
-        if (it->second >= start) {
-            index_by_name.erase(it++);
-        } else {
-            ++it;
-        }
-    }
 }
 
 void Block::erase(size_t position) {
@@ -253,36 +218,7 @@ void Block::erase(size_t position) {
 }
 
 void Block::erase_impl(size_t position) {
-    bool need_maintain_index_by_name = true;
-    if (position + 1 == data.size()) {
-        index_by_name.erase(data.back().name);
-        need_maintain_index_by_name = false;
-    }
-
     data.erase(data.begin() + position);
-
-    if (need_maintain_index_by_name) {
-        for (auto it = index_by_name.begin(); it != index_by_name.end();) {
-            if (it->second == position) {
-                index_by_name.erase(it++);
-            } else {
-                if (it->second > position) {
-                    --it->second;
-                }
-                ++it;
-            }
-        }
-    }
-}
-
-void Block::erase(const String& name) {
-    auto index_it = index_by_name.find(name);
-    if (index_it == index_by_name.end()) {
-        throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, 
name={}, block_names={}",
-                        name, dump_names());
-    }
-
-    erase_impl(index_it->second);
 }
 
 ColumnWithTypeAndName& Block::safe_get_by_position(size_t position) {
@@ -303,54 +239,13 @@ const ColumnWithTypeAndName& 
Block::safe_get_by_position(size_t position) const
     return data[position];
 }
 
-ColumnWithTypeAndName& Block::get_by_name(const std::string& name) {
-    auto it = index_by_name.find(name);
-    if (index_by_name.end() == it) {
-        throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, 
name={}, block_names={}",
-                        name, dump_names());
-    }
-
-    return data[it->second];
-}
-
-const ColumnWithTypeAndName& Block::get_by_name(const std::string& name) const 
{
-    auto it = index_by_name.find(name);
-    if (index_by_name.end() == it) {
-        throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, 
name={}, block_names={}",
-                        name, dump_names());
-    }
-
-    return data[it->second];
-}
-
-ColumnWithTypeAndName* Block::try_get_by_name(const std::string& name) {
-    auto it = index_by_name.find(name);
-    if (index_by_name.end() == it) {
-        return nullptr;
-    }
-    return &data[it->second];
-}
-
-const ColumnWithTypeAndName* Block::try_get_by_name(const std::string& name) 
const {
-    auto it = index_by_name.find(name);
-    if (index_by_name.end() == it) {
-        return nullptr;
-    }
-    return &data[it->second];
-}
-
-bool Block::has(const std::string& name) const {
-    return index_by_name.end() != index_by_name.find(name);
-}
-
-size_t Block::get_position_by_name(const std::string& name) const {
-    auto it = index_by_name.find(name);
-    if (index_by_name.end() == it) {
-        throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, 
name={}, block_names={}",
-                        name, dump_names());
+int Block::get_position_by_name(const std::string& name) const {
+    for (int i = 0; i < data.size(); i++) {
+        if (data[i].name == name) {
+            return i;
+        }
     }
-
-    return it->second;
+    return -1;
 }
 
 void Block::check_number_of_rows(bool allow_null_columns) const {
@@ -768,7 +663,6 @@ DataTypes Block::get_data_types() const {
 
 void Block::clear() {
     data.clear();
-    index_by_name.clear();
 }
 
 void Block::clear_column_data(int64_t column_size) noexcept {
@@ -789,15 +683,6 @@ void Block::clear_column_data(int64_t column_size) 
noexcept {
     }
 }
 
-void Block::erase_tmp_columns() noexcept {
-    auto all_column_names = get_names();
-    for (auto& name : all_column_names) {
-        if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) {
-            erase(name);
-        }
-    }
-}
-
 void Block::clear_column_mem_not_keep(const std::vector<bool>& 
column_keep_flags,
                                       bool need_keep_first) {
     if (data.size() >= column_keep_flags.size()) {
@@ -819,17 +704,14 @@ void Block::clear_column_mem_not_keep(const 
std::vector<bool>& column_keep_flags
 void Block::swap(Block& other) noexcept {
     SCOPED_SKIP_MEMORY_CHECK();
     data.swap(other.data);
-    index_by_name.swap(other.index_by_name);
 }
 
 void Block::swap(Block&& other) noexcept {
     SCOPED_SKIP_MEMORY_CHECK();
     data = std::move(other.data);
-    index_by_name = std::move(other.index_by_name);
 }
 
 void Block::shuffle_columns(const std::vector<int>& result_column_ids) {
-    index_by_name.clear();
     Container tmp_data;
     tmp_data.reserve(result_column_ids.size());
     for (const int result_column_id : result_column_ids) {
@@ -958,7 +840,8 @@ Status Block::filter_block(Block* block, size_t 
filter_column_id, size_t column_
 
 Status Block::serialize(int be_exec_version, PBlock* pblock,
                         /*std::string* compressed_buffer,*/ size_t* 
uncompressed_bytes,
-                        size_t* compressed_bytes, 
segment_v2::CompressionTypePB compression_type,
+                        size_t* compressed_bytes, int64_t* compress_time,
+                        segment_v2::CompressionTypePB compression_type,
                         bool allow_transfer_large_data) const {
     
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(be_exec_version));
     pblock->set_be_exec_version(be_exec_version);
@@ -998,7 +881,7 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
 
     // compress
     if (compression_type != segment_v2::NO_COMPRESSION && 
content_uncompressed_size > 0) {
-        SCOPED_RAW_TIMER(&_compress_time_ns);
+        SCOPED_RAW_TIMER(compress_time);
         pblock->set_compression_type(compression_type);
         pblock->set_uncompressed_size(serialize_bytes);
 
@@ -1030,24 +913,6 @@ Status Block::serialize(int be_exec_version, PBlock* 
pblock,
     return Status::OK();
 }
 
-MutableBlock::MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, 
int reserve_size,
-                           bool ignore_trivial_slot) {
-    for (auto* const tuple_desc : tuple_descs) {
-        for (auto* const slot_desc : tuple_desc->slots()) {
-            if (ignore_trivial_slot && !slot_desc->is_materialized()) {
-                continue;
-            }
-            _data_types.emplace_back(slot_desc->get_data_type_ptr());
-            _columns.emplace_back(_data_types.back()->create_column());
-            if (reserve_size != 0) {
-                _columns.back()->reserve(reserve_size);
-            }
-            _names.push_back(slot_desc->col_name());
-        }
-    }
-    initialize_index_by_name();
-}
-
 size_t MutableBlock::rows() const {
     for (const auto& column : _columns) {
         if (column) {
@@ -1063,7 +928,6 @@ void MutableBlock::swap(MutableBlock& another) noexcept {
     _columns.swap(another._columns);
     _data_types.swap(another._data_types);
     _names.swap(another._names);
-    index_by_name.swap(another.index_by_name);
 }
 
 void MutableBlock::add_row(const Block* block, int row) {
@@ -1126,31 +990,6 @@ Status MutableBlock::add_rows(const Block* block, const 
std::vector<int64_t>& ro
     return Status::OK();
 }
 
-void MutableBlock::erase(const String& name) {
-    auto index_it = index_by_name.find(name);
-    if (index_it == index_by_name.end()) {
-        throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, 
name={}, block_names={}",
-                        name, dump_names());
-    }
-
-    auto position = index_it->second;
-
-    _columns.erase(_columns.begin() + position);
-    _data_types.erase(_data_types.begin() + position);
-    _names.erase(_names.begin() + position);
-
-    for (auto it = index_by_name.begin(); it != index_by_name.end();) {
-        if (it->second == position) {
-            index_by_name.erase(it++);
-        } else {
-            if (it->second > position) {
-                --it->second;
-            }
-            ++it;
-        }
-    }
-}
-
 Block MutableBlock::to_block(int start_column) {
     return to_block(start_column, (int)_columns.size());
 }
@@ -1290,26 +1129,6 @@ void MutableBlock::clear_column_data() noexcept {
     }
 }
 
-void MutableBlock::initialize_index_by_name() {
-    for (size_t i = 0, size = _names.size(); i < size; ++i) {
-        index_by_name[_names[i]] = i;
-    }
-}
-
-bool MutableBlock::has(const std::string& name) const {
-    return index_by_name.end() != index_by_name.find(name);
-}
-
-size_t MutableBlock::get_position_by_name(const std::string& name) const {
-    auto it = index_by_name.find(name);
-    if (index_by_name.end() == it) {
-        throw Exception(ErrorCode::INTERNAL_ERROR, "No such name in Block, 
name={}, block_names={}",
-                        name, dump_names());
-    }
-
-    return it->second;
-}
-
 std::string MutableBlock::dump_names() const {
     std::string out;
     for (auto it = _names.begin(); it != _names.end(); ++it) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index b3ecb58ebed..b99f2ec09ad 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -73,14 +73,7 @@ class Block {
 
 private:
     using Container = ColumnsWithTypeAndName;
-    using IndexByName = phmap::flat_hash_map<String, size_t>;
     Container data;
-    IndexByName index_by_name;
-
-    int64_t _decompress_time_ns = 0;
-    int64_t _decompressed_bytes = 0;
-
-    mutable int64_t _compress_time_ns = 0;
 
 public:
     Block() = default;
@@ -113,8 +106,6 @@ public:
     void erase_tail(size_t start);
     /// remove the columns at the specified positions
     void erase(const std::set<size_t>& positions);
-    /// remove the column with the specified name
-    void erase(const String& name);
     // T was std::set<int>, std::vector<int>, std::list<int>
     template <class T>
     void erase_not_in(const T& container) {
@@ -125,7 +116,13 @@ public:
         std::swap(data, new_data);
     }
 
-    void initialize_index_by_name();
+    std::unordered_map<std::string, uint32_t> get_name_to_pos_map() const {
+        std::unordered_map<std::string, uint32_t> name_to_index_map;
+        for (uint32_t i = 0; i < data.size(); ++i) {
+            name_to_index_map[data[i].name] = i;
+        }
+        return name_to_index_map;
+    }
 
     /// References are invalidated after calling functions above.
     ColumnWithTypeAndName& get_by_position(size_t position) {
@@ -151,13 +148,6 @@ public:
     ColumnWithTypeAndName& safe_get_by_position(size_t position);
     const ColumnWithTypeAndName& safe_get_by_position(size_t position) const;
 
-    ColumnWithTypeAndName& get_by_name(const std::string& name);
-    const ColumnWithTypeAndName& get_by_name(const std::string& name) const;
-
-    // return nullptr when no such column name
-    ColumnWithTypeAndName* try_get_by_name(const std::string& name);
-    const ColumnWithTypeAndName* try_get_by_name(const std::string& name) 
const;
-
     Container::iterator begin() { return data.begin(); }
     Container::iterator end() { return data.end(); }
     Container::const_iterator begin() const { return data.begin(); }
@@ -165,9 +155,9 @@ public:
     Container::const_iterator cbegin() const { return data.cbegin(); }
     Container::const_iterator cend() const { return data.cend(); }
 
-    bool has(const std::string& name) const;
-
-    size_t get_position_by_name(const std::string& name) const;
+    // Get position of column by name. Returns -1 if there is no column with 
that name.
+    // ATTN: this method is O(N). better maintain name -> position map in 
caller if you need to call it frequently.
+    int get_position_by_name(const std::string& name) const;
 
     const ColumnsWithTypeAndName& get_columns_with_type_and_name() const;
 
@@ -291,10 +281,11 @@ public:
 
     // serialize block to PBlock
     Status serialize(int be_exec_version, PBlock* pblock, size_t* 
uncompressed_bytes,
-                     size_t* compressed_bytes, segment_v2::CompressionTypePB 
compression_type,
+                     size_t* compressed_bytes, int64_t* compress_time,
+                     segment_v2::CompressionTypePB compression_type,
                      bool allow_transfer_large_data = false) const;
 
-    Status deserialize(const PBlock& pblock);
+    Status deserialize(const PBlock& pblock, size_t* uncompressed_bytes, 
int64_t* decompress_time);
 
     std::unique_ptr<Block> create_same_struct_block(size_t size, bool 
is_reserve = false) const;
 
@@ -362,15 +353,6 @@ public:
     // for String type or Array<String> type
     void shrink_char_type_column_suffix_zero(const std::vector<size_t>& 
char_type_idx);
 
-    int64_t get_decompress_time() const { return _decompress_time_ns; }
-    int64_t get_decompressed_bytes() const { return _decompressed_bytes; }
-    int64_t get_compress_time() const { return _compress_time_ns; }
-
-    // remove tmp columns in block
-    // in inverted index apply logic, in order to optimize query performance,
-    // we built some temporary columns into block
-    void erase_tmp_columns() noexcept;
-
     void clear_column_mem_not_keep(const std::vector<bool>& column_keep_flags,
                                    bool need_keep_first);
 
@@ -391,9 +373,6 @@ private:
     DataTypes _data_types;
     std::vector<std::string> _names;
 
-    using IndexByName = phmap::flat_hash_map<String, size_t>;
-    IndexByName index_by_name;
-
 public:
     static MutableBlock build_mutable_block(Block* block) {
         return block == nullptr ? MutableBlock() : MutableBlock(block);
@@ -401,27 +380,19 @@ public:
     MutableBlock() = default;
     ~MutableBlock() = default;
 
-    MutableBlock(const std::vector<TupleDescriptor*>& tuple_descs, int 
reserve_size = 0,
-                 bool igore_trivial_slot = false);
-
     MutableBlock(Block* block)
             : _columns(block->mutate_columns()),
               _data_types(block->get_data_types()),
-              _names(block->get_names()) {
-        initialize_index_by_name();
-    }
+              _names(block->get_names()) {}
     MutableBlock(Block&& block)
             : _columns(block.mutate_columns()),
               _data_types(block.get_data_types()),
-              _names(block.get_names()) {
-        initialize_index_by_name();
-    }
+              _names(block.get_names()) {}
 
     void operator=(MutableBlock&& m_block) {
         _columns = std::move(m_block._columns);
         _data_types = std::move(m_block._data_types);
         _names = std::move(m_block._names);
-        initialize_index_by_name();
     }
 
     size_t rows() const;
@@ -552,7 +523,6 @@ public:
                     _columns[i] = _data_types[i]->create_column();
                 }
             }
-            initialize_index_by_name();
         } else {
             if (_columns.size() != block.columns()) {
                 return Status::Error<ErrorCode::INTERNAL_ERROR>(
@@ -599,9 +569,6 @@ public:
     Status add_rows(const Block* block, size_t row_begin, size_t length);
     Status add_rows(const Block* block, const std::vector<int64_t>& rows);
 
-    /// remove the column with the specified name
-    void erase(const String& name);
-
     std::string dump_data(size_t row_limit = 100) const;
     std::string dump_data_json(size_t row_limit = 100) const;
 
@@ -627,15 +594,8 @@ public:
 
     std::vector<std::string>& get_names() { return _names; }
 
-    bool has(const std::string& name) const;
-
-    size_t get_position_by_name(const std::string& name) const;
-
     /** Get a list of column names separated by commas. */
     std::string dump_names() const;
-
-private:
-    void initialize_index_by_name();
 };
 
 struct IteratorRowRef {
diff --git a/be/src/vec/core/sort_block.cpp b/be/src/vec/core/sort_block.cpp
index 20bf1f952af..75aa9d85a12 100644
--- a/be/src/vec/core/sort_block.cpp
+++ b/be/src/vec/core/sort_block.cpp
@@ -32,10 +32,7 @@ ColumnsWithSortDescriptions 
get_columns_with_sort_description(const Block& block
 
     for (size_t i = 0; i < size; ++i) {
         const IColumn* column =
-                !description[i].column_name.empty()
-                        ? 
block.get_by_name(description[i].column_name).column.get()
-                        : 
block.safe_get_by_position(description[i].column_number).column.get();
-
+                
block.safe_get_by_position(description[i].column_number).column.get();
         res.emplace_back(column, description[i]);
     }
 
@@ -53,9 +50,7 @@ void sort_block(Block& src_block, Block& dest_block, const 
SortDescription& desc
         bool reverse = description[0].direction == -1;
 
         const IColumn* column =
-                !description[0].column_name.empty()
-                        ? 
src_block.get_by_name(description[0].column_name).column.get()
-                        : 
src_block.safe_get_by_position(description[0].column_number).column.get();
+                
src_block.safe_get_by_position(description[0].column_number).column.get();
 
         IColumn::Permutation perm;
         column->get_permutation(reverse, limit, 
description[0].nulls_direction, perm);
diff --git a/be/src/vec/core/sort_description.h 
b/be/src/vec/core/sort_description.h
index cdee17f7651..4d1543d6904 100644
--- a/be/src/vec/core/sort_description.h
+++ b/be/src/vec/core/sort_description.h
@@ -20,20 +20,15 @@
 
 #pragma once
 
-#include "cstddef"
-#include "memory"
-#include "string"
-#include "vec/core/field.h"
 #include "vector"
 
 namespace doris::vectorized {
 
 /// Description of the sorting rule by one column.
 struct SortColumnDescription {
-    std::string column_name; /// The name of the column.
-    int column_number;       /// Column number (used if no name is given).
-    int direction;           /// 1 - ascending, -1 - descending.
-    int nulls_direction;     /// 1 - NULLs and NaNs are greater, -1 - less.
+    int column_number;   /// Column number (used if no name is given).
+    int direction;       /// 1 - ascending, -1 - descending.
+    int nulls_direction; /// 1 - NULLs and NaNs are greater, -1 - less.
 
     SortColumnDescription(int column_number_, int direction_, int 
nulls_direction_)
             : column_number(column_number_),
diff --git a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp 
b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
index def1645f323..dfd4cdcfc91 100644
--- a/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
+++ b/be/src/vec/exec/format/arrow/arrow_stream_reader.cpp
@@ -103,12 +103,17 @@ Status ArrowStreamReader::get_next_block(Block* block, 
size_t* read_rows, bool*
         auto num_columns = batch.num_columns();
         for (int c = 0; c < num_columns; ++c) {
             arrow::Array* column = batch.column(c).get();
-
             std::string column_name = batch.schema()->field(c)->name();
 
             try {
                 const vectorized::ColumnWithTypeAndName& column_with_name =
-                        block->get_by_name(column_name);
+                        block->safe_get_by_position(c);
+
+                if (column_with_name.name != column_name) {
+                    return Status::InternalError("Column name mismatch: 
expected {}, got {}",
+                                                 column_with_name.name, 
column_name);
+                }
+
                 
RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
                         column_with_name.column->assume_mutable_ref(), column, 
0, num_rows, _ctzz));
             } catch (Exception& e) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 7da93423dca..d72439e3c34 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -1277,9 +1277,10 @@ Status OrcReader::_fill_partition_columns(
         const std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>&
                 partition_columns) {
     DataTypeSerDe::FormatOptions _text_formatOptions;
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_pos_map = block->get_name_to_pos_map();
     for (const auto& kv : partition_columns) {
-        auto doris_column = block->get_by_name(kv.first).column;
-        auto* col_ptr = const_cast<IColumn*>(doris_column.get());
+        auto col_ptr = 
block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
         const auto& [value, slot_desc] = kv.second;
         auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
         Slice slice(value.data(), value.size());
@@ -1305,10 +1306,18 @@ Status OrcReader::_fill_partition_columns(
 Status OrcReader::_fill_missing_columns(
         Block* block, uint64_t rows,
         const std::unordered_map<std::string, VExprContextSPtr>& 
missing_columns) {
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_pos_map = block->get_name_to_pos_map();
+    std::set<size_t> positions_to_erase;
     for (const auto& kv : missing_columns) {
+        if (!name_to_pos_map.contains(kv.first)) {
+            return Status::InternalError("Failed to find missing column: {}, 
block: {}", kv.first,
+                                         block->dump_structure());
+        }
         if (kv.second == nullptr) {
             // no default column, fill with null
-            auto mutable_column = 
block->get_by_name(kv.first).column->assume_mutable();
+            auto mutable_column =
+                    
block->get_by_position(name_to_pos_map[kv.first]).column->assume_mutable();
             auto* nullable_column = 
static_cast<vectorized::ColumnNullable*>(mutable_column.get());
             nullable_column->insert_many_defaults(rows);
         } else {
@@ -1328,15 +1337,16 @@ Status OrcReader::_fill_missing_columns(
                 mutable_column->resize(rows);
                 // result_column_ptr maybe a ColumnConst, convert it to a 
normal column
                 result_column_ptr = 
result_column_ptr->convert_to_full_column_if_const();
-                auto origin_column_type = block->get_by_name(kv.first).type;
+                auto origin_column_type = 
block->get_by_position(name_to_pos_map[kv.first]).type;
                 bool is_nullable = origin_column_type->is_nullable();
                 block->replace_by_position(
-                        block->get_position_by_name(kv.first),
+                        name_to_pos_map[kv.first],
                         is_nullable ? make_nullable(result_column_ptr) : 
result_column_ptr);
-                block->erase(result_column_id);
+                positions_to_erase.insert(result_column_id);
             }
         }
     }
+    block->erase(positions_to_erase);
     return Status::OK();
 }
 
@@ -2009,8 +2019,10 @@ Status OrcReader::_get_next_block_impl(Block* block, 
size_t* read_rows, bool* eo
         std::vector<orc::ColumnVectorBatch*> batch_vec;
         _fill_batch_vec(batch_vec, _batch.get(), 0);
 
+        // todo: maybe do not need to build name to index map every time
+        auto name_to_pos_map = block->get_name_to_pos_map();
         for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
-            auto& column_with_type_and_name = block->get_by_name(col_name);
+            auto& column_with_type_and_name = 
block->get_by_position(name_to_pos_map[col_name]);
             auto& column_ptr = column_with_type_and_name.column;
             auto& column_type = column_with_type_and_name.type;
             auto file_column_name = 
_table_info_node_ptr->children_file_column_name(col_name);
@@ -2076,10 +2088,17 @@ Status OrcReader::_get_next_block_impl(Block* block, 
size_t* read_rows, bool* eo
             }
         }
 
+        // todo: maybe do not need to build name to index map every time
+        auto name_to_pos_map = block->get_name_to_pos_map();
         if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
             for (auto& dict_filter_cols : _dict_filter_cols) {
                 MutableColumnPtr dict_col_ptr = ColumnInt32::create();
-                size_t pos = 
block->get_position_by_name(dict_filter_cols.first);
+                if (!name_to_pos_map.contains(dict_filter_cols.first)) {
+                    return Status::InternalError(
+                            "Failed to find dict filter column '{}' in block 
{}",
+                            dict_filter_cols.first, block->dump_structure());
+                }
+                auto pos = name_to_pos_map[dict_filter_cols.first];
                 auto& column_with_type_and_name = block->get_by_position(pos);
                 auto& column_type = column_with_type_and_name.type;
                 if (column_type->is_nullable()) {
@@ -2101,7 +2120,7 @@ Status OrcReader::_get_next_block_impl(Block* block, 
size_t* read_rows, bool* eo
         _fill_batch_vec(batch_vec, _batch.get(), 0);
 
         for (auto& col_name : _lazy_read_ctx.all_read_columns) {
-            auto& column_with_type_and_name = block->get_by_name(col_name);
+            auto& column_with_type_and_name = 
block->get_by_position(name_to_pos_map[col_name]);
             auto& column_ptr = column_with_type_and_name.column;
             auto& column_type = column_with_type_and_name.type;
             auto file_column_name = 
_table_info_node_ptr->children_file_column_name(col_name);
@@ -2212,19 +2231,27 @@ void OrcReader::_build_delete_row_filter(const Block* 
block, size_t rows) {
     if (_delete_rows != nullptr) {
         _delete_rows_filter_ptr = std::make_unique<IColumn::Filter>(rows, 1);
         auto* __restrict _pos_delete_filter_data = 
_delete_rows_filter_ptr->data();
+        // todo: maybe do not need to build name to index map every time
+        auto name_to_pos_map = block->get_name_to_pos_map();
         const auto& original_transaction_column = assert_cast<const 
ColumnInt64&>(*remove_nullable(
-                
block->get_by_name(TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE).column));
-        const auto& bucket_id_column = assert_cast<const ColumnInt32&>(
-                
*remove_nullable(block->get_by_name(TransactionalHive::BUCKET_LOWER_CASE).column));
-        const auto& row_id_column = assert_cast<const ColumnInt64&>(
-                
*remove_nullable(block->get_by_name(TransactionalHive::ROW_ID_LOWER_CASE).column));
+                block->get_by_position(
+                             
name_to_pos_map[TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE])
+                        .column));
+        const auto& bucket_id_column = assert_cast<const 
ColumnInt32&>(*remove_nullable(
+                
block->get_by_position(name_to_pos_map[TransactionalHive::BUCKET_LOWER_CASE])
+                        .column));
+        const auto& row_id_column = assert_cast<const 
ColumnInt64&>(*remove_nullable(
+                
block->get_by_position(name_to_pos_map[TransactionalHive::ROW_ID_LOWER_CASE])
+                        .column));
         for (int i = 0; i < rows; ++i) {
             auto original_transaction = original_transaction_column.get_int(i);
             auto bucket_id = bucket_id_column.get_int(i);
             auto row_id = row_id_column.get_int(i);
 
-            TransactionalHiveReader::AcidRowID transactional_row_id = 
{original_transaction,
-                                                                       
bucket_id, row_id};
+            TransactionalHiveReader::AcidRowID transactional_row_id = {
+                    .original_transaction = original_transaction,
+                    .bucket = bucket_id,
+                    .row_id = row_id};
             if (_delete_rows->contains(transactional_row_id)) {
                 _pos_delete_filter_data[i] = 0;
             }
@@ -2238,9 +2265,15 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, 
uint16_t* sel, uint16_t s
     size_t origin_column_num = block->columns();
 
     if (!_dict_cols_has_converted && !_dict_filter_cols.empty()) {
+        // todo: maybe do not need to build name to index map every time
+        auto name_to_pos_map = block->get_name_to_pos_map();
         for (auto& dict_filter_cols : _dict_filter_cols) {
+            if (!name_to_pos_map.contains(dict_filter_cols.first)) {
+                return Status::InternalError("Failed to find dict filter 
column '{}' in block {}",
+                                             dict_filter_cols.first, 
block->dump_structure());
+            }
             MutableColumnPtr dict_col_ptr = ColumnInt32::create();
-            size_t pos = block->get_position_by_name(dict_filter_cols.first);
+            auto pos = name_to_pos_map[dict_filter_cols.first];
             auto& column_with_type_and_name = block->get_by_position(pos);
             auto& column_type = column_with_type_and_name.type;
             if (column_type->is_nullable()) {
@@ -2266,8 +2299,10 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, 
uint16_t* sel, uint16_t s
                                
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
                                
TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
     }
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_pos_map = block->get_name_to_pos_map();
     for (auto& table_col_name : table_col_names) {
-        auto& column_with_type_and_name = block->get_by_name(table_col_name);
+        auto& column_with_type_and_name = 
block->get_by_position(name_to_pos_map[table_col_name]);
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
         auto file_column_name = 
_table_info_node_ptr->children_file_column_name(table_col_name);
@@ -2319,13 +2354,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, 
uint16_t* sel, uint16_t s
     if (can_filter_all) {
         for (auto& col : table_col_names) {
             // clean block to read predicate columns and acid columns
-            block->get_by_name(col).column->assume_mutable()->clear();
+            
block->get_by_position(name_to_pos_map[col]).column->assume_mutable()->clear();
         }
         for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
-            block->get_by_name(col.first).column->assume_mutable()->clear();
+            
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
         }
         for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
-            block->get_by_name(col.first).column->assume_mutable()->clear();
+            
block->get_by_position(name_to_pos_map[col.first]).column->assume_mutable()->clear();
         }
         Block::erase_useless_column(block, origin_column_num);
         RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, nullptr));
@@ -2639,8 +2674,14 @@ Status OrcReader::_convert_dict_cols_to_string_cols(
         return Status::OK();
     }
     if (!_dict_filter_cols.empty()) {
+        // todo: maybe do not need to build name to index map every time
+        auto name_to_pos_map = block->get_name_to_pos_map();
         for (auto& dict_filter_cols : _dict_filter_cols) {
-            size_t pos = block->get_position_by_name(dict_filter_cols.first);
+            if (!name_to_pos_map.contains(dict_filter_cols.first)) {
+                return Status::InternalError("Failed to find dict filter 
column '{}' in block {}",
+                                             dict_filter_cols.first, 
block->dump_structure());
+            }
+            auto pos = name_to_pos_map[dict_filter_cols.first];
             ColumnWithTypeAndName& column_with_type_and_name = 
block->get_by_position(pos);
             const ColumnPtr& column = column_with_type_and_name.column;
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 96feeceea8d..c0a4b2a1704 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -25,6 +25,7 @@
 
 #include <algorithm>
 #include <boost/iterator/iterator_facade.hpp>
+#include <memory>
 #include <ostream>
 
 #include "common/config.h"
@@ -392,25 +393,32 @@ Status RowGroupReader::_read_column_data(Block* block,
                                          FilterMap& filter_map) {
     size_t batch_read_rows = 0;
     bool has_eof = false;
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_idx = block->get_name_to_pos_map();
     for (auto& read_col_name : table_columns) {
-        auto& column_with_type_and_name = block->get_by_name(read_col_name);
+        auto& column_with_type_and_name = 
block->safe_get_by_position(name_to_idx[read_col_name]);
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
         bool is_dict_filter = false;
         for (auto& _dict_filter_col : _dict_filter_cols) {
             if (_dict_filter_col.first == read_col_name) {
                 MutableColumnPtr dict_column = ColumnInt32::create();
-                size_t pos = block->get_position_by_name(read_col_name);
+                if (!name_to_idx.contains(read_col_name)) {
+                    return Status::InternalError(
+                            "Wrong read column '{}' in parquet file, block: 
{}", read_col_name,
+                            block->dump_structure());
+                }
                 if (column_type->is_nullable()) {
-                    block->get_by_position(pos).type =
+                    block->get_by_position(name_to_idx[read_col_name]).type =
                             
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
                     block->replace_by_position(
-                            pos,
+                            name_to_idx[read_col_name],
                             ColumnNullable::create(std::move(dict_column),
                                                    
ColumnUInt8::create(dict_column->size(), 0)));
                 } else {
-                    block->get_by_position(pos).type = 
std::make_shared<DataTypeInt32>();
-                    block->replace_by_position(pos, std::move(dict_column));
+                    block->get_by_position(name_to_idx[read_col_name]).type =
+                            std::make_shared<DataTypeInt32>();
+                    block->replace_by_position(name_to_idx[read_col_name], 
std::move(dict_column));
                 }
                 is_dict_filter = true;
                 break;
@@ -511,20 +519,25 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
         }
 
         const uint8_t* __restrict filter_map_data = result_filter.data();
-        filter_map_ptr.reset(new FilterMap());
+        filter_map_ptr = std::make_unique<FilterMap>();
         RETURN_IF_ERROR(filter_map_ptr->init(filter_map_data, pre_read_rows, 
can_filter_all));
         if (filter_map_ptr->filter_all()) {
             {
                 SCOPED_RAW_TIMER(&_predicate_filter_time);
-                for (auto& col : _lazy_read_ctx.predicate_columns.first) {
+                auto name_to_idx = block->get_name_to_pos_map();
+                for (const auto& col : _lazy_read_ctx.predicate_columns.first) 
{
                     // clean block to read predicate columns
-                    block->get_by_name(col).column->assume_mutable()->clear();
+                    
block->get_by_position(name_to_idx[col]).column->assume_mutable()->clear();
                 }
-                for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
-                    
block->get_by_name(col.first).column->assume_mutable()->clear();
+                for (const auto& col : 
_lazy_read_ctx.predicate_partition_columns) {
+                    block->get_by_position(name_to_idx[col.first])
+                            .column->assume_mutable()
+                            ->clear();
                 }
-                for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
-                    
block->get_by_name(col.first).column->assume_mutable()->clear();
+                for (const auto& col : 
_lazy_read_ctx.predicate_missing_columns) {
+                    block->get_by_position(name_to_idx[col.first])
+                            .column->assume_mutable()
+                            ->clear();
                 }
                 if (_row_id_column_iterator_pair.first != nullptr) {
                     block->get_by_position(_row_id_column_iterator_pair.second)
@@ -660,10 +673,12 @@ Status RowGroupReader::_fill_partition_columns(
         const std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>&
                 partition_columns) {
     DataTypeSerDe::FormatOptions _text_formatOptions;
-    for (auto& kv : partition_columns) {
-        auto doris_column = block->get_by_name(kv.first).column;
-        IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
-        auto& [value, slot_desc] = kv.second;
+    auto name_to_idx = block->get_name_to_pos_map();
+    for (const auto& kv : partition_columns) {
+        auto doris_column = 
block->get_by_position(name_to_idx[kv.first]).column;
+        // obtained from block*, it is a mutable object.
+        auto* col_ptr = const_cast<IColumn*>(doris_column.get());
+        const auto& [value, slot_desc] = kv.second;
         auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
         Slice slice(value.data(), value.size());
         uint64_t num_deserialized = 0;
@@ -688,15 +703,23 @@ Status RowGroupReader::_fill_partition_columns(
 Status RowGroupReader::_fill_missing_columns(
         Block* block, size_t rows,
         const std::unordered_map<std::string, VExprContextSPtr>& 
missing_columns) {
-    for (auto& kv : missing_columns) {
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_idx = block->get_name_to_pos_map();
+    std::set<size_t> positions_to_erase;
+    for (const auto& kv : missing_columns) {
+        if (!name_to_idx.contains(kv.first)) {
+            return Status::InternalError("Missing column: {} not found in 
block {}", kv.first,
+                                         block->dump_structure());
+        }
         if (kv.second == nullptr) {
             // no default column, fill with null
-            auto mutable_column = 
block->get_by_name(kv.first).column->assume_mutable();
+            auto mutable_column =
+                    
block->get_by_position(name_to_idx[kv.first]).column->assume_mutable();
             auto* nullable_column = 
assert_cast<vectorized::ColumnNullable*>(mutable_column.get());
             nullable_column->insert_many_defaults(rows);
         } else {
             // fill with default value
-            auto& ctx = kv.second;
+            const auto& ctx = kv.second;
             auto origin_column_num = block->columns();
             int result_column_id = -1;
             // PT1 => dest primitive type
@@ -711,15 +734,16 @@ Status RowGroupReader::_fill_missing_columns(
                 mutable_column->resize(rows);
                 // result_column_ptr maybe a ColumnConst, convert it to a 
normal column
                 result_column_ptr = 
result_column_ptr->convert_to_full_column_if_const();
-                auto origin_column_type = block->get_by_name(kv.first).type;
+                auto origin_column_type = 
block->get_by_position(name_to_idx[kv.first]).type;
                 bool is_nullable = origin_column_type->is_nullable();
                 block->replace_by_position(
-                        block->get_position_by_name(kv.first),
+                        name_to_idx[kv.first],
                         is_nullable ? make_nullable(result_column_ptr) : 
result_column_ptr);
-                block->erase(result_column_id);
+                positions_to_erase.insert(result_column_id);
             }
         }
     }
+    block->erase(positions_to_erase);
     return Status::OK();
 }
 
@@ -1071,13 +1095,20 @@ Status 
RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes,
 }
 
 void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_idx = block->get_name_to_pos_map();
     for (auto& dict_filter_cols : _dict_filter_cols) {
-        size_t pos = block->get_position_by_name(dict_filter_cols.first);
-        ColumnWithTypeAndName& column_with_type_and_name = 
block->get_by_position(pos);
+        if (!name_to_idx.contains(dict_filter_cols.first)) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Wrong read column '{}' in parquet file, block: 
{}",
+                            dict_filter_cols.first, block->dump_structure());
+        }
+        ColumnWithTypeAndName& column_with_type_and_name =
+                block->get_by_position(name_to_idx[dict_filter_cols.first]);
         const ColumnPtr& column = column_with_type_and_name.column;
-        if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*column)) {
+        if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(*column)) {
             const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
-            const ColumnInt32* dict_column = assert_cast<const 
ColumnInt32*>(nested_column.get());
+            const auto* dict_column = assert_cast<const 
ColumnInt32*>(nested_column.get());
             DCHECK(dict_column);
 
             MutableColumnPtr string_column =
@@ -1087,16 +1118,18 @@ void 
RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
             column_with_type_and_name.type =
                     
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
             block->replace_by_position(
-                    pos, ColumnNullable::create(std::move(string_column),
-                                                
nullable_column->get_null_map_column_ptr()));
+                    name_to_idx[dict_filter_cols.first],
+                    ColumnNullable::create(std::move(string_column),
+                                           
nullable_column->get_null_map_column_ptr()));
         } else {
-            const ColumnInt32* dict_column = assert_cast<const 
ColumnInt32*>(column.get());
+            const auto* dict_column = assert_cast<const 
ColumnInt32*>(column.get());
             MutableColumnPtr string_column =
                     
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
                             dict_column);
 
             column_with_type_and_name.type = 
std::make_shared<DataTypeString>();
-            block->replace_by_position(pos, std::move(string_column));
+            block->replace_by_position(name_to_idx[dict_filter_cols.first],
+                                       std::move(string_column));
         }
     }
 }
diff --git a/be/src/vec/exec/format/table/equality_delete.cpp 
b/be/src/vec/exec/format/table/equality_delete.cpp
index 7f8452f4b18..48914a02144 100644
--- a/be/src/vec/exec/format/table/equality_delete.cpp
+++ b/be/src/vec/exec/format/table/equality_delete.cpp
@@ -45,15 +45,16 @@ Status SimpleEqualityDelete::_build_set() {
 
 Status SimpleEqualityDelete::filter_data_block(Block* data_block) {
     SCOPED_TIMER(equality_delete_time);
-    auto* column_and_type = data_block->try_get_by_name(_delete_column_name);
-    if (column_and_type == nullptr) {
-        return Status::InternalError("Can't find the delete column '{}' in 
data file",
-                                     _delete_column_name);
+    int pos = data_block->get_position_by_name(_delete_column_name);
+    if (pos == -1) {
+        return Status::InternalError("Column '{}' not found in data block: 
{}", _delete_column_name,
+                                     data_block->dump_structure());
     }
-    if (column_and_type->type->get_primitive_type() != _delete_column_type) {
+    auto column_and_type = data_block->get_by_position(pos);
+    if (column_and_type.type->get_primitive_type() != _delete_column_type) {
         return Status::InternalError(
                 "Not support type change in column '{}', src type: {}, target 
type: {}",
-                _delete_column_name, column_and_type->type->get_name(), 
(int)_delete_column_type);
+                _delete_column_name, column_and_type.type->get_name(), 
(int)_delete_column_type);
     }
     size_t rows = data_block->rows();
     // _filter: 1 => in _hybrid_set; 0 => not in _hybrid_set
@@ -64,12 +65,12 @@ Status SimpleEqualityDelete::filter_data_block(Block* 
data_block) {
         _filter->assign(rows, UInt8(0));
     }
 
-    if (column_and_type->column->is_nullable()) {
+    if (column_and_type.column->is_nullable()) {
         const NullMap& null_map =
-                reinterpret_cast<const 
ColumnNullable*>(column_and_type->column.get())
+                reinterpret_cast<const 
ColumnNullable*>(column_and_type.column.get())
                         ->get_null_map_data();
         _hybrid_set->find_batch_nullable(
-                
remove_nullable(column_and_type->column)->assume_mutable_ref(), rows, null_map,
+                remove_nullable(column_and_type.column)->assume_mutable_ref(), 
rows, null_map,
                 *_filter);
         if (_hybrid_set->contain_null()) {
             auto* filter_data = _filter->data();
@@ -78,7 +79,7 @@ Status SimpleEqualityDelete::filter_data_block(Block* 
data_block) {
             }
         }
     } else {
-        _hybrid_set->find_batch(column_and_type->column->assume_mutable_ref(), 
rows, *_filter);
+        _hybrid_set->find_batch(column_and_type.column->assume_mutable_ref(), 
rows, *_filter);
     }
     // should reverse _filter
     auto* filter_data = _filter->data();
@@ -108,19 +109,22 @@ Status MultiEqualityDelete::_build_set() {
 Status MultiEqualityDelete::filter_data_block(Block* data_block) {
     SCOPED_TIMER(equality_delete_time);
     size_t column_index = 0;
-    for (std::string column_name : _delete_block->get_names()) {
-        auto* column_and_type = data_block->try_get_by_name(column_name);
-        if (column_and_type == nullptr) {
-            return Status::InternalError("Can't find the delete column '{}' in 
data file",
-                                         column_name);
+
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_pos_map = data_block->get_name_to_pos_map();
+    for (auto delete_col : _delete_block->get_columns_with_type_and_name()) {
+        const std::string& column_name = delete_col.name;
+        auto column_and_type = 
data_block->safe_get_by_position(name_to_pos_map[column_name]);
+        if (name_to_pos_map.contains(column_name) == false) {
+            return Status::InternalError("Column '{}' not found in data block: 
{}", column_name,
+                                         data_block->dump_structure());
         }
-        if 
(!_delete_block->get_by_name(column_name).type->equals(*column_and_type->type)) 
{
+        if (!delete_col.type->equals(*column_and_type.type)) {
             return Status::InternalError(
                     "Not support type change in column '{}', src type: {}, 
target type: {}",
-                    column_name, 
_delete_block->get_by_name(column_name).type->get_name(),
-                    column_and_type->type->get_name());
+                    column_name, delete_col.type->get_name(), 
column_and_type.type->get_name());
         }
-        _data_column_index[column_index++] = 
data_block->get_position_by_name(column_name);
+        _data_column_index[column_index++] = name_to_pos_map[column_name];
     }
     size_t rows = data_block->rows();
     _data_hashes.clear();
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index a9640e353b1..9b78f11a177 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -223,20 +223,32 @@ void IcebergTableReader::_generate_equality_delete_block(
 }
 
 Status IcebergTableReader::_expand_block_if_need(Block* block) {
+    std::set<std::string> names;
+    auto block_names = block->get_names();
+    names.insert(block_names.begin(), block_names.end());
     for (auto& col : _expand_columns) {
         col.column->assume_mutable()->clear();
-        if (block->try_get_by_name(col.name)) {
+        if (names.contains(col.name)) {
             return Status::InternalError("Wrong expand column '{}'", col.name);
         }
+        names.insert(col.name);
         block->insert(col);
     }
     return Status::OK();
 }
 
 Status IcebergTableReader::_shrink_block_if_need(Block* block) {
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_pos_map = block->get_name_to_pos_map();
+    std::set<size_t> positions_to_erase;
     for (const std::string& expand_col : _expand_col_names) {
-        block->erase(expand_col);
+        if (!name_to_pos_map.contains(expand_col)) {
+            return Status::InternalError("Wrong erase column '{}', block: {}", 
expand_col,
+                                         block->dump_names());
+        }
+        positions_to_erase.emplace(name_to_pos_map[expand_col]);
     }
+    block->erase(positions_to_erase);
     return Status::OK();
 }
 
@@ -383,9 +395,11 @@ void 
IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& d
 void IcebergTableReader::_gen_position_delete_file_range(Block& block, 
DeleteFile* position_delete,
                                                          size_t read_rows,
                                                          bool 
file_path_column_dictionary_coded) {
-    ColumnPtr path_column = block.get_by_name(ICEBERG_FILE_PATH).column;
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_pos_map = block.get_name_to_pos_map();
+    ColumnPtr path_column = 
block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column;
     DCHECK_EQ(path_column->size(), read_rows);
-    ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column;
+    ColumnPtr pos_column = 
block.get_by_position(name_to_pos_map[ICEBERG_ROW_POS]).column;
     using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
     const int64_t* src_data = assert_cast<const 
ColumnType&>(*pos_column).get_data().data();
     IcebergTableReader::PositionDeleteRange range;
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp 
b/be/src/vec/exec/format/wal/wal_reader.cpp
index bb9b4318e9c..2390f210d3e 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -60,7 +60,9 @@ Status WalReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
                                         _wal_path);
     }
     Block src_block;
-    RETURN_IF_ERROR(src_block.deserialize(pblock));
+    size_t uncompressed_size = 0;
+    int64_t uncompressed_time = 0;
+    RETURN_IF_ERROR(src_block.deserialize(pblock, &uncompressed_size, 
&uncompressed_time));
     //convert to dst block
     Block dst_block;
     int index = 0;
diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index 91457cb7718..7168639c59a 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -324,8 +324,10 @@ Status JniConnector::_fill_block(Block* block, size_t 
num_rows) {
     SCOPED_RAW_TIMER(&_fill_block_watcher);
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+    // todo: maybe do not need to build name to index map every time
+    auto name_to_pos_map = block->get_name_to_pos_map();
     for (int i = 0; i < _column_names.size(); ++i) {
-        auto& column_with_type_and_name = block->get_by_name(_column_names[i]);
+        auto& column_with_type_and_name = 
block->get_by_position(name_to_pos_map[_column_names[i]]);
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
         RETURN_IF_ERROR(_fill_column(_table_meta, column_ptr, column_type, 
num_rows));
diff --git a/be/src/vec/exec/scan/file_scanner.cpp 
b/be/src/vec/exec/scan/file_scanner.cpp
index 116fcbd33e7..476eae26ebe 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -543,6 +543,9 @@ Status FileScanner::_check_output_block_types() {
 Status FileScanner::_init_src_block(Block* block) {
     if (!_is_load) {
         _src_block_ptr = block;
+
+        // todo: maybe do not need to build name to index map every time
+        _src_block_name_to_idx = block->get_name_to_pos_map();
         return Status::OK();
     }
     RETURN_IF_ERROR(_check_output_block_types());
@@ -609,7 +612,7 @@ Status FileScanner::_cast_to_input_block(Block* block) {
             // skip variant type
             continue;
         }
-        auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
+        auto& arg = 
_src_block_ptr->get_by_position(_src_block_name_to_idx[slot_desc->col_name()]);
         auto return_type = slot_desc->get_data_type_ptr();
         // remove nullable here, let the get_function decide whether nullable
         auto data_type = 
get_data_type_with_default_argument(remove_nullable(return_type));
@@ -637,7 +640,9 @@ Status FileScanner::_fill_columns_from_path(size_t rows) {
     }
     DataTypeSerDe::FormatOptions _text_formatOptions;
     for (auto& kv : _partition_col_descs) {
-        auto doris_column = _src_block_ptr->get_by_name(kv.first).column;
+        auto doris_column =
+                
_src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).column;
+        // _src_block_ptr points to a mutable block created by this class 
itself, so const_cast can be used here.
         IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
         auto& [value, slot_desc] = kv.second;
         auto _text_serde = slot_desc->get_data_type_ptr()->get_serde();
@@ -669,7 +674,8 @@ Status FileScanner::_fill_missing_columns(size_t rows) {
     for (auto& kv : _missing_col_descs) {
         if (kv.second == nullptr) {
             // no default column, fill with null
-            auto mutable_column = 
_src_block_ptr->get_by_name(kv.first).column->assume_mutable();
+            auto mutable_column = 
_src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first])
+                                          .column->assume_mutable();
             auto* nullable_column = 
static_cast<vectorized::ColumnNullable*>(mutable_column.get());
             nullable_column->insert_many_defaults(rows);
         } else {
@@ -689,10 +695,15 @@ Status FileScanner::_fill_missing_columns(size_t rows) {
                 mutable_column->resize(rows);
                 // result_column_ptr maybe a ColumnConst, convert it to a 
normal column
                 result_column_ptr = 
result_column_ptr->convert_to_full_column_if_const();
-                auto origin_column_type = 
_src_block_ptr->get_by_name(kv.first).type;
+                auto origin_column_type =
+                        
_src_block_ptr->get_by_position(_src_block_name_to_idx[kv.first]).type;
                 bool is_nullable = origin_column_type->is_nullable();
+                if (!_src_block_name_to_idx.contains(kv.first)) {
+                    return Status::InternalError("Column {} not found in src 
block {}", kv.first,
+                                                 
_src_block_ptr->dump_structure());
+                }
                 _src_block_ptr->replace_by_position(
-                        _src_block_ptr->get_position_by_name(kv.first),
+                        _src_block_name_to_idx[kv.first],
                         is_nullable ? make_nullable(result_column_ptr) : 
result_column_ptr);
                 _src_block_ptr->erase(result_column_id);
             }
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp 
b/be/src/vec/exec/scan/olap_scanner.cpp
index 3f56d9e5be8..c0c0c35f0be 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -427,8 +427,8 @@ Status OlapScanner::_init_tablet_reader_params(
     DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", 
DBUG_BLOCK);
 
     if (!_state->skip_storage_engine_merge()) {
-        TOlapScanNode& olap_scan_node =
-                
((pipeline::OlapScanLocalState*)_local_state)->olap_scan_node();
+        auto* olap_scan_local_state = 
(pipeline::OlapScanLocalState*)_local_state;
+        TOlapScanNode& olap_scan_node = 
olap_scan_local_state->olap_scan_node();
         // order by table keys optimization for topn
         // will only read head/tail of data file since it's already sorted by 
keys
         if (olap_scan_node.__isset.sort_info && 
!olap_scan_node.sort_info.is_asc_order.empty()) {
@@ -440,16 +440,20 @@ Status OlapScanner::_init_tablet_reader_params(
             _tablet_reader_params.read_orderby_key_num_prefix_columns =
                     olap_scan_node.sort_info.is_asc_order.size();
             _tablet_reader_params.read_orderby_key_limit = _limit;
-            _tablet_reader_params.filter_block_conjuncts = _conjuncts;
+
+            if (_tablet_reader_params.read_orderby_key_limit > 0 &&
+                olap_scan_local_state->_storage_no_merge()) {
+                _tablet_reader_params.filter_block_conjuncts = _conjuncts;
+                _conjuncts.clear();
+            }
         }
 
         // set push down topn filter
         _tablet_reader_params.topn_filter_source_node_ids =
-                ((pipeline::OlapScanLocalState*)_local_state)
-                        ->get_topn_filter_source_node_ids(_state, true);
+                olap_scan_local_state->get_topn_filter_source_node_ids(_state, 
true);
         if (!_tablet_reader_params.topn_filter_source_node_ids.empty()) {
             _tablet_reader_params.topn_filter_target_node_id =
-                    
((pipeline::OlapScanLocalState*)_local_state)->parent()->node_id();
+                    olap_scan_local_state->parent()->node_id();
         }
     }
 
diff --git a/be/src/vec/exec/scan/scanner.cpp b/be/src/vec/exec/scan/scanner.cpp
index c46c4f4a90f..c9391960c8e 100644
--- a/be/src/vec/exec/scan/scanner.cpp
+++ b/be/src/vec/exec/scan/scanner.cpp
@@ -113,8 +113,6 @@ Status Scanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
                 RETURN_IF_ERROR(_get_block_impl(state, block, eof));
                 if (*eof) {
                     DCHECK(block->rows() == 0);
-                    // clear TEMP columns to avoid column align problem
-                    block->erase_tmp_columns();
                     break;
                 }
                 _num_rows_read += block->rows();
@@ -145,11 +143,6 @@ Status Scanner::get_block(RuntimeState* state, Block* 
block, bool* eof) {
 }
 
 Status Scanner::_filter_output_block(Block* block) {
-    Defer clear_tmp_block([&]() { block->erase_tmp_columns(); });
-    if (block->has(BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED)) {
-        // scanner filter_block is already done (only by _topn_next 
currently), just skip it
-        return Status::OK();
-    }
     auto old_rows = block->rows();
     Status st = VExprContext::filter_block(_conjuncts, block, 
block->columns());
     _counter.num_rows_unselected += old_rows - block->rows();
diff --git a/be/src/vec/functions/function_helpers.cpp 
b/be/src/vec/functions/function_helpers.cpp
index 6862e9addb9..877e47b5318 100644
--- a/be/src/vec/functions/function_helpers.cpp
+++ b/be/src/vec/functions/function_helpers.cpp
@@ -97,14 +97,6 @@ std::tuple<Block, ColumnNumbers> 
create_block_with_nested_columns(const Block& b
         }
     }
 
-    // TODO: only support match function, rethink the logic
-    for (const auto& ctn : block) {
-        if (ctn.name.size() > BeConsts::BLOCK_TEMP_COLUMN_PREFIX.size() &&
-            starts_with(ctn.name, BeConsts::BLOCK_TEMP_COLUMN_PREFIX)) {
-            res.insert(ctn);
-        }
-    }
-
     return {std::move(res), std::move(res_args)};
 }
 
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 7f51bf7c1f0..a3186d2d982 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -411,7 +411,6 @@ Status BlockReader::_unique_key_next_block(Block* block, 
bool* eof) {
         block->insert(column_with_type_and_name);
         RETURN_IF_ERROR(Block::filter_block(block, target_columns_size, 
target_columns_size));
         _stats.rows_del_filtered += target_block_row - block->rows();
-        DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") == 
nullptr);
         if (UNLIKELY(_reader_context.record_rowids)) {
             DCHECK_EQ(_block_row_locations.size(), block->rows() + 
delete_count);
         }
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 3f375eb2c82..deb384de0ba 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -93,6 +93,7 @@ void VCollectIterator::init(TabletReader* reader, bool 
ori_data_overlapping, boo
         _topn_limit = _reader->_reader_context.read_orderby_key_limit;
     } else {
         _topn_limit = 0;
+        DCHECK_EQ(_reader->_reader_context.filter_block_conjuncts.size(), 0);
     }
 }
 
@@ -259,8 +260,6 @@ Status VCollectIterator::_topn_next(Block* block) {
         return Status::Error<END_OF_FILE>("");
     }
 
-    // clear TEMP columns to avoid column align problem
-    block->erase_tmp_columns();
     auto clone_block = block->clone_empty();
     /*
     select id, "${tR2}",
@@ -316,8 +315,6 @@ Status VCollectIterator::_topn_next(Block* block) {
                 if (status.is<END_OF_FILE>()) {
                     eof = true;
                     if (block->rows() == 0) {
-                        // clear TEMP columns to avoid column align problem in 
segment iterator
-                        block->erase_tmp_columns();
                         break;
                     }
                 } else {
@@ -328,8 +325,6 @@ Status VCollectIterator::_topn_next(Block* block) {
             // filter block
             RETURN_IF_ERROR(VExprContext::filter_block(
                     _reader->_reader_context.filter_block_conjuncts, block, 
block->columns()));
-            // clear TMPE columns to avoid column align problem in 
mutable_block.add_rows bellow
-            block->erase_tmp_columns();
 
             // update read rows
             read_rows += block->rows();
@@ -452,12 +447,6 @@ Status VCollectIterator::_topn_next(Block* block) {
                << " sorted_row_pos.size()=" << sorted_row_pos.size()
                << " mutable_block.rows()=" << mutable_block.rows();
     *block = mutable_block.to_block();
-    // append a column to indicate scanner filter_block is already done
-    auto filtered_datatype = std::make_shared<DataTypeUInt8>();
-    auto filtered_column = filtered_datatype->create_column_const(
-            block->rows(), Field::create_field<TYPE_BOOLEAN>(1));
-    block->insert(
-            {filtered_column, filtered_datatype, 
BeConsts::BLOCK_TEMP_COLUMN_SCANNER_FILTERED});
 
     _topn_eof = true;
     return block->rows() > 0 ? Status::OK() : Status::Error<END_OF_FILE>("");
@@ -894,8 +883,6 @@ Status 
VCollectIterator::Level1Iterator::_normal_next(Block* block) {
     while (res.is<END_OF_FILE>() && !_children.empty()) {
         _cur_child = std::move(*(_children.begin()));
         _children.pop_front();
-        // clear TEMP columns to avoid column align problem
-        block->erase_tmp_columns();
         res = _cur_child->next(block);
     }
 
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 5f6e376367d..24bdf4e87a6 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -515,7 +515,6 @@ Status VerticalBlockReader::_unique_key_next_block(Block* 
block, bool* eof) {
             RETURN_IF_ERROR(
                     Block::filter_block(block, target_columns.size(), 
target_columns.size()));
             _stats.rows_del_filtered += block_rows - block->rows();
-            DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") == 
nullptr);
             if (UNLIKELY(_reader_context.record_rowids)) {
                 DCHECK_EQ(_block_row_locations.size(), block->rows() + 
delete_count);
             }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 9cd8d4bb8d9..e6ff6f1a8a8 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -101,8 +101,8 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* 
block, bool* eos) {
     RETURN_IF_ERROR(block_item.get_block(next_block));
     size_t block_byte_size = block_item.block_byte_size();
     COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, 
block_item.deserialize_time());
-    COUNTER_UPDATE(_recvr->_decompress_timer, block->get_decompress_time());
-    COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
+    COUNTER_UPDATE(_recvr->_decompress_timer, block_item.decompress_time());
+    COUNTER_UPDATE(_recvr->_decompress_bytes, block_item.decompress_bytes());
     _recvr->_memory_used_counter->update(-(int64_t)block_byte_size);
     INJECT_MOCK_SLEEP(std::lock_guard<std::mutex> l(_lock));
     sub_blocks_memory_usage(block_byte_size);
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 1d752b1bad8..1a1c84f3e67 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -271,7 +271,8 @@ protected:
                 DCHECK(_pblock);
                 SCOPED_RAW_TIMER(&_deserialize_time);
                 _block = Block::create_unique();
-                
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_block->deserialize(*_pblock));
+                RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
+                        _block->deserialize(*_pblock, &_decompress_bytes, 
&_decompress_time));
             }
             block.swap(_block);
             _block.reset();
@@ -280,6 +281,8 @@ protected:
 
         size_t block_byte_size() const { return _block_byte_size; }
         int64_t deserialize_time() const { return _deserialize_time; }
+        int64_t decompress_time() const { return _decompress_time; }
+        size_t decompress_bytes() const { return _decompress_bytes; }
         BlockItem() = default;
         BlockItem(BlockUPtr&& block, size_t block_byte_size)
                 : _block(std::move(block)), _block_byte_size(block_byte_size) 
{}
@@ -292,6 +295,8 @@ protected:
         std::unique_ptr<PBlock> _pblock;
         size_t _block_byte_size = 0;
         int64_t _deserialize_time = 0;
+        int64_t _decompress_time = 0;
+        size_t _decompress_bytes = 0;
     };
 
     std::list<BlockItem> _block_queue;
diff --git a/be/src/vec/sink/varrow_flight_result_writer.cpp 
b/be/src/vec/sink/varrow_flight_result_writer.cpp
index 9105ba9e057..b8280785546 100644
--- a/be/src/vec/sink/varrow_flight_result_writer.cpp
+++ b/be/src/vec/sink/varrow_flight_result_writer.cpp
@@ -47,10 +47,12 @@ Status GetArrowResultBatchCtx::on_data(const 
std::shared_ptr<vectorized::Block>&
     if (_result != nullptr) {
         auto* arrow_buffer = 
assert_cast<ArrowFlightResultBlockBuffer*>(buffer);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
+        int64_t compressed_time = 0;
         SCOPED_TIMER(arrow_buffer->_serialize_batch_ns_timer);
-        RETURN_IF_ERROR(block->serialize(
-                arrow_buffer->_be_exec_version, _result->mutable_block(), 
&uncompressed_bytes,
-                &compressed_bytes, 
arrow_buffer->_fragment_transmission_compression_type, false));
+        RETURN_IF_ERROR(block->serialize(arrow_buffer->_be_exec_version, 
_result->mutable_block(),
+                                         &uncompressed_bytes, 
&compressed_bytes, &compressed_time,
+                                         
arrow_buffer->_fragment_transmission_compression_type,
+                                         false));
         COUNTER_UPDATE(arrow_buffer->_uncompressed_bytes_counter, 
uncompressed_bytes);
         COUNTER_UPDATE(arrow_buffer->_compressed_bytes_counter, 
compressed_bytes);
         _result->set_packet_seq(packet_seq);
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index b4af185adb3..a05ce66ff98 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -347,12 +347,13 @@ Status BlockSerializer::serialize_block(const Block* src, 
PBlock* dest, size_t n
     SCOPED_TIMER(_parent->_serialize_batch_timer);
     dest->Clear();
     size_t uncompressed_bytes = 0, compressed_bytes = 0;
+    int64_t compress_time = 0;
     RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest, 
&uncompressed_bytes,
-                                   &compressed_bytes, 
_parent->compression_type(),
+                                   &compressed_bytes, &compress_time, 
_parent->compression_type(),
                                    _parent->transfer_large_data_by_brpc()));
     COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes * 
num_receivers);
     COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes * 
num_receivers);
-    COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
+    COUNTER_UPDATE(_parent->_compress_timer, compress_time);
 #ifndef BE_TEST
     
_parent->state()->get_query_ctx()->resource_ctx()->io_context()->update_shuffle_send_bytes(
             compressed_bytes * num_receivers);
diff --git a/be/src/vec/sink/vtablet_block_convertor.cpp 
b/be/src/vec/sink/vtablet_block_convertor.cpp
index e720b5f694e..5bb8f2cbe0f 100644
--- a/be/src/vec/sink/vtablet_block_convertor.cpp
+++ b/be/src/vec/sink/vtablet_block_convertor.cpp
@@ -689,10 +689,6 @@ Status 
OlapTableBlockConvertor::_fill_auto_inc_cols(vectorized::Block* block, si
 
 Status 
OlapTableBlockConvertor::_partial_update_fill_auto_inc_cols(vectorized::Block* 
block,
                                                                    size_t 
rows) {
-    // avoid duplicate PARTIAL_UPDATE_AUTO_INC_COL
-    if (block->has(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL)) {
-        return Status::OK();
-    }
     auto dst_column = vectorized::ColumnInt64::create();
     vectorized::ColumnInt64::Container& dst_values = dst_column->get_data();
     size_t null_value_count = rows;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index d7a269287af..fe51df4dcb2 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -866,8 +866,9 @@ void VNodeChannel::try_send_pending_block(RuntimeState* 
state) {
     if (block.rows() > 0) {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
+        int64_t compressed_time = 0;
         Status st = block.serialize(state->be_exec_version(), 
request->mutable_block(),
-                                    &uncompressed_bytes, &compressed_bytes,
+                                    &uncompressed_bytes, &compressed_bytes, 
&compressed_time,
                                     
state->fragement_transmission_compression_type(),
                                     _parent->_transfer_large_data_by_brpc);
         TEST_INJECTION_POINT_CALLBACK("VNodeChannel::try_send_block", &st);
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp 
b/be/src/vec/sink/writer/vwal_writer.cpp
index 5a64b6f7c43..35436da82f1 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -73,8 +73,9 @@ Status VWalWriter::write_wal(vectorized::Block* block) {
                     { return Status::InternalError("Failed to write wal!"); });
     PBlock pblock;
     size_t uncompressed_bytes = 0, compressed_bytes = 0;
+    int64_t compressed_time = 0;
     RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, 
&uncompressed_bytes,
-                                     &compressed_bytes,
+                                     &compressed_bytes, &compressed_time,
                                      
segment_v2::CompressionTypePB::NO_COMPRESSION));
     RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> 
{&pblock}));
     return Status::OK();
diff --git a/be/src/vec/spill/spill_reader.cpp 
b/be/src/vec/spill/spill_reader.cpp
index de3eea6b625..bb74d20ec33 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -142,7 +142,9 @@ Status SpillReader::read(Block* block, bool* eos) {
             if (!pb_block_.ParseFromArray(result.data, 
cast_set<int>(result.size))) {
                 return Status::InternalError("Failed to read spilled block");
             }
-            RETURN_IF_ERROR(block->deserialize(pb_block_));
+            size_t uncompressed_size = 0;
+            int64_t uncompressed_time = 0;
+            RETURN_IF_ERROR(block->deserialize(pb_block_, &uncompressed_size, 
&uncompressed_time));
         }
         COUNTER_UPDATE(_read_block_data_size, block->bytes());
         COUNTER_UPDATE(_read_rows_count, block->rows());
diff --git a/be/src/vec/spill/spill_writer.cpp 
b/be/src/vec/spill/spill_writer.cpp
index a43ba83ccb9..a43162f43ce 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -115,9 +115,10 @@ Status SpillWriter::_write_internal(const Block& block, 
size_t& written_bytes) {
         {
             PBlock pblock;
             SCOPED_TIMER(_serialize_timer);
+            int64_t compressed_time = 0;
             status = block.serialize(
                     BeExecVersionManager::get_newest_version(), &pblock, 
&uncompressed_bytes,
-                    &compressed_bytes,
+                    &compressed_bytes, &compressed_time,
                     segment_v2::CompressionTypePB::ZSTD); // ZSTD for better 
compression ratio
             RETURN_IF_ERROR(status);
             int64_t pblock_mem = pblock.ByteSizeLong();
diff --git a/be/test/olap/wal/wal_reader_writer_test.cpp 
b/be/test/olap/wal/wal_reader_writer_test.cpp
index 94fc8ff91a2..d443cdfa849 100644
--- a/be/test/olap/wal/wal_reader_writer_test.cpp
+++ b/be/test/olap/wal/wal_reader_writer_test.cpp
@@ -64,8 +64,10 @@ void covert_block_to_pb(
         segment_v2::CompressionTypePB compression_type = 
segment_v2::CompressionTypePB::SNAPPY) {
     size_t uncompressed_bytes = 0;
     size_t compressed_bytes = 0;
-    Status st = block.serialize(BeExecVersionManager::get_newest_version(), 
pblock,
-                                &uncompressed_bytes, &compressed_bytes, 
compression_type);
+    int64_t compressed_time = 0;
+    Status st =
+            block.serialize(BeExecVersionManager::get_newest_version(), 
pblock, &uncompressed_bytes,
+                            &compressed_bytes, &compressed_time, 
compression_type);
     EXPECT_TRUE(st.ok());
     EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
     EXPECT_EQ(compressed_bytes, pblock->column_values().size());
@@ -132,7 +134,9 @@ TEST_F(WalReaderWriterTest, TestWriteAndRead1) {
             break;
         }
         vectorized::Block block;
-        EXPECT_TRUE(block.deserialize(pblock).ok());
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        EXPECT_TRUE(block.deserialize(pblock, &uncompress_size, 
&uncompressed_time).ok());
         EXPECT_EQ(block_rows, block.rows());
     }
     static_cast<void>(wal_reader.finalize());
diff --git a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp 
b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
index bed8be3e4f3..0bf6691d7d0 100644
--- a/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
+++ b/be/test/pipeline/exec/vdata_stream_recvr_test.cpp
@@ -318,8 +318,9 @@ class MockClosure : public google::protobuf::Closure {
 void to_pblock(Block& block, PBlock* pblock) {
     size_t uncompressed_bytes = 0;
     size_t compressed_bytes = 0;
+    int64_t compressed_time = 0;
     EXPECT_TRUE(block.serialize(BeExecVersionManager::get_newest_version(), 
pblock,
-                                &uncompressed_bytes, &compressed_bytes,
+                                &uncompressed_bytes, &compressed_bytes, 
&compressed_time,
                                 
segment_v2::CompressionTypePB::NO_COMPRESSION));
 }
 
diff --git a/be/test/pipeline/operator/materialization_shared_state_test.cpp 
b/be/test/pipeline/operator/materialization_shared_state_test.cpp
index 51c097207f1..d96653b2dfe 100644
--- a/be/test/pipeline/operator/materialization_shared_state_test.cpp
+++ b/be/test/pipeline/operator/materialization_shared_state_test.cpp
@@ -119,8 +119,9 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponse) {
         auto serialized_block = response_.add_blocks()->mutable_block();
         size_t uncompressed_size = 0;
         size_t compressed_size = 0;
+        int64_t compress_time = 0;
         auto s = resp_block1.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
-                                       CompressionTypePB::LZ4);
+                                       &compress_time, CompressionTypePB::LZ4);
         EXPECT_TRUE(s.ok());
 
         _shared_state->rpc_struct_map[_backend_id1].response = 
std::move(response_);
@@ -141,8 +142,9 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponse) {
         auto serialized_block = response_.add_blocks()->mutable_block();
         size_t uncompressed_size = 0;
         size_t compressed_size = 0;
+        int64_t compress_time = 0;
         auto s = resp_block2.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
-                                       CompressionTypePB::LZ4);
+                                       &compress_time, CompressionTypePB::LZ4);
         EXPECT_TRUE(s.ok());
 
         _shared_state->rpc_struct_map[_backend_id2].response = 
std::move(response_);
@@ -219,8 +221,9 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
         auto serialized_block = response_.add_blocks()->mutable_block();
         size_t uncompressed_size = 0;
         size_t compressed_size = 0;
+        int64_t compress_time = 0;
         auto s = resp_block1.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
-                                       CompressionTypePB::LZ4);
+                                       &compress_time, CompressionTypePB::LZ4);
         EXPECT_TRUE(s.ok());
 
         _shared_state->rpc_struct_map[_backend_id1].response = 
std::move(response_);
@@ -240,8 +243,9 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
         auto serialized_block = response_.add_blocks()->mutable_block();
         size_t uncompressed_size = 0;
         size_t compressed_size = 0;
+        int64_t compress_time = 0;
         auto s = resp_block2.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
-                                       CompressionTypePB::LZ4);
+                                       &compress_time, CompressionTypePB::LZ4);
         EXPECT_TRUE(s.ok());
 
         _shared_state->rpc_struct_map[_backend_id2].response = 
std::move(response_);
@@ -260,8 +264,9 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
                 
_shared_state->rpc_struct_map[_backend_id1].response.add_blocks()->mutable_block();
         size_t uncompressed_size = 0;
         size_t compressed_size = 0;
+        int64_t compress_time = 0;
         auto s = resp_block1.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
-                                       CompressionTypePB::LZ4);
+                                       &compress_time, CompressionTypePB::LZ4);
         EXPECT_TRUE(s.ok());
         _shared_state->response_blocks[1] = resp_block1.clone_empty();
     }
@@ -278,8 +283,9 @@ TEST_F(MaterializationSharedStateTest, 
TestMergeMultiResponseMultiBlocks) {
                 
_shared_state->rpc_struct_map[_backend_id2].response.add_blocks()->mutable_block();
         size_t uncompressed_size = 0;
         size_t compressed_size = 0;
+        int64_t compress_time = 0;
         auto s = resp_block2.serialize(0, serialized_block, 
&uncompressed_size, &compressed_size,
-                                       CompressionTypePB::LZ4);
+                                       &compress_time, CompressionTypePB::LZ4);
         EXPECT_TRUE(s.ok());
     }
 
diff --git a/be/test/testutil/mock/mock_data_stream_sender.h 
b/be/test/testutil/mock/mock_data_stream_sender.h
index dc9d2542beb..cd076cca521 100644
--- a/be/test/testutil/mock/mock_data_stream_sender.h
+++ b/be/test/testutil/mock/mock_data_stream_sender.h
@@ -54,7 +54,10 @@ struct MockChannel : public Channel {
             return Status::OK();
         }
         Block nblock;
-        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(nblock.deserialize(*_pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
+                nblock.deserialize(*_pblock, &uncompress_size, 
&uncompressed_time));
         if (!nblock.empty()) {
             RETURN_IF_ERROR(_send_block.merge(std::move(nblock)));
         }
@@ -64,7 +67,10 @@ struct MockChannel : public Channel {
     Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
                                 bool eos = false) override {
         Block nblock;
-        
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(nblock.deserialize(*block->get_block()));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
+                nblock.deserialize(*block->get_block(), &uncompress_size, 
&uncompressed_time));
         if (!nblock.empty()) {
             RETURN_IF_ERROR(_send_block.merge(std::move(nblock)));
         }
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 07e4254c7fc..9b316baca60 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -74,8 +74,10 @@ void block_to_pb(
         segment_v2::CompressionTypePB compression_type = 
segment_v2::CompressionTypePB::SNAPPY) {
     size_t uncompressed_bytes = 0;
     size_t compressed_bytes = 0;
-    Status st = block.serialize(BeExecVersionManager::get_newest_version(), 
pblock,
-                                &uncompressed_bytes, &compressed_bytes, 
compression_type);
+    int64_t compress_time = 0;
+    Status st =
+            block.serialize(BeExecVersionManager::get_newest_version(), 
pblock, &uncompressed_bytes,
+                            &compressed_bytes, &compress_time, 
compression_type);
     EXPECT_TRUE(st.ok());
     // const column maybe uncompressed_bytes<compressed_bytes
     // as the serialize_bytes add some additional byets: 
STREAMVBYTE_PADDING=16;
@@ -147,7 +149,9 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -169,7 +173,9 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -192,7 +198,9 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -219,7 +227,9 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -240,7 +250,9 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -263,7 +275,9 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -287,7 +301,9 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -303,7 +319,9 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -328,7 +346,9 @@ void serialize_and_deserialize_test_one() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
         std::string s2 = pblock2.DebugString();
@@ -354,7 +374,9 @@ void serialize_and_deserialize_test_int() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
         std::string s2 = pblock2.DebugString();
@@ -378,7 +400,9 @@ void serialize_and_deserialize_test_int() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
         std::string s2 = pblock2.DebugString();
@@ -403,7 +427,9 @@ void serialize_and_deserialize_test_long() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
         std::string s2 = pblock2.DebugString();
@@ -427,7 +453,9 @@ void serialize_and_deserialize_test_long() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
         std::string s2 = pblock2.DebugString();
@@ -452,7 +480,9 @@ void serialize_and_deserialize_test_string() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
         std::string s2 = pblock2.DebugString();
@@ -477,7 +507,9 @@ void serialize_and_deserialize_test_string() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
         std::string s2 = pblock2.DebugString();
@@ -505,7 +537,9 @@ void serialize_and_deserialize_test_nullable() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
         std::string s2 = pblock2.DebugString();
@@ -530,7 +564,9 @@ void serialize_and_deserialize_test_nullable() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
         std::string s2 = pblock2.DebugString();
@@ -554,7 +590,9 @@ void serialize_and_deserialize_test_nullable() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
         std::string s2 = pblock2.DebugString();
@@ -576,7 +614,9 @@ void serialize_and_deserialize_test_nullable() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
         std::string s2 = pblock2.DebugString();
@@ -602,7 +642,9 @@ void serialize_and_deserialize_test_decimal() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
         std::string s2 = pblock2.DebugString();
@@ -626,7 +668,9 @@ void serialize_and_deserialize_test_decimal() {
         std::string s1 = pblock.DebugString();
 
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::LZ4);
         std::string s2 = pblock2.DebugString();
@@ -658,7 +702,9 @@ void serialize_and_deserialize_test_bitmap() {
         std::string s1 = pblock.DebugString();
         std::string bb1 = block.dump_data(0, 1024);
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         std::string bb2 = block2.dump_data(0, 1024);
         EXPECT_EQ(bb1, bb2);
         PBlock pblock2;
@@ -688,7 +734,9 @@ void serialize_and_deserialize_test_bitmap() {
         std::string s1 = pblock.DebugString();
         std::string bb1 = block.dump_data(0, 1024);
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         std::string bb2 = block2.dump_data(0, 1024);
         EXPECT_EQ(bb1, bb2);
         EXPECT_EQ(block.dump_data_json(0, 1024), block2.dump_data_json(0, 
1024));
@@ -709,7 +757,9 @@ void serialize_and_deserialize_test_array() {
         block_to_pb(block, &pblock, segment_v2::CompressionTypePB::SNAPPY);
         std::string s1 = pblock.DebugString();
         vectorized::Block block2;
-        static_cast<void>(block2.deserialize(pblock));
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        static_cast<void>(block2.deserialize(pblock, &uncompress_size, 
&uncompressed_time));
         PBlock pblock2;
         block_to_pb(block2, &pblock2, segment_v2::CompressionTypePB::SNAPPY);
         std::string s2 = pblock2.DebugString();
@@ -1075,26 +1125,6 @@ TEST(BlockTest, ctor) {
     ASSERT_EQ(block.columns(), 2);
     ASSERT_EQ(block.get_by_position(0).type->get_primitive_type(), TYPE_INT);
     ASSERT_TRUE(block.get_by_position(1).type->is_nullable());
-
-    {
-        auto mutable_block =
-                
vectorized::MutableBlock::create_unique(tbl->get_tuple_descs(), 10, false);
-        ASSERT_EQ(mutable_block->columns(), 2);
-        auto mutable_block2 = vectorized::MutableBlock::create_unique();
-        mutable_block->swap(*mutable_block2);
-        ASSERT_EQ(mutable_block->columns(), 0);
-        ASSERT_EQ(mutable_block2->columns(), 2);
-    }
-
-    {
-        auto mutable_block =
-                
vectorized::MutableBlock::create_unique(tbl->get_tuple_descs(), 10, true);
-        ASSERT_EQ(mutable_block->columns(), 1);
-        auto mutable_block2 = vectorized::MutableBlock::create_unique();
-        mutable_block->swap(*mutable_block2);
-        ASSERT_EQ(mutable_block->columns(), 0);
-        ASSERT_EQ(mutable_block2->columns(), 1);
-    }
 }
 
 TEST(BlockTest, insert_erase) {
@@ -1125,39 +1155,20 @@ TEST(BlockTest, insert_erase) {
     block.erase_tail(0);
     ASSERT_EQ(block.columns(), 0);
 
-    EXPECT_ANY_THROW(block.erase("column"));
     column_with_name =
             
vectorized::ColumnHelper::create_column_with_name<vectorized::DataTypeString>({});
     block.insert(0, column_with_name);
-    EXPECT_NO_THROW(block.erase("column"));
-    ASSERT_EQ(block.columns(), 0);
-
-    EXPECT_ANY_THROW(block.safe_get_by_position(0));
+    ASSERT_EQ(block.columns(), 1);
 
-    ASSERT_EQ(block.try_get_by_name("column"), nullptr);
-    EXPECT_ANY_THROW(block.get_by_name("column"));
-    EXPECT_ANY_THROW(block.get_position_by_name("column"));
     block.insert(0, column_with_name);
 
-    EXPECT_NO_THROW(auto item = block.get_by_name("column"));
-    ASSERT_NE(block.try_get_by_name("column"), nullptr);
     EXPECT_EQ(block.get_position_by_name("column"), 0);
 
-    block.insert({nullptr, nullptr, BeConsts::BLOCK_TEMP_COLUMN_PREFIX});
-    EXPECT_NO_THROW(auto item = 
block.get_by_name(BeConsts::BLOCK_TEMP_COLUMN_PREFIX));
-
-    block.erase_tmp_columns();
-    ASSERT_EQ(block.try_get_by_name(BeConsts::BLOCK_TEMP_COLUMN_PREFIX), 
nullptr);
-
     {
         // test const block
         const auto const_block = block;
-        EXPECT_EQ(const_block.try_get_by_name("column2"), nullptr);
-        EXPECT_ANY_THROW(const_block.get_by_name("column2"));
-        EXPECT_ANY_THROW(const_block.get_position_by_name("column2"));
+        EXPECT_EQ(const_block.get_position_by_name("column2"), -1);
 
-        EXPECT_NO_THROW(auto item = const_block.get_by_name("column"));
-        ASSERT_NE(const_block.try_get_by_name("column"), nullptr);
         EXPECT_EQ(const_block.get_position_by_name("column"), 0);
     }
 
@@ -1166,14 +1177,7 @@ TEST(BlockTest, insert_erase) {
 
     block.insert({nullptr, std::make_shared<vectorized::DataTypeString>(), 
"col2"});
 
-    vectorized::MutableBlock mutable_block(&block);
-    mutable_block.erase("col1");
-    ASSERT_EQ(mutable_block.columns(), 2);
-
-    EXPECT_ANY_THROW(mutable_block.erase("col1"));
-    ASSERT_EQ(mutable_block.columns(), 2);
-    mutable_block.erase("col2");
-    ASSERT_EQ(mutable_block.columns(), 1);
+    ASSERT_EQ(block.columns(), 3);
 }
 
 TEST(BlockTest, check_number_of_rows) {
@@ -1352,8 +1356,6 @@ TEST(BlockTest, others) {
 
     mutable_block.clear_column_data();
     ASSERT_EQ(mutable_block.get_column_by_position(0)->size(), 0);
-    ASSERT_TRUE(mutable_block.has("column"));
-    ASSERT_EQ(mutable_block.get_position_by_name("column"), 0);
 
     auto dumped_names = mutable_block.dump_names();
     ASSERT_TRUE(dumped_names.find("column") != std::string::npos);
diff --git a/be/test/vec/data_types/common_data_type_test.h 
b/be/test/vec/data_types/common_data_type_test.h
index 4425dd31c94..745b3d19155 100644
--- a/be/test/vec/data_types/common_data_type_test.h
+++ b/be/test/vec/data_types/common_data_type_test.h
@@ -252,13 +252,16 @@ public:
         auto pblock = std::make_unique<PBlock>();
         size_t uncompressed_bytes = 0;
         size_t compressed_bytes = 0;
+        int64_t compress_time = 0;
         segment_v2::CompressionTypePB compression_type = 
segment_v2::CompressionTypePB::ZSTD;
         Status st = block->serialize(be_exec_version, pblock.get(), 
&uncompressed_bytes,
-                                     &compressed_bytes, compression_type);
+                                     &compressed_bytes, &compress_time, 
compression_type);
         ASSERT_EQ(st.ok(), true);
         // deserialize
         auto block_1 = std::make_shared<Block>();
-        st = block_1->deserialize(*pblock);
+        size_t uncompress_size = 0;
+        int64_t uncompressed_time = 0;
+        st = block_1->deserialize(*pblock, &uncompress_size, 
&uncompressed_time);
         ASSERT_EQ(st.ok(), true);
         // check block_1 and block is same
         for (auto col_idx = 0; col_idx < block->columns(); ++col_idx) {
diff --git a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp 
b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
index 9eec232f75f..08864a86fef 100644
--- a/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
+++ b/be/test/vec/exec/format/parquet/parquet_read_lines.cpp
@@ -172,8 +172,8 @@ static void read_parquet_lines(std::vector<std::string> 
numeric_types,
     bool eof = false;
     size_t read_row = 0;
     static_cast<void>(p_reader->get_next_block(block.get(), &read_row, &eof));
-    auto row_id_string_column =
-            static_cast<const 
ColumnString&>(*block->get_by_name("row_id").column.get());
+    auto row_id_string_column = static_cast<const ColumnString&>(
+            
*block->get_by_position(block->get_position_by_name("row_id")).column.get());
     auto read_lines_tmp = read_lines;
     for (auto i = 0; i < row_id_string_column.size(); i++) {
         GlobalRowLoacationV2 info =
@@ -184,7 +184,7 @@ static void read_parquet_lines(std::vector<std::string> 
numeric_types,
         EXPECT_EQ(info.backend_id, BackendOptions::get_backend_id());
         EXPECT_EQ(info.version, IdManager::ID_VERSION);
     }
-    block->erase("row_id");
+    block->erase(block->get_position_by_name("row_id"));
 
     EXPECT_EQ(block->dump_data(), block_dump);
     std::cout << block->dump_data();
diff --git a/be/test/vec/exec/orc/orc_read_lines.cpp 
b/be/test/vec/exec/orc/orc_read_lines.cpp
index 7bdd529c7bd..c73d6604b06 100644
--- a/be/test/vec/exec/orc/orc_read_lines.cpp
+++ b/be/test/vec/exec/orc/orc_read_lines.cpp
@@ -157,8 +157,8 @@ static void read_orc_line(int64_t line, std::string 
block_dump) {
     bool eof = false;
     size_t read_row = 0;
     static_cast<void>(reader->get_next_block(block.get(), &read_row, &eof));
-    auto row_id_string_column =
-            static_cast<const 
ColumnString&>(*block->get_by_name("row_id").column.get());
+    auto row_id_string_column = static_cast<const ColumnString&>(
+            
*block->get_by_position(block->get_position_by_name("row_id")).column.get());
     for (auto i = 0; i < row_id_string_column.size(); i++) {
         GlobalRowLoacationV2 info =
                 
*((GlobalRowLoacationV2*)row_id_string_column.get_data_at(i).data);
@@ -167,7 +167,7 @@ static void read_orc_line(int64_t line, std::string 
block_dump) {
         EXPECT_EQ(info.backend_id, BackendOptions::get_backend_id());
         EXPECT_EQ(info.version, IdManager::ID_VERSION);
     }
-    block->erase("row_id");
+    block->erase(block->get_position_by_name("row_id"));
 
     std::cout << block->dump_data();
     EXPECT_EQ(block->dump_data(), block_dump);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to